kafka发送实体类
时间: 2025-03-24 19:17:47 浏览: 27
### Kafka 发送实体类对象的序列化与反序列化示例
在 Kafka 中发送实体类对象时,通常需要自定义序列化器和反序列化器来处理复杂的数据结构。以下是基于提供的引用内容以及常见实践的一个完整示例。
#### 1. 配置生产者的属性
为了支持实体类对象的传输,在配置生产者时需指定 `key.serializer` 和 `value.serializer` 属性。这里可以使用内置的 `StringSerializer` 或者自定义的序列化器[^5]:
```properties
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomObjectSerializer.class);
```
其中,`CustomObjectSerializer` 是一个自定义的序列化器类。
---
#### 2. 定义实体类
创建一个简单的 Java 实体类作为消息的内容。以下是一个名为 `Order` 的实体类示例[^2]:
```java
public class Order implements Serializable {
private static final long serialVersionUID = 1L;
private Integer orderId;
private String title;
public Order() {}
public Order(Integer orderId, String title) {
this.orderId = orderId;
this.title = title;
}
// Getter and Setter methods omitted for brevity.
}
```
---
#### 3. 创建自定义序列化器
实现 `org.apache.kafka.common.serialization.Serializer` 接口以完成对象到字节数组的转换。下面展示了一个基于 JSON 格式的序列化器[^4]:
```java
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
public class CustomObjectSerializer<T> implements Serializer<T> {
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {}
@Override
public byte[] serialize(String topic, T data) {
try {
if (data == null) {
return null;
}
return objectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new SerializationException("Error serializing JSON message", e);
}
}
@Override
public void close() {}
}
```
此序列化器利用 Jackson 库将对象转为 JSON 字符串并进一步编码成字节流。
---
#### 4. 创建自定义反序列化器
同样地,实现 `Deserializer` 接口用于从字节数组恢复原始对象实例:
```java
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.DeserializationException;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
public class CustomObjectDeserializer<T> implements Deserializer<T> {
private Class<T> typeClass;
private ObjectMapper objectMapper = new ObjectMapper();
@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
TypeReference<T> typeRef = (TypeReference<T>) configs.get("type");
this.typeClass = (Class<T>) typeRef.getType();
}
@Override
public T deserialize(String topic, byte[] data) {
try {
if (data == null || data.length == 0) {
return null;
}
return objectMapper.readValue(data, typeClass);
} catch (Exception e) {
throw new DeserializationException("Error deserializing JSON message", e);
}
}
@Override
public void close() {}
}
```
上述代码中的 `configure()` 方法允许传递目标类型的元信息以便正确解析 JSON 数据。
---
#### 5. 使用生产者发送消息
设置好所有组件之后,可以通过 Kafka 生产者向主题发布消息。例如:
```java
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomObjectSerializer.class.getName());
KafkaProducer<String, Order> producer = new KafkaProducer<>(props);
Order order = new Order(123, "Sample Title");
producer.send(new ProducerRecord<>("orders_topic", "order_key", order));
producer.close();
```
---
#### 6. 消费端接收消息
消费者部分则负责读取消息并将字节重新映射回原生对象形式。其核心逻辑依赖于之前定义好的反序列化器:
```java
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomObjectDeserializer.class.getName());
props.put("type", new TypeReference<Order>() {});
KafkaConsumer<String, Order> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("orders_topic"));
while (true) {
ConsumerRecords<String, Order> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, Order> record : records) {
System.out.printf("Received key=%s value=%s%n", record.key(), record.value().toString());
}
}
```
注意此处额外设置了 `"type"` 参数用来指示期望的目标类型。
---
### 总结
以上展示了完整的流程,包括如何通过 Kafka 发送带有自定义序列化的实体类对象,并确保两端能够正常通信。实际应用中可能还需要考虑更多细节比如异常捕获机制、性能优化等问题。
阅读全文
相关推荐


















