八股-消息队列
消息队列相关面试题
MQ 的作用:解耦、异步、削峰
RabbitMQ 如何保证消息不丢失
回答:
MySQL与Redis的数据双写一致性是通过RabbitMQ实现同步的。此过程中,要求保证消息的高可用性,确保消息不丢失。主要从以下三个层面进行考虑:
- 开启生产者确认机制:确保生产者的消息能够到达队列。若出现错误,可先记录到日志中,随后进行数据修复
- 开启持久化功能:确保消息在未消费前不会从队列中丢失。需对交换机、队列和消息进行持久化处理
- 开启消费者确认机制为auto:由Spring确认消息处理成功后完成ack。同时,设置一定的重试次数,我们当时设定为3次。若重试3次后仍未收到消息,则将失败后的消息投递至异常交换机,由人工进行处理

生产者确认机制
RabbitMQ 提供了 publisher confirm
机制,以避免消息在发送至消息队列(MQ)过程中丢失。消息送达 MQ 后,会返回一个结果给发送者,以此表示消息是否已成功处理

消息发送失败后,应采取以下措施:
- 回调方法即时重发
- 记录日志
- 保存至数据库,并设置定时重发机制,成功发送后,即刻从数据库中删除相关数据
消息持久化
MQ默认采用内存存储消息,开启持久化功能可确保缓存于MQ中的消息不会丢失
交换机持久化
@Bean
public DirectExchange simpleExchange() {
// 三个参数:交换机名称、是否持久化、当没有 queue 与其绑定时是否自动删除
return new DirectExchange("simple.direct", true, false);
}
队列持久化
@Bean
public Queue simpleQueue() {
// 使用 QueueBuilder 构建队列,durable 表示持久化
return QueueBuilder.durable("simple.queue").build();
}
消息持久化
在Spring AMQP中,消息默认为持久化状态。用户可通过MessageProperties
中的DeliveryMode
属性进行指定:
Message msg = MessageBuilder
.withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化
.build();
消费者确认
RabbitMQ支持消费者确认机制,即消费者在处理完消息后,可以向MQ发送ack回执。MQ在收到ack回执后,才会删除该消息。而Spring AMQP允许配置三种确认模式:
- manual:手动ack。需要在业务代码结束后,调用API发送ack
- auto:自动ack。由Spring监测listener代码是否出现异常,若无异常则返回ack;若抛出异常则返回nack
- none:关闭ack。MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
Spring框架的retry机制可被用于消费者异常处理。通过本地重试,可设定重试次数。一旦达到重试次数上限,若消息仍未成功处理,则将该消息投递至异常交换机,由人工进行后续处理
RabbitMQ 消息的重复消费问题如何解决
消费者设置了自动确认机制,当服务尚未来得及向MQ确认时,服务发生宕机。导致服务重启后,又消费了一次消息,从而出现了重复消费的情况。这是因为当时处理的支付(订单业务唯一标识)具有业务唯一性,我们在处理消息时,会先到数据库查询该数据是否存在。若不存在,则表示未曾处理过,此时可以正常处理该消息。若数据已存在,则表明消息已被重复消费,我们无需再次消费
其他解决方案:
实际上,这是一个典型的幂等问题。例如,可以使用redis分布式锁或数据库锁等手段来解决
发生网络抖动或者消费者宕机等情况,由于设置了重试机制,导致重复消费

解决方案:
每条消息设置一个唯一的标识 d
幂等方案:【分布式锁、数据库锁(悲观锁、乐观锁)】
RabbitMQ 死信交换机/RabbitMQ 延迟队列
当时的项目中,有一个业务环节需要运用延迟队列,该功能是通过RabbitMQ来实现的
延迟队列的原理是借助死信交换机和TTL(消息存活时间)来实现的。若消息在超时后未被消费,便会转化为死信。在RabbitMQ中,一旦消息变为死信,可以将其绑定至一个死信交换机。在死信交换机上,可以进一步绑定其他队列。发送消息时,可根据需求指定TTL时间,从而实现延迟队列的功能
此外,RabbitMQ还提供了一种实现延迟队列的方法。即在RabbitMQ中安装一个死信插件,这样操作更为便捷。声明交换机时,只需指定其为死信交换机,发送消息时直接指定超时时间即可。与使用死信交换机加TTL相比,这种方法简化了操作步骤
延迟队列:进入队列的消息将被延迟处理,形成的一种队列
应用场景:超时订单处理、限时优惠活动、定时信息发布
延迟队列 = 死信交换机 + TTL(生存时间)
死信交换机
当一个队列中的消息满足以下任一条件时,可被认定为死信(Dead Letter):
- 消费者通过
basic.reject
或basic.nack
操作声明消费失败,且消息的requeue
参数设置为false
- 消息为过期消息,因超时而未被消费
- 待投递的队列消息数量达到上限,最早的消息可能成为死信
若队列配置了dead-letter-exchange
属性,并指定了一个交换机,则队列中的死信将被投递至该交换机。此交换机被称为死信交换机(Dead Letter Exchange,简称DLX)

TTL
TTL,即Time-To-Live。若队列中的消息在TTL到期后仍未被消费,则该消息将变为死信。TTL超时分为以下两种情况:
- 消息所在的队列设置了存活时间
- 消息本身设置了存活时间
存活时间以最短的为主(队列的和消息本身的)
延迟队列插件
DelayExchange插件,需安装于RabbitMQ中
RabbitMQ拥有官方插件社区,其地址为:https://www.rabbitmq.com/community-plugins.html
DelayExchange 的本质仍是官方的三种交换机,只是增添了延迟功能。因此,在使用时仅需声明一个交换机,交换机的类型可以是任意类型,随后将 delayed
属性设置为 true
即可
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayQueue(String message) {
log.info("接收到延迟消息: {}", message);
}
Message message = MessageBuilder
.withBody("hello, delayed message".getBytes(StandardCharsets.UTF_8))
.setHeader("x-delay", 10000) // 设置延迟时间为10秒
.build();
// 消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 发送消息
rabbitTemplate.convertAndSend(
"delay.direct", // 交换机名称
"delay", // 路由键
message,
correlationData
);
RabbitMQ 消息堆积怎么解决
第一:提升消费者消费能力,可采取多线程消费任务
第二:吸引更多消费者,加快消费速度,采用工作队列模式,配置多个消费者共同消费同一队列中的消息
第三:扩充队列容量,提升堆积上限,可利用RabbitMQ的惰性队列。惰性队列的优势主要包括:
① 接收到消息后直接存入磁盘而非内存;
② 消费者消费消息时才会从磁盘中读取并加载到内存;
③ 支持存储数百万条的消息
当生产者发送消息的速度超过消费者处理消息的速度,便会引发队列中消息的累积,直至队列存储空间达到上限。此后,发送的消息将变为死信,可能被丢弃,此现象即称为消息堆积问题
解决消息堆积可从以下三个方面入手:
- 增加更多消费者,以提升消费速度
- 在消费者内开启线程池,以加快消息处理速度
- 扩大队列容积,提高消息堆积的上限
惰性队列
特征:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
惰性队列声明:
@Bean
public Queue lazyQueue() {
return QueueBuilder
.durable("lazy.queue")
.lazy() // 开启 x-queue-mode 为 lazy
.build();
}
监听惰性队列:
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg) {
log.info("接收到 lazy.queue 的消息: {}", msg);
}
RabbitMQ 高可用机制
当时项目在生产环境下,使用的集群为镜像模式集群,由3台机器搭建而成。
镜像队列结构采用一主多从模式,所有操作均由主节点完成,随后同步至镜像节点。若主节点发生宕机,镜像节点将接替成为新的主节点。然而,若主从同步尚未完成而主节点已宕机,则可能出现数据丢失
那若出现数据丢失该如何解决呢?
可以采用仲裁队列。与镜像队列类似,仲裁队列同样采用主从模式,支持主从数据同步。主从同步基于Raft协议,确保强一致性
并且,使用仲裁队列非常简便,无需额外配置。只需在声明队列时指定其为仲裁队列即可
普通集群(不能做到高可用)
普通集群,亦称标准集群(classic cluster),具有以下特性:
- 在集群各节点间共享部分数据,包括:交换机、队列元信息。队列中的消息不在此范围内
- 访问集群任一节点时,若队列不在该节点,数据将从数据所在节点传递至当前节点,并完成返回
- 若队列所在节点发生故障,队列中的消息将可能丢失

镜像集群
镜像集群:本质为主从模式,具备以下特征:
交换机、队列及其中的消息会在各个队列的镜像节点间进行同步备份
创建队列的节点被称为该队列的主节点,而备份至的其他节点则称为该队列的镜像节点
一个队列的主节点可能是另一个队列的镜像节点
所有操作都是主节点完成,然后同步给镜像节点
主节点宕机后,镜像节点会替代成新的主节点

仲裁队列
仲裁队列:仲裁队列是自3.8版本起引入的新功能,旨在替代镜像队列,并具备以下特性:
- 与镜像队列相同,均采用主从模式,并支持主从数据同步
- 配置极其简单,无需复杂设置
- 主从同步采用Raft协议,确保数据强一致性
@Bean
public Queue quorumQueue() {
return QueueBuilder
.durable("quorum.queue") // 持久化
.quorum() // 仲裁队列
.build();
}
Kafka 如何保证消息不丢失
Kafka是如何保证消息不丢失
嗯,这个保证机制很多,在发送消息到消费者接收消息,在每个阶段都有可能会丢失消息,所以我们解决的话也是从多个方面考虑
生产者发送消息时:
- 可以使用异步回调发送,如果消息发送失败,可以通过回调获取失败后的消息信息,考虑重试或记录日志,后续再做补偿
- 在生产者这边可以设置消息重试机制,有时候由于网络抖动导致发送不成功,可以使用重试机制解决
Broker存储消息时:
- 通过Kafka的复制机制确保消息不丢失。在生产者发送消息时,可以设置
acks
参数为all
,这样当生产者发送消息到分区后,不仅leader分区保存确认,follower分区也会保存确认,只有当所有副本都保存确认后才算成功发送消息,这样设置可以在很大程度上保证消息不会在Broker丢失
- 通过Kafka的复制机制确保消息不丢失。在生产者发送消息时,可以设置
消费者接收消息时:
- Kafka消费消息是按照
offset
进行标记消费的。消费者默认自动按期提交已消费的偏移量,默认是每隔5秒提交一次。如果出现重平衡的情况,可能会导致重复消费或丢失数据 - 一般会禁用自动提交偏移量,改为手动提交。当消费成功后再报告给Broker消费的位置,这样可以避免消息丢失和重复消费
- Kafka消费消息是按照
Kafka中消息的重复消费问题如何解决
Kafka消费消息是按照offset
进行标记消费的。消费者默认自动按期提交已消费的偏移量,默认是每隔5秒提交一次。如果出现重平衡的情况,可能会导致重复消费或丢失数据。我们一般会禁用自动提交偏移量,改为手动提交。当消费成功后再报告给Broker消费的位置,这样就可以避免消息丢失和重复消费
使用Kafka进行消息收发时,可能存在消息丢失的情况。Kafka针对以下几种丢失场景提供了相应的解决方案:
- 生产者发送消息至Broker丢失
- 消息在Broker中存储丢失
- 消费者从Broker接收消息丢失

生产者发送消息到 Brocker 丢失
设置异步发送
// 同步发送
RecordMetadata recordMetadata = kafkaProducer.send(record).get();
// 异步发送
kafkaProducer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
System.out.println("消息发送失败 | 记录日志");
}
long offset = recordMetadata.offset();
int partition = recordMetadata.partition();
String topic = recordMetadata.topic();
}
});
消息重试
prop.put(ProducerConfig.RETRIES_CONFIG, 10);
消息在 Brocker 中存储丢失
发送确认机制 acks

确认机制 | 说明 |
---|---|
acks=0 | 生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快 |
acks=1(默认值) | 只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应 |
acks=all | 只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应 |
消费者从 Brocker 接收消息丢失
- Kafka中的分区机制指的是将每个主题划分为多个分区(Partition)
- 主题分区中的消息只能由消费者组中的唯一一个消费者处理,不同分区分配给不同的消费者(同一消费者组)


消费者默认行为
消费者默认是自动按期提交已经消费的偏移量,默认是每隔 5 秒 提交一次
重平衡问题
如果出现重平衡的情况,可能会导致重复消费或丢失数据
解决方案
禁用自动提交偏移量,改为手动提交:
- 同步提交
- 异步提交
- 同步 + 异步组合提交
消费者组示例
消费者组中包含一个消费者实例 consumer1
,它从多个分区(P1、P2、P3)中接收消息:
分区 | 消息偏移量 | 消费情况 |
---|---|---|
P1 | 0-11 | 已消费 |
P2 | 0-11 | 已消费 |
P3 | 0-11 | 已消费 |
消费者组名称为 T1
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
System.out.println(record.key());
}
consumer.commitAsync(); // 异步提交偏移量
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("记录错误信息: " + e);
} finally {
try {
consumer.commitSync(); // 同步提交偏移量
} finally {
consumer.close();
}
}
Kafka 如何保证消费的顺序性
Kafka默认存储和消费消息时,无法保证顺序性,因为一个Topic中的数据可能分散存储在不同的分区中。每个分区都有按照顺序存储的偏移量。若消费者关联了多个分区,则无法保证消息的顺序性
若存在此类需求,可以通过以下两种方式之一实现:将所有消息存储在同一个分区下
方式一:在发送消息时指定分区号
方式二:在发送消息时,根据相同的业务逻辑设置相同的Key。默认情况下,分区是通过Key的hashcode值来选择的,若hash值相同,则分区也相同
Topic分区中,消息仅由消费者组内的单一消费者处理,因此消息必定遵循先后顺序进行处理。然而,这仅确保Topic单个分区的顺序处理,无法保证跨分区的消息顺序。因此,若需对Topic所有消息进行顺序处理,则应仅提供一个分区
Kafka 的高可用机制
面试官:Kafka的高可用机制有了解过嘛
候选人:
嗯,主要是有两个层面,第一个是集群,第二个是提供了复制机制
Kafka集群指的是由多个broker实例组成,即使某一台宕机,也不耽误其他broker继续对外提供服务
复制机制是可以保证kafka的高可用的,一个topic有多个分区,每个分区有多个副本,有一个leader,其余的是follower,副本存储在不同的broker中;所有的分区副本的内容是相同的,如果leader发生故障时,会自动将其中一个follower提升为leader,保证了系统的容错性、高可用性
面试官:解释一下复制机制中的ISR
候选人:
ISR的意思是in-sync replica,就是需要同步复制保存的follower
其中分区副本有很多的follower,分为了两类,一个是ISR,与leader副本同步保存数据,另外一个普通的副本,是异步同步数据,当leader挂掉之后,会优先从ISR副本列表中选取一个作为leader,因为ISR是同步保存数据,数据更加的完整一些,所以优先选择ISR副本列表
集群模式

Kafka的服务器端由被称为Broker的服务进程构成,即一个Kafka集群由多个Broker组成
这样如果集群中某一台机器宕机,其他机器上的Broker也依然能够对外提供服务。这其实就是Kafka提供高可用的手段之一
分区备份机制

一个topic拥有多个分区,每个分区包含多个副本。在这些副本中,有一个被选为领导者(Leader),其余的则是追随者(Follower)。副本分别存储在不同的代理(Broker)上
所有分区副本的内容均保持一致。一旦领导者发生故障,系统会自动将其中一个追随者提升为新的领导者

ISR(同步副本)
需同步复制保存的follower
若leader失效后,需选出新的leader,选举原则如下:
首先:选举时优先从ISR中选定,因该列表中follower的数据与leader同步
其次:若ISR列表中的follower均不可行,则只能从其他follower中选取
一般只设置 1 个 ISR 副本,保证性能
Kafka 数据清理机制
Kafka 文件存储机制
topic 是逻辑概念,真正存储数据的是分区

数据清理机制
日志的清理策略有两个:
根据消息的保留时间:
当消息在 Kafka 中保存的时间超过了指定的时间,就会触发清理过程
配置示例:
# The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168
根据 topic 存储的数据大小:
当 topic 所占的日志文件大小大于一定的阈值,则开始删除最久的消息。需手动开启
配置示例:
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining segments drop below log.retention.bytes. Functions independently of log.retention.hours. log.retention.bytes=1073741824
Kafka 实现高性能的设计
Kafka的高性能是多方面协同作用的结果,涉及宏观架构、分布式存储、ISR数据同步以及高效利用磁盘、操作系统特性等方面。其主要体现如下几点:
- 消息分区:不受单台服务器的限制,能够处理更多数据
- 顺序读写:磁盘顺序读写,提升读写效率
- 页缓存:将磁盘中的数据缓存至内存,将磁盘访问转化为内存访问
- 零拷贝:减少上下文切换及数据拷贝
- 消息压缩:减少磁盘I/O和网络I/O
- 分批发送:将消息打包批量发送,减少网络开销
- 消息分区:不受单台服务器的限制,可以处理更多的数据
- 顺序读写:磁盘顺序读写,提升读写效率
- 页缓存:把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问
- 零拷贝:减少上下文切换及数据拷贝
- 消息压缩:减少磁盘IO和网络IO
- 分批发发送:将消息打包批量发送,减少网络开销
零拷贝
从用户空间到内核空间 0 拷贝(减少内核态切换)
普通情况:4 次拷贝

优化后:2 次拷贝
