编程要求 根据提示,在右侧编辑器补充代码,使用 Kafka Producer API 对名为 demo 的 Topic 进行数据生产 package net.educoder; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * kafka producer 简单模式 */ public class App { public static void main(String[] args) { /** * 1.创建配置文件对象,一般采用 Properties */ /**----------------begin-----------------------*/ /**-----------------end-------------------------*/ /** * 2.设置kafka的一些参数 * bootstrap.servers --> kafka的连接地址 kafka-01:9092,kafka-02:9092,kafka-03:9092 * key、value的序列化类 -->org.apache.kafka.common.serialization.StringSerializer * acks:1,-1,0 */ /**-----------------begin-----------------------*/ /**-----------------end-------------------------*/ /** * 3.构
时间: 2025-05-19 14:23:18 浏览: 17
### Kafka Producer API 实现向 `demo` Topic 发送数据
以下是使用 Kafka Producer API 向名为 `demo` 的 Topic 发送数据的完整代码示例,包括配置 `Properties` 和创建 `ProducerRecord` 的实现细节。
#### 配置 `Properties`
Kafka Producer 的配置通过 `Properties` 对象完成。主要设置包括 Bootstrap Server 地址、键和值的序列化器等。以下是一个典型的配置:
```java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
// 创建并初始化 Producer 的配置属性
Properties properties = new Properties();
// 设置 Kafka Broker 列表地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 设置 Key 和 Value 的序列化方式
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 初始化 KafkaProducer 实例
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 构建 ProducerRecord 并发送消息
sendMessagesToTopic(producer);
// 关闭 Producer 资源
producer.close();
}
private static void sendMessagesToTopic(KafkaProducer<String, String> producer) {
// 定义要发送的目标 Topic 名称
String topicName = "demo";
// 循环发送多条消息至指定 Topic
for (int i = 0; i < 10; i++) {
String key = "key-" + i;
String value = "value-" + i;
// 创建 ProducerRecord 实例
org.apache.kafka.clients.producer.ProducerRecord<String, String> record =
new org.apache.kafka.clients.producer.ProducerRecord<>(topicName, key, value);
// 将记录发送到 Kafka Cluster 中
producer.send(record);
}
}
}
```
上述代码展示了如何配置 Kafka Producer,并通过循环发送多条消息到名为 `demo` 的 Topic[^1]。每条消息都包含一个键 (`key`) 和一个值 (`value`),并通过 `ProducerRecord` 类封装后传递给 `send()` 方法[^2]。
#### 运行环境说明
为了运行此代码,需确保以下条件满足:
- 已安装 Apache Kafka,并启动了 Kafka Broker。
- Maven 或 Gradle 项目中引入了 Kafka Client 库依赖项。例如,在 Maven 的 `pom.xml` 文件中添加以下内容[^5]:
```xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
```
#### 日志输出验证
当成功执行以上代码时,可以通过查看 Kafka Broker 的日志文件来确认消息已正确写入目标 Topic。类似于以下的日志会显示在控制台中[^4]:
```
kafka-producer-network-thread | client-id - Sample: record(offset, keyValue), partition(demo-0), offset(xxx)
```
这表明消息已被成功提交到 Kafka Cluster 中的指定分区。
---
相关问题
阅读全文
相关推荐
















