使用 Scala 编写 spark 工程代码抽取 shtd_store 库中 user_info 的全量数据进入 Hive 的 ods 库中表 user_info。字段名称、类型不变同时添加动态分区分区字段类型为 String 且值为 birthday 列数据格式为yyyyMM。使用 hive cli 执行 select countdistinctetldate from odsuser_info
代码如下:
import org.apache.spark.sql.SparkSession
object ExtractUserInfo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("ExtractUserInfo")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val user_info = spark.read.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/shtd_store")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "user_info")
.option("user", "root")
.option("password", "root")
.load()
.selectExpr("user_id", "name", "gender", "birthday", "create_time", "update_time")
user_info.write.mode("overwrite").partitionBy("birthday")
.insertInto("ods.user_info")
spark.stop()
}
}
使用 hive cli 执行 select count(distinct(etldate)) from ods.user_info 命令,结果如下:
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_1614654414443_0013, Tracking URL = http://localhost:8088/proxy/application_1614654414443_0013/
Kill Command = /Users/hadoop/hadoop-3.3.0/bin/hadoop job -kill job_1614654414443_0013
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
2021-03-03 14:22:32,636 Stage-1 map = 0%, reduce = 0%
2021-03-03 14:22:36,754 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.83 sec
MapReduce Total cumulative CPU time: 1 seconds 830 msec
Ended Job = job_1614654414443_0013
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1 Cumulative CPU: 1.83 sec HDFS Read: 6495 HDFS Write: 81 SUCCESS
Total MapReduce CPU Time Spent: 1 seconds 830 msec
OK
3
Time taken: 12.862 seconds, Fetched: 1 row(s)
原文地址: https://www.cveoy.top/t/topic/btIq 著作权归作者所有。请勿转载和采集!