Spark 大数据技术与应用:程序设计练习题 (附 Python 答案)

本练习题集涵盖了 Spark 大数据技术与应用的常见应用场景,例如文本文件处理、数据存储、数据分析等。每个题目都附带 Python 答案,帮助您更好地理解 Spark 的使用。

1. 编写一个 Spark 程序,读取一个文本文件,统计其中每个单词出现的次数,并按照出现次数从高到低排序输出。

from pyspark import SparkContext

sc = SparkContext('local', 'wordcount')

text_file = sc.textFile('input.txt')

word_counts = text_file.flatMap(lambda line: line.split()) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: x[1], False)

word_counts.saveAsTextFile('output')

2. 编写一个 Spark 程序,读取一个文本文件,统计其中每个单词出现的次数,并将结果保存到 MySQL 数据库中。

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

sc = SparkContext('local', 'wordcount')
spark = SparkSession(sc)

text_file = sc.textFile('input.txt')

word_counts = text_file.flatMap(lambda line: line.split()) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: x[1], False)

schema = StructType([
    StructField('word', StringType(), True),
    StructField('count', IntegerType(), True)
])

df = spark.createDataFrame(word_counts, schema)

df.write \
    .format('jdbc') \
    .option('url', 'jdbc:mysql://localhost:3306/test') \
    .option('driver', 'com.mysql.jdbc.Driver') \
    .option('dbtable', 'word_counts') \
    .option('user', 'root') \
    .option('password', 'password') \
    .mode('overwrite') \
    .save()

3. 编写一个 Spark 程序,读取一个文本文件,统计其中每个单词出现的次数,并将结果保存到 Redis 数据库中。

from pyspark import SparkContext
import redis

sc = SparkContext('local', 'wordcount')

text_file = sc.textFile('input.txt')

word_counts = text_file.flatMap(lambda line: line.split()) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: x[1], False) \
    .collect()

r = redis.Redis(host='localhost', port=6379, db=0)

for word, count in word_counts:
    r.set(word, count)

4. 编写一个 Spark 程序,读取一个文本文件,统计其中每个单词出现的次数,并将结果保存到 Kafka 消息队列中。

from pyspark import SparkContext
from kafka import KafkaProducer

sc = SparkContext('local', 'wordcount')

text_file = sc.textFile('input.txt')

word_counts = text_file.flatMap(lambda line: line.split()) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: x[1], False) \
    .collect()

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

for word, count in word_counts:
    producer.send('word_counts', key=word.encode('utf-8'), value=str(count).encode('utf-8'))

5. 编写一个 Spark 程序,读取一个文本文件,统计其中每个单词出现的次数,并将结果保存到 Elasticsearch 中。

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from elasticsearch import Elasticsearch

sc = SparkContext('local', 'wordcount')
spark = SparkSession(sc)

text_file = sc.textFile('input.txt')

word_counts = text_file.flatMap(lambda line: line.split()) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: x[1], False)

schema = StructType([
    StructField('word', StringType(), True),
    StructField('count', IntegerType(), True)
])

df = spark.createDataFrame(word_counts, schema)

es = Elasticsearch()

df.write \
    .format('org.elasticsearch.spark.sql') \
    .option('es.nodes', 'localhost') \
    .option('es.port', '9200') \
    .option('es.resource', 'word_counts') \
    .option('es.mapping.id', 'word') \
    .mode('overwrite') \
    .save()

6. 编写一个 Spark 程序,读取一个文本文件,统计其中每个单词出现的次数,并将结果保存到 Hadoop HDFS 中。

from pyspark import SparkContext

sc = SparkContext('local', 'wordcount')

text_file = sc.textFile('input.txt')

word_counts = text_file.flatMap(lambda line: line.split()) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: x[1], False)

word_counts.saveAsTextFile('hdfs://localhost:9000/word_counts')

7. 编写一个 Spark 程序,读取一个文本文件,统计其中每个单词出现的次数,并将结果保存到 AWS S3 中。

from pyspark import SparkContext

sc = SparkContext('local', 'wordcount')

text_file = sc.textFile('s3a://bucket/input.txt')

word_counts = text_file.flatMap(lambda line: line.split()) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: x[1], False)

word_counts.saveAsTextFile('s3a://bucket/word_counts')

8. 编写一个 Spark 程序,读取一个文本文件,统计其中每个单词出现的次数,并将结果保存到 Azure Blob Storage 中。

from pyspark import SparkContext

sc = SparkContext('local', 'wordcount')

text_file = sc.textFile('wasbs://input@container.blob.core.windows.net')

word_counts = text_file.flatMap(lambda line: line.split()) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: x[1], False)

word_counts.saveAsTextFile('wasbs://word_counts@container.blob.core.windows.net')

9. 编写一个 Spark 程序,读取一个文本文件,统计其中每个单词出现的次数,并将结果保存到 Google Cloud Storage 中。

from pyspark import SparkContext

sc = SparkContext('local', 'wordcount')

text_file = sc.textFile('gs://bucket/input.txt')

word_counts = text_file.flatMap(lambda line: line.split()) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: x[1], False)

word_counts.saveAsTextFile('gs://bucket/word_counts')

10. 编写一个 Spark 程序,读取一个文本文件,统计其中每个单词出现的次数,并将结果保存到 MongoDB 中。

from pyspark import SparkContext
from pymongo import MongoClient

sc = SparkContext('local', 'wordcount')

text_file = sc.textFile('input.txt')

word_counts = text_file.flatMap(lambda line: line.split()) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: x[1], False) \
    .collect()

client = MongoClient('localhost', 27017)
db = client['test']
collection = db['word_counts']

for word, count in word_counts:
    collection.insert_one({'word': word, 'count': count})

注意:

  • 以上代码示例中,'input.txt' 代表待处理的文本文件路径。
  • 实际使用时,请根据您的实际环境修改代码中的配置参数,例如数据库连接信息、文件路径等。
  • 以上代码示例仅供参考,您可以根据自己的需求进行修改。
Spark 大数据技术与应用:程序设计练习题 (附 Python 答案)

原文地址: https://www.cveoy.top/t/topic/jnsQ 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录