使用MapReduce统计天气数据中的最高气温
例子是从Hadoop权威指南上改写的,需要先将代码生成jar包,然后将测试样例和jar包上传到HDFS中,使用命令运行,最终会得出每年的最高气温统计结果,运行命令不写出来了。代码也很简单,就是分别写出map和reduce的操作,map中对数据进行一个分割,提取,将结果传给reduce进行统计。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
package com.rain.mapreduce; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * Hadoop书上的最高气温判断 */ public class MaxTemperature { public static class MaxTemperatureMapper extends Mapper<LongWritable,Text, Text, IntWritable>{ private static final int MISSING = 9999; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(15,19); int airTemperature; if (line.charAt(87) == '+') { airTemperature = Integer.parseInt(line.substring(88,92)); }else{ airTemperature = Integer.parseInt(line.substring(87,92)); } String quality = line.substring(92,93); if (airTemperature!=MISSING&&quality.matches("[01459]")){ context.write(new Text(year), new IntWritable(airTemperature)); } } } public static class MaxTemperatureReducer extends Reducer<Text ,IntWritable,Text,IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; for (IntWritable value:values){ maxValue = Math.max(maxValue,value.get()); } context.write(key,new IntWritable(maxValue)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { if (args.length!=2){ System.err.println("Usage:MaxTEmperature"); System.exit(-1); } Job job = new Job(); job.setJarByClass(MaxTemperature.class); job.setJobName("Max temperature"); FileInputFormat.addInputPath(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true)?0:1); } } |