springboot消费kafka的客户端实现,手动消费得到一个消息,不使用注解实现
时间: 2024-03-09 20:44:17 浏览: 113
如果你不想使用注解来实现Kafka的消费者,你可以手动创建一个Kafka消费者,然后使用Kafka的API来消费消息。以下是一个示例:
1. 添加Kafka依赖和配置Kafka消费者,具体步骤请参考我上一个回答。
2. 创建Kafka消费者
```java
@Service
public class KafkaConsumerService {
private final static String TOPIC_NAME = "your-topic";
private final static String GROUP_ID = "your-group-id";
private final static String BOOTSTRAP_SERVERS = "your-kafka-server:9092";
@Autowired
public KafkaConsumerService() {
Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交偏移量
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: " + record.value());
consumer.commitAsync(); // 手动提交偏移量
}
}
}
}
```
在上面的例子中,我们手动创建了一个Kafka消费者,并使用poll()方法来获取Kafka主题中的消息。在收到消息后,我们打印消息并手动提交偏移量。
3. 运行应用程序
现在你已经完成了所有的设置,可以运行你的应用程序并开始消费消息。
注:手动提交偏移量需要注意,否则会导致消息被重复消费或者消息丢失的问题。在确保消息处理成功后,才能手动提交偏移量。
阅读全文
相关推荐
















