PySpark统计交警大队卡口建设占比:详细步骤及代码示例
首先需要安装并配置好 PySpark 环境,然后按照以下步骤编写程序:
- 导入必要的模块和函数
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, when, col
from pyspark.sql.window import Window
- 创建 SparkSession
spark = SparkSession.builder.appName('TrafficPolice').getOrCreate()
- 读取数据源
假设数据源为 CSV 格式,包含以下字段:交警大队名称、卡口名称、建设年份。
df = spark.read.csv('path/to/data.csv', header=True)
- 进行数据处理
首先需要对数据进行分组,计算每个交警大队在不同年份建设的卡口数量,然后再计算占比。
grouped = df.groupBy('交警大队名称', '建设年份') \
.agg(sum(when(col('卡口名称').isNotNull(), 1).otherwise(0)).alias('卡口数量'))
result = grouped.groupBy('交警大队名称') \
.pivot('建设年份') \
.agg(sum('卡口数量')) \
.na.fill(0) \
.withColumnRenamed('交警大队名称', '大队名称') \
.withColumn('总卡口数', sum(col(c) for c in grouped.columns if c != '交警大队名称')) \
.withColumn('占比', col('总卡口数') / sum(col('总卡口数')).over(Window.partitionBy()))
- 输出结果
result.show()
完整代码如下:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, when, col
from pyspark.sql.window import Window
spark = SparkSession.builder.appName('TrafficPolice').getOrCreate()
df = spark.read.csv('path/to/data.csv', header=True)
grouped = df.groupBy('交警大队名称', '建设年份') \
.agg(sum(when(col('卡口名称').isNotNull(), 1).otherwise(0)).alias('卡口数量'))
result = grouped.groupBy('交警大队名称') \
.pivot('建设年份') \
.agg(sum('卡口数量')) \
.na.fill(0) \
.withColumnRenamed('交警大队名称', '大队名称') \
.withColumn('总卡口数', sum(col(c) for c in grouped.columns if c != '交警大队名称')) \
.withColumn('占比', col('总卡口数') / sum(col('总卡口数')).over(Window.partitionBy()))
result.show()
注意:需要将'path/to/data.csv'替换成实际数据源的路径。
原文地址: https://www.cveoy.top/t/topic/nnqW 著作权归作者所有。请勿转载和采集!