PySpark Pandas UDF 和 MapInPandas 函数示例
PySpark Pandas UDF 和 MapInPandas 函数示例
该示例演示了如何在 PySpark 中使用 pandas_udf 和 mapInPandas 函数来执行 Python 原生函数和 pandas 函数。
首先,我们创建一个包含不同类型数据的 DataFrame:
from datetime import datetime, date
import pandas as pd
from pyspark.shell import spark
from pyspark.sql import Row
df = spark.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df.show()
使用 pandas_udf
定义一个 pandas_udf 函数,该函数接受一个 pandas Series,并返回一个 pandas Series,其中每个元素都加 1:
from pyspark.sql.functions import pandas_udf
@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
# 使用 pandas Series 加 1
return series + 1
df.select(pandas_plus_one(df.a)).show()
使用 mapInPandas
定义一个函数,接受一个迭代器,并返回一个 DataFrame,其中仅包含 a 列等于 1 的行:
def pandas_filter_func(iterator):
for pandas_df in iterator:
yield pandas_df[pandas_df.a == 1]
df.mapInPandas(pandas_filter_func, schema=df.schema).show()
使用 applyInPandas 对分组数据进行计算
创建一个新的 DataFrame,并使用 groupby 函数按颜色分组。计算每个组的平均值,并使用 applyInPandas 对分组数据进行计算:
df = spark.createDataFrame([
['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()
df.groupby('color').avg().show()
# 定义一个函数,将 `v1` 列减去该列的平均值
def plus_mean(pandas_df):
return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())
df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()
总结
上述代码展示了如何在 PySpark 中使用 pandas_udf、mapInPandas 和 applyInPandas 函数来执行 Python 原生函数和 pandas 函数。这些函数可以帮助我们更方便地处理和分析数据。
原文地址: https://www.cveoy.top/t/topic/kUZ8 著作权归作者所有。请勿转载和采集!