Spark大数据技术与应用程序设计题:10道练习题及答案
Spark大数据技术与应用程序设计题:10道练习题及答案
以下提供10道关于Spark大数据技术与应用的程序设计练习题,涵盖读取不同数据源、计算单词出现次数、统计平均值等常见操作,并附带完整代码示例。
1. 编写一个Spark程序,读取一个文本文件,并计算其中每个单词出现的次数。输出结果按照单词出现次数从高到低排序。
答案:
val textFile = sc.textFile('input.txt')
val wordCounts = textFile.flatMap(line => line.split(' '))
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)
.sortBy(_._2, false)
wordCounts.collect().foreach(println)
2. 编写一个Spark程序,读取一个CSV文件,计算每个城市的平均气温,并输出结果。
答案:
case class Weather(city: String, temperature: Double)
val weatherData = sc.textFile('weather.csv')
.map(line => line.split(','))
.map(fields => Weather(fields(0), fields(1).toDouble))
val cityTemperatures = weatherData.map(weather => (weather.city, (weather.temperature, 1)))
.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
.mapValues(sumCount => sumCount._1 / sumCount._2)
cityTemperatures.collect().foreach(println)
3. 编写一个Spark程序,读取一个JSON文件,计算每个用户的平均购物金额,并输出结果。
答案:
case class Purchase(user: String, amount: Double)
val purchaseData = spark.read.json('purchases.json')
.as[Purchase]
val userPurchases = purchaseData.groupBy('user')
.agg(avg('amount').as('avgPurchase'))
userPurchases.show()
4. 编写一个Spark程序,读取一个Parquet文件,计算每个用户的平均购物金额,并输出结果。
答案:
case class Purchase(user: String, amount: Double)
val purchaseData = spark.read.parquet('purchases.parquet')
.as[Purchase]
val userPurchases = purchaseData.groupBy('user')
.agg(avg('amount').as('avgPurchase'))
userPurchases.show()
5. 编写一个Spark程序,读取一个Avro文件,计算每个用户的平均购物金额,并输出结果。
答案:
case class Purchase(user: String, amount: Double)
val purchaseData = spark.read.format('avro')
.load('purchases.avro')
.as[Purchase]
val userPurchases = purchaseData.groupBy('user')
.agg(avg('amount').as('avgPurchase'))
userPurchases.show()
6. 编写一个Spark程序,读取一个Hive表,计算每个用户的平均购物金额,并输出结果。
答案:
case class Purchase(user: String, amount: Double)
spark.sql('USE mydatabase')
val purchaseData = spark.sql('SELECT * FROM purchases')
.as[Purchase]
val userPurchases = purchaseData.groupBy('user')
.agg(avg('amount').as('avgPurchase'))
userPurchases.show()
7. 编写一个Spark程序,读取一个Kafka主题,计算其中每个单词出现的次数,并输出结果。
答案:
val kafkaParams = Map[String, Object](
'bootstrap.servers' -> 'localhost:9092',
'key.deserializer' -> classOf[StringDeserializer],
'value.deserializer' -> classOf[StringDeserializer],
'group.id' -> 'test-group',
'auto.offset.reset' -> 'latest',
'enable.auto.commit' -> (false: java.lang.Boolean)
)
val topics = Array('test-topic')
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val wordCounts = stream.flatMap(record => record.value().split(' '))
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)
wordCounts.print()
8. 编写一个Spark程序,读取一个HBase表,计算其中每个单词出现的次数,并输出结果。
答案:
val conf = HBaseConfiguration.create()
conf.set('hbase.zookeeper.quorum', 'localhost')
conf.set('hbase.zookeeper.property.clientPort', '2181')
val tableName = TableName.valueOf('test-table')
val table = new HTable(conf, tableName)
val scan = new Scan()
val scanner = table.getScanner(scan)
val wordCounts = scanner.iterator().asScala
.flatMap(result => result.listCells().asScala)
.map(cell => Bytes.toString(CellUtil.cloneValue(cell)))
.flatMap(line => line.split(' '))
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)
wordCounts.collect().foreach(println)
9. 编写一个Spark程序,读取一个MongoDB集合,计算其中每个单词出现的次数,并输出结果。
答案:
val mongoConfig = Map(
'uri' -> 'mongodb://localhost:27017/',
'database' -> 'test-database',
'collection' -> 'test-collection'
)
val mongoRDD = MongoSpark.load(spark, ReadConfig(mongoConfig))
val wordCounts = mongoRDD.flatMap(doc => doc.getString('text').split(' '))
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)
wordCounts.collect().foreach(println)
10. 编写一个Spark程序,读取一个Cassandra表,计算其中每个单词出现的次数,并输出结果。
答案:
val cassandraConfig = Map(
'spark.cassandra.connection.host' -> 'localhost',
'spark.cassandra.auth.username' -> 'cassandra',
'spark.cassandra.auth.password' -> 'cassandra'
)
val cassandraRDD = spark.sparkContext.cassandraTable('test_keyspace', 'test_table', cassandraConfig)
val wordCounts = cassandraRDD.flatMap(row => row.getString('text').split(' '))
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)
wordCounts.collect().foreach(println)
以上示例代码仅供参考,实际应用中可能需要根据具体场景进行调整。
希望以上练习题和答案能够帮助您更好地理解Spark大数据技术与应用。
原文地址: https://www.cveoy.top/t/topic/jnsr 著作权归作者所有。请勿转载和采集!