实时总销售额计算:Spark Streaming 实践指南
以下是一个使用 Spark Streaming 计算实时总销售额的示例程序,该程序从 TCP Socket 接收数据流,并将每行数据按逗号分割为'(key, value)'对,其中 key 为产品名称,value 为销售额。随后,程序使用 reduceByKey 操作计算每个 key 的销售额,并使用 map 操作将所有销售额累加得到总销售额。最后,它输出每个 key 的销售额,并输出总销售额。
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SalesTotalCalculator {
def main(args: Array[String]): Unit = {
// 创建 SparkConf 对象并设置应用程序名称
val conf = new SparkConf().setAppName("SalesTotalCalculator")
// 创建 StreamingContext 对象,设置批处理间隔为 1 秒
val ssc = new StreamingContext(conf, Seconds(1))
// 创建 DStream,从 TCP Socket 接收数据流
val lines = ssc.socketTextStream("localhost", 9999)
// 将每行数据按逗号分割,并转换为 (key, value) 对
val pairs = lines.map(line => {
val fields = line.split(",")
(fields(0), fields(1).toDouble)
})
// 对每个 RDD 执行 reduceByKey 操作,计算每个 key 的销售额
val salesByProduct = pairs.reduceByKey(_ + _)
// 对每个 RDD 执行 map 操作,将每个 key 的销售额累加得到总销售额
val totalSales = salesByProduct.map(_._2).reduce(_ + _)
// 输出每个 key 的销售额
salesByProduct.print()
// 输出总销售额
totalSales.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
println(s"Total sales: ${rdd.first()}")
}
})
// 启动 StreamingContext
ssc.start()
ssc.awaitTermination()
}
}
你可以使用以下命令将该程序打包为 JAR 文件,并通过 spark-submit 命令提交到 Spark 集群上运行:
sbt package
spark-submit --class SalesTotalCalculator --master <master-url> <path-to-jar-file>
请注意将 <master-url> 替换为 Spark 集群的主节点 URL,将 <path-to-jar-file> 替换为打包生成的 JAR 文件的路径。
原文地址: https://www.cveoy.top/t/topic/o1Uw 著作权归作者所有。请勿转载和采集!