以下是一个使用 Spark Streaming 计算实时总销售额的示例程序,该程序从 TCP Socket 接收数据流,并将每行数据按逗号分割为'(key, value)'对,其中 key 为产品名称,value 为销售额。随后,程序使用 reduceByKey 操作计算每个 key 的销售额,并使用 map 操作将所有销售额累加得到总销售额。最后,它输出每个 key 的销售额,并输出总销售额。

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SalesTotalCalculator {
  def main(args: Array[String]): Unit = {
    // 创建 SparkConf 对象并设置应用程序名称
    val conf = new SparkConf().setAppName("SalesTotalCalculator")
    // 创建 StreamingContext 对象,设置批处理间隔为 1 秒
    val ssc = new StreamingContext(conf, Seconds(1))

    // 创建 DStream,从 TCP Socket 接收数据流
    val lines = ssc.socketTextStream("localhost", 9999)

    // 将每行数据按逗号分割,并转换为 (key, value) 对
    val pairs = lines.map(line => {
      val fields = line.split(",")
      (fields(0), fields(1).toDouble)
    })

    // 对每个 RDD 执行 reduceByKey 操作,计算每个 key 的销售额
    val salesByProduct = pairs.reduceByKey(_ + _)

    // 对每个 RDD 执行 map 操作,将每个 key 的销售额累加得到总销售额
    val totalSales = salesByProduct.map(_._2).reduce(_ + _)

    // 输出每个 key 的销售额
    salesByProduct.print()

    // 输出总销售额
    totalSales.foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        println(s"Total sales: ${rdd.first()}")
      }
    })

    // 启动 StreamingContext
    ssc.start()
    ssc.awaitTermination()
  }
}

你可以使用以下命令将该程序打包为 JAR 文件,并通过 spark-submit 命令提交到 Spark 集群上运行:

sbt package
spark-submit --class SalesTotalCalculator --master <master-url> <path-to-jar-file>

请注意将 <master-url> 替换为 Spark 集群的主节点 URL,将 <path-to-jar-file> 替换为打包生成的 JAR 文件的路径。


原文地址: https://www.cveoy.top/t/topic/o1Uw 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录