Hadoop日记Day18---MapReduce排序分组


本节所用到的数据下载地址为:http://pan.baidu.com/s/1bnfELmZ

MapReduce的排序分组任务与要求

  我们知道排序分组是MapReduce中Mapper端的第四步,其中分组排序都是基于Key的,我们可以通过下面这几个例子来体现出来。其中的数据和任务如下图1.1,1.2所示。

#首先按照第一列升序排列,当第一列相同时,第二列升序排列
3    3
3    2
3    1
2    2
2    1
1    1
-------------------
#结果
1    1
2    1
2    2
3    1
3    2
3    3

图 1.1 排序

#当第一列相同时,求出第二列的最小值
3    3
3    2
3    1
2    2
2    1
1    1
-------------------
#结果
3    1
2    1
1    1

图 1.2 分组

一、 排序算法

1.1 MapReduce默认排序算法

  使用MapReduce默认排序算法代码如下1.1所示,在代码中我将第一列作为键,第二列作为值。

 1 package sort;
 2 
 3 import java.io.IOException;
 4 import java.net.URI;
 5 
 6 import org.apache.hadoop.conf.Configuration;
 7 import org.apache.hadoop.fs.FileStatus;
 8 import org.apache.hadoop.fs.FileSystem;
 9 import org.apache.hadoop.fs.Path;
10 import org.apache.hadoop.io.LongWritable;
11 import org.apache.hadoop.io.Text;
12 import org.apache.hadoop.mapreduce.Job;
13 import org.apache.hadoop.mapreduce.Mapper;
14 import org.apache.hadoop.mapreduce.Reducer;
15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
19 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
20 
21 public class SortApp {
22     private static final String INPUT_PATH = "hdfs://hadoop:9000/newinput";
23     private static final String OUT_PATH = "hdfs://hadoop:9000/newoutput";
24     public static void main(String[] args) throws Exception {
25         Configuration conf=new Configuration();
26         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
27         final Path outpath = new Path(OUT_PATH);
28         if(fileSystem.exists(outpath)){
29             fileSystem.delete(outpath,true);
30         }
31         
32         final Job job = new Job(conf,SortApp.class.getSimpleName());
33         
34         //1.1 指定输入文件路径
35         FileInputFormat.setInputPaths(job, INPUT_PATH);        
36         job.setInputFormatClass(TextInputFormat.class);//指定哪个类用来格式化输入文件
37                 
38         //1.2指定自定义的Mapper类
39         job.setMapperClass(MyMapper.class);        
40         job.setMapOutputKeyClass(LongWritable.class);//指定输出<k2,v2>的类型
41         job.setMapOutputValueClass(LongWritable.class);
42                 
43         //1.3 指定分区类
44         job.setPartitionerClass(HashPartitioner.class);
45         job.setNumReduceTasks(1);
46                 
47         //1.4 TODO 排序、分区
48                 
49         //1.5  TODO (可选)合并
50                 
51         //2.2 指定自定义的reduce类
52         job.setReducerClass(MyReducer.class);        
53         job.setOutputKeyClass(LongWritable.class);//指定输出<k3,v3>的类型
54         job.setOutputValueClass(LongWritable.class);
55                 
56         //2.3 指定输出到哪里
57         FileOutputFormat.setOutputPath(job, outpath);        
58         job.setOutputFormatClass(TextOutputFormat.class);//设定输出文件的格式化类                        
59         job.waitForCompletion(true);//把代码提交给JobTracker执行        
60     }
61     static class MyMapper extends Mapper<LongWritable, Text,LongWritable,LongWritable>{
62 
63         @Override
64         protected void map(
65                 LongWritable key,
66                 Text value,
67                 Mapper<LongWritable, Text, LongWritable, LongWritable>.Context context)
68                 throws IOException, InterruptedException {
69             final String[] splited = value.toString().split("\t");
70             final long k2 = Long.parseLong(splited[0]);
71             final long v2 = Long.parseLong(splited[1]);
72             context.write(new LongWritable(k2),new LongWritable(v2));
73         }    
74     }
75     static class MyReducer extends Reducer<LongWritable,LongWritable,LongWritable,LongWritable>{
76 
77         @Override
78         protected void reduce(
79                 LongWritable k2,
80                 Iterable<LongWritable> v2s,
81                 Reducer<LongWritable, LongWritable, LongWritable, LongWritable>.Context context)
82                 throws IOException, InterruptedException {
83             for(LongWritable v2:v2s){
84                 context.write(k2, v2);
85             }            
86         }    
87     }
88 }
View Code

代码 1.1

  运行结果如下图1.3所示

1    1
2    2
2    1
3    3
3    2
3    1

图 1.3

  从上面图中运行结果可以看出,MapReduce默认排序算法只对Key进行了排序,并没有对value进行排序,没有达到我们的要求,所以要实现我们的要求,还要我们自定义一个排序算法

1.2 自定义排序算法

  从上面图中运行结果可以知道,MapReduce默认排序算法只对Key进行了排序,并没有对value进行排序,没有达到我们的要求,所以要实现我们的要求,还要我们自定义一个排序算法。在map和reduce阶段进行排序时,比较的是k2。v2是不参与排序比较的。如果要想让v2也进行排序,需要把k2和v2组装成新的类作为k 2 ,才能参与比较。所以在这里我们新建一个新的类型NewK2类型来封装原来的k2和v2。代码如1.2所示。

  1 package sort;
  2 
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6 import java.net.URI;
  7 
  8 import org.apache.hadoop.conf.Configuration;
  9 import org.apache.hadoop.fs.FileSystem;
 10 import org.apache.hadoop.fs.Path;
 11 import org.apache.hadoop.io.LongWritable;
 12 import org.apache.hadoop.io.Text;
 13 import org.apache.hadoop.io.WritableComparable;
 14 import org.apache.hadoop.mapreduce.Job;
 15 import org.apache.hadoop.mapreduce.Mapper;
 16 import org.apache.hadoop.mapreduce.Reducer;
 17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 18 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 19 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 20 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 21 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 22 
 23 public class SortApp {
 24     static final String INPUT_PATH = "hdfs://hadoop:9000/newinput";
 25     static final String OUT_PATH = "hdfs://hadoop:9000/newoutput";
 26     public static void main(String[] args) throws Exception{
 27         final Configuration configuration = new Configuration();
 28         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
 29         if(fileSystem.exists(new Path(OUT_PATH))){
 30             fileSystem.delete(new Path(OUT_PATH), true);
 31         }
 32         final Job job = new Job(configuration, SortApp.class.getSimpleName());
 33         //1.1 指定输入文件路径
 34         FileInputFormat.setInputPaths(job, INPUT_PATH);        
 35         job.setInputFormatClass(TextInputFormat.class);//指定哪个类用来格式化输入文件
 36         
 37         //1.2指定自定义的Mapper类
 38         job.setMapperClass(MyMapper.class);        
 39         job.setMapOutputKeyClass(NewK2.class);//指定输出<k2,v2>的类型
 40         job.setMapOutputValueClass(LongWritable.class);
 41         
 42         //1.3 指定分区类
 43         job.setPartitionerClass(HashPartitioner.class);
 44         job.setNumReduceTasks(1);
 45         
 46         //2.2 指定自定义的reduce类
 47         job.setReducerClass(MyReducer.class);        
 48         job.setOutputKeyClass(LongWritable.class);//指定输出<k3,v3>的类型
 49         job.setOutputValueClass(LongWritable.class);
 50         
 51         //2.3 指定输出到哪里
 52         FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));        
 53         job.setOutputFormatClass(TextOutputFormat.class);//设定输出文件的格式化类
 54         job.waitForCompletion(true);//把代码提交给JobTracker执行
 55     }
 56 
 57     
 58     static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{
 59         protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,NewK2,LongWritable>.Context context) throws java.io.IOException ,InterruptedException {
 60             final String[] splited = value.toString().split("\t");
 61             final NewK2 k2 = new NewK2(Long.parseLong(splited[0]), Long.parseLong(splited[1]));
 62             final LongWritable v2 = new LongWritable(Long.parseLong(splited[1]));
 63             context.write(k2, v2);
 64         };
 65     }
 66     
 67     static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{
 68         protected void reduce(NewK2 k2, java.lang.Iterable<LongWritable> v2s, org.apache.hadoop.mapreduce.Reducer<NewK2,LongWritable,LongWritable,LongWritable>.Context context) throws java.io.IOException ,InterruptedException {
 69             context.write(new LongWritable(k2.first), new LongWritable(k2.second));
 70         };
 71     }
 72     
 73     /**
 74      * 问:为什么实现该类?
 75      * 答:因为原来的v2不能参与排序,把原来的k2和v2封装到一个类中,作为新的k2
 76      *
 77      */
 78     static class  NewK2 implements WritableComparable<NewK2>{
 79         Long first;
 80         Long second;
 81         
 82         public NewK2(){}
 83         
 84         public NewK2(long first, long second){
 85             this.first = first;
 86             this.second = second;
 87         }
 88         
 89         
 90         @Override
 91         public void readFields(DataInput in) throws IOException {
 92             this.first = in.readLong();
 93             this.second = in.readLong();
 94         }
 95 
 96         @Override
 97         public void write(DataOutput out) throws IOException {
 98             out.writeLong(first);
 99             out.writeLong(second);
100         }
101 
102         /**
103          * 当k2进行排序时,会调用该方法.
104          * 当第一列不同时,升序;当第一列相同时,第二列升序
105          */
106         @Override
107         public int compareTo(NewK2 o) {
108             final long minus = this.first - o.first;
109             if(minus !=0){
110                 return (int)minus;
111             }
112             return (int)(this.second - o.second);
113         }
114         
115         @Override
116         public int hashCode() {
117             return this.first.hashCode()+this.second.hashCode();
118         }
119         
120         @Override
121         public boolean equals(Object obj) {
122             if(!(obj instanceof NewK2)){
123                 return false;
124             }
125             NewK2 oK2 = (NewK2)obj;
126             return (this.first==oK2.first)&&(this.second==oK2.second);
127         }
128     }
129     
130 }
View Code

代码 1.2

  从上面的代码中我们可以发现,我们的新类型NewK2实现了WritableComparable接口,其中该接口中有一个compareTo()方法,当对关键字进行比较会调用该方法,而我们就在该方法中实现了我们想要做的事。

  运行结果如下图1.4所示。

1    1
2    1
2    2
3    1
3    2
3    3

图 1.4

二、分组算法

2.1 MapReduce默认分组

  分组是在MapReduce中Mapper端的第四步,分组也是基于Key进行的,将相同key的value放到一个集合中去。还以上面排序代码为例,业务逻辑如下图2.1所示。在代码中以NewK2为关键字,每个键都不相同,所以会将数据分为六组,这样就不能实现我们的业务要求,但利用自定义类型NewK2,可以自定义排序算法的同时我们也可以自定义分组算法。

#当第一列相同时,求出第二列的最小值
3    3
3    2
3    1
2    2
2    1
1    1
-------------------
#结果
3    1
2    1
1    1

图 2.1

2.2 自定义分组比较器

  由于业务要求分组是按照第一列分组,但是NewK2的比较规则决定了不能按照第一列分,只能自定义分组比较器,代码如下2.1所示。

  1 package group;
  2 
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6 import java.net.URI;
  7 
  8 import org.apache.hadoop.conf.Configuration;
  9 import org.apache.hadoop.fs.FileSystem;
 10 import org.apache.hadoop.fs.Path;
 11 import org.apache.hadoop.io.LongWritable;
 12 import org.apache.hadoop.io.RawComparator;
 13 import org.apache.hadoop.io.Text;
 14 import org.apache.hadoop.io.WritableComparable;
 15 import org.apache.hadoop.io.WritableComparator;
 16 import org.apache.hadoop.mapreduce.Job;
 17 import org.apache.hadoop.mapreduce.Mapper;
 18 import org.apache.hadoop.mapreduce.Reducer;
 19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 20 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 22 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 23 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 24 
 25 public class GroupApp {
 26     static final String INPUT_PATH = "hdfs://hadoop:9000/newinput";
 27     static final String OUT_PATH = "hdfs://hadoop:9000/newoutput";
 28     public static void main(String[] args) throws Exception{
 29         final Configuration configuration = new Configuration();
 30         
 31         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
 32         if(fileSystem.exists(new Path(OUT_PATH))){
 33             fileSystem.delete(new Path(OUT_PATH), true);
 34         }        
 35         final Job job = new Job(configuration, GroupApp.class.getSimpleName());
 36         
 37         //1.1 指定输入文件路径
 38         FileInputFormat.setInputPaths(job, INPUT_PATH);        
 39         job.setInputFormatClass(TextInputFormat.class);//指定哪个类用来格式化输入文件
 40         
 41         //1.2指定自定义的Mapper类
 42         job.setMapperClass(MyMapper.class);        
 43         job.setMapOutputKeyClass(NewK2.class);//指定输出<k2,v2>的类型
 44         job.setMapOutputValueClass(LongWritable.class);
 45         
 46         //1.3 指定分区类
 47         job.setPartitionerClass(HashPartitioner.class);
 48         job.setNumReduceTasks(1);
 49         
 50         //1.4 TODO 排序、分区
 51         job.setGroupingComparatorClass(MyGroupingComparator.class);
 52         //1.5  TODO (可选)合并
 53         
 54         //2.2 指定自定义的reduce类
 55         job.setReducerClass(MyReducer.class);        
 56         job.setOutputKeyClass(LongWritable.class);//指定输出<k3,v3>的类型
 57         job.setOutputValueClass(LongWritable.class);
 58         
 59         //2.3 指定输出到哪里
 60         FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));        
 61         job.setOutputFormatClass(TextOutputFormat.class);//设定输出文件的格式化类        
 62         job.waitForCompletion(true);//把代码提交给JobTracker执行
 63     }
 64 
 65     
 66     static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{
 67         protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,NewK2,LongWritable>.Context context) throws java.io.IOException ,InterruptedException {
 68             final String[] splited = value.toString().split("\t");
 69             final NewK2 k2 = new NewK2(Long.parseLong(splited[0]), Long.parseLong(splited[1]));
 70             final LongWritable v2 = new LongWritable(Long.parseLong(splited[1]));
 71             context.write(k2, v2);
 72         };
 73     }
 74     
 75     static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{
 76         protected void reduce(NewK2 k2, java.lang.Iterable<LongWritable> v2s, org.apache.hadoop.mapreduce.Reducer<NewK2,LongWritable,LongWritable,LongWritable>.Context context) throws java.io.IOException ,InterruptedException {
 77             long min = Long.MAX_VALUE;
 78             for (LongWritable v2 : v2s) {
 79                 if(v2.get()<min){
 80                     min = v2.get();
 81                 }
 82             }
 83             
 84             context.write(new LongWritable(k2.first), new LongWritable(min));
 85         };
 86     }
 87     
 88     /**
 89      * 问:为什么实现该类?
 90      * 答:因为原来的v2不能参与排序,把原来的k2和v2封装到一个类中,作为新的k2
 91      *
 92      */
 93     static class  NewK2 implements WritableComparable<NewK2>{
 94         Long first;
 95         Long second;
 96         
 97         public NewK2(){}
 98         
 99         public NewK2(long first, long second){
100             this.first = first;
101             this.second = second;
102         }
103         
104         
105         @Override
106         public void readFields(DataInput in) throws IOException {
107             this.first = in.readLong();
108             this.second = in.readLong();
109         }
110 
111         @Override
112         public void write(DataOutput out) throws IOException {
113             out.writeLong(first);
114             out.writeLong(second);
115         }
116 
117         /**
118          * 当k2进行排序时,会调用该方法.
119          * 当第一列不同时,升序;当第一列相同时,第二列升序
120          */
121         @Override
122         public int compareTo(NewK2 o) {
123             final long minus = this.first - o.first;
124             if(minus !=0){
125                 return (int)minus;
126             }
127             return (int)(this.second - o.second);
128         }
129         
130         @Override
131         public int hashCode() {
132             return this.first.hashCode()+this.second.hashCode();
133         }
134         
135         @Override
136         public boolean equals(Object obj) {
137             if(!(obj instanceof NewK2)){
138                 return false;
139             }
140             NewK2 oK2 = (NewK2)obj;
141             return (this.first==oK2.first)&&(this.second==oK2.second);
142         }
143     }
144     
145     static class MyGroupingComparator implements RawComparator<NewK2>{
146 
147         @Override
148         public int compare(NewK2 o1, NewK2 o2) {
149             return (int)(o1.first - o2.first);
150         }
151     
152         @Override
153         public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
154                 int arg4, int arg5) {
155             return WritableComparator.compareBytes(arg0, arg1, 8, arg3, arg4, 8);
156         }
157         
158     }
159 }
View Code

代码2.1

  从上面的代码中我们可以知道,我们自定义了一个分组比较器MyGroupingComparator,该类实现了RawComparator接口,RawComparator又继承了Comparator接口,这两个接口的代码如下:

public interface RawComparator<T> extends Comparator<T> {
  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);

}
public interface Comparator<T> {
    int compare(T o1, T o2);
    boolean equals(Object obj);
}

  在类MyGroupingComparator中分别对着两个接口中的方法进行了实现,RawComparator中的compare()方法是基于字节的比较,Comparator中的compare()方法是基于对象的比较。在该方法一共有六个参数,如下:
* @param arg0 表示第一个参与比较的字节数组
* @param arg1 表示第一个参与比较的字节数组的起始位置
* @param arg2 表示第一个参与比较的字节数组的偏移量
*
* @param arg3 表示第二个参与比较的字节数组
* @param arg4 表示第二个参与比较的字节数组的起始位置
* @param arg5 表示第二个参与比较的字节数组的偏移量

  在于NewK2中存储着两个long类型,每个long类型为8字节,.compareBytes()方法的参数如下:.compareBytes(arg0, arg1, 8, arg3, arg4, 8);因为比较的是第一列,所以读取的偏移量为8字节。由于我们要求出每一分组的最小值,所以还重写Reduce方法,求出每一分租的最小值。最后的运行结果如下图2.1所示

1    1
2    1
3    1

图 2.1

三、MapReduce的一些算法

3.1 MapReduce中Shuffle过程

  Shuffle是MapReduce过程的核心,了解Shuffle非常有助于理解MapReduce的工作原理。huffle的正常意思是洗牌或弄乱,可能大家更熟悉的是Java API里的Collections.shuffle(List)方法,它会随机地打乱参数list里的元素顺序。如果你不知道MapReduce里Shuffle是什么,那么请看这张图:

  在该图中分为Map任务和Reduce任务两个部分,从map端到reduce端的红色和绿色的线表示数据流的一个过程,也就是从<K1,V1>到<K2,V2>到<K3,V3>的一个过程。

Map端

  <1>在map端首先接触的是InputSplit,在InputSplit中含有DataNode中的数据,每一个InputSplit都会分配一个Mapper任务,Mapper任务结束后产生<K2,V 2>的输出,这些输出显存放在缓存中,每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spil l.percent),一个后台线程就把内容写到(spill)Linux本地磁盘中的指定目录(mapred.local.dir)下的新建的一个溢出写文件。

  <2>写磁盘前,要partition,sort。通过分区,将不同类型的数据分开处理,之后对不同分区的数据进行排序,如果有Combiner,还要对排序后的数据进行co mbine。等最后记录写完,将全部溢出文件合并为一个分区且排序的文件。

  <3>最后将磁盘中的数据送到Reduce中,从图中可以看出Map输出有三个分区,有一个分区数据被送到图示的Reduce任务中,剩下的两个分区被送到其他Reducer任务中。而图示的Reducer任务的其他的三个输入来自其他的Map输出。

Reduce端

  <1>Reducer通过Http方式得到输出文件的分区。
  <2>TaskTracker为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘。一个Map任务完成,Reduce就开始复制输出。
  <3>排序阶段合并map输出。然后走Reduce阶段。

3.2 Hadoop压缩算法

3.2.1 算法介绍

  Hadoop的压缩过程并不是一个必须的过程,但为什么还要使用呢?在哪些阶段可以使用,有什么好处呢?
<1>在Map输出到Reduce时可以使用,因为map端输出的数据要通过网络输出到Reduce端,为了减少传输的数据量我们可以采用压缩的方式来减少延迟。
<2>在整个作业的输出也可以使用
  Codec为是压缩,解压缩的算法的实现,在Hadoop中,codec由CompressionCode的实现来表示。下面是一些实现,如下图3.1所示。

图 3.1

3.2.2 MapReduce的输出进行压缩

  输出的压缩属性,和使用方式:如下图3.2,3.3所示。

图 3.2

图3.3

3.3 常见算法

3.3.1 MapReduce常见算介绍

<1>单词计数(已介绍)
<2>数据去重(去掉重复数据不难理解吧)
<3>排序(在上节已介绍)
<4>Top K(是求最值问题,下面会介绍)

下面算法,跟我们数据库中的方法比较类似,
<5>选择---行

    数据库中:该操作的结果应该是一行一行的显示,相当于where。

    MapReduce的实现:以求最值为例,从100万数据中选出一行最小值。
<6>投影---列

    数据库中:该操作的结果应该是一列一列的显示,相当于select。    

    MapReduce的实现:以求处理手机上网日志为例,从其11个字段选出了五个字段来显示我们的手机上网流量。
<7>分组

    数据库中:相当于group by。        

    MapReduce的实现:相当于分区,以求处理手机上网日志为例,喊手机号和非手机号分为两组。
<9>多表连接

    MapReduce中:在MapReduce中可以同时进入多个文件进行操作,其中两个文件有关系就相当于表连接。那么如何知道文件之间的关系呢?我可以通过map函数的context参数来获得文件路径代码如下

  final FileSplit inputSplit = (FileSplit) context.getInputSplit();
  final String path = inputSplit.getPath().toString();

<10>单表关联  

  通过上面的分析我们可以知道,sql中的方法也可以在MapReduce中实现,也就是说当把关系型数据库中的算法全部在MapReduce中实现时,也就意味着sql的使用范围扩展到了Hadoop,也就是大数据领域,这样意义是非常大的。

3.3.2 Top K 最值案例

  求最值的方法,在我们的生活中应用非常的广,比如找出高考中的最高分,也就是状元,就非常类似分布式计算,要选出全国的最高分就首先选出各省份的,要选出各省份就得选出各市级的等等,而这些数据量非常大,无法直接全部加载到内存中,面对如此大的数据量我就可以考虑使用分布式计算的方式。我们以从100万的数据中求出其中的最大值为例,介绍该方法。

  求最值最简单的办法就是对该文件进行一次遍历得出最值,但是现实中数据比量比较大,这种方法不能实现。在传统的MapReduce思想中,将文件的数据经过map迭代出来送到reduce中,在Reduce中求出最大值。但这个方法显然不够优化,我们可采用“分而治之”的思想,不需要map的所有数据全部送到reduce中,我们可以在map中先求出最大值,将该map任务的最大值送reduce中,这样就减少了数据的传输量。那么什么时候该把这个数据写出去呢?我们知道,每一个键值对都会调用一次map(),由于数据量大调用map()的次数也就多了,显然在map()函数中将该数据写出去是不明智的,所以最好的办法该Mapper任务结束后将该数据写出去。我们又知道,当Mapper/Reducer任务结束后会调用cleanup函数,所以我们可以在该函数中将该数据写出去。了解了这些我们可以看一下程序的代码如3.1所示。

 1 package suanfa;
 2 
 3 import java.net.URI;
 4 
 5 import mapreduce.WordCountApp;
 6 
 7 import org.apache.hadoop.conf.Configuration;
 8 import org.apache.hadoop.fs.FileSystem;
 9 import org.apache.hadoop.fs.Path;
10 import org.apache.hadoop.io.LongWritable;
11 import org.apache.hadoop.io.NullWritable;
12 import org.apache.hadoop.io.Text;
13 import org.apache.hadoop.mapreduce.Job;
14 import org.apache.hadoop.mapreduce.Mapper;
15 import org.apache.hadoop.mapreduce.Reducer;
16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
17 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
19 
20 public class TopKApp {
21     static final String INPUT_PATH = "hdfs://hadoop:9000/input2";
22     static final String OUT_PATH = "hdfs://hadoop:9000/out2";
23     
24     public static void main(String[] args) throws Exception {
25         Configuration conf = new Configuration();
26         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
27         final Path outPath = new Path(OUT_PATH);
28         if(fileSystem.exists(outPath)){
29             fileSystem.delete(outPath, true);
30         }
31         
32         final Job job = new Job(conf , WordCountApp.class.getSimpleName());
33         FileInputFormat.setInputPaths(job, INPUT_PATH);
34         job.setMapperClass(MyMapper.class);
35         job.setReducerClass(MyReducer.class);
36         job.setOutputKeyClass(LongWritable.class);
37         job.setOutputValueClass(NullWritable.class);
38         FileOutputFormat.setOutputPath(job, outPath);
39         job.waitForCompletion(true);
40     }
41     static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{
42         long max = Long.MIN_VALUE;
43         protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
44             final long temp = Long.parseLong(v1.toString());
45             if(temp>max){
46                 max = temp;
47             }
48         };
49         
50         protected void cleanup(org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,LongWritable, NullWritable>.Context context) throws java.io.IOException ,InterruptedException {
51             context.write(new LongWritable(max), NullWritable.get());
52         };
53     }
54 
55     static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{
56         long max = Long.MIN_VALUE;
57         protected void reduce(LongWritable k2, java.lang.Iterable<NullWritable> arg1, org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context arg2) throws java.io.IOException ,InterruptedException {
58             final long temp = k2.get();
59             if(temp>max){
60                 max = temp;
61             }
62         };
63         
64         protected void cleanup(org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context context) throws java.io.IOException ,InterruptedException {
65             context.write(new LongWritable(max), NullWritable.get());
66         };
67     }        
68 }
View Code

代码3.1

运行结果为:32767,也就是我们数据中的最大值

优质内容筛选与推荐>>
1、JDBC批处理---(java 对数据库的回滚) .
2、FIND_IN_SET()
3、HiWord()
4、MongoDB入门理解
5、Python操作 RabbitMQ、Redis、Memcache、SQLAlchemy


长按二维码向我转账

受苹果公司新规定影响,微信 iOS 版的赞赏功能被关闭,可通过二维码转账支持公众号。

    阅读
    好看
    已推荐到看一看
    你的朋友可以在“发现”-“看一看”看到你认为好看的文章。
    已取消,“好看”想法已同步删除
    已推荐到看一看 和朋友分享想法
    最多200字,当前共 发送

    已发送

    朋友将在看一看看到

    确定
    分享你的想法...
    取消

    分享想法到看一看

    确定
    最多200字,当前共

    发送中

    网络异常,请稍后重试

    微信扫一扫
    关注该公众号





    联系我们

    欢迎来到TinyMind。

    关于TinyMind的内容或商务合作、网站建议,举报不良信息等均可联系我们。

    TinyMind客服邮箱:support@tinymind.net.cn