Scala Spark ETL 示例:数据清洗与转换
以下是一个简单的 Spark ETL 流程示例,使用 Scala 编写:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object ETLJob {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("ETLJob")
.getOrCreate()
// 加载源数据
val sourceDF = spark.read
.option('header', true)
.option('inferSchema', true)
.csv("src/main/resources/source-data.csv")
// 转换数据
val transformedDF = sourceDF
.filter(col('age').gt(18))
.groupBy('country')
.agg(avg('salary').as('avg_salary'))
// 保存结果数据
transformedDF.write
.option('header', true)
.csv("src/main/resources/result-data.csv")
// 停止 SparkSession
spark.stop()
}
}
以上代码中,我们创建了一个 SparkSession 对象,加载源数据,对数据进行转换,并将转换后的数据保存为 CSV 文件。其中,我们使用了 Spark SQL 的函数来进行数据过滤和聚合操作。最后,我们停止了 SparkSession 对象,以释放资源。
在运行这段代码之前,我们需要准备一份源数据,可以是 CSV、JSON、Parquet 等格式。在本示例中,我们使用了一个名为 'source-data.csv' 的 CSV 文件,放置在 'src/main/resources' 目录下。同时,我们将转换后的结果保存在 'result-data.csv' 文件中,也放置在 'src/main/resources' 目录下。
在实际使用中,我们可以将这段代码打包成一个可执行的 JAR 文件,并在命令行中执行,或者将其部署到 Spark 集群上运行。这样,我们就能够轻松地实现 Spark ETL 流程了。
原文地址: https://www.cveoy.top/t/topic/lLtD 著作权归作者所有。请勿转载和采集!