Spark Streaming 实时数据分析:词频统计和用户访问量
Spark Streaming 实时数据分析:词频统计和用户访问量
本篇博客将提供两道 Spark Streaming 相关的编程练习题,并给出 Python 代码答案。
1. 实时统计每分钟内出现次数最多的单词
题目: 使用 Spark Streaming 实时统计每分钟内出现次数最多的单词。
答案:
from pyspark.streaming import StreamingContext
# 创建StreamingContext
ssc = StreamingContext(sparkContext, 60)
# 创建DStream
lines = ssc.socketTextStream('localhost', 9999)
# 切分每行文本为单词并计数
words = lines.flatMap(lambda line: line.split(' '))
wordCounts = words.countByValue()
# 每分钟内出现次数最多的单词
topWordCounts = wordCounts.transform(lambda rdd: rdd.sortBy(lambda x: x[1], False).take(1))
# 输出结果
topWordCounts.pprint()
# 启动StreamingContext
ssc.start()
ssc.awaitTermination()
2. 实时统计每分钟内的用户访问量
题目: 使用 Spark Streaming 实时统计每分钟内的用户访问量。
答案:
from pyspark.streaming import StreamingContext
# 创建StreamingContext
ssc = StreamingContext(sparkContext, 60)
# 创建DStream
logs = ssc.socketTextStream('localhost', 9999)
# 统计每分钟内的用户访问量
userCounts = logs.map(lambda log: (log.split(' ')[0], 1)).reduceByKey(lambda a, b: a + b)
# 输出结果
userCounts.pprint()
# 启动StreamingContext
ssc.start()
ssc.awaitTermination()
代码说明:
- 创建 StreamingContext: 使用
StreamingContext(sparkContext, 60)创建一个 StreamingContext,其中sparkContext是 Spark 的上下文,60表示批处理的时间间隔为 60 秒,即每分钟处理一次数据。 - 创建 DStream: 使用
socketTextStream('localhost', 9999)创建一个 DStream,从本地主机端口 9999 接收文本数据。 - 处理数据: 根据题目要求,对 DStream 进行相应的操作,例如将文本数据切分成单词、统计词频、统计用户访问量等。
- 输出结果: 使用
pprint()将处理后的结果输出到控制台。 - 启动 StreamingContext: 使用
ssc.start()启动 StreamingContext 开始接收数据并处理。 - 等待结束: 使用
ssc.awaitTermination()使程序一直运行直到用户手动停止。
注意: 以上代码需要在已经创建好 Spark 上下文的环境中运行,并需要设置好本地主机端口 9999 用于接收数据。
更多内容: 您可以参考 Spark Streaming 官方文档和示例代码了解更多信息。
原文地址: https://www.cveoy.top/t/topic/fVfD 著作权归作者所有。请勿转载和采集!