MapReduce 在传递<key, value>对时默认根据 key 进行排序,所以当需要对表格数据按某一字段进行排序时,可以设置排序的字段为 key。然而,有时需要对两个字段分别进行排序,这种自定义排序被称为'二次排序'。

例如,有以下数据:

| first_name | last_name | score | |---|---|---| | A | Smith | 3 | | B | Jones | 5 | | C | Brown | 1 | | B | Davis | 6 | | A | Wilson | 4 | | C | Miller | 5 | | A | Taylor | 4 | | A | Garcia | 3 | | B | Rodriguez | 6 | | B | Jackson | 5 | | C | White | 5 | | C | Thompson | 1 |

如果对上面的数据按第一个字段值进行升序排列,那么有些数据的顺序是相同的,这时候我们就要按照第二个字段值再进行一次排序,例如:先按照第一个值(字母)升序排列,再按照第二个值(数字)进行降序排列得到:

A, 3 B, 5 C, 1 B, 6 A, 4 C, 5 A, 4 A, 3 B, 6 B, 5 C, 5 C, 1

本教程将编写一个 Java 程序,利用 MapReduce 技术对数据进行二次排序,并将其打包成 jar 文件运行得到结果。

步骤:

  1. **读取数据:**从给定的表格文件 'datas.xlsx' 中读取数据(表格的第一行为标题,处理时需要排除)。

  2. **分区:**根据 'first_name' 字段的首字母(不区分大小写)进行分区,首字母相同的行在同一分区(输出在同一文件,例如:A、a 开头的行在第 1 个分区 'part-r-00000',Z、z 开头的行在第 26 个分区 'part-r-00025')。

  3. **二次排序:**在每个分区内,对数据进行二次排序,要求先对 'first_name' 字段降序排列,再对 'last_name' 字段也进行降序排列。

  4. **输出结果:**MapReduce 输出的每一行结果中,'first_name' 和 'last_name' 之间用空格分隔,其他字段用英文分号分隔。

代码示例:

由于无法获得数据文件 'datas.xlsx',以下代码仅作为参考示例:

1. 自定义二次排序 Key 类

public class NameScoreKey implements WritableComparable<NameScoreKey> {
    private String name;
    private int score;

    public NameScoreKey() {}

    public NameScoreKey(String name, int score) {
        this.name = name;
        this.score = score;
    }

    public String getName() {
        return name;
    }

    public int getScore() {
        return score;
    }

    public void write(DataOutput out) throws IOException {
        out.writeUTF(name);
        out.writeInt(score);
    }

    public void readFields(DataInput in) throws IOException {
        name = in.readUTF();
        score = in.readInt();
    }

    public int compareTo(NameScoreKey o) {
        int cmp = name.compareTo(o.name);
        if (cmp != 0) {
            return cmp; // 按 name 升序排列
        }
        // 按 score 降序排列
        return o.score - score;
    }

    public int hashCode() {
        return name.hashCode() + score;
    }

    public boolean equals(Object o) {
        if (!(o instanceof NameScoreKey)) {
            return false;
        }
        NameScoreKey other = (NameScoreKey) o;
        return name.equals(other.name) && score == other.score;
    }

    public String toString() {
        return name + ' ' + score;
    }
}

2. Map 阶段

public static class SortMapper extends Mapper<LongWritable, Text, Text, NameScoreKey> {
    private static final Pattern PATTERN = Pattern.compile("^(\w+),\s*(\w+),\s*(\d+)$");

    private Text outputKey = new Text();
    private NameScoreKey outputValue = new NameScoreKey();

    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        if (line.startsWith("first_name")) {
            return; // 忽略表头
        }
        Matcher matcher = PATTERN.matcher(line);
        if (matcher.find()) {
            String firstName = matcher.group(1);
            String lastName = matcher.group(2);
            int score = Integer.parseInt(matcher.group(3));
            outputKey.set(firstName.toLowerCase()); // 把首字母转为小写,便于分区
            outputValue = new NameScoreKey(lastName, score);
            context.write(outputKey, outputValue);
        }
    }
}

3. Reduce 阶段

public static class SortReducer extends Reducer<Text, NameScoreKey, Text, Text> {
    private Text outputKey = new Text();
    private Text outputValue = new Text();

    public void reduce(Text key, Iterable<NameScoreKey> values, Context context)
            throws IOException, InterruptedException {
        List<NameScoreKey> list = new ArrayList<>();
        for (NameScoreKey value : values) {
            list.add(new NameScoreKey(value.getName(), value.getScore())); // 需要复制一份,避免共享对象
        }
        Collections.sort(list); // 对 last_name 进行排序
        for (NameScoreKey value : list) {
            outputKey.set(key.toString().toUpperCase()); // 把首字母转为大写
            outputValue.set(value.getName() + ';' + value.getScore());
            context.write(outputKey, outputValue);
        }
    }
}

4. Driver 阶段

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Secondary Sort");
    job.setJarByClass(SecondarySort.class);
    job.setMapperClass(SortMapper.class);
    job.setReducerClass(SortReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(NameScoreKey.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    job.setPartitionerClass(FirstLetterPartitioner.class); // 自定义分区器
    job.setNumReduceTasks(26); // 分区数为 26,一个分区对应一个首字母
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

5. 自定义分区器

public class FirstLetterPartitioner extends Partitioner<Text, NameScoreKey> {
    public int getPartition(Text key, NameScoreKey value, int numPartitions) {
        char firstLetter = key.toString().charAt(0);
        if (firstLetter >= 'a' && firstLetter <= 'z') {
            return firstLetter - 'a';
        } else if (firstLetter >= 'A' && firstLetter <= 'Z') {
            return firstLetter - 'A';
        } else {
            return 0; // 非字母字符归为第一个分区
        }
    }
}

运行程序:

  1. 将代码打包成 jar 文件。
  2. 运行 jar 文件,并指定输入和输出路径,例如:
hadoop jar secondary_sort.jar SecondarySort datas.xlsx output

注意:

  • 本代码示例仅供参考,实际操作中可能需要根据具体情况进行调整。
  • 为了方便理解,代码中省略了一些异常处理代码,实际使用时需要进行完善。
  • 代码中使用了 Hadoop API,需要在 Hadoop 环境下运行。
  • 本教程假设读者已经具备一定 MapReduce 和 Hadoop 的基础知识。
MapReduce 二次排序:根据多个字段对表格数据进行排序

原文地址: https://www.cveoy.top/t/topic/oI3Y 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录