java连接kafka代码
时间: 2025-02-03 10:36:14 浏览: 36
### Java 连接 Kafka 示例代码
#### 发送消息到 Kafka 的示例代码
为了向 Kafka 主题发送消息,在Java中可以使用`KafkaProducer`类。创建生产者实例并配置必要的属性,如引导服务器地址、键序列化器和值序列化器。
```java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaMessageSender {
private static final String TOPIC_NAME = "test";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key", "value");
producer.send(record);
}
}
}
```
此段代码展示了如何设置一个基本的Kafka生产者,并通过指定主题名称发送一条记录[^2]。
#### 接收来自 Kafka 的消息
对于消费端而言,可以通过实现监听器的方式接收特定主题的消息。下面这段代码定义了一个Spring框架下的组件服务,它会自动订阅给定的主题,并处理传入的数据。
```java
@Component
@Slf4j
public class KafkaListenerService {
@KafkaListener(topics = "my-replicated-topic", groupId = "ss")
public void listen(ConsumerRecord<?, ?> cr) throws Exception {
log.info(cr.toString());
}
}
```
上述代码片段说明了怎样利用`@KafkaListener`注解来简化消费者逻辑,使得每当有新消息到达时都会触发相应的业务方法执行[^3]。
阅读全文
相关推荐


















