用scala写一段sparkETL流程
以下是一个简单的 Spark ETL 流程示例,使用 Scala 编写:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object ETLJob {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("ETLJob")
.getOrCreate()
// 加载源数据
val sourceDF = spark.read
.option("header", true)
.option("inferSchema", true)
.csv("src/main/resources/source-data.csv")
// 转换数据
val transformedDF = sourceDF
.filter(col("age").gt(18))
.groupBy("country")
.agg(avg("salary").as("avg_salary"))
// 保存结果数据
transformedDF.write
.option("header", true)
.csv("src/main/resources/result-data.csv")
// 停止 SparkSession
spark.stop()
}
}
以上代码中,我们创建了一个 SparkSession 对象,加载源数据,对数据进行转换,并将转换后的数据保存为 CSV 文件。其中,我们使用了 Spark SQL 的函数来进行数据过滤和聚合操作。最后,我们停止了 SparkSession 对象,以释放资源。
在运行这段代码之前,我们需要准备一份源数据,可以是 CSV、JSON、Parquet 等格式。在本示例中,我们使用了一个名为 source-data.csv 的 CSV 文件,放置在 src/main/resources 目录下。同时,我们将转换后的结果保存在 result-data.csv 文件中,也放置在 src/main/resources 目录下。
在实际使用中,我们可以将这段代码打包成一个可执行的 JAR 文件,并在命令行中执行,或者将其部署到 Spark 集群上运行。这样,我们就能够轻松地实现 Spark ETL 流程了。
原文地址: https://www.cveoy.top/t/topic/xVo 著作权归作者所有。请勿转载和采集!