Spark Streaming 实战:计算商品总价
下面是一个使用 Spark Streaming 计算商品总价的示例程序。
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def calculate_total_price(rdd):
total_price = rdd.map(lambda item: float(item.split(',')[1])).reduce(lambda x, y: x + y)
print('Total price: $', total_price)
if __name__ == "__main__":
# 创建 SparkContext 和 StreamingContext
sc = SparkContext(appName="StreamingExample")
ssc = StreamingContext(sc, 1)
# 创建一个 DStream,从 socket 流中读取数据
lines = ssc.socketTextStream("localhost", 9999)
# 对 DStream 进行处理,计算商品总价
lines.foreachRDD(calculate_total_price)
# 启动 StreamingContext
ssc.start()
# 等待程序终止
ssc.awaitTermination()
在这个示例中,我们首先创建了一个 calculate_total_price 函数,该函数接收一个 RDD 并计算商品的总价。然后,我们创建了一个 SparkContext 和一个 StreamingContext。接下来,我们创建了一个 DStream,从 TCP socket 流中读取数据。然后,我们对 DStream 进行处理,使用 foreachRDD 方法调用 calculate_total_price 函数来计算商品总价。最后,我们启动 StreamingContext 并等待程序终止。
请注意,这只是一个示例程序,实际使用时可能需要根据具体的数据源和处理逻辑进行调整。
原文地址: https://www.cveoy.top/t/topic/o1UG 著作权归作者所有。请勿转载和采集!