Hadoop实例之利用MapReduce实现日志清洗(附源代码)


通过hadoop的分布式文件系统与MR完成日常日志文件的数据处理,以求达到数据清洗的目的。

日志数据格式:

27.19.74.143 - - [30/Mar/2015:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127

需要得到的数据:

源代码:

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;


import org.apache.hadoop.conf.Configuration;                                                                                   
import org.apache.hadoop.fs.Path;                                                                                              
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;                                                                                              
import org.apache.hadoop.mapreduce.Job;                                                                                        
import org.apache.hadoop.mapreduce.Mapper;                                                                                     
import org.apache.hadoop.mapreduce.Reducer;                                                                                    
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;                                                                     
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;                                                                     
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;                                                                   
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;                                                                   
public class Logtest1 {                                                                                                            
    public static class Map extends Mapper<Object , Text , IntWritable,Text >{                                                    
    private static Text goods=new Text();                                                                                         
    private static IntWritable num=new IntWritable();   
    public  static  final  IntWritable  one  =  new  IntWritable(1);  

//下面这部分代码是将获取的时间处理为标准格式
public static final SimpleDateFormat FORMAT = new SimpleDateFormat( "d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH); public static final SimpleDateFormat dateformat1 = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss"); private Date parseDateFormat(String string) { Date parse = null; try { parse = FORMAT.parse(string); } catch (ParseException e) { e.printStackTrace(); } return parse; } private String parseTime(String line) { final int first = line.indexOf("["); String time = line.substring(first + 1, line.length()-1).trim(); Date date = parseDateFormat(time); return dateformat1.format(date); }
int number=1; //日志条数 当做主键
public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ try { String line=value.toString(); //读取每一行数据转化为字符串 String arr[]=line.split(" "); //将得到的字符串以空格为分隔符分割,存入数组 String content=null; String time = parseTime(arr[3]); 将时间转为标准时间格式 content=arr[0]+","+time+","+arr[6]+","+arr[8]+","+arr[9]; //将需要的数据拼接 // System.out.println(content);
num.set(number); //存入num,当做主键传递给reduces number
++; //条数加一
goods.set(content); //将内容当做value context.write(num,goods);
}
catch (Exception e) { // TODO: handle exception } } } public static class Reduce extends Reducer< IntWritable, Text, IntWritable, Text>{ private static IntWritable result= new IntWritable(); public void reduce(IntWritable key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ for(Text val:values){ context.write(key,val); System.out.println(key); System.out.println(val); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{ Configuration conf=new Configuration(); Job job =new Job(conf,"Logtest1"); job.setJarByClass(Logtest1.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path in=new Path("hdfs://192.168.68.130:9000/user/hadoop/access_2015_03_30.log"); //需要清洗的日志所在位置 Path out=new Path("hdfs://192.168.68.130:9000/user/hadoop/outtest6"); //输入的目的,注意outtest6不能存在 FileInputFormat.addInputPath(job,in); FileOutputFormat.setOutputPath(job,out); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

优质内容筛选与推荐>>
1、mac地址和ip地址要同时存在么?
2、spring boot拦截器中获取request post请求中的参数
3、svn文件被锁不能提交的解决办法
4、jquery iframe父子框架中的元素访问方法
5、【转】全国最佳医院排名


长按二维码向我转账

受苹果公司新规定影响,微信 iOS 版的赞赏功能被关闭,可通过二维码转账支持公众号。

    阅读
    好看
    已推荐到看一看
    你的朋友可以在“发现”-“看一看”看到你认为好看的文章。
    已取消,“好看”想法已同步删除
    已推荐到看一看 和朋友分享想法
    最多200字,当前共 发送

    已发送

    朋友将在看一看看到

    确定
    分享你的想法...
    取消

    分享想法到看一看

    确定
    最多200字,当前共

    发送中

    网络异常,请稍后重试

    微信扫一扫
    关注该公众号