使用MapReduce和spark对日志数据进行简单的清理和总结
首先使用MapReduce对日志进行分割,将time,ip,url提取出来,在用reduce进行一个整合,根据ip地址的出现次数,打印到hdfs中。在整合中我使用了bean结构来存储数据,bean继承了WritableComparable接口。
使用时先将BaiduLog和LogBean两个类导入项目,并配置相应的Maven依赖,然后导出项目的jar到虚拟机中,将日志文件上传到HDFS中,使用命令运行
hadoop jar rain-hadoop-1.0-SNAPSHOT.jar com.rain.mapreduce.BaiduLog /data/baidu.log /data/log/clean5
BaiduLog.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
<br />import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class BaiduLog { public static class BaiduLogMapper extends Mapper<LongWritable,Text, Text, LogBean> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // super.map(key, value, context); String log = value.toString(); String str = "(cn.baidu.core.inteceptor.LogInteceptor:55)"; if (log.indexOf(str)!=-1){ String[] log_arr = log.split(str); String time = log_arr[0].substring(1, 10); String[] log_arr2 = log_arr[1].split("\t"); String ip = log_arr2[1]; String url = log_arr2[2]; if (url.equals("null")){ url = log_arr2[3]; } LogBean logbean = new LogBean(time,ip,url); context.write(new Text(ip),logbean); } } } public static class BaiduLogReducer extends Reducer<Text,LogBean,IntWritable,Text>{ @Override protected void reduce(Text key, Iterable<LogBean> values, Context context) throws IOException, InterruptedException { // super.reduce(key, values, context); int sum = 0; StringBuffer str = new StringBuffer(); int flag = 0; for (LogBean logbean:values){ sum++; if (flag==0){ str.append(logbean.toString()); flag = 1; } } context.write(new IntWritable(sum),new Text(str.toString())); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "avg"); job.setJarByClass(BaiduLog.class); job.setMapperClass(BaiduLog.BaiduLogMapper.class); job.setReducerClass(BaiduLog.BaiduLogReducer.class); // job.setCombinerClass(BaiduLog.BaiduLogReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LogBean.class); FileInputFormat.addInputPath(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); } } |
LogBean.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
<br />import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class LogBean implements WritableComparable<LogBean> { private String time; private String ip; private String url; public LogBean() { super(); } public LogBean(String time, String ip, String url) { this.time = time; this.ip = ip; this.url = url; } @Override public String toString() { return "LogBean{" + "time='" + time + ' ' + ", ip='" + ip + ' ' + ", url='" + url + ' ' + '}'; } public String getTime() { return time; } public void setTime(String time) { this.time = time; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } @Override public int compareTo(LogBean o) { return 0; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(time); out.writeUTF(ip); out.writeUTF(url); } @Override public void readFields(DataInput in) throws IOException { time = in.readUTF(); ip = in.readUTF(); url = in.readUTF(); } } |