编写生产者程序,将 json 文件数据发送给 Kafka;编写消费者程序,读取 Kafka 的 json 文件数据,并手动提交偏移量。Topic 为 json_topic。
时间: 2024-12-21 20:26:40 浏览: 52
在使用Kafka处理JSON数据的过程中,首先需要设置一个名为`json_topic`的主题。以下是使用Python的Kafka客户端库Pika和 confluent-kafka-python分别实现生产者和消费者的基本步骤:
**生产者(Producer)程序:**
```python
from kafka import KafkaProducer
import json
# 创建Kafka生产者实例
producer = KafkaProducer(bootstrap_servers='localhost:9092', # 替换为你的Kafka服务器地址
value_serializer=lambda m: json.dumps(m).encode('utf-8'))
# JSON文件内容
with open('data.json') as file:
data = json.load(file)
# 发送消息到json_topic主题
for record in data:
producer.send('json_topic', key=None, value=record)
# 关闭连接
producer.close()
```
**消费者(Consumer)程序:**
```python
from kafka import KafkaConsumer
import json
# 创建Kafka消费者实例
consumer = KafkaConsumer('json_topic',
bootstrap_servers='localhost:9092', # 替换为你的Kafka服务器地址
auto_offset_reset='earliest', # 自动从最早的偏移量开始消费
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
# 消费并处理消息
for message in consumer:
print(f"Received: {message.value}")
# 手动提交偏移量,这里通常会在消费处理完一条消息后进行
consumer.commit(offset=message.offset)
```
注意:在实际部署中,你需要确保Kafka服务器已经运行并且配置正确,同时还需要处理异常情况和错误日志。
阅读全文
相关推荐

















