MapReduce 是一种用于大规模数据集处理的编程模型,它将一个大任务分解成两个主要阶段:Map(映射) 和 Reduce(规约)。

核心思想
想象一下一个图书馆,你想统计所有书籍中每个单词出现的总次数。
-
Map (映射) 阶段:
- 你雇佣了很多实习生(Mapper)。
- 每个实习生拿到一本书(输入数据的一个分片)。
- 他们逐页阅读,将遇到的每一个单词记录下来,并标记为 "1"(
("hello", 1))。 - 实习生们不进行累加,只负责标记和输出,你会得到一堆像
("hello", 1), ("world", 1), ("hello", 1)` 这样的记录。
-
Shuffle & Sort (洗牌与排序) 阶段:
- 你有一个管理员。
- 管理员收集所有实习生提交的记录。
- 他将所有相同单词的记录分到同一个文件夹里,所有
("hello", 1)的记录都被放在一起,所有("world", 1)的记录被放在一起。 - 对每个文件夹内的记录,按单词进行排序,这个阶段是 MapReduce 的“魔法”所在,它为 Reduce 阶段做好了准备。
-
Reduce (规约) 阶段:
(图片来源网络,侵删)- 你再雇佣一批高级分析师(Reducer)。
- 每个分析师负责一个文件夹(一个唯一的 Key)。
- 分析师打开他负责的文件夹,里面全是
("hello", 1)这样的记录。 - 他将所有的 "1" 加起来,得到
("hello", 3)(假设 "hello" 出现了3次)。 - 每个分析师输出一个最终的计数结果。
- Map: 将输入数据分解,并生成中间的键值对。
- Shuffle & Sort: 框架自动完成,将相同 Key 的中间值聚合在一起。
- Reduce: 对每个唯一的 Key,聚合其对应的 Value,生成最终结果。
Python 实现 MapReduce 的方式
在 Python 中,你有几种方式来实现 MapReduce:
- 纯 Python 手动实现:理解原理的最佳方式,但不适合生产环境。
- 使用
mrjob库:由 Yelp 开发,是 Python 生态中最流行、最成熟的 MapReduce 库,它可以在你的本地机器上运行,也可以轻松部署到 Amazon EMR 等云平台上。这是最推荐的方式。 - 使用 Hadoop Streaming:通过标准输入/输出与 Hadoop 集群通信,需要你有 Hadoop 环境。
下面我们重点讲解 mrjob,因为它最实用。
使用 mrjob 库进行编程
mrjob 让你可以用纯 Python 编写 MapReduce 作业,而无需关心底层的 Hadoop 或集群管理细节。
安装 mrjob
pip install mrjob
一个完整的单词计数示例
我们将实现上面提到的图书馆单词计数任务。

步骤 1:准备输入数据
创建一个名为 input.txt 的文件,内容如下:
hello world
hello mrjob
this is a test
hello again
步骤 2:编写 Python 脚本
创建一个名为 wordcount.py 的文件,并写入以下代码:
from mrjob.job import MRJob
class MRWordCount(MRJob):
"""
一个用于统计单词数量的 MapReduce 作业
"""
def mapper(self, _, line):
"""
Map 函数
:param _: 输入的键,对于文本文件,每一行的键通常是 None
:param line: 输入的值,即文件中的一行文本
"""
# 将一行文本分割成单词列表
words = line.split()
# 遍历每个单词,并输出 (word, 1) 的键值对
for word in words:
# yield 关键字用于输出中间结果
yield word, 1
def reducer(self, word, counts):
"""
Reduce 函数
:param word: 从 Mapper 阶段接收到的键(经过 Shuffle & Sort 后)
:param counts: 与该键关联的所有值的迭代器(对于 "hello",它会是 1, 1, 1)
"""
# 对与 word 关联的所有 counts 求和
yield word, sum(counts)
if __name__ == '__main__':
# 这行代码会启动作业
MRWordCount.run()
代码解释:
class MRWordCount(MRJob): 我们创建一个继承自MRJob的类。mapper(self, _, line):mrjob会逐行读取输入文件,并将每一行作为参数传递给mapper函数。line.split()将字符串按空格分割成单词列表。yield word, 1为每个单词产生一个键值对,对于 "hello world",它会产生("hello", 1)和("world", 1)。
reducer(self, word, counts):mrjob的框架会自动收集所有相同word的中间结果,并将word和所有对应的1传递给reducer。counts是一个迭代器,包含了某个单词对应的所有 "1",对于 "hello",counts会迭代出1,1,1。sum(counts)将这些 "1" 相加,得到最终的总数。yield word, sum(counts)输出最终结果。
步骤 3:运行作业
打开终端,进入 wordcount.py 和 input.txt 所在的目录,然后运行:
# 使用本地运行模式 python wordcount.py input.txt
预期输出:
"again" 1
"a" 1
"hello" 3
"is" 1
"mrjob" 1
"test" 1
"this" 1
"world" 1
你看,我们成功统计了每个单词的出现次数!
更复杂的 mrjob 示例:计算每个 IP 地址的访问次数
这个例子更贴近实际应用,比如分析 web 服务器日志。
步骤 1:准备输入数据
创建一个名为 access.log 的文件,内容如下:
168.1.1 - - [10/Oct/2025:13:55:36 +0000] "GET /index.html HTTP/1.1" 200 1024
10.0.0.5 - - [10/Oct/2025:13:56:05 +0000] "POST /login HTTP/1.1" 200 512
192.168.1.1 - - [10/Oct/2025:13:56:36 +0000] "GET /home.html HTTP/1.1" 200 2048
10.0.0.5 - - [10/Oct/2025:13:57:01 +0000] "GET /dashboard HTTP/1.1" 404 256
192.168.1.1 - - [10/Oct/2025:13:57:36 +0000] "GET /about.html HTTP/1.1" 200 512
我们想要统计每个 IP 地址(168.1.1)的访问总次数。
步骤 2:编写 Python 脚本
创建一个名为 ipcount.py 的文件:
from mrjob.job import MRJob
import re
class MRIPCount(MRJob):
# 定义 Mapper 函数
def mapper(self, _, line):
"""
从日志行中提取 IP 地址。
日志格式: IP - - [date] "request" status size
"""
# 使用正则表达式提取 IP 地址
# IP 地址的模式: 四个由点分隔的 1-3 位数字
ip_address = re.search(r'\d+\.\d+\.\d+\.\d+', line)
if ip_address:
# 如果找到了 IP 地址,则将其作为键,1 作为值
yield ip_address.group(), 1
# 定义 Reducer 函数
def reducer(self, ip_address, counts):
"""
对每个 IP 地址的访问次数进行求和。
"""
# 将与该 IP 地址关联的所有 counts (都是 1) 相加
yield ip_address, sum(counts)
if __name__ == '__main__':
MRIPCount.run()
步骤 3:运行作业
python ipcount.py access.log
预期输出:
"10.0.0.5" 2
"192.168.1.1" 3
mrjob 的高级特性
mrjob 还支持更复杂的场景:
-
Combiners (组合器):在数据发送到 Reducer 之前,先在本地进行一次聚合,这可以极大地减少网络传输的数据量,提高性能,Combiner 的逻辑通常和 Reducer 相同。
# 在你的 MRJob 类中添加 def combiner(self, word, counts): yield word, sum(counts)mrjob会自动决定何时使用 Combiner。 -
Multiple Mappers/Reducers:一个作业可以有多个 Mapper 和多个 Reducer。
class MyJob(MRJob): def steps(self): return [ self.mr(mapper=self.mapper1, reducer=self.reducer1), self.mr(mapper=self.mapper2, reducer=self.reducer2) ] -
Runner Options:
mrjob提供了丰富的命令行选项来控制作业的运行。--local: 在本地运行(默认)。--runner=hadoop: 在 Hadoop 集群上运行。--runner=emr: 在 Amazon EMR 上运行。-r emr:-r是--runner的缩写。--num-reducers=N: 指定 Reducer 的数量。--output-dir=s3://my-bucket/output: 将输出直接保存到 Amazon S3。
纯 Python 手动实现(为了理解原理)
如果你没有安装 mrjob,或者想手动实现一遍来加深理解,可以这样做。
代码逻辑:
- 模拟 Map 阶段:读取文件,处理每一行,生成一个包含所有
(word, 1)的列表。 - 模拟 Shuffle & Sort 阶段:对列表进行排序,并将相同
word的1收集到一起。 - 模拟 Reduce 阶段:遍历排序后的列表,对每个
word的1进行求和。
# 1. 准备输入数据 (同上)
# input.txt 内容:
# hello world
# hello mrjob
# this is a test
# hello again
# 2. 手动实现 MapReduce
def map_phase(file_path):
"""模拟 Mapper 阶段"""
intermediate = []
with open(file_path, 'r') as f:
for line in f:
words = line.split()
for word in words:
intermediate.append((word, 1))
return intermediate
def shuffle_sort_phase(intermediate):
"""模拟 Shuffle & Sort 阶段"""
# 先按单词排序
intermediate.sort(key=lambda x: x[0])
# 然后将相同的单词分组
# [('a', 1), ('a', 1), ('b', 1)] -> {'a': [1, 1], 'b': [1]}
grouped = {}
for word, count in intermediate:
if word not in grouped:
grouped[word] = []
grouped[word].append(count)
return grouped
def reduce_phase(grouped_data):
"""模拟 Reducer 阶段"""
final_result = []
for word, counts in grouped_data.items():
final_result.append((word, sum(counts)))
return final_result
# 3. 运行流程
if __name__ == '__main__':
input_file = 'input.txt'
# 执行 Map
intermediate_data = map_phase(input_file)
print("Map 阶段输出 (中间结果):")
print(intermediate_data)
print("-" * 20)
# 执行 Shuffle & Sort
grouped_data = shuffle_sort_phase(intermediate_data)
print("Shuffle & Sort 阶段输出 (分组后):")
print(grouped_data)
print("-" * 20)
# 执行 Reduce
final_result = reduce_phase(grouped_data)
print("Reduce 阶段输出 (最终结果):")
# 按单词排序后输出,使其与 mrjob 结果一致
final_result.sort(key=lambda x: x[0])
for word, count in final_result:
print(f'"{word}"\t{count}')
手动实现的缺点:
- 内存限制:所有中间数据都存储在内存中,对于海量数据(如 TB 级),这会立刻导致内存溢出。
- 效率低下:没有并行处理,所有操作都在单台机器的单个进程中完成。
- 复杂度高:手动处理文件 I/O、排序、分组等逻辑非常繁琐。
| 特性 | mrjob (推荐) |
纯 Python 手动实现 |
|---|---|---|
| 易用性 | 高,API 简洁,只需定义 mapper 和 reducer |
低,需要手动实现所有逻辑 |
| 性能 | 高,支持并行、Combiner,可部署到集群 | 低,单线程,内存受限 |
| 可扩展性 | 极高,可轻松扩展到 Hadoop/Spark/EMR | 无,无法处理超大规模数据 |
| 适用场景 | 生产环境、大数据分析、学习 MapReduce 框架 | 学习算法原理、处理小规模数据集 |
对于任何实际应用或深入学习,强烈建议使用 mrjob,它为你屏蔽了分布式系统的复杂性,让你可以专注于业务逻辑的实现。
