Skip to main content

Kafka 消息队列深入详解与高阶应用

·1185 words·3 mins

概述
#

Kafka 是高吞吐量的分布式消息系统,本文深入讲解其架构原理和高级应用。

核心架构
#

组件关系
#

Producer → Broker (Kafka Cluster) → Consumer
                ├── Topic
                ├── Partition
                └── Replication

核心概念
#

概念 说明
Topic 消息类别,类似数据库表
Partition 主题的分区,水平扩展
Replication 副本,高可用保障
Producer 消息生产者
Consumer 消息消费者
Consumer Group 消费者组

安装部署
#

# Docker Compose
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

核心操作
#

Topic 管理
#

# 创建主题
kafka-topics --create \
  --bootstrap-server localhost:9092 \
  --topic user-events \
  --partitions 3 \
  --replication-factor 1

# 查看主题
kafka-topics --list --bootstrap-server localhost:9092
kafka-topics --describe --topic user-events --bootstrap-server localhost:9092

# 删除主题
kafka-topics --delete --topic user-events --bootstrap-server localhost:9092

Producer
#

// Java 客户端
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");  // 确保写入成功
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 1);

Producer<String, String> producer = new KafkaProducer<>(props);

// 同步发送
ProducerRecord<String, String> record = new ProducerRecord<>("user-events", "key1", "value1");
RecordMetadata metadata = producer.send(record).get();

// 异步发送
producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        exception.printStackTrace();
    }
});

producer.close();

Consumer
#

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "user-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");  // 从最早开始消费
props.put("enable.auto.commit", "false");    // 手动提交

Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user-events"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n",
            record.offset(), record.key(), record.value());
    }
    
    // 手动提交
    consumer.commitSync();
}

consumer.close();

高级特性
#

偏移量管理
#

// 手动管理偏移量
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

for (ConsumerRecord<String, String> record : records) {
    offsets.put(new TopicPartition(record.topic(), record.partition()),
        new OffsetAndMetadata(record.offset() + 1));
}

consumer.commitSync(offsets);

消费者组再平衡
#

触发条件
#

  • 消费者加入/离开组
  • Topic 分区变更
  • Coordinator 变更

优化策略
#

props.put("session.timeout.ms", "45000");     // 会话超时
props.put("heartbeat.interval.ms", "15000");  // 心跳间隔
props.put("max.poll.records", "500");          // 每次拉取最大记录数

分区策略
#

// 自定义分区器
public class CustomPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        
        // 按用户ID哈希分区
        String userId = (String) key;
        return Math.abs(userId.hashCode()) % numPartitions;
    }
    
    @Override
    public void close() {}
    
    @Override
    public void configure(Map<String, ?> configs) {}
}

数据处理
#

Kafka Streams
#

// 构建流处理应用
StreamsBuilder builder = new StreamsBuilder();

// 1. 读取主题
KStream<String, String> orders = builder.stream("orders");

// 2. 过滤
KStream<String, String> paidOrders = orders
    .filter((key, value) -> value.contains("\"status\":\"paid\""));

// 3. Key变形
KTable<String, Long> orderCount = paidOrders
    .groupBy((key, value) -> "total")
    .count();

// 4. 输出
orderCount.toStream().to("order-count", Produced.with(Serdes.String(), Serdes.Long()));

// 启动
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

查询示例
#

// QName 交互
KTable<String, Long> orderCount = builder.table(
    "order-count",
    Materialized.as("order-count-store")
);

 queryStore = orderCount.queryableStoreName("order-count-store");

// 基于 REST 的查询
@Autowired
private KafkaStreams kafkaStreams;

@GetMapping("/order-count")
public Long getOrderCount() {
    QueryableStoreType<ReadOnlyKeyValueStore<String, Long>> storeType =
        QueryableStoreTypes.keyValueStore();
    
    ReadOnlyKeyValueStore<String, Long> store =
        kafkaStreams.store(queryStore, storeType);
    
    return store.get("total");
}

容错机制
#

副本机制
#

# 查看副本状态
kafka-topics --describe --topic user-events --bootstrap-server localhost:9092

# 输出示例
Topic: user-events  PartitionCount: 3   ReplicationFactor: 3
Partition: 0  Leader: 1   Replicas: 1,2,3  Isr: 1,2,3
Partition: 1  Leader: 2   Replicas: 2,3,1  Isr: 2,3,1
Partition: 2  Leader: 3   Replicas: 3,1,2  Isr: 3,1,2

ISR (In-Sync Replicas)
#

ISR = {Leader, Replica1, Replica2}
当所有副本都同步时,ISR = {1,2,3}

 Leader 故障时:
 1. 从 ISR 中选举新 Leader
 2. 更新元数据
 3. 恢复服务

数据一致性
#

级别 说明
acks=0 不等待确认,最快但可能丢数据
acks=1 等待 Leader 写入,平衡性能和可靠性
acks=all 等待所有副本写入,最可靠

性能优化
#

生产者优化
#

props.put("batch.size", 32768);     // 批处理大小
props.put("linger.ms", 5);         // 等待时间
props.put("compression.type", "snappy");  // 压缩
props.put("buffer.memory", 67108864);     // 缓冲区大小

消费者优化
#

props.put("fetch.min.bytes", 1);        // 最小拉取字节数
props.put("fetch.max.wait.ms", 500);    // 最大等待时间
props.put("max.partition.fetch.bytes", 1048576);  // 最大分区字节数

Broker 优化
#

# server.properties
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.flush.interval.messages=10000
log.flush.interval.ms=1000

监控告警
#

关键指标
#

# Kafka 指标 (通过 JMX Exporter)
kafka_network_request_metrics_request_queue_total

kafka_server_broker_topic_metrics_messages_in_total
kafka_server_broker_topic_metrics_bytes_in_total
kafka_server_broker_topic_metrics_bytes_out_total

kafka_consumer_coordinator_metrics_group_coordinator_available
kafka_consumer_fetch_metrics_records_lag_max

Grafana Dashboard
#

{
  "panels": [
    {
      "title": "消息吞吐量",
      "targets": [
        {
          "expr": "sum(rate(kafka_server_broker_topic_metrics_messages_in_total[5m]))"
        }
      ]
    },
    {
      "title": "消费者 Lag",
      "targets": [
        {
          "expr": "sum(kafka_consumer_fetch_metrics_records_lag_max)"
        }
      ]
    }
  ]
}

常见问题
#

消费者组重复消费
#

# 重置消费者组偏移量
kafka-consumer-groups --reset-offsets --to-earliest \
  --group my-group --topic my-topic --execute

消息积压
#

# 查看消费者组状态
kafka-consumer-groups --describe --group my-group --bootstrap-server localhost:9092

# 增加消费者数量
# 消费者数 <= 分区数

数据丢失
#

// 确保可靠写入
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
props.put("enable.idempotence", true);

最佳实践
#

  1. 合理设置分区数 - 按吞吐量和并行度
  2. 使用压缩 - 减少网络开销
  3. 批量发送 - 提高吞吐量
  4. 手动提交 - 精确控制
  5. 监控告警 - 及时发现异常

总结
#

Kafka 是强大的分布式消息系统,需要深入理解其原理才能发挥最大价值。