hadoop的MultipleOutputs多文件输出


public class Demo {
    public static class MultestMapper extends
            Mapper<Object, Text, Text, NullWritable> {
        private Text outkey = new Text("");
        private MultipleOutputs<Text, NullWritable> mos;
        protected void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            if (line.contains("DOWNLOAD:")){
                outkey.set(line.substring(line.indexOf("DOWNLOAD:")+9));
                mos.write("download", outkey, NullWritable.get());
            } else if (line.contains("LOGGING:")){
                outkey.set(line.substring(line.indexOf("LOGGING:")+8));
                mos.write("logging", outkey, NullWritable.get());
            }
            else if (line.contains("MONITOR:")){
                outkey.set(line.substring(line.indexOf("MONITOR:")+8));
                mos.write("monitor", outkey, NullWritable.get());
            }
            else if (line.contains("ACTIVITIES:")){
                outkey.set(line.substring(line.indexOf("ACTIVITIES:")+11));
                mos.write("activities", outkey, NullWritable.get());
            }
        }

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            mos = new MultipleOutputs<Text, NullWritable>(context);
            super.setup(context);
        }

        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            mos.close();
            super.cleanup(context);
        }
    }
    
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: <in> <out>");
            System.exit(2);
        }
        Job job = new Job(conf, "multest");
        job.setJarByClass(Demo.class);
        job.setMapperClass(MultestMapper.class);
        job.setNumReduceTasks(0);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        MultipleOutputs.addNamedOutput(job, "download",TextOutputFormat.class, Text.class, NullWritable.class);
        MultipleOutputs.addNamedOutput(job, "logging",TextOutputFormat.class, Text.class, NullWritable.class);
        MultipleOutputs.addNamedOutput(job, "monitor",TextOutputFormat.class, Text.class, NullWritable.class);
        MultipleOutputs.addNamedOutput(job, "activities",TextOutputFormat.class, Text.class, NullWritable.class);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

优质内容筛选与推荐>>
1、SSIS变量类型映射
2、Python *args 和 **kwargs用法
3、在UITableViewController里面实现UITextField与键盘的自适应
4、L1-027.出租
5、推导式集解


长按二维码向我转账

受苹果公司新规定影响,微信 iOS 版的赞赏功能被关闭,可通过二维码转账支持公众号。

    阅读
    好看
    已推荐到看一看
    你的朋友可以在“发现”-“看一看”看到你认为好看的文章。
    已取消,“好看”想法已同步删除
    已推荐到看一看 和朋友分享想法
    最多200字,当前共 发送

    已发送

    朋友将在看一看看到

    确定
    分享你的想法...
    取消

    分享想法到看一看

    确定
    最多200字,当前共

    发送中

    网络异常,请稍后重试

    微信扫一扫
    关注该公众号