下面是一个使用 Spark Streaming 计算商品单价乘以数量的示例程序:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# 创建 SparkContext 和 StreamingContext
sc = SparkContext('local[2]', 'ProductStream')
ssc = StreamingContext(sc, 1)

# 创建一个 DStream 来接收流数据
lines = ssc.socketTextStream('localhost', 9999)

# 将每行数据以逗号分割为商品和数量
pairs = lines.map(lambda line: line.split(','))
# 将商品和数量转换为键值对,键为商品,值为数量
product_counts = pairs.map(lambda pair: (pair[0], int(pair[1])))

# 计算商品的单价乘以数量
product_prices = product_counts.mapValues(lambda count: count * 10)  # 假设单价为 10

# 打印结果
product_prices.pprint()

# 启动 StreamingContext
ssc.start()
ssc.awaitTermination()

在上述程序中,我们首先创建了一个 StreamingContext 对象,并指定了 Spark 的 local 模式以及每秒处理一条数据。然后,我们创建了一个 DStream 来接收流数据,通过 map 操作将每行数据以逗号分割为商品和数量,并将其转换为键值对形式,其中键为商品,值为数量。接下来,我们通过 mapValues 操作计算商品的单价乘以数量。最后,我们使用 pprint 方法打印计算结果。

要运行此程序,您需要在终端上启动一个网络服务,监听本地的 9999 端口,并将商品和数量以逗号分隔的形式发送到该端口。例如,您可以使用以下命令来启动一个简单的服务:

nc -lk 9999

然后,您可以在终端上输入商品和数量,并按 Enter 键发送给程序。程序将计算商品的单价乘以数量,并将结果打印出来。


原文地址: https://www.cveoy.top/t/topic/o1UQ 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录