kafka消费消息的代码
时间: 2025-01-31 11:02:18 浏览: 47
Kafka消费消息通常涉及到使用Kafka客户端库,如Apache Kafka的Java Consumer API,或者是第三方框架如Spring Cloud Stream、Kafka Clients for Apache Curator等。这里是一个简单的Java代码示例,展示如何使用官方消费者API从Kafka主题中消费消息:
```java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class KafkaConsumerExample {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC = "your-topic-name";
private static final String GROUP_ID = "your-consumer-group";
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
props.put("group.id", GROUP_ID);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100); // Poll for messages, timeout in ms
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
consumer.close(); // Close the consumer when done
}
}
```
这个例子创建了一个`KafkaConsumer`实例,并订阅了指定的主题。然后它在一个无限循环中定期检查新的消息,获取并打印出来。
阅读全文
相关推荐


















