Skip to content

10 - Kafka 异步架构

知识图谱

┌────────────────── Kafka 在项目中的角色 ──────────────────┐
│                                                          │
│  ┌──────────────────┐      ┌──────────────────────┐     │
│  │   秒杀下单        │      │    点赞持久化          │     │
│  │                  │      │                      │     │
│  │  Lua成功         │      │  like()              │     │
│  │    ↓             │      │    ↓                  │     │
│  │  Producer ──→    │      │  Producer ──→        │     │
│  │  createOrder     │      │  TOPIC_LIKE_BEHAVIOR │     │
│  │    ↓             │      │    ↓                  │     │
│  │  Consumer        │      │  Consumer            │     │
│  │  → ThreadPool    │      │  → DB 写入            │     │
│  │  → Redisson 锁   │      │  → 批量计数聚合       │     │
│  │  → DB 写入       │      │                      │     │
│  │    ↓(失败)       │      │    ↓(失败)            │     │
│  │  save-order-     │      │  TOPIC_SAVE_DB_FAILED│     │
│  │  failed-topic    │      │                      │     │
│  │    ↓             │      │                      │     │
│  │  rollback.lua    │      │                      │     │
│  └──────────────────┘      └──────────────────────┘     │
└──────────────────────────────────────────────────────────┘

Kafka 配置

文件: src/main/resources/application.yaml

yaml
spring:
  kafka:
    bootstrap-servers: 192.168.76.129:9092
    producer:
      retries: 10                  # 生产者重试次数
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: hmdp-test          # 消费者组
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      ack-mode: manual             # 手动ACK ← ⚠️ 但消费者没调用

Topic 清单

Topic 常量生产者消费者用途
TOPIC_CREATE_ORDER"createOrder"VoucherOrderServiceImplKafkaOrderConsumer秒杀异步下单
TOPIC_SAVE_ORDER_FAILED"save-order-failed-topic"KafkaOrderConsumerKafkaOrderConsumer订单落库失败回滚
TOPIC_LIKE_BEHAVIOR"likeBehavior"LikeBehaviorServiceImplKafkaLikeConsumer点赞事件处理
TOPIC_SAVE_DB_FAILED"saveDbFailed"KafkaLikeConsumerKafkaLikeConsumer点赞落库失败

Event 消息体

java
public class Event {
    private String topic;       // 目标 Topic
    private Long userId;        // 操作用户ID
    private Long entityId;      // 业务实体ID (orderId / behaviorId)
    private Map<String, Object> data;  // 扩展数据
}

// 秒杀示例:
Event {
    topic: "createOrder",
    userId: 1001,
    entityId: 7654321,  // orderId
    data: {
        "voucherId": 10,
        "buyNumber": 1
    }
}

KafkaOrderConsumer 详解

文件: src/main/java/com/hmdp/event/KafkaOrderConsumer.java

java
@Component
public class KafkaOrderConsumer {
    // ⚠️ 自建线程池, 没有使用 Spring 的 ThreadPoolConfig Bean
    private final ExecutorService executorService =
            Executors.newFixedThreadPool(10);

    @KafkaListener(topics = {TOPIC_CREATE_ORDER})
    public void handleCreateOrder(ConsumerRecord record) {
        // 1. 解析消息
        Event event = JSONObject.parseObject(
                record.value().toString(), Event.class);

        // 2. 提交到线程池处理
        executorService.submit(() -> {
            VoucherOrder voucherOrder = new VoucherOrder()
                    .setId(event.getEntityId())
                    .setUserId(event.getUserId())
                    .setVoucherId(Long.valueOf(data.get("voucherId").toString()))
                    .setBuyNumber(Integer.valueOf(data.get("buyNumber").toString()));

            // 3. 调用 Service 处理 (内含 Redisson 锁 + DB 操作)
            voucherOrderService.createVoucherOrder(voucherOrder);
        });
        // ⚠️ 注意: 没有调用 acknowledgment.acknowledge()
    }

    @KafkaListener(topics = {TOPIC_SAVE_ORDER_FAILED})
    public void handleSaveOrderFailed(ConsumerRecord record) {
        // 执行 rollback.lua 恢复 Redis 数据
        Event event = JSONObject.parseObject(...);
        Long result = stringRedisTemplate.execute(
                ROLLBACK_SCRIPT,
                Collections.emptyList(),
                String.valueOf(event.getEntityId()),  // orderId
                voucherId.toString(),
                String.valueOf(buyNumber),
                event.getUserId().toString()
        );
    }

    @PreDestroy
    public void shutdown() {
        executorService.shutdown();
        if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
            executorService.shutdownNow();
        }
    }
}

面试 Q&A

Q1: 为什么选 Kafka 而不是 RabbitMQ / RocketMQ?

各消息队列的特点:

维度KafkaRabbitMQRocketMQ
吞吐量极高(百万级TPS)中(万级)高(十万级)
延迟毫秒级微秒级毫秒级
消息顺序分区内有序不保证队列内有序
消息回溯支持(offset)不支持支持
事务消息支持(幂等+事务)不支持支持
适用场景日志/大数据/高吞吐业务消息/路由灵活电商/金融

本项目选 Kafka 的原因:秒杀场景需要高吞吐量 + 分区内有序(同一用户的订单不会乱序)。

如果是金融级的事务消息需求,RocketMQ 更合适(原生支持半消息和事务回查)。

追问:Kafka 如何保证消息不丢?

三个环节都需要保证:

1. 生产端

  • retries=10:发送失败自动重试
  • 可以加 acks=all:等所有 ISR 副本确认
  • 可以加回调 send(record, callback) 处理发送失败

2. Broker 端

  • replication.factor >= 2:多副本
  • min.insync.replicas >= 2:至少 2 个副本同步成功
  • 当前项目是单节点,没有副本保障

3. 消费端

  • ack-mode=manual:手动提交 offset
  • 但当前代码 没有调用 acknowledge(),等同于自动提交
  • 如果消费者崩溃重启,offset 可能已经被自动提交 → 消息丢失
  • 或者 offset 未提交 → 消息重复消费

Q2: 手动 ACK 没有调用 acknowledge() 会怎样?

配置了 ack-mode: manual 但不调用 acknowledge(),Kafka 的行为取决于 enable-auto-commit 配置:

  • 如果 enable-auto-commit=true(Spring Kafka 默认):

    • 手动 ACK 模式下自动提交被禁用
    • offset 永远不提交 → 每次重启都从最早的未提交 offset 重新消费
    • 后果:消息重复消费
  • 如果手动提交也不行:

    • 消费者可能积累大量 lag
    • Kafka broker 的 log retention 到期后,旧消息被删除 → 消息丢失

修复:消费方法签名加 Acknowledgment ack 参数,处理完毕后调用 ack.acknowledge()

java
@KafkaListener(topics = &#123;TOPIC_CREATE_ORDER&#125;)
public void handleCreateOrder(ConsumerRecord record, Acknowledgment ack) &#123;
    // ... 处理逻辑 ...
    ack.acknowledge();
&#125;

Q3: 消费者用线程池处理有什么风险?

executorService.submit() 是异步的,消费者线程立即返回继续消费下一条消息。

风险:

  1. 消息丢失:submit 后立即 ACK(如果有的话),但线程池任务还没执行。如果 JVM 崩溃,任务丢失
  2. 队列积压Executors.newFixedThreadPool(10) 使用无界队列(LinkedBlockingQueue),如果处理速度跟不上消费速度,任务无限堆积 → OOM
  3. 背压缺失:Kafka 不知道线程池是否过载,继续推送消息

改进方案:

  1. 使用有界队列 + 拒绝策略
  2. 在线程池任务完成后才 ACK
  3. 使用 Kafka 的 max.poll.records 控制每次拉取数量

Q4: 生产者发送失败了怎么办?

当前代码中 sendOrderMsgToKafka 如果抛异常,被外层 try-catch 捕获后返回"未知错误"。

但此时 seckill.lua 已经执行成功了——Redis 中库存已扣减、buyCount 已增加、orderId 已添加。

后果:用户看到失败,但 Redis 中库存已经减了,且没有触发回滚。

这是一个"部分提交"问题。修复方案:

  1. Kafka 发送失败时,手动执行 rollback.lua 回滚 Redis
  2. 或者用 Kafka 事务(transactional.id),保证发送和 Lua 的原子性

Q5: ThreadPoolConfig 为什么没被使用?

项目配置了一个 Spring 管理的线程池 Bean:

java
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() &#123;
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(10);
    executor.setMaxPoolSize(20);
    executor.setQueueCapacity(50);
    executor.setThreadNamePrefix("KafkaOrderConsumer-");
    return executor;
&#125;

KafkaOrderConsumer 自己创建了 Executors.newFixedThreadPool(10),没有注入 Spring Bean。

差异:

  • Spring Bean:有界队列(50),最大20线程,可通过 Spring 管理生命周期
  • 自建线程池:无界队列,固定10线程,需要手动 @PreDestroy 关闭

应该改为注入 Spring Bean,利用其有界队列和拒绝策略。


踩坑点

踩坑点说明面试官考察意图
手动ACK未调用配置了 manual 但代码没 acknowledge消息可靠性
线程池无界队列Executors.newFixedThreadPool 默认无界OOM 风险
Spring Bean 未使用ThreadPoolConfig 配置了但没注入代码质量
生产者失败无回滚Kafka 发送失败但 Redis 已扣减分布式事务
消息幂等性缺失重复消费可能重复插入订单幂等性设计
单 Broker 无副本192.168.76.129:9092 单节点高可用

加分回答

  • 提到 消息幂等表:消费前检查 orderId 是否已处理过(DB 唯一索引或 Redis Set)
  • 分析 Kafka 的 Exactly-Once 语义:producer 开启 enable.idempotence=true + consumer 手动管理 offset
  • 讨论 死信队列(DLQ):多次消费失败的消息放入 DLQ,人工排查
  • 提到 事务消息:RocketMQ 的半消息方案可以解决"Lua 成功 + Kafka 发送失败"的问题
  • 分析 Kafka Consumer 的 Rebalance 问题:消费者加入/退出时触发 Rebalance,可能导致重复消费

关联文档