八股-消息队列
消息队列相关面试题
======= MQ =======
消息队列选型
Kafka、ActiveMQ、RabbitMQ、RocketMQ来进行不同维度对比
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
单机吞吐量 | 万级 | 万级 | 10 万级 | 10 万级 |
时效性 | 毫秒级 | 微秒级 | 毫秒级 | 毫秒级 |
可用性 | 高(主从) | 高(主从) | 非常高(分布式) | 非常高(分布式) |
消息重复 | 至少一次 | 至少一次 | 至少一次 最多一次 | 至少一次最多一次 |
消息顺序性 | 有序 | 有序 | 有序 | 分区有序 |
支持主题数 | 千级 | 百万级 | 千级 | 百级,多了性能严重下滑 |
消息回溯 | 不支持 | 不支持 | 支持(按时间回溯) | 支持(按offset回溯) |
管理界面 | 普通 | 普通 | 完善 | 普通 |
消息队列使用场景
MQ的作用:解耦、异步、削峰
- 解耦:可在多个系统间实现解耦,将原本通过网络调用的方式转变为使用MQ进行消息的异步通信。非同步操作可改为通过MQ实现不同系统间的连接。如此,项目之间无耦合,系统间影响不大。即便某个系统出现故障,消息仅会在MQ中堆积,无人消费,不会影响其他系统
- 异步:若一个操作涉及多个步骤,且这些步骤间无需同步完成,例如客户创建订单后,还需在客户轨迹系统添加轨迹、更新库存、修改客户状态等。若系统直接调用,将耗费大量时间,不利于客户体验。像添加客户轨迹这类操作无需同步。若将订单创建时需更新的轨迹、库存、状态等信息通过MQ异步处理,可加快系统访问速度,提升客户体验
- 削峰:系统访问流量有高峰与低峰时期。如中午抢购活动期间,系统流量可能激增至每秒5000个并发请求,而系统处理能力仅限于每秒2000个请求。使用MQ进行流量削峰,将用户大量消息存入MQ,系统按自身最大消费能力处理消息,确保系统稳定。可能需调整业务逻辑,向用户返回特定页面或稍后通知结果
消息队列的可靠性、顺序性怎么保证
消息可靠性保障方法
- 消息持久化:确保消息队列能够持久化消息至关重要。在系统崩溃、重启或网络故障等情况下,未处理的消息不应丢失。例如,RabbitMQ 可通过配置将消息持久化至磁盘,通过将队列和消息都设置为持久化(设置
durable = true
),这样在服务器重启后,消息依然可以被重新读取和处理 - 消息确认机制:消费者在成功处理消息后,应向消息队列发送确认(acknowledgment)。消息队列仅在收到确认后,才会将消息从队列中移除。若未收到确认,消息队列可能会在一定时间后重新发送消息给其他消费者或再次发送给同一消费者。以 Kafka 为例,消费者通过
commitSync
或commitAsync
方法提交偏移量(offset),从而确认消息的消费 - 消息重试策略:当消费者处理消息失败时,需有合理的重试策略。可设置重试次数和重试间隔时间。例如,在第一次处理失败后,等待一段时间(如 5 秒)后进行第二次重试,若重试多次(如 3 次)后仍失败,可将消息发送至死信队列,以便后续人工排查或采取其他特殊处理
消息顺序性保障方法
- 有序消息处理场景识别:首先需明确业务场景中哪些消息是需要保证顺序的。例如,在金融交易系统中,对于同用户的转账操作顺序不能被打乱。对于需要顺序处理的消息,要确保消息队列和消费者能够按照特定顺序进行处理
- 消息队列对顺序性的支持:部分消息队列本身提供了顺序性保证的功能。例如,Kafka 可通过将消息划分到同一个分区(Partition)来保证消息在分区内是有序的,消费者按照分区顺序读取消息即可保证消息顺序。但这也可能会限制消息的并行处理程度,需在顺序性和吞吐量之间进行权衡
- 消费者顺序处理策略:消费者在处理顺序消息时,应避免并发处理可能导致顺序打乱的情况。例如,可通过单线程或使用线程池对顺序消息进行串行化处理,确保消息按照正确的顺序被消费
如何保证幂等写
幂等性指的是同一操作的多次执行对系统状态的影响与一次执行结果一致。例如,支付接口若因网络重试被多次调用,最终应确保仅扣款一次。实现幂等性的核心方案包括:
- 唯一标识(幂等键) :客户端为每个请求生成全局唯一ID(如UUID、业务主键),服务端校验该ID是否已处理,适用于接口调用、消息消费等场景
- 数据库事务 + 乐观锁:通过版本号或状态字段控制并发更新,确保多次更新等同于单次操作,适用于数据库记录更新(如余额扣减、订单状态变更)场景
- 数据库唯一约束:利用数据库唯一索引防止重复数据写入,适用于数据插入场景(如订单创建)
- 分布式锁:通过锁机制保证同一时刻仅有一个请求执行关键操作,适用于高并发下的资源抢夺(如秒杀)场景
- 消息去重:消息队列生产者为每条消息生成唯一的消息ID,消费者在处理消息前,先检查该消息ID是否已经处理过,如果已经处理过则丢弃该消息
如何保证数据一致性,事务消息如何实现?
一条普通的MQ消息,从产生到被消费,大概流程如下:

- 生产者产生消息,发送带MQ服务器
- MQ收到消息后,将消息持久化到存储系统
- MQ服务器返回ACK到生产者
- MQ服务器把消息push给消费者
- 消费者消费完消息,响应ACK
- MQ服务器收到ACK,认为消息消费成功,即在存储中删除消息
举个下订单的例子吧。订单系统创建完订单后,再发送消息给下游系统。如果订单创建成功,然后消息没有成功发送出去,下游系统就无法感知这个事情,出导致数据不一致
如何保证数据一致性呢?可以使用事务消息

- 生产者产生消息,发送一条半事务消息到MQ服务器
- MQ收到消息后,将消息持久化到存储系统,这条消息的状态是待发送状态
- MQ服务器返回ACK确认到生产者,此时MQ不会触发消息推送事件
- 生产者执行本地事务
- 如果本地事务执行成功,即commit执行结果到MQ服务器;如果执行失败,发送rollback
- 如果是正常的commit,MQ服务器更新消息状态为可发送;如果是rollback,即删除消息
- 如果消息状态更新为可发送,则MQ服务器会push消息给消费者。消费者消费完就回ACK
- 如果MQ服务器长时间没有收到生产者的commit或者rollback,它会反查生产者,然后根据查询到的结果执行最终状态
消息队列参考哪种设计模式
是参考了观察者模式和发布订阅模式,两种设计模式思路是一样的,举个例子:
- 观察者模式:某公司给自己员工发月饼发粽子,是由公司的行政部门发送的,这件事不适合交给第三方,原因是“公司”和“员工”是一个整体
- 发布-订阅模式:某公司要给其他人发各种快递,因为“公司”和“其他人”是独立的,其唯一的桥梁是“快递”,所以这件事适合交给第三方快递公司解决
上述过程中,如果公司自己去管理快递的配送,那公司就会变成一个快递公司,业务繁杂难以管理,影响公司自身的主营业务,因此使用何种模式需要考虑什么情况两者是需要耦合的
观察者模式
观察者模式实际上就是一个一对多的关系,在观察者模式中存在一个主题和多个观察者,主题也是被观察者,当主题发布消息时,会通知各个观察者,观察者将会收到最新消息,图解如下:每个观察者首先订阅主题,订阅成功后当主题发送消息时会循环整个观察者列表,逐一发送消息通知
发布订阅模式
发布订阅模式和观察者模式的区别就是发布者和订阅者完全解耦,通过中间的发布订阅中心进行消息通知,发布者并不知道自己发布的消息会通知给谁,因此发布订阅模式有三个重要角色,发布者->发布订阅中心->订阅者
图解如下:当发布者发布消息到发布订阅中心后,发布订阅中心会将消息通知给所有订阅该发布者的订阅者
🚩自己写一个消息队列,如何进行架构设计
可以从这几个角度去思考:

- 首先是消息队列的整体流程,producer发送消息给broker,broker存储好,broker再发送给consumer消费,consumer回复消费确认等
- producer发送消息给broker,broker发消息给consumer消费,那就需要两次RPC了,RPC如何设计呢?可以参考开源框架Dubbo,你可以说说服务发现、序列化协议等等
- broker考虑如何持久化呢,是放文件系统还是数据库呢,会不会消息堆积呢,消息堆积如何处理呢
- 消费关系如何保存呢?点对点还是广播方式呢?广播关系又是如何维护呢?zk还是config server
- 消息可靠性如何保证呢?如果消息重复了,如何幂等处理呢?
- 消息队列的高可用如何设计呢?可以参考Kafka的高可用保障机制。多副本 -> leader & follower -> broker 挂了重新选举 leader 即可对外服务
- 消息事务特性,与本地业务同个事务,本地消息落库;消息投递到服务端,本地才删除;定时任务扫描本地消息库,补偿发送
- MQ得伸缩性和可扩展性,如果消息积压或者资源不够时,如何支持快速扩容,提高吞吐?可以参照一下 Kafka 的设计理念,broker -> topic -> partition,每个 partition 放一个机器,就存一部分数据。如果现在资源不够了,简单啊,给 topic 增加 partition,然后做数据迁移,增加机器,不就可以存放更多数据,提供更高的吞吐量了
======= RocketMQ =======
为什么选择 RocketMQ
选择RocketMQ的原因是:
- 开发语言优势。RocketMQ 使用 Java 语言开发,比起使用 Erlang 开发的 RabbitMQ 来说,有着更容易上手的阅读体验和受众。在遇到 RocketMQ 较为底层的问题时,大部分熟悉 Java 的同学都可以深入阅读其源码,分析、排查问题
- 社区氛围活跃。RocketMQ 是阿里巴巴开源且内部在大量使用的消息队列,说明 RocketMQ 是的确经得起残酷的生产环境考验的,并且能够针对线上环境复杂的需求场景提供相应的解决方案
- 特性丰富。根据 RocketMQ 官方文档的列举,其高级特性达到了
12 种
,例如顺序消息、事务消息、消息过滤、定时消息等。顺序消息、事务消息、消息过滤、定时消息。RocketMQ 丰富的特性,能够为我们在复杂的业务场景下尽可能多地提供思路及解决方案
RocketMQ与Kafka的区别及其技术选型指南
Kafka的优缺点
优点:
- Kafka拥有卓越的高吞吐量,即使在普通配置(如4CPU、8G内存)的机器上,单台机器也能承受高达十几万的QPS,这一点十分显著
- Kafka支持集群部署,即使部分机器发生故障,也不会影响其正常使用
缺点:
- Kafka存在数据丢失的风险,因为它在接收到消息时,并非直接写入物理磁盘,而是先写入磁盘缓冲区。此外,Kafka功能相对单一,主要支持消息的收发,缺乏高级功能,适用场景受限
RocketMQ的优缺点
优点:
- RocketMQ支持丰富的功能,如延迟队列、消息事务等,吞吐量也较高,单机吞吐量可达10万级,支持大规模集群部署,线性扩展方便,采用Java语言开发,满足了国内大多数公司的技术栈需求
缺点:
- 相较于Kafka,RocketMQ的性能略逊一筹,因为Kafka采用了sendfile的零拷贝技术,而RocketMQ主要使用mmap+write来实现零拷贝
如何选择
单一需求场景:
- 如果业务仅涉及消息的收发,且可以容忍小部分数据丢失,同时要求极高的吞吐量和性能,直接选择Kafka即可。例如,公司需要收集和传输用户行为日志以及其他相关日志的处理,Kafka中间件是合适的选择
复杂业务需求:
- 如果公司需要通过MQ实现诸如延迟队列、消息事务等业务需求,且技术栈主要为Java语言,则直接选择RocketMQ将更为便捷,这样可以节省大量时间和精力
RocketMQ延时消息的底层原理

Broker在接收到延时消息时,会将该消息存入延时Topic的队列中。随后,ScheduleMessageService会为每个队列执行定时任务,不断检查队列中哪些消息已达到设定时间,并将这些消息转发至消息的原始Topic。这些消息随后会被各自的Producer消费
🚩RocektMQ怎么处理分布式事务
RocketMQ是一种最终一致性的分布式事务,意味着它确保消息的最终一致性,而非像2PC、3PC、TCC那样的强一致性分布式事务
假设A向B转账100元,而它们不在同一服务上。目标是在A的账户中减去100元,在B的账户中增加100元
实际情况可能包含以下四种:
- 1)A账户减100元(成功),B账户加100元(成功)
- 2)A账户减100元(失败),B账户加100元(失败)
- 3)A账户减100元(成功),B账户加100元(失败)
- 4)A账户减100元(失败),B账户加100元(成功)
第1和第2种情况能够保证事务的一致性,但第3和第4种情况则无法保证事务的一致性

分布式事务流程如下:
- 1、A服务首先发送一个Half Message(指暂不能被Consumer消费的消息。Producer 已将消息成功发送至Broker端,但该消息被标记为暂不能投递状态,此类状态下的消息称为半消息。需Producer进行二次确认后,Consumer才能消费它)至Broker端,消息中包含B服务即将+100元的信息
- 2、当A服务确认Half Message发送成功后,随即开始执行第3步,即本地事务
- 3、执行本地事务(可能出现以下三种情况:1、执行成功。2、执行失败。3、因网络等原因导致无响应)
- 4.1)、若本地事务执行成功,则Producer向Broker服务器发送Commit,B服务即可消费该消息
- 4.2)、若本地事务执行失败,则Producer向Broker服务器发送Rollback,从而直接删除上述半消息
- 4.3)、若因网络等原因长时间无响应,将触发RocketMQ的回调接口,进行事务回查
从上述流程可知,只有A服务本地事务执行成功,B服务才能消费该消息
那么,若A账户减100(成功),B账户加100(失败),此时B服务失败如何处理?
若B服务最终执行失败,几乎可以确定是由于代码问题引起的异常。因为消费端RocketMQ具有重试机制,除非是代码问题,否则一般重试几次即可成功
若因代码原因导致多次重试失败,无需担心,将异常记录下来,由人工处理。人工处理完毕后,即可使事务达到最终的一致性
RocketMQ消息顺序保证机制
消息的有序性指的是消息的消费顺序应严格保持与发送顺序一致。例如,一个订单会产生3条消息,分别是订单创建、订单付款和订单完成。在消息消费过程中,同一条订单的消息应严格按此顺序进行消费,否则可能导致业务混乱。同时,不同订单的消息可以并发消费,例如,可以先处理第三个订单的付款,再处理第二个订单的创建
RocketMQ采用局部顺序一致性机制,实现了单个队列中消息的严格有序性。这意味着,若要保证顺序消费,必须将一组消息发送到同一个队列中,随后由消费者进行有序消费
RocketMQ推荐的顺序消费解决方案如下:根据业务需求划分不同的队列,将需要顺序消费的消息发送至同一队列。不同业务间的消息仍采用并发消费。这种方式既满足了顺序消费的需求,又提高了消息处理速度,在一定程度上避免了消息堆积问题
RocketMQ顺序消息的原理包括:
- Producer(生产者) 将需要保证顺序的一批消息发送到同一个
MessageQueue
- Consumer(消费者) 通过加锁机制确保消息消费的顺序性,Broker端通过对
MessageQueue
进行加锁,确保同一个MessageQueue
只能被同一个 Consumer 进行消费
======= RabbitMQ =======
RabbitMQ 特性
RabbitMQ 以 可靠性、灵活性 和 易扩展性 为核心优势,适合需要稳定消息传递的复杂系统。其丰富的插件和协议支持使其在微服务、IoT、金融等领域广泛应用,比较核心的特性有如下:
- 持久化机制:RabbitMQ 支持消息、队列和交换器的持久化。当启用持久化时,消息会被写入磁盘,即使 RabbitMQ 服务器重启,消息也不会丢失。例如,在声明队列时可以设置
durable
参数为true
来实现队列的持久化:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明一个持久化队列
channel.queue_declare(queue='durable_queue', durable=True)
- 消息确认机制:提供了生产者确认和消费者确认机制。生产者可以设置
confirm
模式,当消息成功到达 RabbitMQ 服务器时,会收到确认消息;消费者在处理完消息后,可以向 RabbitMQ 发送确认信号,告知服务器该消息已被成功处理,服务器才会将消息从队列中删除 - 镜像队列:支持创建镜像队列,将队列的内容复制到多个节点上,提高消息的可用性和可靠性。当一个节点出现故障时,其他节点仍然可以提供服务,确保消息不会丢失
- 多种交换器类型:RabbitMQ 提供了多种类型的交换器,如直连交换器(Direct Exchange)、扇形交换器(Fanout Exchange)、主题交换器(Topic Exchange)和头部交换器(Headers Exchange)。不同类型的交换器根据不同的规则将消息路由到队列中。例如,扇形交换器会将接收到的消息广播到所有绑定的队列中;主题交换器则根据消息的路由键和绑定键的匹配规则进行路由
RabbitMQ 底层架构

以下是 RabbitMQ 的一些核心架构组件和特性:
- 核心组件:生产者负责发送消息到 RabbitMQ、消费者负责从 RabbitMQ 接收并处理消息、RabbitMQ 本身负责存储和转发消息
- 交换机:交换机接收来自生产者的消息,并根据 routing key 和绑定规则将消息路由到一个或多个队列
- 持久化:RabbitMQ 支持消息的持久化,可以将消息保存在磁盘上,以确保在 RabbitMQ 重启后消息不丢失,队列也可以设置为持久化,以保证其结构在重启后不会丢失
- 确认机制:为了确保消息可靠送达,RabbitMQ 使用确认机制,费者在处理完消息后发送确认给 RabbitMQ,未确认的消息会重新入队
- 高可用性:RabbitMQ 提供了集群模式,可以将多个 RabbitMQ 实例组成一个集群,以提高可用性和负载均衡。通过镜像队列,可以在多个节点上复制同一队列的内容,以防止单点故障
RabbitMQ 如何保证消息不丢失
回答:
MySQL与Redis的数据双写一致性是通过RabbitMQ实现同步的。此过程中,要求保证消息的高可用性,确保消息不丢失。主要从以下三个层面进行考虑:
- 开启生产者确认机制:确保生产者的消息能够到达队列。若出现错误,可先记录到日志中,随后进行数据修复
- 开启持久化功能:确保消息在未消费前不会从队列中丢失。需对交换机、队列和消息进行持久化处理
- 开启消费者确认机制为auto:由Spring确认消息处理成功后完成ack。同时,设置一定的重试次数,当时设定为3次。若重试3次后仍未收到消息,则将失败后的消息投递至异常交换机,由人工进行处理

生产者确认机制
RabbitMQ 提供了 publisher confirm
机制,以避免消息在发送至消息队列(MQ)过程中丢失。消息送达 MQ 后,会返回一个结果给发送者,以此表示消息是否已成功处理

消息发送失败后,应采取以下措施:
- 回调方法即时重发
- 记录日志
- 保存至数据库,并设置定时重发机制,成功发送后,即刻从数据库中删除相关数据
消息持久化
MQ默认采用内存存储消息,开启持久化功能可确保缓存于MQ中的消息不会丢失
交换机持久化
@Bean
public DirectExchange simpleExchange() {
// 三个参数:交换机名称、是否持久化、当没有 queue 与其绑定时是否自动删除
return new DirectExchange("simple.direct", true, false);
}
队列持久化
@Bean
public Queue simpleQueue() {
// 使用 QueueBuilder 构建队列,durable 表示持久化
return QueueBuilder.durable("simple.queue").build();
}
消息持久化
在Spring AMQP中,消息默认为持久化状态。用户可通过MessageProperties
中的DeliveryMode
属性进行指定:
Message msg = MessageBuilder
.withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化
.build();
消费者确认
RabbitMQ支持消费者确认机制,即消费者在处理完消息后,可以向MQ发送ack回执。MQ在收到ack回执后,才会删除该消息。而Spring AMQP允许配置三种确认模式:
- manual:手动ack。需要在业务代码结束后,调用API发送ack
- auto:自动ack。由Spring监测listener代码是否出现异常,若无异常则返回ack;若抛出异常则返回nack
- none:关闭ack。MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
Spring框架的retry机制可被用于消费者异常处理。通过本地重试,可设定重试次数。一旦达到重试次数上限,若消息仍未成功处理,则将该消息投递至异常交换机,由人工进行后续处理
RabbitMQ 消息的重复消费问题如何解决
消费者设置了自动确认机制,当服务尚未来得及向MQ确认时,服务发生宕机。导致服务重启后,又消费了一次消息,从而出现了重复消费的情况。这是因为当时处理的支付(订单业务唯一标识)具有业务唯一性,在处理消息时,会先到数据库查询该数据是否存在。若不存在,则表示未曾处理过,此时可以正常处理该消息。若数据已存在,则表明消息已被重复消费,无需再次消费
其他解决方案:
实际上,这是一个典型的幂等问题。例如,可以使用redis分布式锁或数据库锁等手段来解决
发生网络抖动或者消费者宕机等情况,由于设置了重试机制,导致重复消费

解决方案:
每条消息设置一个唯一的标识 d
幂等方案:【分布式锁、数据库锁(悲观锁、乐观锁)】
RabbitMQ 死信交换机/RabbitMQ 延迟队列
当时的项目中,有一个业务环节需要运用延迟队列,该功能是通过RabbitMQ来实现的
延迟队列的原理是借助死信交换机和TTL(消息存活时间)来实现的。若消息在超时后未被消费,便会转化为死信。在RabbitMQ中,一旦消息变为死信,可以将其绑定至一个死信交换机。在死信交换机上,可以进一步绑定其他队列。发送消息时,可根据需求指定TTL时间,从而实现延迟队列的功能
此外,RabbitMQ还提供了一种实现延迟队列的方法。即在RabbitMQ中安装一个死信插件,这样操作更为便捷。声明交换机时,只需指定其为死信交换机,发送消息时直接指定超时时间即可。与使用死信交换机加TTL相比,这种方法简化了操作步骤
延迟队列:进入队列的消息将被延迟处理,形成的一种队列
应用场景:超时订单处理、限时优惠活动、定时信息发布
延迟队列 = 死信交换机 + TTL(生存时间)
死信交换机
当一个队列中的消息满足以下任一条件时,可被认定为死信(Dead Letter):
- 消费者通过
basic.reject
或basic.nack
操作声明消费失败,且消息的requeue
参数设置为false
- 消息为过期消息,因超时而未被消费
- 待投递的队列消息数量达到上限,最早的消息可能成为死信
若队列配置了dead-letter-exchange
属性,并指定了一个交换机,则队列中的死信将被投递至该交换机。此交换机被称为死信交换机(Dead Letter Exchange,简称DLX)

TTL
TTL,即Time-To-Live。若队列中的消息在TTL到期后仍未被消费,则该消息将变为死信。TTL超时分为以下两种情况:
- 消息所在的队列设置了存活时间
- 消息本身设置了存活时间
存活时间以最短的为主(队列的和消息本身的)
延迟队列插件
DelayExchange插件,需安装于RabbitMQ中
RabbitMQ拥有官方插件社区,其地址为:https://www.rabbitmq.com/community-plugins.html
DelayExchange 的本质仍是官方的三种交换机,只是增添了延迟功能。因此,在使用时仅需声明一个交换机,交换机的类型可以是任意类型,随后将 delayed
属性设置为 true
即可
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayQueue(String message) {
log.info("接收到延迟消息: {}", message);
}
Message message = MessageBuilder
.withBody("hello, delayed message".getBytes(StandardCharsets.UTF_8))
.setHeader("x-delay", 10000) // 设置延迟时间为10秒
.build();
// 消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 发送消息
rabbitTemplate.convertAndSend(
"delay.direct", // 交换机名称
"delay", // 路由键
message,
correlationData
);
RabbitMQ 消息堆积怎么解决
第一:提升消费者消费能力,可采取多线程消费任务
第二:吸引更多消费者,加快消费速度,采用工作队列模式,配置多个消费者共同消费同一队列中的消息
第三:扩充队列容量,提升堆积上限,可利用RabbitMQ的惰性队列。惰性队列的优势主要包括:
① 接收到消息后直接存入磁盘而非内存;
② 消费者消费消息时才会从磁盘中读取并加载到内存;
③ 支持存储数百万条的消息
当生产者发送消息的速度超过消费者处理消息的速度,便会引发队列中消息的累积,直至队列存储空间达到上限。此后,发送的消息将变为死信,可能被丢弃,此现象即称为消息堆积问题
解决消息堆积可从以下三个方面入手:
- 增加更多消费者,以提升消费速度
- 在消费者内开启线程池,以加快消息处理速度
- 扩大队列容积,提高消息堆积的上限
惰性队列
特征:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
惰性队列声明:
@Bean
public Queue lazyQueue() {
return QueueBuilder
.durable("lazy.queue")
.lazy() // 开启 x-queue-mode 为 lazy
.build();
}
监听惰性队列:
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg) {
log.info("接收到 lazy.queue 的消息: {}", msg);
}
RabbitMQ 高可用机制
当时项目在生产环境下,使用的集群为镜像模式集群,由3台机器搭建而成。
镜像队列结构采用一主多从模式,所有操作均由主节点完成,随后同步至镜像节点。若主节点发生宕机,镜像节点将接替成为新的主节点。然而,若主从同步尚未完成而主节点已宕机,则可能出现数据丢失
那若出现数据丢失该如何解决呢?
可以采用仲裁队列。与镜像队列类似,仲裁队列同样采用主从模式,支持主从数据同步。主从同步基于Raft协议,确保强一致性
并且,使用仲裁队列非常简便,无需额外配置。只需在声明队列时指定其为仲裁队列即可
普通集群(不能做到高可用)
普通集群,亦称标准集群(classic cluster),具有以下特性:
- 在集群各节点间共享部分数据,包括:交换机、队列元信息。队列中的消息不在此范围内
- 访问集群任一节点时,若队列不在该节点,数据将从数据所在节点传递至当前节点,并完成返回
- 若队列所在节点发生故障,队列中的消息将可能丢失

镜像集群
镜像集群:本质为主从模式,具备以下特征:
交换机、队列及其中的消息会在各个队列的镜像节点间进行同步备份
创建队列的节点被称为该队列的主节点,而备份至的其他节点则称为该队列的镜像节点
一个队列的主节点可能是另一个队列的镜像节点
所有操作都是主节点完成,然后同步给镜像节点
主节点宕机后,镜像节点会替代成新的主节点

仲裁队列
仲裁队列:仲裁队列是自3.8版本起引入的新功能,旨在替代镜像队列,并具备以下特性:
- 与镜像队列相同,均采用主从模式,并支持主从数据同步
- 配置极其简单,无需复杂设置
- 主从同步采用Raft协议,确保数据强一致性
@Bean
public Queue quorumQueue() {
return QueueBuilder
.durable("quorum.queue") // 持久化
.quorum() // 仲裁队列
.build();
}
======= Kafka =======
特点
- 高吞吐量、低延迟:Kafka每秒可处理数十万条消息,其延迟最低可达几毫秒。每个topic可分多个partition,consumer group 对partition进行consume操作
- 可扩展性:Kafka集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,并支持数据备份,以防数据丢失
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
- 高并发:支持数千个客户端同时读写
为什么Kafka速度快
- 顺序写入优化:Kafka将消息顺序写入磁盘,减少了磁盘的寻道时间。这种方式比随机写入更高效,因为磁盘读写头在顺序写入时只需移动一次
- 批量处理技术:Kafka支持批量发送消息,生产者在发送消息时可以等待直到有足够的数据积累到一定量,然后再发送。这种方法减少了网络开销和磁盘I/O操作的次数,从而提高了吞吐量
- 零拷贝技术:Kafka使用零拷贝技术,可以直接将数据从磁盘发送到网络套接字,避免了在用户空间和内核空间之间的多次数据拷贝。这大幅降低了CPU和内存的负载,提高了数据传输效率
- 压缩技术:Kafka支持对消息进行压缩,这不仅减少了网络传输的数据量,还提高了整体的吞吐量
🚩Kafka模型简介,Kafka是推送还是拉取?
消费者模型
消息由生产者发送至Kafka集群后,随即被消费者消费。通常,我们的消费模型分为两种:推送模型(Push)和拉取模型(Pull)
推送模型(Push)
- 基于推送模型的消息系统,消息代理会记录消费者的消费状态
- 消息代理在将消息推送到消费者后,会标记该消息为已消费,但这种方法无法有效保证消息已被处理
- 若要确保消息被处理,消息代理在发送完消息后,需将状态设置为“已发送”,只有当收到消费者的确认请求后,才将其更新为“已消费”。这种方式需要代理记录所有消费状态,但显然不切实际
缺点:
- Push模式难以适应消费速率不同的消费者
- 因为消息发送速率由Broker决定,Push模式的目标是尽可能快速传递消息,但这样很容易导致Consumer来不及处理消息,典型表现就是拒绝服务以及网络拥塞
拉取模型(Pull)
Kafka采用拉取模型,消费者自行记录消费状态,每个消费者独立、顺序地拉取每个分区的消息

说明:
- 两个消费者(不同消费者组)拉取同一个主题的消息,消费者A的消费进度是3,消费者B的消费进度是6
- 消费者拉取的最大上限由最高水位(Watermark)控制,生产者最新写入的消息如果尚未达到备份数量,对消费者不可见
- 由消费者控制偏移量的优点是:消费者可以按任意顺序消费消息。例如,消费者可以重置到旧的偏移量,重新处理之前已消费的消息;或者直接跳到最近的位置,从当前时刻开始消费
消费者组
Kafka消费者以Consumer Group的方式协同工作,一个或多个消费者组成一个组,共同消费一个Topic。每个分区在同一时间只能由Group中的一个消费者读取,但多个Group可以同时消费该分区

上图中,有一个由三个消费者组成的Group,其中一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也称作该消费者是该分区的拥有者
优点:
- 消费者可以通过水平扩展同时读取大量消息
- 如果一个消费者失败,其他Group成员将自动负载均衡读取之前失败的消费者读取的分区
消费方式
Kafka消费者采用Pull(拉)模式从Broker中读取数据
Pull的优点:
- Pull模式可以根据Consumer的消费能力以适当的速率消费消息
缺点:
- 如果Kafka没有数据,消费者可能会陷入循环,持续返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数Timeout,如果当前没有数据可供消费,Consumer会等待一段时间后再返回,这段时长即为Timeout
Kafka 如何保证消息不丢失
Kafka是如何保证消息不丢失
嗯,这个保证机制很多,在发送消息到消费者接收消息,在每个阶段都有可能会丢失消息,所以解决的话也是从多个方面考虑
生产者发送消息时:
- 可以使用异步回调发送,如果消息发送失败,可以通过回调获取失败后的消息信息,考虑重试或记录日志,后续再做补偿
- 在生产者这边可以设置消息重试机制,有时候由于网络抖动导致发送不成功,可以使用重试机制解决
Broker存储消息时:
- 通过Kafka的复制机制确保消息不丢失。在生产者发送消息时,可以设置
acks
参数为all
,这样当生产者发送消息到分区后,不仅leader分区保存确认,follower分区也会保存确认,只有当所有副本都保存确认后才算成功发送消息,这样设置可以在很大程度上保证消息不会在Broker丢失
- 通过Kafka的复制机制确保消息不丢失。在生产者发送消息时,可以设置
消费者接收消息时:
- Kafka消费消息是按照
offset
进行标记消费的。消费者默认自动按期提交已消费的偏移量,默认是每隔5秒提交一次。如果出现重平衡的情况,可能会导致重复消费或丢失数据 - 一般会禁用自动提交偏移量,改为手动提交。当消费成功后再报告给Broker消费的位置,这样可以避免消息丢失和重复消费
- Kafka消费消息是按照
Kafka中消息的重复消费问题如何解决
Kafka消费消息是按照offset
进行标记消费的。消费者默认自动按期提交已消费的偏移量,默认是每隔5秒提交一次。如果出现重平衡的情况,可能会导致重复消费或丢失数据。一般会禁用自动提交偏移量,改为手动提交。当消费成功后再报告给Broker消费的位置,这样就可以避免消息丢失和重复消费
使用Kafka进行消息收发时,可能存在消息丢失的情况。Kafka针对以下几种丢失场景提供了相应的解决方案:
- 生产者发送消息至Broker丢失
- 消息在Broker中存储丢失
- 消费者从Broker接收消息丢失

生产者发送消息到 Brocker 丢失
设置异步发送
// 同步发送
RecordMetadata recordMetadata = kafkaProducer.send(record).get();
// 异步发送
kafkaProducer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
System.out.println("消息发送失败 | 记录日志");
}
long offset = recordMetadata.offset();
int partition = recordMetadata.partition();
String topic = recordMetadata.topic();
}
});
消息重试
prop.put(ProducerConfig.RETRIES_CONFIG, 10);
消息在 Brocker 中存储丢失
发送确认机制 acks

确认机制 | 说明 |
---|---|
acks=0 | 生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快 |
acks=1(默认值) | 只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应 |
acks=all | 只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应 |
消费者从 Brocker 接收消息丢失
- Kafka中的分区机制指的是将每个主题划分为多个分区(Partition)
- 主题分区中的消息只能由消费者组中的唯一一个消费者处理,不同分区分配给不同的消费者(同一消费者组)


消费者默认行为
消费者默认是自动按期提交已经消费的偏移量,默认是每隔 5 秒 提交一次
重平衡问题
如果出现重平衡的情况,可能会导致重复消费或丢失数据
解决方案
禁用自动提交偏移量,改为手动提交:
- 同步提交
- 异步提交
- 同步 + 异步组合提交
消费者组示例
消费者组中包含一个消费者实例 consumer1
,它从多个分区(P1、P2、P3)中接收消息:
分区 | 消息偏移量 | 消费情况 |
---|---|---|
P1 | 0-11 | 已消费 |
P2 | 0-11 | 已消费 |
P3 | 0-11 | 已消费 |
消费者组名称为 T1
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
System.out.println(record.key());
}
consumer.commitAsync(); // 异步提交偏移量
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("记录错误信息: " + e);
} finally {
try {
consumer.commitSync(); // 同步提交偏移量
} finally {
consumer.close();
}
}
Kafka 如何保证消费的顺序性
Kafka默认存储和消费消息时,无法保证顺序性,因为一个Topic中的数据可能分散存储在不同的分区中。每个分区都有按照顺序存储的偏移量。若消费者关联了多个分区,则无法保证消息的顺序性
若存在此类需求,可以通过以下两种方式之一实现:将所有消息存储在同一个分区下
方式一:在发送消息时指定分区号
方式二:在发送消息时,根据相同的业务逻辑设置相同的Key。默认情况下,分区是通过Key的hashcode值来选择的,若hash值相同,则分区也相同
Topic分区中,消息仅由消费者组内的单一消费者处理,因此消息必定遵循先后顺序进行处理。然而,这仅确保Topic单个分区的顺序处理,无法保证跨分区的消息顺序。因此,若需对Topic所有消息进行顺序处理,则应仅提供一个分区
Kafka为什么一个分区只能由消费者组的一个消费者消费?这样设计的意义是什么?
同一时刻,一条消息只能被组中的一个消费者实例消费

如果两个消费者负责同一个分区,那么就意味着两个消费者同时读取分区的消息,由于消费者自己可以控制读取消息的offset,就有可能C1才读到2,而C1读到1,C1还没处理完,C2已经读到3了,则会造成很多浪费,因为这就相当于多线程读取同一个消息,会造成消息处理的重复,且不能保证消息的顺序
Kafka 和 RocketMQ 消息确认机制有什么不同?
Kafka的消息确认机制有三种:0,1,-1:
- ACK=0:这是最不可靠的模式。生产者在发送消息后不会等待来自服务器的确认。这意味着消息可能会在发送之后丢失,而生产者将无法知道它是否成功到达服务器
- ACK=1:这是默认模式,也是一种折衷方式。在这种模式下,生产者会在消息发送后等待来自分区领导者(leader)的确认,但不会等待所有副本(replicas)的确认。这意味着只要消息被写入分区领导者,生产者就会收到确认。如果分区领导者成功写入消息,但在同步到所有副本之前宕机,消息可能会丢失
- ACK=-1:这是最可靠的模式。在这种模式下,生产者会在消息发送后等待所有副本的确认。只有在所有副本都成功写入消息后,生产者才会收到确认。这确保了消息的可靠性,但会导致更长的延迟
RocketMQ 提供了三种消息发送方式:同步发送、异步发送和单向发送:
- 同步发送:是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式。应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等
- 异步发送:是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式,但是需要实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。适用于链路耗时较长,对响应时间较为敏感的业务场景,例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等
- 单向发送:发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集
Kafka 和 RocketMQ 的 broker 架构有什么区别
- Kafka 的 broker 架构:Kafka 的 broker 架构采用了分布式的设计,每个 Kafka broker 是一个独立的服务实例,负责存储和处理一部分消息数据。Kafka 的 topic 被分区存储在不同的 broker 上,实现了水平扩展和高可用性
- RocketMQ 的 broker 架构:RocketMQ 的 broker 架构也是分布式的,但是每个 RocketMQ broker 有主从之分,一个主节点和多个从节点组成一个 broker 集群。主节点负责消息的写入和消费者的拉取,从节点负责消息的复制和消费者的负载均衡,提高了消息的可靠性和可用性
Kafka 的高可用机制
面试官:Kafka的高可用机制有了解过嘛
候选人:
嗯,主要是有两个层面,第一个是集群,第二个是提供了复制机制
Kafka集群指的是由多个broker实例组成,即使某一台宕机,也不耽误其他broker继续对外提供服务
复制机制是可以保证kafka的高可用的,一个topic有多个分区,每个分区有多个副本,有一个leader,其余的是follower,副本存储在不同的broker中;所有的分区副本的内容是相同的,如果leader发生故障时,会自动将其中一个follower提升为leader,保证了系统的容错性、高可用性
面试官:解释一下复制机制中的ISR
候选人:
ISR的意思是in-sync replica,就是需要同步复制保存的follower
其中分区副本有很多的follower,分为了两类,一个是ISR,与leader副本同步保存数据,另外一个普通的副本,是异步同步数据,当leader挂掉之后,会优先从ISR副本列表中选取一个作为leader,因为ISR是同步保存数据,数据更加的完整一些,所以优先选择ISR副本列表
集群模式

Kafka的服务器端由被称为Broker的服务进程构成,即一个Kafka集群由多个Broker组成
这样如果集群中某一台机器宕机,其他机器上的Broker也依然能够对外提供服务。这其实就是Kafka提供高可用的手段之一
分区备份机制

一个topic拥有多个分区,每个分区包含多个副本。在这些副本中,有一个被选为领导者(Leader),其余的则是追随者(Follower)。副本分别存储在不同的代理(Broker)上
所有分区副本的内容均保持一致。一旦领导者发生故障,系统会自动将其中一个追随者提升为新的领导者

ISR(同步副本)
需同步复制保存的follower
若leader失效后,需选出新的leader,选举原则如下:
首先:选举时优先从ISR中选定,因该列表中follower的数据与leader同步
其次:若ISR列表中的follower均不可行,则只能从其他follower中选取
一般只设置 1 个 ISR 副本,保证性能
Kafka 数据清理机制
Kafka 文件存储机制
topic 是逻辑概念,真正存储数据的是分区

数据清理机制
日志的清理策略有两个:
根据消息的保留时间:
当消息在 Kafka 中保存的时间超过了指定的时间,就会触发清理过程
配置示例:
# The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168
根据 topic 存储的数据大小:
当 topic 所占的日志文件大小大于一定的阈值,则开始删除最久的消息。需手动开启
配置示例:
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining segments drop below log.retention.bytes. Functions independently of log.retention.hours. log.retention.bytes=1073741824
Kafka 实现高性能的设计
Kafka的高性能是多方面协同作用的结果,涉及宏观架构、分布式存储、ISR数据同步以及高效利用磁盘、操作系统特性等方面。其主要体现如下几点:
- 消息分区:不受单台服务器的限制,能够处理更多数据
- 顺序读写:磁盘顺序读写,提升读写效率
- 页缓存:将磁盘中的数据缓存至内存,将磁盘访问转化为内存访问
- 零拷贝:减少上下文切换及数据拷贝
- 消息压缩:减少磁盘I/O和网络I/O
- 分批发送:将消息打包批量发送,减少网络开销
- 消息分区:不受单台服务器的限制,可以处理更多的数据
- 顺序读写:磁盘顺序读写,提升读写效率
- 页缓存:把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问
- 零拷贝:减少上下文切换及数据拷贝
- 消息压缩:减少磁盘IO和网络IO
- 分批发发送:将消息打包批量发送,减少网络开销
零拷贝
从用户空间到内核空间 0 拷贝(减少内核态切换)
普通情况:4 次拷贝

优化后:2 次拷贝
