用idea从hdfs下载成绩表对每个同学去除最高分去除最低分求总分和平均分并把处理结果上传到hdfs数据例如:10001 22 42 60 32 7710002 35 70 65 31 90处理结果:10001 134 4510002 170 57
代码如下:
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
public class ScoreProcess {
public static class ScoreMapper extends Mapper<LongWritable, Text, Text, Text> {
private Text id = new Text();
private Text scoreList = new Text();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
String studentId = tokenizer.nextToken();
id.set(studentId);
List<Integer> scores = new ArrayList<Integer>();
while (tokenizer.hasMoreTokens()) {
scores.add(Integer.parseInt(tokenizer.nextToken()));
}
// 去掉最高分和最低分
scores.remove(Collections.min(scores));
scores.remove(Collections.max(scores));
// 计算总分和平均分
int totalScore = 0;
for (int score : scores) {
totalScore += score;
}
double averageScore = (double) totalScore / scores.size();
scoreList.set(totalScore + " " + String.format("%.2f", averageScore));
context.write(id, scoreList);
}
}
public static class ScoreReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
context.write(key, value);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Path inputPath = new Path(args[0]);
Path outputPath = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
org.apache.hadoop.mapreduce.Job job = org.apache.hadoop.mapreduce.Job.getInstance(conf);
job.setJarByClass(ScoreProcess.class);
job.setMapperClass(ScoreMapper.class);
job.setReducerClass(ScoreReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, inputPath);
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, outputPath);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
在hdfs中执行以下命令:
hadoop jar ScoreProcess.jar ScoreProcess /input/scores.txt /output
其中,/input/scores.txt是原始成绩表在hdfs中的路径,/output是处理结果要输出到的hdfs路径。
处理结果将会输出到/output/part-r-00000文件中,可以使用以下命令查看:
hadoop fs -cat /output/part-r-00000
``
原文地址: https://www.cveoy.top/t/topic/hbSG 著作权归作者所有。请勿转载和采集!