hadoop的MultipleOutputs多文件输出
public class Demo { public static class MultestMapper extends Mapper<Object, Text, Text, NullWritable> { private Text outkey = new Text(""); private MultipleOutputs<Text, NullWritable> mos; protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); if (line.contains("DOWNLOAD:")){ outkey.set(line.substring(line.indexOf("DOWNLOAD:")+9)); mos.write("download", outkey, NullWritable.get()); } else if (line.contains("LOGGING:")){ outkey.set(line.substring(line.indexOf("LOGGING:")+8)); mos.write("logging", outkey, NullWritable.get()); } else if (line.contains("MONITOR:")){ outkey.set(line.substring(line.indexOf("MONITOR:")+8)); mos.write("monitor", outkey, NullWritable.get()); } else if (line.contains("ACTIVITIES:")){ outkey.set(line.substring(line.indexOf("ACTIVITIES:")+11)); mos.write("activities", outkey, NullWritable.get()); } } @Override protected void setup(Context context) throws IOException, InterruptedException { mos = new MultipleOutputs<Text, NullWritable>(context); super.setup(context); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { mos.close(); super.cleanup(context); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: <in> <out>"); System.exit(2); } Job job = new Job(conf, "multest"); job.setJarByClass(Demo.class); job.setMapperClass(MultestMapper.class); job.setNumReduceTasks(0); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); MultipleOutputs.addNamedOutput(job, "download",TextOutputFormat.class, Text.class, NullWritable.class); MultipleOutputs.addNamedOutput(job, "logging",TextOutputFormat.class, Text.class, NullWritable.class); MultipleOutputs.addNamedOutput(job, "monitor",TextOutputFormat.class, Text.class, NullWritable.class); MultipleOutputs.addNamedOutput(job, "activities",TextOutputFormat.class, Text.class, NullWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }优质内容筛选与推荐>>