java 如何消费kafka
时间: 2025-01-30 10:47:11 浏览: 39
在Java中,要消费Kafka主题,通常会使用Apache Kafka官方提供的客户端库——Kafka Clients或者更现代的Kafka Java High Level Consumer (HLPC)。以下是使用HLPC的基本步骤:
1. **添加依赖**:首先,你需要在Maven或Gradle构建文件中添加Kafka的相关依赖。例如,在Maven中:
```xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>your-kafka-version</version>
</dependency>
```
2. **创建配置对象**:配置连接到Kafka集群所需的属性,如bootstrap servers地址、group id等,可以创建`Properties`对象:
```java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");
```
3. **创建消费者实例**:使用`NewConsumer`构造函数,传入配置对象创建消费者实例:
```java
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
```
4. **订阅主题**:通过`consumer.subscribe()`方法订阅需要消费的主题列表:
```java
Set<String> topics = new HashSet<>();
topics.add("my-topic");
consumer.subscribe(topics);
```
5. **开始消费**:循环处理消息,直到不再需要消费或关闭消费者:
```java
while (running) {
ConsumerRecords<String, String> records = consumer.poll(100); // 每隔一段时间检查新的消息
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
```
6. **提交偏移量**:在适当的时候,可以手动或自动提交消费进度以保证数据不会丢失(参见配置中的`enable.auto.commit`)。
7. **关闭消费者**:当完成消费或者应用停止时,调用`consumer.close()`关闭连接。
阅读全文
相关推荐

















