生成10道关于Spark大数据技术与应用这么课程相关的程序设计题并输入答案
- 编写一个Spark程序,读取一个文本文件,并计算其中每个单词出现的次数。输出结果按照单词出现次数从高到低排序。
答案:
val textFile = sc.textFile("input.txt")
val wordCounts = textFile.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)
.sortBy(_._2, false)
wordCounts.collect().foreach(println)
- 编写一个Spark程序,读取一个CSV文件,计算每个城市的平均气温,并输出结果。
答案:
case class Weather(city: String, temperature: Double)
val weatherData = sc.textFile("weather.csv")
.map(line => line.split(","))
.map(fields => Weather(fields(0), fields(1).toDouble))
val cityTemperatures = weatherData.map(weather => (weather.city, (weather.temperature, 1)))
.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
.mapValues(sumCount => sumCount._1 / sumCount._2)
cityTemperatures.collect().foreach(println)
- 编写一个Spark程序,读取一个JSON文件,计算每个用户的平均购物金额,并输出结果。
答案:
case class Purchase(user: String, amount: Double)
val purchaseData = spark.read.json("purchases.json")
.as[Purchase]
val userPurchases = purchaseData.groupBy("user")
.agg(avg("amount").as("avgPurchase"))
userPurchases.show()
- 编写一个Spark程序,读取一个Parquet文件,计算每个用户的平均购物金额,并输出结果。
答案:
case class Purchase(user: String, amount: Double)
val purchaseData = spark.read.parquet("purchases.parquet")
.as[Purchase]
val userPurchases = purchaseData.groupBy("user")
.agg(avg("amount").as("avgPurchase"))
userPurchases.show()
- 编写一个Spark程序,读取一个Avro文件,计算每个用户的平均购物金额,并输出结果。
答案:
case class Purchase(user: String, amount: Double)
val purchaseData = spark.read.format("avro")
.load("purchases.avro")
.as[Purchase]
val userPurchases = purchaseData.groupBy("user")
.agg(avg("amount").as("avgPurchase"))
userPurchases.show()
- 编写一个Spark程序,读取一个Hive表,计算每个用户的平均购物金额,并输出结果。
答案:
case class Purchase(user: String, amount: Double)
spark.sql("USE mydatabase")
val purchaseData = spark.sql("SELECT * FROM purchases")
.as[Purchase]
val userPurchases = purchaseData.groupBy("user")
.agg(avg("amount").as("avgPurchase"))
userPurchases.show()
- 编写一个Spark程序,读取一个Kafka主题,计算其中每个单词出现的次数,并输出结果。
答案:
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "test-group",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("test-topic")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val wordCounts = stream.flatMap(record => record.value().split(" "))
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)
wordCounts.print()
- 编写一个Spark程序,读取一个HBase表,计算其中每个单词出现的次数,并输出结果。
答案:
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "localhost")
conf.set("hbase.zookeeper.property.clientPort", "2181")
val tableName = TableName.valueOf("test-table")
val table = new HTable(conf, tableName)
val scan = new Scan()
val scanner = table.getScanner(scan)
val wordCounts = scanner.iterator().asScala
.flatMap(result => result.listCells().asScala)
.map(cell => Bytes.toString(CellUtil.cloneValue(cell)))
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)
wordCounts.collect().foreach(println)
- 编写一个Spark程序,读取一个MongoDB集合,计算其中每个单词出现的次数,并输出结果。
答案:
val mongoConfig = Map(
"uri" -> "mongodb://localhost:27017/",
"database" -> "test-database",
"collection" -> "test-collection"
)
val mongoRDD = MongoSpark.load(spark, ReadConfig(mongoConfig))
val wordCounts = mongoRDD.flatMap(doc => doc.getString("text").split(" "))
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)
wordCounts.collect().foreach(println)
- 编写一个Spark程序,读取一个Cassandra表,计算其中每个单词出现的次数,并输出结果。
答案:
val cassandraConfig = Map(
"spark.cassandra.connection.host" -> "localhost",
"spark.cassandra.auth.username" -> "cassandra",
"spark.cassandra.auth.password" -> "cassandra"
)
val cassandraRDD = spark.sparkContext.cassandraTable("test_keyspace", "test_table", cassandraConfig)
val wordCounts = cassandraRDD.flatMap(row => row.getString("text").split(" "))
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)
wordCounts.collect().foreach(println)
``
原文地址: https://www.cveoy.top/t/topic/fpv1 著作权归作者所有。请勿转载和采集!