首先需要安装并配置好 PySpark 环境,然后按照以下步骤编写程序:

  1. 导入必要的模块和函数
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, when, col
from pyspark.sql.window import Window
  1. 创建 SparkSession
spark = SparkSession.builder.appName('TrafficPolice').getOrCreate()
  1. 读取数据源

假设数据源为 CSV 格式,包含以下字段:交警大队名称、卡口名称、建设年份。

df = spark.read.csv('path/to/data.csv', header=True)
  1. 进行数据处理

首先需要对数据进行分组,计算每个交警大队在不同年份建设的卡口数量,然后再计算占比。

grouped = df.groupBy('交警大队名称', '建设年份') \
           .agg(sum(when(col('卡口名称').isNotNull(), 1).otherwise(0)).alias('卡口数量'))

result = grouped.groupBy('交警大队名称') \
                .pivot('建设年份') \
                .agg(sum('卡口数量')) \
                .na.fill(0) \
                .withColumnRenamed('交警大队名称', '大队名称') \
                .withColumn('总卡口数', sum(col(c) for c in grouped.columns if c != '交警大队名称')) \
                .withColumn('占比', col('总卡口数') / sum(col('总卡口数')).over(Window.partitionBy()))
  1. 输出结果
result.show()

完整代码如下:

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, when, col
from pyspark.sql.window import Window

spark = SparkSession.builder.appName('TrafficPolice').getOrCreate()

df = spark.read.csv('path/to/data.csv', header=True)

grouped = df.groupBy('交警大队名称', '建设年份') \
           .agg(sum(when(col('卡口名称').isNotNull(), 1).otherwise(0)).alias('卡口数量'))

result = grouped.groupBy('交警大队名称') \
                .pivot('建设年份') \
                .agg(sum('卡口数量')) \
                .na.fill(0) \
                .withColumnRenamed('交警大队名称', '大队名称') \
                .withColumn('总卡口数', sum(col(c) for c in grouped.columns if c != '交警大队名称')) \
                .withColumn('占比', col('总卡口数') / sum(col('总卡口数')).over(Window.partitionBy()))

result.show()

注意:需要将'path/to/data.csv'替换成实际数据源的路径。


原文地址: https://www.cveoy.top/t/topic/nnqW 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录