Spark 数据落标检测:根据标准 ID 抽样检测 Hive 表数据质量
// 导入相关包 import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._
// 创建 SparkSession 对象 val spark = SparkSession.builder() .appName('DataStandardCheck') .master('local[*]') .getOrCreate()
// 读取数据标准表 val dataStdDF = spark.read .format('jdbc') .option('url', 'jdbc:mysql://localhost:3306/test') .option('dbtable', 'data_std') .option('user', 'root') .option('password', '123456') .load()
// 读取标准代码表 val dataStdCodeDF = spark.read .format('jdbc') .option('url', 'jdbc:mysql://localhost:3306/test') .option('dbtable', 'data_std_code') .option('user', 'root') .option('password', '123456') .load()
// 读取数据落标表 val dataStdDownDF = spark.read .format('jdbc') .option('url', 'jdbc:mysql://localhost:3306/test') .option('dbtable', 'data_std_down') .option('user', 'root') .option('password', '123456') .load()
// 指定要检测的标准 id val stdId = 'xxx'
// 根据标准 id 过滤数据标准表 val filteredStdDF = dataStdDF.filter($'std_id' === stdId)
// 获取标准字段名、长度、精度、类型等信息 val stdName = filteredStdDF.select('std_name').head().getString(0) val stdLength = filteredStdDF.select('std_length').head().getInt(0) val stdPrecision = filteredStdDF.select('std_precision').head().getInt(0) val stdType = filteredStdDF.select('std_type').head().getString(0)
// 根据标准 id 过滤标准代码表 val filteredStdCodeDF = dataStdCodeDF.filter($'std_id' === stdId)
// 获取标准可用值列表 val stdCodeList = filteredStdCodeDF.select('std_code').collect().map(_.getString(0)).toList
// 根据标准 id 过滤数据落标表 val filteredStdDownDF = dataStdDownDF.filter($'std_id' === stdId)
// 遍历数据落标表中的每个 hive 表,进行数据检测并写入结果 filteredStdDownDF.foreach(row => { val hiveTable = row.getString(1) val result = checkData(hiveTable, stdName, stdLength, stdPrecision, stdType, stdCodeList) val stdDownId = row.getInt(0) // 更新数据落标表中的检测结果 dataStdDownDF.where($'std_down_id' === stdDownId).update('check_result', result) })
// 数据检测函数 def checkData(hiveTable: String, stdName: String, stdLength: Int, stdPrecision: Int, stdType: String, stdCodeList: List[String]): String = { // 读取 hive 表中的数据 val hiveDF = spark.read.table(hiveTable) // 进行数据检测 val total = hiveDF.count() val filtered = stdType match { case 'string' => hiveDF.filter(length(col(stdName)) === stdLength) case 'double' => hiveDF.filter(precision(col(stdName)) === stdPrecision) case 'int' => hiveDF.filter(precision(col(stdName)) === stdPrecision) case 'date' => hiveDF.filter(precision(col(stdName)) === stdPrecision) case _ => hiveDF } val valid = filtered.filter(col(stdName).isin(stdCodeList:_*)).count() val ratio = valid / total // 返回检测结果 s'共检测$total条数据,其中满足标准的数据有$valid条,占比为$ratio。' }
原文地址: https://www.cveoy.top/t/topic/ouaZ 著作权归作者所有。请勿转载和采集!