WordCount经常在我们初学mapreduce的时候,被作为最简单的例子来讲解,这次我们就从官方给出的源码入手,看看是怎么回事。
所谓Mapreduce,就是Map(映射)”和”Reduce(归约)。map过程,就是把一组<key,value>映射成新的<key,value>;reduce过程,就是把map过程产生的一些列<key,value>,其中那些key相同的,归约成<key,<values>>来处理,一个key对应一组value。
有了这点认识,我们来看看hadoop的mapreduce源码范例:wordcount。
这个程序用来统计一个文本里面各个单词出现的个数。具体怎么运行可以参看我之前写的《centos+虚拟机配置hadoop2.5.2-mapreduce-wordcount例子》
下面分析一下WordCount源码:
Map过程:
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
关于mapper类的要点:
- maps将输入的键值对输出为中间键值对,输出不一定是和输入是同样类型的,一个输入键值对可以输出多个键值对,也可能不输出。
- hadoop的Map-Reduce框架对每一个InputSplit键值对都生成一个map任务,这些键值对是由InputFormat生成的。
- 框架首先调用setup函数,随后是map函数(InputSplit产生的每一个键值对都调用一次),最后调用finally函数。在wordcount中,只重写了map函数。
- mapper的输出被分成了不同组,供每个reducer来处理,我们可以通过实现抽象类Partitioner来控制排序和分组。
TokenizerMapper类的简单解析:
由于继承mapper类,因此要实现map()方法。
其中有这么一行代码:
StringTokenizer itr = new StringTokenizer(value.toString());
value是传进来的值,对于这个例子而言,是一行的文本,将一行分割成一个又一个单词,然后,用一个while循环迭代,生成新的(key,value)。代码如下:
while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); }
其中的one这个变量,是在刚开始声明的:
private final static IntWritable one = new IntWritable(1);
用来计数,这里是1,所以,输出的(key,value)都是这样的形式:(“单词”,1)。可供之后处理。
Reduce过程:
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
关于reducer类的要点:
- reducer将中间输出键值对中那些键相同的合并,值为集合。
- reduce主要有三个阶段:
- shuffle洗牌:reducer将排序好的输出从每个mapper里面复制出来,整个过程用HTTP来通信。
- sort排序:框架将reducer的具有相同键的输入合并排序,因为不同的mapper可能有相同的键,shuffle过程和sort过程是同时进行的。
- Reduce归约:在reduce阶段,每个key传进来,reduce方法都被调用一次,进行归约。
IntSumReducer类的简单解析:
重写了reduce方法,由于之前已经有map过程了,因此此时传进来的键值对的形式是<key,>,即value不是一个值,而是值的集合。用一个for循环,即可遍历某个key里面的所有值:
for (IntWritable val : values) { sum += val.get(); }
这个for循环将某个key对应的所有的value累加,即某单词出现次数的累加。
然后把结果输出:
result.set(sum); context.write(key, result);
其中的result在之前声明了:
private IntWritable result = new IntWritable();
是整型,所以之后context.write(key,result)就把某单词出现的次数输出了。
Mapreduce的整个过程:
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }
要点:
- Configuration conf = new Configuration(); 这句新建了一个配置对象conf,可以配置mapreduce的一些参数,这里没有对配置作过多的调整。
- Job job = new Job(conf, “word count”); 这句新建了一个job,用于控制整个工作流程,接下来的6个set函数:
job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
它们分别是:
1.设置工作的类名,这里是WordCount
2.设置mapper的类:TokenizerMapper.class)
3.设置combiner的类:IntSumReducer.class
4.设置reducer的类:IntSumReducer.class,和combiner是一样的。
5.设置输出key的格式,text类型
6.设置输出value的格式,int类型。
- 接着就是设置输入输出路径,都由命令行参数来指定
- 最后调用job.waitForCompletion(true) 来开始工作。
最后附上完整源代码:
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }