MapReduce是一种用于大规模数据集并行处理的分布式计算编程模型,由Google提出后被广泛应用于大数据处理场景,核心思想是将复杂的并行计算任务拆解为Map和Reduce两个阶段,通过分而治之的方式提升处理效率。

MapReduce核心组成
MapReduce模型主要包含两个部分,分别对应数据处理的拆分和汇总逻辑:
- Map阶段:负责将输入的大数据集拆分成多个小的键值对片段,对每个片段进行独立处理,生成中间结果键值对。这个阶段可以并行执行,每个Map任务处理一部分输入数据,互不干扰。
- Reduce阶段:负责接收Map阶段输出的中间结果,按照键进行分组汇总,对同一个键对应的所有值进行合并处理,最终输出最终结果。
MapReduce执行流程
一个完整的MapReduce任务执行过程通常包含以下步骤:
- 输入数据被切分成多个分片,每个分片对应一个Map任务
- Map任务读取分片数据,处理生成中间键值对
- 框架对中间结果进行分区、排序、分组,相同键的数据会被发送到同一个Reduce任务
- Reduce任务处理分组后的数据,生成最终结果并输出到存储系统
词频统计示例实现
词频统计是MapReduce最经典的应用场景,下面给出基于Hadoop MapReduce的Java实现代码,统计文本中每个单词出现的次数。
Map阶段代码
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
// 自定义Map类,继承Mapper基类
// 输入键值对类型:LongWritable(行偏移量), Text(行内容)
// 输出键值对类型:Text(单词), LongWritable(出现次数1)
public class WordCountMap extends Mapper<LongWritable, Text, Text, LongWritable> {
private Text word = new Text();
private LongWritable one = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 将行内容转换为字符串
String line = value.toString();
// 按空格拆分单词
String[] words = line.split(" ");
// 遍历每个单词,输出键值对(单词, 1)
for (String w : words) {
if (w.length() > 0) {
word.set(w);
context.write(word, one);
}
}
}
}
Reduce阶段代码
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
// 自定义Reduce类,继承Reducer基类
// 输入键值对类型:Text(单词), LongWritable(出现次数,可迭代)
// 输出键值对类型:Text(单词), LongWritable(总出现次数)
public class WordCountReduce extends Reducer<Text, LongWritable, Text, LongWritable> {
private LongWritable total = new LongWritable();
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
// 遍历同一个单词对应的所有出现次数,求和
for (LongWritable val : values) {
sum += val.get();
}
total.set(sum);
// 输出最终结果(单词, 总次数)
context.write(key, total);
}
}
任务驱动类代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 创建配置对象和Job实例
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word-count");
// 设置Jar包的主类
job.setJarByClass(WordCountDriver.class);
// 设置Map和Reduce类
job.setMapperClass(WordCountMap.class);
job.setReducerClass(WordCountReduce.class);
// 设置Map输出键值对类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 设置最终输出键值对类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 设置输入路径和输出路径,args[0]为输入路径,args[1]为输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交任务并等待完成
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}
运行注意事项
运行上述代码时需要注意以下几点:
- 如果是本地运行,需要配置Hadoop本地环境,并且输出路径不能提前存在,否则任务会执行失败
- 如果是集群运行,需要将代码打包成Jar包,上传到Hadoop集群后通过命令行提交任务
- 实际场景中可以根据需求调整Map和Reduce的数量,优化任务的执行效率
除了Java语言,MapReduce还支持Python、Scala等多种语言实现,核心逻辑都是遵循Map和Reduce两个阶段的处理规则,开发者可以根据自身技术栈选择合适的实现方式。
MapReduce分布式计算大数据处理Map_Reduce示例修改时间:2026-07-04 03:48:26