Spark大数据技术与应用程序设计题:10道练习题及答案

以下提供10道关于Spark大数据技术与应用的程序设计练习题,涵盖读取不同数据源、计算单词出现次数、统计平均值等常见操作,并附带完整代码示例。

1. 编写一个Spark程序,读取一个文本文件,并计算其中每个单词出现的次数。输出结果按照单词出现次数从高到低排序。

答案:

val textFile = sc.textFile('input.txt')
val wordCounts = textFile.flatMap(line => line.split(' '))  
    .map(word => (word, 1))  
    .reduceByKey((a, b) => a + b)  
    .sortBy(_._2, false)
wordCounts.collect().foreach(println)

2. 编写一个Spark程序,读取一个CSV文件,计算每个城市的平均气温,并输出结果。

答案:

case class Weather(city: String, temperature: Double)

val weatherData = sc.textFile('weather.csv')  
    .map(line => line.split(','))  
    .map(fields => Weather(fields(0), fields(1).toDouble))

val cityTemperatures = weatherData.map(weather => (weather.city, (weather.temperature, 1)))  
    .reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))  
    .mapValues(sumCount => sumCount._1 / sumCount._2)

cityTemperatures.collect().foreach(println)

3. 编写一个Spark程序,读取一个JSON文件,计算每个用户的平均购物金额,并输出结果。

答案:

case class Purchase(user: String, amount: Double)

val purchaseData = spark.read.json('purchases.json')
    .as[Purchase]

val userPurchases = purchaseData.groupBy('user')
    .agg(avg('amount').as('avgPurchase'))

userPurchases.show()

4. 编写一个Spark程序,读取一个Parquet文件,计算每个用户的平均购物金额,并输出结果。

答案:

case class Purchase(user: String, amount: Double)

val purchaseData = spark.read.parquet('purchases.parquet')
    .as[Purchase]

val userPurchases = purchaseData.groupBy('user')
    .agg(avg('amount').as('avgPurchase'))

userPurchases.show()

5. 编写一个Spark程序,读取一个Avro文件,计算每个用户的平均购物金额,并输出结果。

答案:

case class Purchase(user: String, amount: Double)

val purchaseData = spark.read.format('avro')
    .load('purchases.avro')
    .as[Purchase]

val userPurchases = purchaseData.groupBy('user')
    .agg(avg('amount').as('avgPurchase'))

userPurchases.show()

6. 编写一个Spark程序,读取一个Hive表,计算每个用户的平均购物金额,并输出结果。

答案:

case class Purchase(user: String, amount: Double)

spark.sql('USE mydatabase')
val purchaseData = spark.sql('SELECT * FROM purchases')
    .as[Purchase]

val userPurchases = purchaseData.groupBy('user')
    .agg(avg('amount').as('avgPurchase'))

userPurchases.show()

7. 编写一个Spark程序,读取一个Kafka主题,计算其中每个单词出现的次数,并输出结果。

答案:

val kafkaParams = Map[String, Object](
  'bootstrap.servers' -> 'localhost:9092',
  'key.deserializer' -> classOf[StringDeserializer],
  'value.deserializer' -> classOf[StringDeserializer],
  'group.id' -> 'test-group',
  'auto.offset.reset' -> 'latest',
  'enable.auto.commit' -> (false: java.lang.Boolean)
)

val topics = Array('test-topic')
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

val wordCounts = stream.flatMap(record => record.value().split(' '))  
    .map(word => (word, 1))  
    .reduceByKey((a, b) => a + b)
wordCounts.print()

8. 编写一个Spark程序,读取一个HBase表,计算其中每个单词出现的次数,并输出结果。

答案:

val conf = HBaseConfiguration.create()
conf.set('hbase.zookeeper.quorum', 'localhost')
conf.set('hbase.zookeeper.property.clientPort', '2181')

val tableName = TableName.valueOf('test-table')
val table = new HTable(conf, tableName)

val scan = new Scan()
val scanner = table.getScanner(scan)

val wordCounts = scanner.iterator().asScala
    .flatMap(result => result.listCells().asScala)
    .map(cell => Bytes.toString(CellUtil.cloneValue(cell)))
    .flatMap(line => line.split(' '))  
    .map(word => (word, 1))  
    .reduceByKey((a, b) => a + b)

wordCounts.collect().foreach(println)

9. 编写一个Spark程序,读取一个MongoDB集合,计算其中每个单词出现的次数,并输出结果。

答案:

val mongoConfig = Map(
  'uri' -> 'mongodb://localhost:27017/',
  'database' -> 'test-database',
  'collection' -> 'test-collection'
)

val mongoRDD = MongoSpark.load(spark, ReadConfig(mongoConfig))

val wordCounts = mongoRDD.flatMap(doc => doc.getString('text').split(' '))  
    .map(word => (word, 1))  
    .reduceByKey((a, b) => a + b)

wordCounts.collect().foreach(println)

10. 编写一个Spark程序,读取一个Cassandra表,计算其中每个单词出现的次数,并输出结果。

答案:

val cassandraConfig = Map(
  'spark.cassandra.connection.host' -> 'localhost',
  'spark.cassandra.auth.username' -> 'cassandra',
  'spark.cassandra.auth.password' -> 'cassandra'
)

val cassandraRDD = spark.sparkContext.cassandraTable('test_keyspace', 'test_table', cassandraConfig)

val wordCounts = cassandraRDD.flatMap(row => row.getString('text').split(' '))  
    .map(word => (word, 1))  
    .reduceByKey((a, b) => a + b)

wordCounts.collect().foreach(println)

以上示例代码仅供参考,实际应用中可能需要根据具体场景进行调整。

希望以上练习题和答案能够帮助您更好地理解Spark大数据技术与应用。


原文地址: https://www.cveoy.top/t/topic/jnsr 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录