使用 MapReduce 处理学生信息和成绩数据并输出到单独文件
以下是一个可能的实现方案,你可以根据自己的需求进行调整和优化。
- 自定义 OutputFormat
首先,我们需要自定义一个 OutputFormat 类,用于将每个学生的成绩信息输出到单独的文件中。
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
public class StudentOutputFormat extends FileOutputFormat<Text, Text> {
@Override
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context) throws IOException {
Path outputDir = FileOutputFormat.getOutputPath(context);
String extension = '.txt';
// 创建一个FileOutputCommitter对象,用于处理输出目录
FileOutputCommitter committer = (FileOutputCommitter) getOutputCommitter(context);
// 获取当前任务的任务ID
int taskID = context.getTaskAttemptID().getTaskID().getId();
// 构造输出文件的名称,并拼接上输出目录的路径
String filename = 'part-' + taskID + extension;
Path outputFile = new Path(outputDir, filename);
// 创建文件系统对象,并打开输出流
FileSystem fs = outputFile.getFileSystem(context.getConfiguration());
FSDataOutputStream fileOut = fs.create(outputFile, false);
// 返回自定义的RecordWriter对象
return new StudentRecordWriter(fileOut);
}
private static class StudentRecordWriter extends RecordWriter<Text, Text> {
private FSDataOutputStream fileOut;
public StudentRecordWriter(FSDataOutputStream fileOut) {
this.fileOut = fileOut;
}
@Override
public void write(Text key, Text value) throws IOException {
fileOut.write(key.getBytes(), 0, key.getLength());
fileOut.write('\t');
fileOut.write(value.getBytes(), 0, value.getLength());
fileOut.write('\n');
}
@Override
public void close(TaskAttemptContext context) throws IOException {
fileOut.close();
}
}
}
- MapReduce 程序
接下来,我们可以编写一个 MapReduce 程序,使用自定义的 OutputFormat 将每个学生的成绩信息分别存储到单独的文件中,并使用 reduceJoin 将两个文件数据合并。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class StudentScoreAnalysis {
public static void main(String[] args) throws Exception {
// 创建一个配置对象
Configuration conf = new Configuration();
// 创建一个Job对象
Job job = Job.getInstance(conf, 'Student Score Analysis');
// 设置主类
job.setJarByClass(StudentScoreAnalysis.class);
// 设置Mapper和Reducer的类
job.setMapperClass(ScoreMapper.class);
job.setReducerClass(ScoreReducer.class);
// 设置Mapper的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 设置Reducer的输出类型
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
// 设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 设置输入格式和输出格式
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(StudentOutputFormat.class);
// 提交Job并等待完成
job.waitForCompletion(true);
}
}
- Mapper 类
下面是 ScoreMapper 类的一个示例实现,用于从 score.txt 文件中读取成绩信息,将学生姓名作为 key,将课程名和成绩作为 value 输出。
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class ScoreMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split('\t');
if (fields.length == 4) {
String studentID = fields[0];
String courseID = fields[1];
String courseName = fields[2];
String score = fields[3];
// 将学生姓名作为key,将课程名和成绩作为value输出
context.write(new Text(studentID), new Text(courseName + '\t' + score));
}
}
}
- Reducer 类
下面是 ScoreReducer 类的一个示例实现,用于将每个学生的成绩信息按照学生姓名分别存储到单独的文件中。
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class ScoreReducer extends Reducer<Text, Text, NullWritable, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 拼接学生的成绩信息
StringBuilder sb = new StringBuilder();
for (Text value : values) {
sb.append(key.toString()).append('\t').append(value.toString()).append('\n');
}
// 输出学生的成绩信息
context.write(NullWritable.get(), new Text(sb.toString()));
}
}
- 执行程序
将以上代码编译打包,并将 student.txt 和 score.txt 上传到 Hadoop 的输入路径中。然后执行以下命令运行程序:
hadoop jar student-score-analysis.jar student.txt output
程序将会将每个学生的成绩信息分别存储到单独的文件中,并将最终结果输出到 output 目录中的 part-r-00000 文件中。你可以根据自己的需求进行调整和优化。
原文地址: https://www.cveoy.top/t/topic/pgq9 著作权归作者所有。请勿转载和采集!