Hadoop Secondary Sort: 先对 first_name 字段降序排列,再对 last_name 字段也进行降序排列
Hadoop Secondary Sort: 先对 first_name 字段降序排列,再对 last_name 字段也进行降序排列
本文将介绍如何使用 Hadoop 进行二次排序,以实现先对 'first_name' 字段降序排列,再对 'last_name' 字段也进行降序排列。
代码示例
【分区类】Partitioner.java
依赖库:org.apache.hadoop.mapreduce.Partitioner
public class Partitioner extends org.apache.hadoop.mapreduce.Partitioner<Text, Text> {
@Override
public int getPartition(Text key, Text value, int numReduceTasks) {
char firstChar = key.toString().toLowerCase().charAt(0);
if (firstChar < 'a' || firstChar > 'z') {
return numReduceTasks - 1;
} else {
return firstChar - 'a';
}
}
}
【排序类】Comparator.java
依赖库:org.apache.hadoop.io.WritableComparator
public class Comparator extends org.apache.hadoop.io.WritableComparator {
public Comparator() {
super(Text.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
Text aText = (Text) a;
Text bText = (Text) b;
String[] aFields = aText.toString().split(',');
String[] bFields = bText.toString().split(',');
String aFirstName = aFields[0];
String bFirstName = bFields[0];
String aLastName = aFields[1];
String bLastName = bFields[1];
int firstNameCompareResult = bFirstName.compareToIgnoreCase(aFirstName);
if (firstNameCompareResult != 0) {
return firstNameCompareResult;
} else {
return bLastName.compareToIgnoreCase(aLastName);
}
}
}
【Mapper类】Mapper.java
依赖库:org.apache.hadoop.io.LongWritable、org.apache.hadoop.io.Text、org.apache.hadoop.mapreduce.Mapper
public class Mapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
if (key.get() > 0) { // 排除表头
String[] fields = value.toString().split(',');
String firstName = fields[1];
String lastName = fields[2];
String emailAddress = fields[3];
Text outputKey = new Text(firstName + ',' + lastName);
Text outputValue = new Text(fields[0] + ';' + firstName + ' ' + lastName + ';' + emailAddress);
context.write(outputKey, outputValue);
}
}
}
【Reducer类】Reducer.java
依赖库:org.apache.hadoop.io.Text、org.apache.hadoop.mapreduce.Reducer
public class Reducer extends org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(key, value);
}
}
}
【驱动类】Driver.java
依赖库:org.apache.hadoop.conf.Configuration、org.apache.hadoop.fs.FileSystem、org.apache.hadoop.fs.Path、org.apache.hadoop.io.Text、org.apache.hadoop.mapreduce.Job、org.apache.hadoop.mapreduce.lib.input.FileInputFormat、org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
public class Driver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Secondary Sort");
job.setJarByClass(Driver.class);
job.setMapperClass(Mapper.class);
job.setPartitionerClass(Partitioner.class);
job.setSortComparatorClass(Comparator.class);
job.setReducerClass(Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileSystem fs = FileSystem.get(conf);
Path outputPath = new Path(args[1]);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
FileOutputFormat.setOutputPath(job, outputPath);
job.setNumReduceTasks(26); // 分区数为26
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
代码说明
- 分区类:根据 'first_name' 字段的第一个字母进行分区,以便将相同第一个字母的记录分配到同一个 reducer 上进行处理。
- 排序类:首先根据 'first_name' 字段进行降序排列,如果 'first_name' 相同,则根据 'last_name' 字段进行降序排列。
- Mapper 类:将输入数据解析成 key-value 对,其中 key 为 'first_name,last_name',value 为其他信息。
- Reducer 类:将相同 key 的 value 集合进行处理,并将最终结果输出到文件中。
- 驱动类:设置 job 的配置信息,并启动 job 执行。
运行结果
经过二次排序后,输出文件中的数据将按照 'first_name' 字段降序排列,如果 'first_name' 相同,则按照 'last_name' 字段降序排列。
总结
本文介绍了如何使用 Hadoop 进行二次排序,以实现先对 'first_name' 字段降序排列,再对 'last_name' 字段也进行降序排列。该方法可以应用于多种场景,例如对用户数据进行排序、对商品数据进行排序等。
原文地址: https://www.cveoy.top/t/topic/oEv1 著作权归作者所有。请勿转载和采集!