PySpark Pandas UDF 和 MapInPandas 函数示例

该示例演示了如何在 PySpark 中使用 pandas_udfmapInPandas 函数来执行 Python 原生函数和 pandas 函数。

首先,我们创建一个包含不同类型数据的 DataFrame:

from datetime import datetime, date
import pandas as pd
from pyspark.shell import spark
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df.show()

使用 pandas_udf

定义一个 pandas_udf 函数,该函数接受一个 pandas Series,并返回一个 pandas Series,其中每个元素都加 1:

from pyspark.sql.functions import pandas_udf

@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # 使用 pandas Series 加 1
    return series + 1

df.select(pandas_plus_one(df.a)).show()

使用 mapInPandas

定义一个函数,接受一个迭代器,并返回一个 DataFrame,其中仅包含 a 列等于 1 的行:

def pandas_filter_func(iterator):
    for pandas_df in iterator:
        yield pandas_df[pandas_df.a == 1]

df.mapInPandas(pandas_filter_func, schema=df.schema).show()

使用 applyInPandas 对分组数据进行计算

创建一个新的 DataFrame,并使用 groupby 函数按颜色分组。计算每个组的平均值,并使用 applyInPandas 对分组数据进行计算:

df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()
df.groupby('color').avg().show()

# 定义一个函数,将 `v1` 列减去该列的平均值
def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())

df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()

总结

上述代码展示了如何在 PySpark 中使用 pandas_udfmapInPandasapplyInPandas 函数来执行 Python 原生函数和 pandas 函数。这些函数可以帮助我们更方便地处理和分析数据。

PySpark Pandas UDF 和 MapInPandas 函数示例

原文地址: https://www.cveoy.top/t/topic/kUZ8 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录