MapReduce 在传递key value对时默认根据 key 进行排序所 以在对表格数据按某一字段进行排序时可以设置排序的字段为 key 而有时候我们需要对两个字段分别进行排序基于这种需求进行的自 定义排序称为二次排序。例如有以下数据如果对上面的数据按第一个字段值进行升序排列那么有些数据 的顺序是相同的这时候我们就要按照第二个字段值再进行一次排序 例如先按照第一个值字母升序排列再按照第二个值数
由于无法获得数据文件 datas.xlsx,以下代码仅作为参考示例:
- 自定义二次排序 Key 类
public class NameScoreKey implements WritableComparable<NameScoreKey> {
private String name;
private int score;
public NameScoreKey() {}
public NameScoreKey(String name, int score) {
this.name = name;
this.score = score;
}
public String getName() {
return name;
}
public int getScore() {
return score;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(score);
}
public void readFields(DataInput in) throws IOException {
name = in.readUTF();
score = in.readInt();
}
public int compareTo(NameScoreKey o) {
int cmp = name.compareTo(o.name);
if (cmp != 0) {
return cmp; // 按 name 升序排列
}
// 按 score 降序排列
return o.score - score;
}
public int hashCode() {
return name.hashCode() + score;
}
public boolean equals(Object o) {
if (!(o instanceof NameScoreKey)) {
return false;
}
NameScoreKey other = (NameScoreKey) o;
return name.equals(other.name) && score == other.score;
}
public String toString() {
return name + " " + score;
}
}
- Map 阶段
public static class SortMapper extends Mapper<LongWritable, Text, Text, NameScoreKey> {
private static final Pattern PATTERN = Pattern.compile("^(\\w+),\\s*(\\w+),\\s*(\\d+)$");
private Text outputKey = new Text();
private NameScoreKey outputValue = new NameScoreKey();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
if (line.startsWith("first_name")) {
return; // 忽略表头
}
Matcher matcher = PATTERN.matcher(line);
if (matcher.find()) {
String firstName = matcher.group(1);
String lastName = matcher.group(2);
int score = Integer.parseInt(matcher.group(3));
outputKey.set(firstName.toLowerCase()); // 把首字母转为小写,便于分区
outputValue = new NameScoreKey(lastName, score);
context.write(outputKey, outputValue);
}
}
}
- Reduce 阶段
public static class SortReducer extends Reducer<Text, NameScoreKey, Text, Text> {
private Text outputKey = new Text();
private Text outputValue = new Text();
public void reduce(Text key, Iterable<NameScoreKey> values, Context context)
throws IOException, InterruptedException {
List<NameScoreKey> list = new ArrayList<>();
for (NameScoreKey value : values) {
list.add(new NameScoreKey(value.getName(), value.getScore())); // 需要复制一份,避免共享对象
}
Collections.sort(list); // 对 last_name 进行排序
for (NameScoreKey value : list) {
outputKey.set(key.toString().toUpperCase()); // 把首字母转为大写
outputValue.set(value.getName() + ";" + value.getScore());
context.write(outputKey, outputValue);
}
}
}
- Driver 阶段
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Secondary Sort");
job.setJarByClass(SecondarySort.class);
job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NameScoreKey.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setPartitionerClass(FirstLetterPartitioner.class); // 自定义分区器
job.setNumReduceTasks(26); // 分区数为 26,一个分区对应一个首字母
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
- 自定义分区器
public class FirstLetterPartitioner extends Partitioner<Text, NameScoreKey> {
public int getPartition(Text key, NameScoreKey value, int numPartitions) {
char firstLetter = key.toString().charAt(0);
if (firstLetter >= 'a' && firstLetter <= 'z') {
return firstLetter - 'a';
} else if (firstLetter >= 'A' && firstLetter <= 'Z') {
return firstLetter - 'A';
} else {
return 0; // 非字母字符归为第一个分区
}
}
}
``
原文地址: https://www.cveoy.top/t/topic/hf87 著作权归作者所有。请勿转载和采集!