编写MapReduce程序对手机流量数据进行分析得到总的上传流量、总的下载流量、总流量之和
假设手机流量数据的格式为:每行记录包含手机号、上传流量、下载流量,以逗号分隔。
Map阶段:
- 将每行记录按照逗号分隔成三个字段,分别表示手机号、上传流量、下载流量。
- 将手机号作为键,上传流量和下载流量作为值,发射到Reducer。
Reduce阶段:
- 对于每个手机号,累加其上传流量和下载流量,得到总的上传流量和总的下载流量。
- 将总的上传流量和总的下载流量相加,得到总流量之和。
示例代码:
Map阶段:
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text phone = new Text();
private IntWritable upload = new IntWritable();
private IntWritable download = new IntWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(",");
if (fields.length == 3) {
phone.set(fields[0]);
upload.set(Integer.parseInt(fields[1]));
download.set(Integer.parseInt(fields[2]));
context.write(phone, upload);
context.write(phone, download);
}
}
}
Reduce阶段:
public class FlowCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable totalUpload = new IntWritable();
private IntWritable totalDownload = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int uploadSum = 0;
int downloadSum = 0;
for (IntWritable value : values) {
if (value.get() > 0) {
uploadSum += value.get();
} else {
downloadSum += value.get();
}
}
totalUpload.set(uploadSum);
totalDownload.set(downloadSum);
context.write(key, totalUpload);
context.write(key, totalDownload);
}
}
Driver代码:
public class FlowCountDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowCountDriver.class);
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
``
原文地址: https://www.cveoy.top/t/topic/fTxC 著作权归作者所有。请勿转载和采集!