go 语言的kafka 使用
时间: 2025-05-10 08:59:28 浏览: 13
### Go语言中使用Kafka的示例
以下是基于 `confluent-kafka-go` 的 Kafka 生产者和消费者的基本实现方式。此包由 Confluent 提供,是官方推荐用于 Golang 开发的 Kafka 客户端库[^1]。
#### 1. Kafka生产者的简单实现
下面是一个简单的 Kafka 生产者代码示例:
```go
package main
import (
"fmt"
"log"
"os"
"os/signal"
"strings"
"syscall"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
// 创建一个新的生产者实例
p, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092", // 替换为实际的Kafka服务器地址
})
if err != nil {
log.Fatalf("Failed to create producer: %s\n", err)
}
defer p.Close()
// 处理信号以便优雅退出
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
// 配置交付报告回调函数
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
// 发送消息到指定主题
topic := "test-topic" // 替换为目标主题名称
deliveryChan := make(chan kafka.Event)
for _, word := range strings.Fields("Hello world this is a test") {
err := p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, deliveryChan)
if err != nil {
log.Println(err)
}
}
// 等待所有消息发送完成
close(deliveryChan)
for e := range deliveryChan {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
log.Printf("Delivery error: %v\n", ev.TopicPartition.Error)
} else {
log.Printf("Message delivered to topic %s [%d]\n",
*ev.TopicPartition.Topic, ev.TopicPartition.Partition)
}
}
}
<-signals
}
```
上述代码展示了如何创建一个 Kafka 生产者,并通过循环将字符串分割后的单词作为独立的消息发布到目标主题上。
---
#### 2. Kafka消费者的简单实现
下面是 Kafka 消费者的代码示例:
```go
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092", // 替换为实际的Kafka服务器地址
"group.id": "my-group-id", // 设置消费组ID
"auto.offset.reset": "earliest", // 自动重置偏移量策略
})
if err != nil {
log.Fatalf("Failed to create consumer: %s\n", err)
}
defer consumer.Close()
topics := []string{"test-topic"} // 替换为目标主题列表
err = consumer.SubscribeTopics(topics, nil)
if err != nil {
log.Fatalf("Failed to subscribe topics: %s\n", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
run := true
for run {
select {
case <-ctx.Done():
run = false
default:
msg, err := consumer.ReadMessage(-1)
if err == nil {
fmt.Printf("Received message: %s\n", string(msg.Value))
} else {
log.Printf("Error while consuming messages: %v\n", err)
}
}
}
consumer.Close()
}
```
这段代码展示了一个基本的 Kafka 消费者逻辑,它会订阅特定的主题并持续读取消息直到超时结束。
---
#### 关于Kafka的应用场景
Kafka通常被应用于两大类应用:一是构建实时数据管道以可靠地获取数据;二是构建实时流处理应用程序[^2]。这使得像上面这样的生产和消费模式非常适合用来传输日志、监控指标或者事件通知等。
---
#### 高级配置说明
如果需要更深入地定制 Kafka 节点的行为,则可以通过调整 server.properties 文件来完成。例如,在 KRaft(Kafka Raft Metadata Protocol)模式下启动节点时,可以设置如下参数[^4]:
- **process.roles**: 定义该节点的角色,比如既是 broker 又是 controller。
- **node.id**: 明确当前节点的身份编号。
- **controller.quorum.voters**: 列举参与投票的所有控制器成员及其网络位置。
- **listeners 和 advertised.listeners**: 分别定义内部通信协议以及对外暴露的服务地址。
- **log.dirs**: 数据文件存储路径。
这些选项允许开发者灵活控制集群拓扑结构和服务可用性。
---
阅读全文
相关推荐


















