1. 编写一个Spark程序,读取一个文本文件,并计算其中每个单词出现的次数。输出结果按照单词出现次数从高到低排序。

答案:

val textFile = sc.textFile("input.txt")
val wordCounts = textFile.flatMap(line => line.split(" "))
    .map(word => (word, 1))
    .reduceByKey((a, b) => a + b)
    .sortBy(_._2, false)
wordCounts.collect().foreach(println)
  1. 编写一个Spark程序,读取一个CSV文件,计算每个城市的平均气温,并输出结果。

答案:

case class Weather(city: String, temperature: Double)

val weatherData = sc.textFile("weather.csv")
    .map(line => line.split(","))
    .map(fields => Weather(fields(0), fields(1).toDouble))

val cityTemperatures = weatherData.map(weather => (weather.city, (weather.temperature, 1)))
    .reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
    .mapValues(sumCount => sumCount._1 / sumCount._2)

cityTemperatures.collect().foreach(println)
  1. 编写一个Spark程序,读取一个JSON文件,计算每个用户的平均购物金额,并输出结果。

答案:

case class Purchase(user: String, amount: Double)

val purchaseData = spark.read.json("purchases.json")
    .as[Purchase]

val userPurchases = purchaseData.groupBy("user")
    .agg(avg("amount").as("avgPurchase"))

userPurchases.show()
  1. 编写一个Spark程序,读取一个Parquet文件,计算每个用户的平均购物金额,并输出结果。

答案:

case class Purchase(user: String, amount: Double)

val purchaseData = spark.read.parquet("purchases.parquet")
    .as[Purchase]

val userPurchases = purchaseData.groupBy("user")
    .agg(avg("amount").as("avgPurchase"))

userPurchases.show()
  1. 编写一个Spark程序,读取一个Avro文件,计算每个用户的平均购物金额,并输出结果。

答案:

case class Purchase(user: String, amount: Double)

val purchaseData = spark.read.format("avro")
    .load("purchases.avro")
    .as[Purchase]

val userPurchases = purchaseData.groupBy("user")
    .agg(avg("amount").as("avgPurchase"))

userPurchases.show()
  1. 编写一个Spark程序,读取一个Hive表,计算每个用户的平均购物金额,并输出结果。

答案:

case class Purchase(user: String, amount: Double)

spark.sql("USE mydatabase")
val purchaseData = spark.sql("SELECT * FROM purchases")
    .as[Purchase]

val userPurchases = purchaseData.groupBy("user")
    .agg(avg("amount").as("avgPurchase"))

userPurchases.show()
  1. 编写一个Spark程序,读取一个Kafka主题,计算其中每个单词出现的次数,并输出结果。

答案:

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "test-group",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("test-topic")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

val wordCounts = stream.flatMap(record => record.value().split(" "))
    .map(word => (word, 1))
    .reduceByKey((a, b) => a + b)
wordCounts.print()
  1. 编写一个Spark程序,读取一个HBase表,计算其中每个单词出现的次数,并输出结果。

答案:

val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "localhost")
conf.set("hbase.zookeeper.property.clientPort", "2181")

val tableName = TableName.valueOf("test-table")
val table = new HTable(conf, tableName)

val scan = new Scan()
val scanner = table.getScanner(scan)

val wordCounts = scanner.iterator().asScala
    .flatMap(result => result.listCells().asScala)
    .map(cell => Bytes.toString(CellUtil.cloneValue(cell)))
    .flatMap(line => line.split(" "))
    .map(word => (word, 1))
    .reduceByKey((a, b) => a + b)

wordCounts.collect().foreach(println)
  1. 编写一个Spark程序,读取一个MongoDB集合,计算其中每个单词出现的次数,并输出结果。

答案:

val mongoConfig = Map(
  "uri" -> "mongodb://localhost:27017/",
  "database" -> "test-database",
  "collection" -> "test-collection"
)

val mongoRDD = MongoSpark.load(spark, ReadConfig(mongoConfig))

val wordCounts = mongoRDD.flatMap(doc => doc.getString("text").split(" "))
    .map(word => (word, 1))
    .reduceByKey((a, b) => a + b)

wordCounts.collect().foreach(println)
  1. 编写一个Spark程序,读取一个Cassandra表,计算其中每个单词出现的次数,并输出结果。

答案:

val cassandraConfig = Map(
  "spark.cassandra.connection.host" -> "localhost",
  "spark.cassandra.auth.username" -> "cassandra",
  "spark.cassandra.auth.password" -> "cassandra"
)

val cassandraRDD = spark.sparkContext.cassandraTable("test_keyspace", "test_table", cassandraConfig)

val wordCounts = cassandraRDD.flatMap(row => row.getString("text").split(" "))
    .map(word => (word, 1))
    .reduceByKey((a, b) => a + b)

wordCounts.collect().foreach(println)
``
生成10道关于Spark大数据技术与应用这么课程相关的程序设计题并输入答案

原文地址: https://www.cveoy.top/t/topic/fpv1 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录