Spark Streaming 滑动窗口计算实时平均车速
假设我们有一个实时数据流表示车辆通过某个路段的速度和时间戳。我们想要计算每个 5 分钟内车辆的平均速度,同时每 1 分钟更新一次结果。可以使用 Spark Streaming 的滑动窗口和步长来实现。
首先,创建一个 Spark StreamingContext 对象:
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
sc = SparkContext(appName='average_speed')
ssc = StreamingContext(sc, 60) # 每 60 秒一个批次
接下来,创建一个 DStream 来表示实时数据流:
lines = ssc.socketTextStream('localhost', 9999)
假设每行数据的格式为'速度,时间戳',可以使用 map 操作将其转换为 (key, value) 对,其中 key 是每个 5 分钟的时间段,value 是速度:
speeds = lines.map(lambda x: (int(int(x.split(',')[1])/300)*300, float(x.split(',')[0])))
这里使用了 int(int(x.split(',')[1])/300)*300 来将时间戳舍去秒级别,只保留每个 5 分钟的时间段。
接下来,对 DStream 进行窗口操作,并计算每个窗口内的平均速度:
windowed_speeds = speeds.window(300, 60) # 窗口大小为 5 分钟,步长为 1 分钟
avg_speeds = windowed_speeds.groupByKey().mapValues(lambda x: sum(x)/len(x))
这里使用了窗口大小为 300(即 5 分钟),步长为 60(即 1 分钟),然后对每个窗口内的速度进行 groupByKey 操作,计算平均速度。
最后,将结果输出到控制台:
avg_speeds.pprint()
完整代码如下:
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
sc = SparkContext(appName='average_speed')
ssc = StreamingContext(sc, 60)
lines = ssc.socketTextStream('localhost', 9999)
speeds = lines.map(lambda x: (int(int(x.split(',')[1])/300)*300, float(x.split(',')[0])))
windowed_speeds = speeds.window(300, 60)
avg_speeds = windowed_speeds.groupByKey().mapValues(lambda x: sum(x)/len(x))
avg_speeds.pprint()
ssc.start()
ssc.awaitTermination()
原文地址: https://www.cveoy.top/t/topic/oyTy 著作权归作者所有。请勿转载和采集!