[c-sharp] view plaincopyprint?
- publicabstractclassMyMultipleOutputFormat<KextendsWritableComparable<?>,VextendsWritable>
- extendsFileOutputFormat<K,V>{
- privateMultiRecordWriterwriter=null;
- @Override
- publicRecordWriter<K,V>getRecordWriter(TaskAttemptContextjob)
- throwsIOException,InterruptedException{
- if(writer==null){
- writer=newMultiRecordWriter(job,getTaskOutputPath(job));
- }
- returnwriter;
- }
- privatePathgetTaskOutputPath(TaskAttemptContextconf)throwsIOException{
- PathworkPath=null;
- OutputCommittercommitter=super.getOutputCommitter(conf);
- if(committerinstanceofFileOutputCommitter){
- workPath=((FileOutputCommitter)committer).getWorkPath();
- }else{
- PathoutputPath=super.getOutputPath(conf);
- if(outputPath==null){
- thrownewIOException("Undefinedjoboutput-path");
- }
- workPath=outputPath;
- }
- returnworkPath;
- }
- protectedabstractStringgenerateFileNameForKeyValue(Kkey,Vvalue,Configurationconf);
- publicclassMultiRecordWriterextendsRecordWriter<K,V>{
- privateHashMap<String,RecordWriter<K,V>>recordWriters=null;
- privateTaskAttemptContextjob=null;
- privatePathworkPath=null;
- publicMultiRecordWriter(TaskAttemptContextjob,PathworkPath){
- super();
- this.job=job;
- this.workPath=workPath;
- recordWriters=newHashMap<String,RecordWriter<K,V>>();
- }
- @Override
- publicvoidclose(TaskAttemptContextcontext)throwsIOException,InterruptedException{
- Iterator<RecordWriter<K,V>>values=this.recordWriters.values().iterator();
- while(values.hasNext()){
- values.next().close(context);
- }
- this.recordWriters.clear();
- }
- @Override
- publicvoidwrite(Kkey,Vvalue)throwsIOException,InterruptedException{
- StringbaseName=generateFileNameForKeyValue(key,value,job.getConfiguration());
- RecordWriter<K,V>rw=this.recordWriters.get(baseName);
- if(rw==null){
- rw=getBaseRecordWriter(job,baseName);
- this.recordWriters.put(baseName,rw);
- }
- rw.write(key,value);
- }
- privateRecordWriter<K,V>getBaseRecordWriter(TaskAttemptContextjob,StringbaseName)
- throwsIOException,InterruptedException{
- Configurationconf=job.getConfiguration();
- booleanisCompressed=getCompressOutput(job);
- StringkeyValueSeparator=",";
- RecordWriter<K,V>recordWriter=null;
- if(isCompressed){
- Class<?extendsCompressionCodec>codecClass=getOutputCompressorClass(job,
- GzipCodec.class);
- CompressionCodeccodec=ReflectionUtils.newInstance(codecClass,conf);
- Pathfile=newPath(workPath,baseName+codec.getDefaultExtension());
- FSDataOutputStreamfileOut=file.getFileSystem(conf).create(file,false);
- recordWriter=newMy_LineRead<K,V>(newDataOutputStream(codec
- .createOutputStream(fileOut)),keyValueSeparator);
- }
- else{
- Pathfile=newPath(workPath,baseName);
- FSDataOutputStreamfileOut=file.getFileSystem(conf).create(file,false);
- recordWriter=newMy_LineRead<K,V>(fileOut);
- }
- returnrecordWriter;
- }
- }
- }
最后就是测试类,WordCount_MulFileOut.java
长按二维码向我转账
受苹果公司新规定影响,微信 iOS 版的赞赏功能被关闭,可通过二维码转账支持公众号。
阅读
好看
已推荐到看一看
你的朋友可以在“发现”-“看一看”看到你认为好看的文章。
取消
分享想法到看一看
确定
最多200字,当前共字
微信扫一扫
关注该公众号