PySpark 数据处理:使用 filter 和 map 筛选特定数据

本教程将演示如何使用 PySpark 读取文本文件,并筛选包含特定字符串 'DataBase' 的数据行。我们将分别使用 SparkContext 和 SparkSession 两种方式实现。

方法一:使用 SparkContextpythonfrom pyspark import SparkContext

sc = SparkContext.getOrCreate()lines = sc.textFile('data01.txt')res = lines.map(lambda x: x.split(',')).filter(lambda x: x[1] == 'DataBase')res.count()

这段代码首先创建了一个 SparkContext 对象,然后使用 textFile 方法读取名为 'data01.txt' 的文本文件。接下来,使用 map 函数将每一行数据按照逗号分割成列表,并使用 filter 函数筛选出第二个元素等于 'DataBase' 的数据行。最后,使用 count 方法统计筛选结果的行数。

方法二:使用 SparkSessionpythonfrom pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()lines = spark.read.text('data01.txt').rdd.map(lambda x: x[0])res = lines.map(lambda x: x.split(',')).filter(lambda x: x[1] == 'DataBase')res.count()

这段代码首先创建了一个 SparkSession 对象,然后使用 read.text 方法读取名为 'data01.txt' 的文本文件。由于 read.text 返回的是 DataFrame 对象,我们需要使用 rdd 属性将其转换为 RDD 对象,并使用 map(lambda x: x[0]) 获取每一行的文本内容。接下来的步骤与方法一相同,都是使用 mapfilter 函数筛选数据,并使用 count 方法统计结果。

两种方法都可以实现相同的功能,选择哪种方法取决于实际应用场景和个人偏好。SparkSession 是 Spark 2.0 后引入的更高级的 API,推荐在新项目中优先使用。

PySpark 数据处理:使用 filter 和 map 筛选特定数据

原文地址: https://www.cveoy.top/t/topic/kK1 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录