Spark Streaming实战:Python实现词频统计(Kafka & HDFS)
Spark Streaming实战:Python实现词频统计(Kafka & HDFS)
本文将介绍如何使用Spark Streaming结合Python,分别从Kafka和HDFS中读取数据,并进行实时的词频统计分析。
案例一:读取Kafka数据并统计词频
本案例模拟从Kafka topic中实时读取数据流,并统计每个单词出现的频率。
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext
from pyspark import SparkConf
conf = SparkConf().setAppName('KafkaWordCount')
ssc = StreamingContext(conf, 5)
kafkaParams = {'metadata.broker.list': 'localhost:9092'}
topics = ['test_topic']
kafkaStream = KafkaUtils.createDirectStream(ssc, topics, kafkaParams)
words = kafkaStream.flatMap(lambda x: x[1].split(' '))
wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
代码解释:
- 导入必要的库,包括
KafkaUtils、StreamingContext和SparkConf。 - 创建SparkConf对象,并设置应用程序名称为'KafkaWordCount'。
- 创建StreamingContext对象,设置批处理时间间隔为5秒。
- 定义Kafka参数,包括broker地址和topic名称。
- 使用
KafkaUtils.createDirectStream方法创建Kafka数据流。 - 使用
flatMap方法将每条消息按照空格分割成单词。 - 使用
map方法将每个单词映射为(word, 1)的键值对。 - 使用
reduceByKey方法统计每个单词出现的总次数。 - 使用
pprint方法打印结果。 - 启动Spark Streaming应用程序,并等待程序结束。
案例二:读取HDFS文件并统计词频
本案例模拟从HDFS文件中读取数据流,并统计每个单词出现的频率。
from pyspark.streaming import StreamingContext
from pyspark import SparkConf
conf = SparkConf().setAppName('HDFSWordCount')
ssc = StreamingContext(conf, 5)
lines = ssc.textFileStream('hdfs://localhost:9000/input')
words = lines.flatMap(lambda line: line.split(' '))
wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
代码解释:
- 导入必要的库,包括
StreamingContext和SparkConf。 - 创建SparkConf对象,并设置应用程序名称为'HDFSWordCount'。
- 创建StreamingContext对象,设置批处理时间间隔为5秒。
- 使用
textFileStream方法读取HDFS目录下的文件数据流。 - 使用
flatMap方法将每行数据按照空格分割成单词。 - 使用
map方法将每个单词映射为(word, 1)的键值对。 - 使用
reduceByKey方法统计每个单词出现的总次数。 - 使用
pprint方法打印结果。 - 启动Spark Streaming应用程序,并等待程序结束。
总结
本文介绍了如何使用Spark Streaming和Python进行实时词频统计,并分别演示了从Kafka和HDFS读取数据的案例。通过学习这两个案例,您可以了解Spark Streaming的基本操作,并将其应用于实际的实时数据处理场景中。
原文地址: https://www.cveoy.top/t/topic/fVfe 著作权归作者所有。请勿转载和采集!