
概述 #
Kafka 是高吞吐量的分布式消息系统,本文深入讲解其架构原理和高级应用。
核心架构 #
组件关系 #
Producer → Broker (Kafka Cluster) → Consumer
├── Topic
├── Partition
└── Replication核心概念 #
| 概念 | 说明 |
|---|---|
| Topic | 消息类别,类似数据库表 |
| Partition | 主题的分区,水平扩展 |
| Replication | 副本,高可用保障 |
| Producer | 消息生产者 |
| Consumer | 消息消费者 |
| Consumer Group | 消费者组 |
安装部署 #
# Docker Compose
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.3.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1核心操作 #
Topic 管理 #
# 创建主题
kafka-topics --create \
--bootstrap-server localhost:9092 \
--topic user-events \
--partitions 3 \
--replication-factor 1
# 查看主题
kafka-topics --list --bootstrap-server localhost:9092
kafka-topics --describe --topic user-events --bootstrap-server localhost:9092
# 删除主题
kafka-topics --delete --topic user-events --bootstrap-server localhost:9092Producer #
// Java 客户端
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all"); // 确保写入成功
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
Producer<String, String> producer = new KafkaProducer<>(props);
// 同步发送
ProducerRecord<String, String> record = new ProducerRecord<>("user-events", "key1", "value1");
RecordMetadata metadata = producer.send(record).get();
// 异步发送
producer.send(record, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
}
});
producer.close();Consumer #
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "user-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest"); // 从最早开始消费
props.put("enable.auto.commit", "false"); // 手动提交
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user-events"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}
// 手动提交
consumer.commitSync();
}
consumer.close();高级特性 #
偏移量管理 #
// 手动管理偏移量
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
}
consumer.commitSync(offsets);消费者组再平衡 #
触发条件 #
- 消费者加入/离开组
- Topic 分区变更
- Coordinator 变更
优化策略 #
props.put("session.timeout.ms", "45000"); // 会话超时
props.put("heartbeat.interval.ms", "15000"); // 心跳间隔
props.put("max.poll.records", "500"); // 每次拉取最大记录数分区策略 #
// 自定义分区器
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 按用户ID哈希分区
String userId = (String) key;
return Math.abs(userId.hashCode()) % numPartitions;
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}数据处理 #
Kafka Streams #
// 构建流处理应用
StreamsBuilder builder = new StreamsBuilder();
// 1. 读取主题
KStream<String, String> orders = builder.stream("orders");
// 2. 过滤
KStream<String, String> paidOrders = orders
.filter((key, value) -> value.contains("\"status\":\"paid\""));
// 3. Key变形
KTable<String, Long> orderCount = paidOrders
.groupBy((key, value) -> "total")
.count();
// 4. 输出
orderCount.toStream().to("order-count", Produced.with(Serdes.String(), Serdes.Long()));
// 启动
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();查询示例 #
// QName 交互
KTable<String, Long> orderCount = builder.table(
"order-count",
Materialized.as("order-count-store")
);
queryStore = orderCount.queryableStoreName("order-count-store");
// 基于 REST 的查询
@Autowired
private KafkaStreams kafkaStreams;
@GetMapping("/order-count")
public Long getOrderCount() {
QueryableStoreType<ReadOnlyKeyValueStore<String, Long>> storeType =
QueryableStoreTypes.keyValueStore();
ReadOnlyKeyValueStore<String, Long> store =
kafkaStreams.store(queryStore, storeType);
return store.get("total");
}容错机制 #
副本机制 #
# 查看副本状态
kafka-topics --describe --topic user-events --bootstrap-server localhost:9092
# 输出示例
Topic: user-events PartitionCount: 3 ReplicationFactor: 3
Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2ISR (In-Sync Replicas) #
ISR = {Leader, Replica1, Replica2}
当所有副本都同步时,ISR = {1,2,3}
Leader 故障时:
1. 从 ISR 中选举新 Leader
2. 更新元数据
3. 恢复服务数据一致性 #
| 级别 | 说明 |
|---|---|
acks=0 |
不等待确认,最快但可能丢数据 |
acks=1 |
等待 Leader 写入,平衡性能和可靠性 |
acks=all |
等待所有副本写入,最可靠 |
性能优化 #
生产者优化 #
props.put("batch.size", 32768); // 批处理大小
props.put("linger.ms", 5); // 等待时间
props.put("compression.type", "snappy"); // 压缩
props.put("buffer.memory", 67108864); // 缓冲区大小消费者优化 #
props.put("fetch.min.bytes", 1); // 最小拉取字节数
props.put("fetch.max.wait.ms", 500); // 最大等待时间
props.put("max.partition.fetch.bytes", 1048576); // 最大分区字节数Broker 优化 #
# server.properties
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.flush.interval.messages=10000
log.flush.interval.ms=1000监控告警 #
关键指标 #
# Kafka 指标 (通过 JMX Exporter)
kafka_network_request_metrics_request_queue_total
kafka_server_broker_topic_metrics_messages_in_total
kafka_server_broker_topic_metrics_bytes_in_total
kafka_server_broker_topic_metrics_bytes_out_total
kafka_consumer_coordinator_metrics_group_coordinator_available
kafka_consumer_fetch_metrics_records_lag_maxGrafana Dashboard #
{
"panels": [
{
"title": "消息吞吐量",
"targets": [
{
"expr": "sum(rate(kafka_server_broker_topic_metrics_messages_in_total[5m]))"
}
]
},
{
"title": "消费者 Lag",
"targets": [
{
"expr": "sum(kafka_consumer_fetch_metrics_records_lag_max)"
}
]
}
]
}常见问题 #
消费者组重复消费 #
# 重置消费者组偏移量
kafka-consumer-groups --reset-offsets --to-earliest \
--group my-group --topic my-topic --execute消息积压 #
# 查看消费者组状态
kafka-consumer-groups --describe --group my-group --bootstrap-server localhost:9092
# 增加消费者数量
# 消费者数 <= 分区数数据丢失 #
// 确保可靠写入
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
props.put("enable.idempotence", true);最佳实践 #
- 合理设置分区数 - 按吞吐量和并行度
- 使用压缩 - 减少网络开销
- 批量发送 - 提高吞吐量
- 手动提交 - 精确控制
- 监控告警 - 及时发现异常
总结 #
Kafka 是强大的分布式消息系统,需要深入理解其原理才能发挥最大价值。