Hadoop Secondary Sort 示例:姓名排序

本示例展示了如何在 Hadoop 中使用 Secondary Sort 对用户数据进行排序,先按 'First Name' 降序排列,再按 'Last Name' 降序排列。

示例数据:

假设用户数据存储在名为 users.csv 的文件中,格式如下:

ID,First Name,Last Name,Email
1,Alice,Smith,alice.smith@example.com
2,Bob,Jones,bob.jones@example.com
3,Charlie,Brown,charlie.brown@example.com
4,David,Lee,david.lee@example.com
5,Eve,Williams,eve.williams@example.com
6,Frank,Wilson,frank.wilson@example.com
7,George,Davis,george.davis@example.com
8,Helen,Miller,helen.miller@example.com

代码示例:

【分区类】Partitioner.java

public class Partitioner extends org.apache.hadoop.mapreduce.Partitioner<Text, Text> {
    @Override
    public int getPartition(Text key, Text value, int numReduceTasks) {
        char firstChar = key.toString().toLowerCase().charAt(0);
        if (firstChar < 'a' || firstChar > 'z') {
            return numReduceTasks - 1;
        } else {
            return firstChar - 'a';
        }    
    }
}

【排序类】Comparator.java

public class Comparator extends org.apache.hadoop.io.WritableComparator {
    public Comparator() {
        super(Text.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        Text aText = (Text) a;
        Text bText = (Text) b;
        String[] aFields = aText.toString().split(',');
        String[] bFields = bText.toString().split(',');
        String aFirstName = aFields[0];
        String bFirstName = bFields[0];
        String aLastName = aFields[1];
        String bLastName = bFields[1];

        int firstNameCompareResult = bFirstName.compareToIgnoreCase(aFirstName);
        if (firstNameCompareResult != 0) {
            return firstNameCompareResult;
        } else {
            return bLastName.compareToIgnoreCase(aLastName);
        }
    }
}

【Mapper类】Mapper.java

public class Mapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        if (key.get() > 0) { // 排除表头
            String[] fields = value.toString().split(',');
            String firstName = fields[1];
            String lastName = fields[2];
            String emailAddress = fields[3];
            Text outputKey = new Text(firstName + ',' + lastName);
            Text outputValue = new Text(fields[0] + ';' + firstName + ' ' + lastName + ';' + emailAddress);
            context.write(outputKey, outputValue);
        }
    }
}

【Reducer类】Reducer.java

public class Reducer extends org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for (Text value : values) {
            context.write(key, value);
        }
    }
}

【驱动类】Driver.java

public class Driver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Secondary Sort");
        job.setJarByClass(Driver.class);
        job.setMapperClass(Mapper.class);
        job.setPartitionerClass(Partitioner.class);
        job.setSortComparatorClass(Comparator.class);
        job.setReducerClass(Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileSystem fs = FileSystem.get(conf);
        Path outputPath = new Path(args[1]);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }
        FileOutputFormat.setOutputPath(job, outputPath);
        job.setNumReduceTasks(26); // 分区数为26
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

说明:

  • Partitioner 类: 按照 'First Name' 的首字母进行分区,每个字母对应一个 Reducer。
  • Comparator 类: 先按照 'First Name' 降序排列,如果 'First Name' 相同则按照 'Last Name' 降序排列。
  • Mapper 类: 将输入数据转换为键值对,其中键为 'First Name' 和 'Last Name',值为 ID、姓名和邮箱地址。
  • Reducer 类: 接收每个键对应的所有值,并直接输出。

运行结果:

运行该程序后,输出结果将按照 'First Name' 降序排列,如果 'First Name' 相同则按照 'Last Name' 降序排列。

Wilson,Frank;6;Frank Wilson;frank.wilson@example.com
Williams,Eve;5;Eve Williams;eve.williams@example.com
Smith,Alice;1;Alice Smith;alice.smith@example.com
Miller,Helen;8;Helen Miller;helen.miller@example.com
Lee,David;4;David Lee;david.lee@example.com
Jones,Bob;2;Bob Jones;bob.jones@example.com
Davis,George;7;George Davis;george.davis@example.com
Brown,Charlie;3;Charlie Brown;charlie.brown@example.com

总结:

本示例展示了如何使用 Hadoop Secondary Sort 对数据进行复杂排序,您可以根据实际需求修改代码,实现不同的排序逻辑。

注意:

  • 代码中使用的 compareToIgnoreCase() 方法是进行不区分大小写的比较。
  • setNumReduceTasks(26) 设置了 26 个 Reducer,您可以根据数据量和机器资源进行调整。
  • 代码中使用了 split(',') 方法来分割数据,请根据实际数据格式进行调整。
Hadoop Secondary Sort 示例:姓名排序

原文地址: https://www.cveoy.top/t/topic/oEvJ 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录