外观
10 - Kafka 异步架构
知识图谱
┌────────────────── Kafka 在项目中的角色 ──────────────────┐
│ │
│ ┌──────────────────┐ ┌──────────────────────┐ │
│ │ 秒杀下单 │ │ 点赞持久化 │ │
│ │ │ │ │ │
│ │ Lua成功 │ │ like() │ │
│ │ ↓ │ │ ↓ │ │
│ │ Producer ──→ │ │ Producer ──→ │ │
│ │ createOrder │ │ TOPIC_LIKE_BEHAVIOR │ │
│ │ ↓ │ │ ↓ │ │
│ │ Consumer │ │ Consumer │ │
│ │ → ThreadPool │ │ → DB 写入 │ │
│ │ → Redisson 锁 │ │ → 批量计数聚合 │ │
│ │ → DB 写入 │ │ │ │
│ │ ↓(失败) │ │ ↓(失败) │ │
│ │ save-order- │ │ TOPIC_SAVE_DB_FAILED│ │
│ │ failed-topic │ │ │ │
│ │ ↓ │ │ │ │
│ │ rollback.lua │ │ │ │
│ └──────────────────┘ └──────────────────────┘ │
└──────────────────────────────────────────────────────────┘Kafka 配置
文件: src/main/resources/application.yaml
yaml
spring:
kafka:
bootstrap-servers: 192.168.76.129:9092
producer:
retries: 10 # 生产者重试次数
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: hmdp-test # 消费者组
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: manual # 手动ACK ← ⚠️ 但消费者没调用Topic 清单
| Topic 常量 | 值 | 生产者 | 消费者 | 用途 |
|---|---|---|---|---|
| TOPIC_CREATE_ORDER | "createOrder" | VoucherOrderServiceImpl | KafkaOrderConsumer | 秒杀异步下单 |
| TOPIC_SAVE_ORDER_FAILED | "save-order-failed-topic" | KafkaOrderConsumer | KafkaOrderConsumer | 订单落库失败回滚 |
| TOPIC_LIKE_BEHAVIOR | "likeBehavior" | LikeBehaviorServiceImpl | KafkaLikeConsumer | 点赞事件处理 |
| TOPIC_SAVE_DB_FAILED | "saveDbFailed" | KafkaLikeConsumer | KafkaLikeConsumer | 点赞落库失败 |
Event 消息体
java
public class Event {
private String topic; // 目标 Topic
private Long userId; // 操作用户ID
private Long entityId; // 业务实体ID (orderId / behaviorId)
private Map<String, Object> data; // 扩展数据
}
// 秒杀示例:
Event {
topic: "createOrder",
userId: 1001,
entityId: 7654321, // orderId
data: {
"voucherId": 10,
"buyNumber": 1
}
}KafkaOrderConsumer 详解
文件: src/main/java/com/hmdp/event/KafkaOrderConsumer.java
java
@Component
public class KafkaOrderConsumer {
// ⚠️ 自建线程池, 没有使用 Spring 的 ThreadPoolConfig Bean
private final ExecutorService executorService =
Executors.newFixedThreadPool(10);
@KafkaListener(topics = {TOPIC_CREATE_ORDER})
public void handleCreateOrder(ConsumerRecord record) {
// 1. 解析消息
Event event = JSONObject.parseObject(
record.value().toString(), Event.class);
// 2. 提交到线程池处理
executorService.submit(() -> {
VoucherOrder voucherOrder = new VoucherOrder()
.setId(event.getEntityId())
.setUserId(event.getUserId())
.setVoucherId(Long.valueOf(data.get("voucherId").toString()))
.setBuyNumber(Integer.valueOf(data.get("buyNumber").toString()));
// 3. 调用 Service 处理 (内含 Redisson 锁 + DB 操作)
voucherOrderService.createVoucherOrder(voucherOrder);
});
// ⚠️ 注意: 没有调用 acknowledgment.acknowledge()
}
@KafkaListener(topics = {TOPIC_SAVE_ORDER_FAILED})
public void handleSaveOrderFailed(ConsumerRecord record) {
// 执行 rollback.lua 恢复 Redis 数据
Event event = JSONObject.parseObject(...);
Long result = stringRedisTemplate.execute(
ROLLBACK_SCRIPT,
Collections.emptyList(),
String.valueOf(event.getEntityId()), // orderId
voucherId.toString(),
String.valueOf(buyNumber),
event.getUserId().toString()
);
}
@PreDestroy
public void shutdown() {
executorService.shutdown();
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
}
}面试 Q&A
Q1: 为什么选 Kafka 而不是 RabbitMQ / RocketMQ?
各消息队列的特点:
维度 Kafka RabbitMQ RocketMQ 吞吐量 极高(百万级TPS) 中(万级) 高(十万级) 延迟 毫秒级 微秒级 毫秒级 消息顺序 分区内有序 不保证 队列内有序 消息回溯 支持(offset) 不支持 支持 事务消息 支持(幂等+事务) 不支持 支持 适用场景 日志/大数据/高吞吐 业务消息/路由灵活 电商/金融 本项目选 Kafka 的原因:秒杀场景需要高吞吐量 + 分区内有序(同一用户的订单不会乱序)。
如果是金融级的事务消息需求,RocketMQ 更合适(原生支持半消息和事务回查)。
追问:Kafka 如何保证消息不丢?
三个环节都需要保证:
1. 生产端:
retries=10:发送失败自动重试- 可以加
acks=all:等所有 ISR 副本确认- 可以加回调
send(record, callback)处理发送失败2. Broker 端:
replication.factor >= 2:多副本min.insync.replicas >= 2:至少 2 个副本同步成功- 当前项目是单节点,没有副本保障
3. 消费端:
ack-mode=manual:手动提交 offset- 但当前代码 没有调用
acknowledge(),等同于自动提交- 如果消费者崩溃重启,offset 可能已经被自动提交 → 消息丢失
- 或者 offset 未提交 → 消息重复消费
Q2: 手动 ACK 没有调用 acknowledge() 会怎样?
配置了
ack-mode: manual但不调用acknowledge(),Kafka 的行为取决于enable-auto-commit配置:
如果
enable-auto-commit=true(Spring Kafka 默认):
- 手动 ACK 模式下自动提交被禁用
- offset 永远不提交 → 每次重启都从最早的未提交 offset 重新消费
- 后果:消息重复消费
如果手动提交也不行:
- 消费者可能积累大量 lag
- Kafka broker 的 log retention 到期后,旧消息被删除 → 消息丢失
修复:消费方法签名加
Acknowledgment ack参数,处理完毕后调用ack.acknowledge():java@KafkaListener(topics = {TOPIC_CREATE_ORDER}) public void handleCreateOrder(ConsumerRecord record, Acknowledgment ack) { // ... 处理逻辑 ... ack.acknowledge(); }
Q3: 消费者用线程池处理有什么风险?
executorService.submit()是异步的,消费者线程立即返回继续消费下一条消息。风险:
- 消息丢失:submit 后立即 ACK(如果有的话),但线程池任务还没执行。如果 JVM 崩溃,任务丢失
- 队列积压:
Executors.newFixedThreadPool(10)使用无界队列(LinkedBlockingQueue),如果处理速度跟不上消费速度,任务无限堆积 → OOM- 背压缺失:Kafka 不知道线程池是否过载,继续推送消息
改进方案:
- 使用有界队列 + 拒绝策略
- 在线程池任务完成后才 ACK
- 使用 Kafka 的
max.poll.records控制每次拉取数量
Q4: 生产者发送失败了怎么办?
当前代码中
sendOrderMsgToKafka如果抛异常,被外层try-catch捕获后返回"未知错误"。但此时 seckill.lua 已经执行成功了——Redis 中库存已扣减、buyCount 已增加、orderId 已添加。
后果:用户看到失败,但 Redis 中库存已经减了,且没有触发回滚。
这是一个"部分提交"问题。修复方案:
- Kafka 发送失败时,手动执行 rollback.lua 回滚 Redis
- 或者用 Kafka 事务(
transactional.id),保证发送和 Lua 的原子性
Q5: ThreadPoolConfig 为什么没被使用?
项目配置了一个 Spring 管理的线程池 Bean:
java@Bean public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setQueueCapacity(50); executor.setThreadNamePrefix("KafkaOrderConsumer-"); return executor; }但
KafkaOrderConsumer自己创建了Executors.newFixedThreadPool(10),没有注入 Spring Bean。差异:
- Spring Bean:有界队列(50),最大20线程,可通过 Spring 管理生命周期
- 自建线程池:无界队列,固定10线程,需要手动
@PreDestroy关闭应该改为注入 Spring Bean,利用其有界队列和拒绝策略。
踩坑点
| 踩坑点 | 说明 | 面试官考察意图 |
|---|---|---|
| 手动ACK未调用 | 配置了 manual 但代码没 acknowledge | 消息可靠性 |
| 线程池无界队列 | Executors.newFixedThreadPool 默认无界 | OOM 风险 |
| Spring Bean 未使用 | ThreadPoolConfig 配置了但没注入 | 代码质量 |
| 生产者失败无回滚 | Kafka 发送失败但 Redis 已扣减 | 分布式事务 |
| 消息幂等性缺失 | 重复消费可能重复插入订单 | 幂等性设计 |
| 单 Broker 无副本 | 192.168.76.129:9092 单节点 | 高可用 |
加分回答
- 提到 消息幂等表:消费前检查 orderId 是否已处理过(DB 唯一索引或 Redis Set)
- 分析 Kafka 的 Exactly-Once 语义:producer 开启
enable.idempotence=true+ consumer 手动管理 offset - 讨论 死信队列(DLQ):多次消费失败的消息放入 DLQ,人工排查
- 提到 事务消息:RocketMQ 的半消息方案可以解决"Lua 成功 + Kafka 发送失败"的问题
- 分析 Kafka Consumer 的 Rebalance 问题:消费者加入/退出时触发 Rebalance,可能导致重复消费
关联文档
- 03-秒杀系统 — Kafka 在秒杀链路中的角色
- 07-点赞子系统 — 点赞事件的 Kafka 处理
- 11-已知问题与优化方向 — ACK、幂等性、线程池问题