🚀 消息队列的"翻车"现场:当Kafka和RocketMQ遇到异常时会发生什么?
🎯 开篇:一个"翻车"的故事
想象一下,你是一个快递员(生产者),负责把包裹(消息)送到快递站(Broker),然后由其他快递员(消费者)送到客户手里。
场景1: 你开车去快递站,结果路上堵车了(网络异常)😤
场景2: 你到了快递站,发现快递站关门了(Broker不可用)😱
场景3: 包裹送到了,但是快递员在送包裹时摔了一跤(消费异常)🤕
这些"翻车"现场在消息队列的世界里每天都在上演!今天我们就来聊聊,当Kafka和RocketMQ遇到这些"翻车"
情况时,它们是怎么处理的,以及会不会互相"甩锅"。
🤔 先来几个灵魂拷问
在开始之前,让我们先思考几个问题:
- 生产者发消息失败了,消费者会知道吗? 🤷♀️
- 消费者消费失败了,生产者会收到通知吗? 📢
- 重试机制是自动的还是需要手动写代码? 🔄
- 如果一直重试失败,消息会去哪里? 🗑️
带着这些问题,我们开始今天的"翻车"现场分析!
1. 📤 发消息异常:生产者的"翻车"现场
1.1 消息队列架构对比
在开始分析异常之前,让我们先看看Kafka和RocketMQ的架构对比:
1.2 当生产者遇到"翻车"时
想象你是一个勤劳的快递员,每天要送很多包裹。但是今天运气不太好…
1.2.1 Kafka快递员的"翻车"日记 📝
今天遇到的"翻车"情况:
- 🚗 路上堵车了(网络连接失败)
- 🏢 快递站关门了(Broker不可用)
- 📦 包裹包装有问题(序列化失败)
- 📦 包裹太大了,装不下(消息过大)
- 🔐 快递站不认我的证件(认证失败)
Kafka快递员的内心独白: 😅
Kafka快递员的处理方式: 🛠️
// Kafka快递员的"翻车"处理日记
public class Kafka快递员 {
private KafkaProducer<String, String> 快递车;
public void 送包裹(String 目的地, String 包裹号, String 包裹内容) {
ProducerRecord<String, String> 包裹 = new ProducerRecord<>(目的地, 包裹号, 包裹内容);
try {
// 同步送包裹 - 必须等到送到才走
RecordMetadata 送达证明 = 快递车.send(包裹).get();
System.out.println("🎉 包裹送达成功!位置: " + 送达证明.offset());
} catch (InterruptedException e) {
// 被老板叫停了 😤
Thread.currentThread().interrupt();
System.err.println("😤 老板叫我停手: " + e.getMessage());
} catch (ExecutionException e) {
// 送包裹时出问题了 😱
Throwable 问题原因 = e.getCause();
if (问题原因 instanceof RetriableException) {
// 可以重试的问题 - Kafka会自动帮我重试
System.err.println("🔄 遇到小问题,Kafka会自动帮我重试: " + 问题原因.getMessage());
} else {
// 严重问题 - 需要我手动处理
System.err.println("💥 遇到严重问题: " + 问题原因.getMessage());
// 需要记录到我的小本本上,或者送到"问题包裹处理中心"
处理问题包裹(包裹, 问题原因);
}
}
}
// 异步送包裹 - 放下包裹就走,不等结果
public void 异步送包裹(String 目的地, String 包裹号, String 包裹内容) {
ProducerRecord<String, String> 包裹 = new ProducerRecord<>(目的地, 包裹号, 包裹内容);
快递车.send(包裹, new Callback() {
@Override
public void onCompletion(RecordMetadata 送达证明, Exception 问题) {
if (问题 != null) {
// 送包裹失败了 😢
System.err.println("😢 异步送包裹失败: " + 问题.getMessage());
if (问题 instanceof RetriableException) {
// Kafka会自动帮我重试,我只需要记录一下
System.out.println("🔄 Kafka会自动重试这个包裹");
} else {
// 严重问题,需要我手动处理
处理问题包裹(包裹, 问题);
}
} else {
// 送包裹成功!🎉
System.out.println("🎉 异步送包裹成功!位置: " + 送达证明.offset());
}
}
});
}
private void 处理问题包裹(ProducerRecord<String, String> 包裹, Throwable 问题) {
// 把问题包裹记录到我的小本本上
System.err.println("📝 记录问题包裹: " + 包裹.value());
// 或者送到"问题包裹处理中心"
System.err.println("🚨 送到问题包裹处理中心");
}
}
Kafka快递员的心得总结: 📚
- 🚗 同步送包裹:必须等到送到才走,比较慢但是安全
- 🏃 异步送包裹:放下包裹就走,比较快但是需要等通知
- 🔄 小问题:Kafka会自动帮我重试,我不用操心
- 💥 大问题:需要我手动处理,记录到小本本上
1.2.2 RocketMQ快递员的"翻车"日记 📝
RocketMQ快递员说: “我是阿里巴巴的快递员,我也有自己的处理方式!” 🚀
// RocketMQ快递员的"翻车"处理日记
public class RocketMQ快递员 {
private DefaultMQProducer 快递车;
public void 送包裹(String 目的地, String 标签, String 包裹内容) {
Message 包裹 = new Message(目的地, 标签, 包裹内容.getBytes());
try {
SendResult 送达证明 = 快递车.send(包裹);
System.out.println("🎉 包裹送达成功!包裹ID: " + 送达证明.getMsgId());
} catch (MQClientException e) {
// 我的证件有问题 😅
System.err.println("😅 我的证件有问题: " + e.getMessage());
处理证件问题(包裹, e);
} catch (RemotingException e) {
// 网络问题 - RocketMQ会自动帮我重试
System.err.println("🔄 网络问题,RocketMQ会自动重试: " + e.getMessage());
} catch (MQBrokerException e) {
// 快递站有问题
System.err.println("🏢 快递站有问题: " + e.getMessage());
处理快递站问题(包裹, e);
} catch (InterruptedException e) {
// 被老板叫停了
Thread.currentThread().interrupt();
System.err.println("😤 老板叫我停手: " + e.getMessage());
}
}
private void 处理证件问题(Message 包裹, MQClientException e) {
System.err.println("📝 记录证件问题: " + e.getMessage());
}
private void 处理快递站问题(Message 包裹, MQBrokerException e) {
System.err.println("📝 记录快递站问题: " + e.getMessage());
}
}
RocketMQ快递员的心得总结: 📚
- 🚗 同步送包裹:必须等到送到才走
- 🏃 异步送包裹:放下包裹就走,等通知
- 🔄 网络问题:RocketMQ会自动帮我重试
- 💥 其他问题:需要我手动处理
1.3 生产者异常处理流程图
1.4 发消息异常对消费者的影响
重要结论:发消息异常对消费者没有直接影响! 🎯
想象一下这个场景:
- 快递员A(生产者)送包裹到快递站失败了
- 快递员B(消费者)在快递站等包裹
快递员B的内心独白: 🤷♀️
“我在快递站等包裹,但是快递员A送包裹失败了,我会知道吗?”
“不会!我只会收到成功到达快递站的包裹!”
“快递员A的失败跟我没关系,我继续等我的包裹就行了!”
// 消费者的内心独白
public class 消费者快递员 {
public void 等包裹() {
// 消费者只关心从快递站拿到包裹
// 不会收到生产者送包裹失败的通知
ConsumerRecords<String, String> 包裹列表 = 快递站.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> 包裹 : 包裹列表) {
// 消费者只能处理成功到达快递站的包裹
处理包裹(包裹.value());
}
}
}
原因分析: 🧠
生产者和消费者通过快递站(Broker)解耦,采用异步通信模式。生产者只负责将包裹送到快递站,消费者只处理成功到达快递站的包裹,两者互不影响。
1.5 发消息异常影响分析图
2. 📥 消费消息异常:消费者的"翻车"现场
2.1 消费者异常处理架构对比
在分析消费者异常之前,让我们先看看Kafka和RocketMQ消费者异常处理的架构差异:
2.2 当消费者遇到"翻车"时
现在轮到消费者快递员了!他们从快递站拿到包裹后,在送包裹给客户的过程中也会遇到各种"翻车"情况。
2.2.1 Kafka消费者快递员的"翻车"日记 📝
今天遇到的"翻车"情况:
- 🏠 客户不在家(业务逻辑异常)
- 📦 包裹包装有问题,打不开(反序列化异常)
- 🚗 路上车坏了(网络异常)
- 🏦 银行系统故障,无法收款(数据库连接异常)
Kafka消费者快递员的处理方式: 🛠️
// Kafka消费者快递员的"翻车"处理日记
public class Kafka消费者快递员 {
private KafkaConsumer<String, String> 快递车;
public void 送包裹给客户() {
快递车.subscribe(Arrays.asList("客户地址列表"));
while (true) {
ConsumerRecords<String, String> 包裹列表 = 快递车.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> 包裹 : 包裹列表) {
try {
// 送包裹给客户
送包裹给客户(包裹.value());
// 手动确认包裹已送达
快递车.commitSync();
System.out.println("🎉 包裹送达成功,已确认!");
} catch (客户不在家Exception e) {
// 客户不在家 - 不重试
System.err.println("😅 客户不在家,跳过包裹: " + e.getMessage());
快递车.commitSync(); // 确认跳过此包裹
} catch (包裹打不开Exception e) {
// 包裹打不开 - 不重试
System.err.println("😱 包裹打不开: " + e.getMessage());
处理打不开的包裹(包裹, e);
快递车.commitSync(); // 确认跳过此包裹
} catch (Exception e) {
// 其他问题 - 可选择重试或跳过
System.err.println("💥 送包裹时出问题: " + e.getMessage());
// 策略1:跳过包裹
快递车.commitSync();
// 策略2:重试送包裹(需要自己实现重试逻辑)
// 重试送包裹(包裹);
// 策略3:送到"问题包裹处理中心"
// 送到问题包裹处理中心(包裹);
}
}
}
}
private void 送包裹给客户(String 包裹内容) {
// 模拟送包裹过程
if (包裹内容.contains("客户不在家")) {
throw new 客户不在家Exception("客户不在家");
}
if (包裹内容.contains("包裹打不开")) {
throw new 包裹打不开Exception("包裹打不开");
}
System.out.println("📦 成功送包裹: " + 包裹内容);
}
private void 处理打不开的包裹(ConsumerRecord<String, String> 包裹, Exception e) {
System.err.println("📝 记录打不开的包裹: " + 包裹.value());
}
}
Kafka消费者快递员的心得总结: 📚
- 😅 客户不在家:跳过包裹,不重试
- 😱 包裹打不开:跳过包裹,不重试
- 💥 其他问题:可选择重试、跳过或送到问题处理中心
- ✅ 手动确认:处理完成后需要手动确认包裹状态
2.2.2 RocketMQ消费者快递员的"翻车"日记 📝
RocketMQ消费者快递员说: “我是阿里巴巴的消费者快递员,我有自动重试功能!” 🚀
// RocketMQ消费者快递员的"翻车"处理日记
public class RocketMQ消费者快递员 {
private DefaultMQPushConsumer 快递车;
public void 开始送包裹() {
快递车.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> 包裹列表,
ConsumeConcurrentlyContext 上下文) {
for (MessageExt 包裹 : 包裹列表) {
try {
// 送包裹给客户
送包裹给客户(new String(包裹.getBody()));
System.out.println("🎉 包裹送达成功: " + 包裹.getMsgId());
// 告诉RocketMQ包裹送达成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (客户不在家Exception e) {
// 客户不在家 - 不重试
System.err.println("😅 客户不在家,不重试: " + e.getMessage());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 不重试
} catch (Exception e) {
// 其他问题 - 重试
System.err.println("💥 送包裹失败,RocketMQ会自动重试: " + e.getMessage());
return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 重试
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
}
private void 送包裹给客户(String 包裹内容) {
// 模拟送包裹过程
if (包裹内容.contains("客户不在家")) {
throw new 客户不在家Exception("客户不在家");
}
if (包裹内容.contains("系统故障")) {
throw new RuntimeException("系统故障");
}
System.out.println("📦 成功送包裹: " + 包裹内容);
}
}
RocketMQ消费者快递员的心得总结: 📚
- 😅 客户不在家:不重试,直接跳过
- 💥 系统故障:自动重试,最多重试16次
- 🔄 重试间隔:递增间隔(1s, 5s, 10s, 30s…)
- 🗑️ 死信队列:超过重试次数后自动送到死信队列
2.3 消费消息异常对生产者的影响
重要结论:消费消息异常对生产者没有直接影响! 🎯
想象一下这个场景:
- 快递员A(生产者)成功把包裹送到快递站
- 快递员B(消费者)从快递站拿包裹,但是送包裹给客户时失败了
快递员A的内心独白: 🤷♂️
“我把包裹成功送到快递站了,但是快递员B送包裹给客户时失败了,我会知道吗?”
“不会!我只关心包裹是否成功送到快递站!”
“快递员B的失败跟我没关系,我继续送我的包裹就行了!”
// 生产者的内心独白
public class 生产者快递员 {
public void 送包裹到快递站(String 目的地, String 包裹号, String 包裹内容) {
ProducerRecord<String, String> 包裹 = new ProducerRecord<>(目的地, 包裹号, 包裹内容);
快递车.send(包裹, new Callback() {
@Override
public void onCompletion(RecordMetadata 送达证明, Exception 问题) {
if (问题 == null) {
// 我只关心包裹是否成功送到快递站
System.out.println("🎉 包裹成功送到快递站: " + 送达证明.offset());
// 我不会收到消费者送包裹失败的通知
}
}
});
}
}
原因分析: 🧠
同样基于解耦设计,消费者送包裹失败不会影响生产者。生产者只关心包裹是否成功送到快递站,不关心后续的配送过程。
2.4 消费消息异常影响分析图
2.5 消费者异常处理流程图
3. 🔄 重试机制:自动还是手动?
3.1 重试机制大揭秘
现在让我们来揭秘重试机制!这就像快递员遇到问题时,是自动重试还是需要手动处理。
3.1.1 重试机制对比表 📊
快递员类型 | 生产者重试 | 消费者重试 | 是否需要编码控制 |
---|---|---|---|
Kafka快递员 | ✅ 自动重试 | ❌ 不自动重试 | 生产者:不需要 消费者:需要 |
RocketMQ快递员 | ✅ 自动重试 | ✅ 自动重试 | 生产者:不需要 消费者:不需要 |
3.1.2 重试机制详解
Kafka快递员的重试机制: 🔄
// Kafka快递员的重试配置
public class Kafka快递员配置 {
public void 配置快递车() {
Properties 配置 = new Properties();
配置.put("retries", 3); // 重试3次
配置.put("retry.backoff.ms", 100); // 重试间隔100ms
配置.put("enable.idempotence", true); // 防止重复送包裹
// 重试是自动的,我只需要配置参数
KafkaProducer<String, String> 快递车 = new KafkaProducer<>(配置);
}
public void 送包裹() {
// 我只需要送包裹,重试是自动的
快递车.send(包裹, new Callback() {
@Override
public void onCompletion(RecordMetadata 送达证明, Exception 问题) {
if (问题 != null) {
// 这里的异常是最终结果(重试后仍然失败)
System.err.println("😢 重试后仍然失败: " + 问题.getMessage());
}
}
});
}
}
RocketMQ快递员的重试机制: 🚀
// RocketMQ快递员的重试配置
public class RocketMQ快递员配置 {
public void 配置快递车() {
DefaultMQProducer 快递车 = new DefaultMQProducer("快递员组");
快递车.setRetryTimesWhenSendFailed(3); // 同步送包裹重试3次
快递车.setRetryTimesWhenSendAsyncFailed(3); // 异步送包裹重试3次
// 重试是自动的,我只需要配置参数
}
public void 送包裹() {
try {
// 我只需要送包裹,重试是自动的
SendResult 送达证明 = 快递车.send(包裹);
System.out.println("🎉 送包裹成功: " + 送达证明.getMsgId());
} catch (Exception e) {
// 这里的异常是最终结果(重试后仍然失败)
System.err.println("😢 重试后仍然失败: " + e.getMessage());
}
}
}
3.2 重试机制架构图
3.3 消费者重试机制对比
Kafka消费者:需要手动实现重试 🔧
// Kafka消费者需要手动实现重试逻辑
public class Kafka消费者重试 {
private int 最大重试次数 = 3;
public void 送包裹给客户() {
while (true) {
ConsumerRecords<String, String> 包裹列表 = 快递车.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> 包裹 : 包裹列表) {
boolean 送包裹成功 = false;
int 重试次数 = 0;
while (!送包裹成功 && 重试次数 < 最大重试次数) {
try {
送包裹给客户(包裹.value());
送包裹成功 = true;
快递车.commitSync();
System.out.println("🎉 包裹送达成功");
} catch (Exception e) {
重试次数++;
System.err.println("💥 送包裹失败,重试次数: " + 重试次数);
if (重试次数 >= 最大重试次数) {
// 超过重试次数,处理失败包裹
处理失败包裹(包裹, e);
快递车.commitSync(); // 确认跳过此包裹
} else {
// 等待一段时间后重试
try {
Thread.sleep(1000 * 重试次数); // 递增等待时间
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
}
}
}
RocketMQ消费者:自动重试 🚀
// RocketMQ消费者自动重试
public class RocketMQ消费者重试 {
public void 配置快递车() {
DefaultMQPushConsumer 快递车 = new DefaultMQPushConsumer("消费者组");
快递车.setMaxReconsumeTimes(3); // 最大重试3次
// 重试是自动的,我只需要配置参数
}
public void 开始送包裹() {
快递车.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> 包裹列表,
ConsumeConcurrentlyContext 上下文) {
for (MessageExt 包裹 : 包裹列表) {
try {
送包裹给客户(new String(包裹.getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 返回重试状态,RocketMQ会自动重试
System.err.println("💥 送包裹失败,RocketMQ会自动重试: " + e.getMessage());
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
}
}
4. 🎯 灵魂拷问的答案
还记得开头的几个灵魂拷问吗?现在来揭晓答案!
4.1 问题1:生产者发消息失败了,消费者会知道吗?
答案:不会! 🤷♀️
原因: 基于解耦设计,消费者只能处理成功到达Broker的消息。
4.2 问题2:消费者消费失败了,生产者会收到通知吗?
答案:不会! 📢
原因: 生产者只关心消息是否成功发送到Broker,不关心后续处理结果。
4.3 问题3:重试机制是自动的还是需要手动写代码?
答案:看情况! 🔄
- Kafka生产者:自动重试,无需编码
- Kafka消费者:不自动重试,需要编码
- RocketMQ生产者:自动重试,无需编码
- RocketMQ消费者:自动重试,无需编码
4.4 问题4:如果一直重试失败,消息会去哪里?
答案:看情况! 🗑️
- Kafka:没有内置死信队列,需要手动处理
- RocketMQ:自动进入死信队列,需要手动处理
4.5 消息流转时序图
5. 🎉 总结:快递员们的"翻车"心得
5.1 关键结论
-
发消息异常影响:
- 对生产者:直接影响,需要处理异常
- 对消费者:无直接影响,消费者不知道发送失败
-
消费消息异常影响:
- 对消费者:直接影响,需要处理异常
- 对生产者:无直接影响,生产者不知道消费失败
-
重试机制:
- Kafka生产者:自动重试,无需编码控制
- Kafka消费者:不自动重试,需要编码控制
- RocketMQ生产者:自动重试,无需编码控制
- RocketMQ消费者:自动重试,无需编码控制
5.2 设计原则
- 解耦设计:生产者和消费者通过Broker解耦,异常不会跨组件传播
- 重试策略:尽可能使用自动重试机制,区分业务异常和系统异常
- 监控告警:建立完善的监控和告警体系
5.3 快递员们的最终心得
Kafka快递员说: “我是高吞吐量的快递员,但是消费者需要自己处理重试!” 🚀
RocketMQ快递员说: “我是阿里巴巴的快递员,我有完整的重试和死信队列功能!” 🎯
共同心得: “不管遇到什么’翻车’情况,我们都要保持冷静,按照既定的处理流程来!” 💪
5.4 异常处理决策树
📚 官方参考文档
Kafka官方文档
- Kafka官方文档:https://kafka.apache.org/documentation/
- Kafka生产者配置:https://kafka.apache.org/documentation/#producerconfigs
- Kafka消费者配置:https://kafka.apache.org/documentation/#consumerconfigs
- Kafka异常处理:https://kafka.apache.org/documentation/#errorhandling
- Kafka重试机制:https://kafka.apache.org/documentation/#retries
RocketMQ官方文档
- RocketMQ官方文档:https://rocketmq.apache.org/zh/docs/
- RocketMQ快速开始:https://rocketmq.apache.org/zh/docs/quickStart/01quickstart/
- RocketMQ生产者指南:https://rocketmq.apache.org/zh/docs/producer/02message/
- RocketMQ消费者指南:https://rocketmq.apache.org/zh/docs/consumer/01pushconsumer/
- RocketMQ异常处理:https://rocketmq.apache.org/zh/docs/FAQ/01FAQ/
相关技术文档
- Apache Kafka GitHub:https://github.com/apache/kafka
- Apache RocketMQ GitHub:https://github.com/apache/rocketmq
- Spring Kafka文档:https://docs.spring.io/spring-kafka/docs/current/reference/html/
- Spring Cloud Stream文档:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/
最佳实践指南
- Kafka最佳实践:https://kafka.apache.org/documentation/#bestpractices
- RocketMQ最佳实践:https://rocketmq.apache.org/zh/docs/bestPractice/01bestpractice/
- 消息队列设计模式:https://docs.microsoft.com/en-us/azure/architecture/patterns/
通过这个轻松有趣的"快递员"
比喻,我们深入理解了消息队列的异常处理机制。希望这些知识能帮助你在实际工作中更好地处理消息队列的异常情况!