用spark 跑sql文件日志中如何知道每段insert overight了多少行数据。
在Spark中,可以通过使用日志级别来获取每个INSERT OVERWRITE语句的行数。
首先,将日志级别设置为INFO,以便在Spark应用程序中获取详细的日志信息。可以在启动Spark应用程序时使用以下命令来设置日志级别:
spark-submit --conf spark.driver.extraJavaOptions="-Dlog4j.configuration=file:log4j.properties" --conf spark.executor.extraJavaOptions="-Dlog4j.configuration=file:log4j.properties" --class <main class> <application jar>
然后,在log4j.properties文件中配置日志级别:
# Set everything to be logged to the console
log4j.rootCategory=INFO, console
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
接下来,在Spark应用程序中运行SQL文件。在执行每个INSERT OVERWRITE语句之后,可以使用sparkSession.sparkContext.setLogLevel("INFO")将日志级别设置为INFO,并使用sparkSession.sparkContext.getConf().get("spark.driver.log.level")获取当前日志级别。
在INSERT OVERWRITE语句之前和之后,可以通过以下方式获取每个语句插入的行数:
val startTime = System.currentTimeMillis()
// 执行INSERT OVERWRITE语句
val endTime = System.currentTimeMillis()
val logLevel = sparkSession.sparkContext.getConf().get("spark.driver.log.level")
sparkSession.sparkContext.setLogLevel("INFO")
val logLines = sparkSession.sparkContext.textFile("path/to/spark/application/logs")
val filteredLines = logLines.filter(line => line.contains("INFO") && line.contains("Rows Inserted") && !line.contains("INFO org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol: Job commit failed")).collect()
sparkSession.sparkContext.setLogLevel(logLevel)
val numRowsInserted = filteredLines.map(line => line.split(" ")(4).toInt).sum
println(s"Number of rows inserted: $numRowsInserted")
这段代码会从Spark应用程序日志中过滤出包含"INFO"和"Rows Inserted"的行,并将每行的插入行数提取出来。最后,将所有插入行数相加,得到每个语句插入的总行数。
请注意,这种方法假设日志中只包含INSERT OVERWRITE语句的信息,并且每个语句的日志行包含"INFO"和"Rows Inserted"。如果日志中还包含其他信息或者日志格式不一致,可能需要根据实际情况进行调整
原文地址: https://www.cveoy.top/t/topic/iDTV 著作权归作者所有。请勿转载和采集!