spark kafka案例
时间: 2025-01-13 21:44:51 浏览: 55
### 关于 Apache Spark 与 Apache Kafka 集成使用的案例
#### Direct 方式整合 Spark Streaming 和 Kafka 的简单实例
当采用Direct方式来连接Spark Streaming和Kafka时,这种方式不需要使用Receiver机制,而是通过直接读取Kafka中的数据来进行批处理操作。这使得整个过程更加高效稳定,并且能够更好地支持精确一次语义[^1]。
下面给出一段Python版本的代码片段作为例子:
```python
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
conf = SparkConf().setMaster("local[*]").setAppName("spark_kafka_direct")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, batchDuration=2)
brokers = "localhost:9092"
topics = ["test"]
kvs = KafkaUtils.createDirectStream(ssc, topics, kafkaParams={"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
words = lines.flatMap(lambda line: line.split(" "))
wordCounts = words.countByValue()
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
```
这段脚本创建了一个简单的Streaming应用程序,该应用会持续监听名为`test`的主题上的新消息并统计单词频率[^2]。
#### 使用 Structured Streaming 连接 Spark 和 Kafka
对于较新的Spark版本(如3.x),推荐使用Structured Streaming API来替代旧版的DStream API。这是因为Structured Streaming提供了更简洁易懂的数据流抽象以及更好的性能优化特性[^4]。
这里有一个基于PySpark的结构化流处理框架的小型示范程序:
```python
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("structured_streaming_example") \
.getOrCreate()
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
.option("subscribe", "topicName").load()\
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
query = df.writeStream.outputMode('append').format('console').start()
query.awaitTermination()
```
此段代码展示了如何利用DataFrame/Dataset API构建一个从Kafka主题中消费记录并将它们打印出来的连续查询作业。
阅读全文
相关推荐




















