hadoop 多文件输出


这两天在网上看了个MapReduce的多文件输出的帖子: http://blog.csdn.net/inkfish。写的不错。

我试着完成了一下。也是分为三个文件:我这三个文件,跟原作者的稍有不同。其中有些类是我原来写的,我直接拷贝过来的,所以有点不同。

My_LineRead.java

[java] view plaincopyprint?
  1. publicclassMy_LineRead<K,V>extendsRecordWriter<K,V>{
  2. privatestaticfinalStringutf8="UTF-8";
  • privatestaticfinalStringcolon="----";//划分符号
  • privatestaticfinalbyte[]newline;
  • static{
  • try{
  • newline="/n".getBytes(utf8);
  • }catch(UnsupportedEncodingExceptionuee){
  • thrownewIllegalArgumentException("can'tfind"+utf8+"encoding");
  • }
  • }
  • protectedDataOutputStreamout;
  • privatefinalbyte[]keyValueSeparator;
  • publicMy_LineRead(DataOutputStreamout){
  • this(out,colon);//调用下面的构造函数
  • }
  • publicMy_LineRead(DataOutputStreamout,StringkeyValueSeparator){
  • //TODOAuto-generatedconstructorstub
  • this.out=out;
  • try{
  • this.keyValueSeparator=keyValueSeparator.getBytes(utf8);
  • }catch(UnsupportedEncodingExceptione){
  • //TODOAuto-generatedcatchblock
  • thrownewIllegalArgumentException("can'tfind"+utf8+"encoding");
  • }
  • }
  • @Override
  • publicvoidclose(TaskAttemptContextarg0)throwsIOException,
  • InterruptedException{
  • //TODOAuto-generatedmethodstub
  • out.close();
  • }
  • @Override
  • publicvoidwrite(Kkey,Vvalue)throwsIOException,
  • InterruptedException{
  • if(!(key==null&&keyinstanceofNullWritable)){
  • //如果key不为空者输出key
  • if((Object)keyinstanceofText){
  • Textto=(Text)key;
  • out.write(to.getBytes(),0,to.getLength());
  • }
  • else
  • {
  • out.write(key.toString().getBytes(utf8));
  • }
  • out.write(keyValueSeparator);
  • }
  • if(!(value==null&&valueinstanceofNullWritable)){
  • //如果value不为空则输出value
  • if((Object)valueinstanceofText){
  • Textto=(Text)value;
  • out.write(to.getBytes(),0,to.getLength());
  • }
  • else
  • {
  • out.write(value.toString().getBytes(utf8));
  • }
  • out.write(newline);
  • }
  • }
  • }

MyMultipleOutputFormat.java //这个类,我添加了些注释便于理解

[c-sharp] view plaincopyprint?
  1. publicabstractclassMyMultipleOutputFormat<KextendsWritableComparable<?>,VextendsWritable>
  2. extendsFileOutputFormat<K,V>{
  3. //接口类,需要在主程序中实现generateFileNameForKeyValue来获取文件名
  • privateMultiRecordWriterwriter=null;
  • @Override
  • publicRecordWriter<K,V>getRecordWriter(TaskAttemptContextjob)
  • throwsIOException,InterruptedException{
  • //TODOAuto-generatedmethodstub
  • //如果第一次调用那么writer=null
  • if(writer==null){
  • //getTaskOutputPath获取output路径
  • writer=newMultiRecordWriter(job,getTaskOutputPath(job));
  • }
  • returnwriter;
  • }
  • privatePathgetTaskOutputPath(TaskAttemptContextconf)throwsIOException{
  • PathworkPath=null;
  • OutputCommittercommitter=super.getOutputCommitter(conf);
  • if(committerinstanceofFileOutputCommitter){
  • workPath=((FileOutputCommitter)committer).getWorkPath();
  • }else{
  • PathoutputPath=super.getOutputPath(conf);
  • if(outputPath==null){
  • thrownewIOException("Undefinedjoboutput-path");
  • }
  • workPath=outputPath;
  • }
  • returnworkPath;
  • }
  • /**通过key,value,conf来确定输出文件名(含扩展名)*/
  • //返回值就是文件名。可以根据key,value来判断
  • protectedabstractStringgenerateFileNameForKeyValue(Kkey,Vvalue,Configurationconf);
  • //MultiRecordWriter类
  • publicclassMultiRecordWriterextendsRecordWriter<K,V>{
  • /**RecordWriter的缓存*/
  • privateHashMap<String,RecordWriter<K,V>>recordWriters=null;
  • privateTaskAttemptContextjob=null;
  • /**输出目录*/
  • privatePathworkPath=null;
  • //构造函数
  • publicMultiRecordWriter(TaskAttemptContextjob,PathworkPath){
  • super();
  • this.job=job;
  • this.workPath=workPath;
  • recordWriters=newHashMap<String,RecordWriter<K,V>>();
  • }
  • //关闭,应该可能是多个文件进行关闭,所有采用循环
  • //recordWriters.values()就是指的getBaseRecordWriter返回的值。
  • @Override
  • publicvoidclose(TaskAttemptContextcontext)throwsIOException,InterruptedException{
  • Iterator<RecordWriter<K,V>>values=this.recordWriters.values().iterator();
  • while(values.hasNext()){
  • values.next().close(context);
  • }
  • this.recordWriters.clear();
  • }
  • @Override
  • publicvoidwrite(Kkey,Vvalue)throwsIOException,InterruptedException{
  • //得到输出文件名
  • StringbaseName=generateFileNameForKeyValue(key,value,job.getConfiguration());
  • //如果recordWriters里没有文件名,那么就建立。否则就直接写值。
  • RecordWriter<K,V>rw=this.recordWriters.get(baseName);
  • if(rw==null){
  • rw=getBaseRecordWriter(job,baseName);
  • //放入HashMap
  • this.recordWriters.put(baseName,rw);
  • }
  • rw.write(key,value);
  • }
  • //${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension}
  • privateRecordWriter<K,V>getBaseRecordWriter(TaskAttemptContextjob,StringbaseName)
  • throwsIOException,InterruptedException{
  • //获取配置文件
  • Configurationconf=job.getConfiguration();
  • //查看是否使用解码器
  • booleanisCompressed=getCompressOutput(job);
  • StringkeyValueSeparator=",";
  • RecordWriter<K,V>recordWriter=null;
  • if(isCompressed){
  • Class<?extendsCompressionCodec>codecClass=getOutputCompressorClass(job,
  • GzipCodec.class);
  • CompressionCodeccodec=ReflectionUtils.newInstance(codecClass,conf);
  • Pathfile=newPath(workPath,baseName+codec.getDefaultExtension());
  • FSDataOutputStreamfileOut=file.getFileSystem(conf).create(file,false);
  • recordWriter=newMy_LineRead<K,V>(newDataOutputStream(codec
  • .createOutputStream(fileOut)),keyValueSeparator);
  • }
  • //如果不使用解码器
  • else{
  • Pathfile=newPath(workPath,baseName);
  • FSDataOutputStreamfileOut=file.getFileSystem(conf).create(file,false);
  • //recordWriter=newMy_LineRead<K,V>(fileOut,keyValueSeparator);
  • //这里我使用的我自己的OutputFormat
  • recordWriter=newMy_LineRead<K,V>(fileOut);
  • }
  • returnrecordWriter;
  • }
  • }
  • }

最后就是测试类,WordCount_MulFileOut.java

[java] view plaincopyprint?
  1. publicclassWordCount_MulFileOut{
  2. publicstaticclasswordcountMapperextends
  • Mapper<LongWritable,Text,Text,IntWritable>{
  • privatefinalstaticIntWritableone=newIntWritable(1);
  • privateTextword=newText();
  • publicvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{
  • Stringline=value.toString();
  • StringTokenizeritr=newStringTokenizer(line);
  • while(itr.hasMoreElements()){
  • word.set(itr.nextToken());
  • context.write(word,one);
  • }
  • }
  • }
  • publicstaticclasswordcountReduceextends
  • Reducer<Text,IntWritable,Text,IntWritable>{
  • publicvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedException{
  • intsum=0;
  • for(IntWritablestr:values){
  • sum+=str.get();
  • }
  • context.write(key,newIntWritable(sum));
  • }
  • }
  • publicstaticclassMyMultipleextendsMyMultipleOutputFormat{
  • @Override
  • protectedStringgenerateFileNameForKeyValue(WritableComparablekey,
  • Writablevalue,Configurationconf){
  • //TODOAuto-generatedmethodstub
  • return"other.txt";
  • }
  • }
  • publicstaticvoidmain(Stringargs[])throwsException{
  • Configurationconf=newConfiguration();
  • Jobjob=newJob(conf,"wordcount");
  • job.setJarByClass(WordCount_MulFileOut.class);
  • job.setInputFormatClass(TextInputFormat.class);
  • job.setOutputFormatClass(MyMultiple.class);
  • job.setOutputKeyClass(Text.class);
  • job.setOutputValueClass(IntWritable.class);
  • job.setMapperClass(wordcountMapper.class);
  • job.setReducerClass(wordcountReduce.class);
  • job.setCombinerClass(wordcountReduce.class);
  • FileInputFormat.setInputPaths(job,newPath(args[1]));
  • FileOutputFormat.setOutputPath(job,newPath(args[2]));
  • job.waitForCompletion(true);
  • }
  • }
优质内容筛选与推荐>>
1、Map生成器 map适配器如今能够使用各种不同的Generator,iterator和常量值的组合来填充Map初始化对象
2、Oracle 好书 01 ( Oracle 10g 数据库简介 )
3、HO引擎近况20120710
4、react(二):组件的通信
5、javascript---event.keyCode值(转)


长按二维码向我转账

受苹果公司新规定影响,微信 iOS 版的赞赏功能被关闭,可通过二维码转账支持公众号。

    阅读
    好看
    已推荐到看一看
    你的朋友可以在“发现”-“看一看”看到你认为好看的文章。
    已取消,“好看”想法已同步删除
    已推荐到看一看 和朋友分享想法
    最多200字,当前共 发送

    已发送

    朋友将在看一看看到

    确定
    分享你的想法...
    取消

    分享想法到看一看

    确定
    最多200字,当前共

    发送中

    网络异常,请稍后重试

    微信扫一扫
    关注该公众号





    联系我们

    欢迎来到TinyMind。

    关于TinyMind的内容或商务合作、网站建议,举报不良信息等均可联系我们。

    TinyMind客服邮箱:support@tinymind.net.cn