MapReduce 二次排序:根据多个字段对表格数据进行排序
MapReduce 在传递<key, value>对时默认根据 key 进行排序,所以当需要对表格数据按某一字段进行排序时,可以设置排序的字段为 key。然而,有时需要对两个字段分别进行排序,这种自定义排序被称为'二次排序'。
例如,有以下数据:
| first_name | last_name | score | |---|---|---| | A | Smith | 3 | | B | Jones | 5 | | C | Brown | 1 | | B | Davis | 6 | | A | Wilson | 4 | | C | Miller | 5 | | A | Taylor | 4 | | A | Garcia | 3 | | B | Rodriguez | 6 | | B | Jackson | 5 | | C | White | 5 | | C | Thompson | 1 |
如果对上面的数据按第一个字段值进行升序排列,那么有些数据的顺序是相同的,这时候我们就要按照第二个字段值再进行一次排序,例如:先按照第一个值(字母)升序排列,再按照第二个值(数字)进行降序排列得到:
A, 3 B, 5 C, 1 B, 6 A, 4 C, 5 A, 4 A, 3 B, 6 B, 5 C, 5 C, 1
本教程将编写一个 Java 程序,利用 MapReduce 技术对数据进行二次排序,并将其打包成 jar 文件运行得到结果。
步骤:
-
**读取数据:**从给定的表格文件 'datas.xlsx' 中读取数据(表格的第一行为标题,处理时需要排除)。
-
**分区:**根据 'first_name' 字段的首字母(不区分大小写)进行分区,首字母相同的行在同一分区(输出在同一文件,例如:A、a 开头的行在第 1 个分区 'part-r-00000',Z、z 开头的行在第 26 个分区 'part-r-00025')。
-
**二次排序:**在每个分区内,对数据进行二次排序,要求先对 'first_name' 字段降序排列,再对 'last_name' 字段也进行降序排列。
-
**输出结果:**MapReduce 输出的每一行结果中,'first_name' 和 'last_name' 之间用空格分隔,其他字段用英文分号分隔。
代码示例:
由于无法获得数据文件 'datas.xlsx',以下代码仅作为参考示例:
1. 自定义二次排序 Key 类
public class NameScoreKey implements WritableComparable<NameScoreKey> {
private String name;
private int score;
public NameScoreKey() {}
public NameScoreKey(String name, int score) {
this.name = name;
this.score = score;
}
public String getName() {
return name;
}
public int getScore() {
return score;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(score);
}
public void readFields(DataInput in) throws IOException {
name = in.readUTF();
score = in.readInt();
}
public int compareTo(NameScoreKey o) {
int cmp = name.compareTo(o.name);
if (cmp != 0) {
return cmp; // 按 name 升序排列
}
// 按 score 降序排列
return o.score - score;
}
public int hashCode() {
return name.hashCode() + score;
}
public boolean equals(Object o) {
if (!(o instanceof NameScoreKey)) {
return false;
}
NameScoreKey other = (NameScoreKey) o;
return name.equals(other.name) && score == other.score;
}
public String toString() {
return name + ' ' + score;
}
}
2. Map 阶段
public static class SortMapper extends Mapper<LongWritable, Text, Text, NameScoreKey> {
private static final Pattern PATTERN = Pattern.compile("^(\w+),\s*(\w+),\s*(\d+)$");
private Text outputKey = new Text();
private NameScoreKey outputValue = new NameScoreKey();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
if (line.startsWith("first_name")) {
return; // 忽略表头
}
Matcher matcher = PATTERN.matcher(line);
if (matcher.find()) {
String firstName = matcher.group(1);
String lastName = matcher.group(2);
int score = Integer.parseInt(matcher.group(3));
outputKey.set(firstName.toLowerCase()); // 把首字母转为小写,便于分区
outputValue = new NameScoreKey(lastName, score);
context.write(outputKey, outputValue);
}
}
}
3. Reduce 阶段
public static class SortReducer extends Reducer<Text, NameScoreKey, Text, Text> {
private Text outputKey = new Text();
private Text outputValue = new Text();
public void reduce(Text key, Iterable<NameScoreKey> values, Context context)
throws IOException, InterruptedException {
List<NameScoreKey> list = new ArrayList<>();
for (NameScoreKey value : values) {
list.add(new NameScoreKey(value.getName(), value.getScore())); // 需要复制一份,避免共享对象
}
Collections.sort(list); // 对 last_name 进行排序
for (NameScoreKey value : list) {
outputKey.set(key.toString().toUpperCase()); // 把首字母转为大写
outputValue.set(value.getName() + ';' + value.getScore());
context.write(outputKey, outputValue);
}
}
}
4. Driver 阶段
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Secondary Sort");
job.setJarByClass(SecondarySort.class);
job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NameScoreKey.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setPartitionerClass(FirstLetterPartitioner.class); // 自定义分区器
job.setNumReduceTasks(26); // 分区数为 26,一个分区对应一个首字母
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
5. 自定义分区器
public class FirstLetterPartitioner extends Partitioner<Text, NameScoreKey> {
public int getPartition(Text key, NameScoreKey value, int numPartitions) {
char firstLetter = key.toString().charAt(0);
if (firstLetter >= 'a' && firstLetter <= 'z') {
return firstLetter - 'a';
} else if (firstLetter >= 'A' && firstLetter <= 'Z') {
return firstLetter - 'A';
} else {
return 0; // 非字母字符归为第一个分区
}
}
}
运行程序:
- 将代码打包成 jar 文件。
- 运行 jar 文件,并指定输入和输出路径,例如:
hadoop jar secondary_sort.jar SecondarySort datas.xlsx output
注意:
- 本代码示例仅供参考,实际操作中可能需要根据具体情况进行调整。
- 为了方便理解,代码中省略了一些异常处理代码,实际使用时需要进行完善。
- 代码中使用了 Hadoop API,需要在 Hadoop 环境下运行。
- 本教程假设读者已经具备一定 MapReduce 和 Hadoop 的基础知识。
原文地址: https://www.cveoy.top/t/topic/oI3Y 著作权归作者所有。请勿转载和采集!