Spark 大数据技术与应用:程序设计练习题 (附 Python 答案)
Spark 大数据技术与应用:程序设计练习题 (附 Python 答案)
本练习题集涵盖了 Spark 大数据技术与应用的常见应用场景,例如文本文件处理、数据存储、数据分析等。每个题目都附带 Python 答案,帮助您更好地理解 Spark 的使用。
1. 编写一个 Spark 程序,读取一个文本文件,统计其中每个单词出现的次数,并按照出现次数从高到低排序输出。
from pyspark import SparkContext
sc = SparkContext('local', 'wordcount')
text_file = sc.textFile('input.txt')
word_counts = text_file.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b) \
.sortBy(lambda x: x[1], False)
word_counts.saveAsTextFile('output')
2. 编写一个 Spark 程序,读取一个文本文件,统计其中每个单词出现的次数,并将结果保存到 MySQL 数据库中。
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
sc = SparkContext('local', 'wordcount')
spark = SparkSession(sc)
text_file = sc.textFile('input.txt')
word_counts = text_file.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b) \
.sortBy(lambda x: x[1], False)
schema = StructType([
StructField('word', StringType(), True),
StructField('count', IntegerType(), True)
])
df = spark.createDataFrame(word_counts, schema)
df.write \
.format('jdbc') \
.option('url', 'jdbc:mysql://localhost:3306/test') \
.option('driver', 'com.mysql.jdbc.Driver') \
.option('dbtable', 'word_counts') \
.option('user', 'root') \
.option('password', 'password') \
.mode('overwrite') \
.save()
3. 编写一个 Spark 程序,读取一个文本文件,统计其中每个单词出现的次数,并将结果保存到 Redis 数据库中。
from pyspark import SparkContext
import redis
sc = SparkContext('local', 'wordcount')
text_file = sc.textFile('input.txt')
word_counts = text_file.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b) \
.sortBy(lambda x: x[1], False) \
.collect()
r = redis.Redis(host='localhost', port=6379, db=0)
for word, count in word_counts:
r.set(word, count)
4. 编写一个 Spark 程序,读取一个文本文件,统计其中每个单词出现的次数,并将结果保存到 Kafka 消息队列中。
from pyspark import SparkContext
from kafka import KafkaProducer
sc = SparkContext('local', 'wordcount')
text_file = sc.textFile('input.txt')
word_counts = text_file.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b) \
.sortBy(lambda x: x[1], False) \
.collect()
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
for word, count in word_counts:
producer.send('word_counts', key=word.encode('utf-8'), value=str(count).encode('utf-8'))
5. 编写一个 Spark 程序,读取一个文本文件,统计其中每个单词出现的次数,并将结果保存到 Elasticsearch 中。
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from elasticsearch import Elasticsearch
sc = SparkContext('local', 'wordcount')
spark = SparkSession(sc)
text_file = sc.textFile('input.txt')
word_counts = text_file.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b) \
.sortBy(lambda x: x[1], False)
schema = StructType([
StructField('word', StringType(), True),
StructField('count', IntegerType(), True)
])
df = spark.createDataFrame(word_counts, schema)
es = Elasticsearch()
df.write \
.format('org.elasticsearch.spark.sql') \
.option('es.nodes', 'localhost') \
.option('es.port', '9200') \
.option('es.resource', 'word_counts') \
.option('es.mapping.id', 'word') \
.mode('overwrite') \
.save()
6. 编写一个 Spark 程序,读取一个文本文件,统计其中每个单词出现的次数,并将结果保存到 Hadoop HDFS 中。
from pyspark import SparkContext
sc = SparkContext('local', 'wordcount')
text_file = sc.textFile('input.txt')
word_counts = text_file.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b) \
.sortBy(lambda x: x[1], False)
word_counts.saveAsTextFile('hdfs://localhost:9000/word_counts')
7. 编写一个 Spark 程序,读取一个文本文件,统计其中每个单词出现的次数,并将结果保存到 AWS S3 中。
from pyspark import SparkContext
sc = SparkContext('local', 'wordcount')
text_file = sc.textFile('s3a://bucket/input.txt')
word_counts = text_file.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b) \
.sortBy(lambda x: x[1], False)
word_counts.saveAsTextFile('s3a://bucket/word_counts')
8. 编写一个 Spark 程序,读取一个文本文件,统计其中每个单词出现的次数,并将结果保存到 Azure Blob Storage 中。
from pyspark import SparkContext
sc = SparkContext('local', 'wordcount')
text_file = sc.textFile('wasbs://input@container.blob.core.windows.net')
word_counts = text_file.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b) \
.sortBy(lambda x: x[1], False)
word_counts.saveAsTextFile('wasbs://word_counts@container.blob.core.windows.net')
9. 编写一个 Spark 程序,读取一个文本文件,统计其中每个单词出现的次数,并将结果保存到 Google Cloud Storage 中。
from pyspark import SparkContext
sc = SparkContext('local', 'wordcount')
text_file = sc.textFile('gs://bucket/input.txt')
word_counts = text_file.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b) \
.sortBy(lambda x: x[1], False)
word_counts.saveAsTextFile('gs://bucket/word_counts')
10. 编写一个 Spark 程序,读取一个文本文件,统计其中每个单词出现的次数,并将结果保存到 MongoDB 中。
from pyspark import SparkContext
from pymongo import MongoClient
sc = SparkContext('local', 'wordcount')
text_file = sc.textFile('input.txt')
word_counts = text_file.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b) \
.sortBy(lambda x: x[1], False) \
.collect()
client = MongoClient('localhost', 27017)
db = client['test']
collection = db['word_counts']
for word, count in word_counts:
collection.insert_one({'word': word, 'count': count})
注意:
- 以上代码示例中,'input.txt' 代表待处理的文本文件路径。
- 实际使用时,请根据您的实际环境修改代码中的配置参数,例如数据库连接信息、文件路径等。
- 以上代码示例仅供参考,您可以根据自己的需求进行修改。
原文地址: https://www.cveoy.top/t/topic/jnsQ 著作权归作者所有。请勿转载和采集!