- 为什么进行压力测试?
- 核心工具:
pymemcache - 构建压力测试脚本(从简单到复杂)
- 关键性能指标
- 如何解读和优化结果
- 进阶:与
locust集成
为什么进行压力测试?
对 Memcached 进行压力测试的主要目的包括:

- 性能基准测试:确定你的 Memcached 实例(包括网络、CPU、内存)在特定负载下的最大吞吐量(QPS)和延迟。
- 容量规划:了解随着客户端连接数和请求量的增加,系统何时会达到瓶颈,从而决定需要多少台 Memcached 服务器。
- 代码验证:确保你的 Python 应用程序在高并发读写场景下,对
pymemcache的使用是健壮的,没有死锁、连接泄漏等问题。 - 瓶颈定位:是网络延迟、服务器 CPU 不够用,还是客户端 Python 代码成了瓶颈?压力测试数据可以帮助你找到答案。
核心工具:pymemcache
pymemcache 是目前 Python 社区最推荐使用的 Memcached 客户端,相比老牌的 python-memcached,它有以下优势:
- 高性能:使用
libevent和gevent等异步 I/O 模型,性能极佳。 - 连接池:内置连接池,可以复用 TCP 连接,避免了频繁创建和销毁连接的开销,对高并发至关重要。
- 协议支持:支持二进制协议,比文本协议更高效。
- 故障转移:支持在节点故障时自动将请求转移到下一个健康节点。
安装:
pip install pymemcache
构建压力测试脚本
我们将从最简单的脚本开始,逐步构建一个功能更完善的压力测试工具。
单线程脚本(基础验证)
这个脚本用于快速验证 pymemcache 是否能正常工作,并给出一个非常粗略的性能数据。

import time
from pymemcache.client.base import Client
# --- 配置 ---
MEMCACHED_SERVER = '127.0.0.1:11211'
KEY_PREFIX = 'stress_test_key_'
TEST_VALUE = b'A' * 1024 # 1KB 的测试数据
TOTAL_REQUESTS = 10000
def run_simple_test():
# 创建客户端
# connect_timeout 和 timeout 设置很重要,防止长时间阻塞
client = Client(MEMCACHED_SERVER, connect_timeout=2, timeout=2)
try:
# 确保连接成功
client.version()
print(f"Successfully connected to {MEMCACHED_SERVER}")
# --- 写入测试 ---
print(f"Starting write test for {TOTAL_REQUESTS} keys...")
start_time = time.time()
for i in range(TOTAL_REQUESTS):
key = f"{KEY_PREFIX}{i}"
client.set(key, TEST_VALUE)
write_duration = time.time() - start_time
write_qps = TOTAL_REQUESTS / write_duration
print(f"Write Test: {write_qps:.2f} QPS (Total time: {write_duration:.2f}s)")
# --- 读取测试 ---
print(f"Starting read test for {TOTAL_REQUESTS} keys...")
start_time = time.time()
for i in range(TOTAL_REQUESTS):
key = f"{KEY_PREFIX}{i}"
client.get(key)
read_duration = time.time() - start_time
read_qps = TOTAL_REQUESTS / read_duration
print(f"Read Test: {read_qps:.2f} QPS (Total time: {read_duration:.2f}s)")
except Exception as e:
print(f"An error occurred: {e}")
finally:
client.close()
if __name__ == '__main__':
run_simple_test()
问题:这个脚本只有一个线程,无法模拟真实的高并发场景,它的 QPS 结果通常远低于实际生产环境。
多线程脚本(并发压力)
这是最常用的压力测试方法,我们将使用 Python 的 concurrent.futures.ThreadPoolExecutor 来模拟多个并发客户端。
import time
import random
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from pymemcache.client.base import Client
# --- 配置 ---
MEMCACHED_SERVER = '127.0.0.1:11211'
KEY_PREFIX = 'stress_test_key_'
TEST_VALUE = b'A' * 1024 # 1KB
TOTAL_REQUESTS = 50000 # 总请求数
CONCURRENT_THREADS = 50 # 并发线程数
# 用于线程安全的计数器
request_counter = 0
lock = threading.Lock()
def worker(client, worker_id):
"""工作线程函数,执行读写操作"""
global request_counter
success = 0
fail = 0
while True:
with lock:
if request_counter >= TOTAL_REQUESTS:
break
request_counter += 1
current_req = request_counter
key = f"{KEY_PREFIX}{worker_id}_{current_req}"
# 随机选择操作类型 (70% 读, 30% 写)
operation = random.choices(['get', 'set'], weights=[0.7, 0.3])[0]
try:
if operation == 'set':
client.set(key, TEST_VALUE)
else:
client.get(key)
success += 1
except Exception as e:
fail += 1
# print(f"Worker {worker_id} failed on request {current_req}: {e}")
return success, fail
def run_concurrent_test():
print(f"Starting concurrent test with {CONCURRENT_THREADS} threads and {TOTAL_REQUESTS} total requests...")
start_time = time.time()
# 使用 ThreadPoolExecutor 管理线程池
with ThreadPoolExecutor(max_workers=CONCURRENT_THREADS) as executor:
# 为每个线程创建一个独立的客户端实例
# 这是最佳实践!每个线程有自己的连接,避免锁竞争。
clients = [Client(MEMCACHED_SERVER, connect_timeout=2, timeout=2) for _ in range(CONCURRENT_THREADS)]
futures = [executor.submit(worker, client, i) for i, client in enumerate(clients)]
total_success = 0
total_fail = 0
for future in as_completed(futures):
success, fail = future.result()
total_success += success
total_fail += fail
duration = time.time() - start_time
overall_qps = TOTAL_REQUESTS / duration
print("\n--- Test Summary ---")
print(f"Total Requests: {TOTAL_REQUESTS}")
print(f"Total Time: {duration:.2f} seconds")
print(f"Overall QPS: {overall_qps:.2f}")
print(f"Success: {total_success}, Failures: {total_fail}")
# 关闭所有客户端连接
for client in clients:
client.close()
if __name__ == '__main__':
run_concurrent_test()
关键点解析:
- 每个线程一个客户端:
pymemcache的Client实例不是线程安全的。必须为每个工作线程创建一个独立的Client实例,上面的代码正确地做到了这一点。 - 线程安全计数器:使用
threading.Lock来保护共享的request_counter,确保所有线程协同工作,直到完成所有请求。 - 混合读写:真实应用场景是读写混合的,使用
random.choices可以模拟不同的读写比例。 - 连接池 vs. 独立连接:
pymemcache内部有自己的连接池,每个Client实例都会管理一小撮连接,对于多线程测试,"每个线程一个客户端"的模式是最高效且最清晰的。
关键性能指标
在运行压力测试时,你需要关注以下几个核心指标:

- QPS (Queries Per Second):每秒查询数,这是衡量 Memcached 服务和处理能力最核心的指标,QPS 越高,性能越好。
- 延迟:从发送请求到收到响应的时间,通常使用百分位数来衡量,如 P50, P90, P99, P999。
- P50 (中位数):50% 的请求响应时间小于这个值。
- P99 (99th percentile):99% 的请求响应时间小于这个值,这个指标非常重要,它代表了最差体验的用户感受,P99 延迟突然飙升,说明系统已经开始不稳定。
- 错误率:失败的请求数 / 总请求数,任何高于 0% 的错误率都值得警惕,可能意味着连接超时、服务器过载或网络问题。
- 服务器资源:在测试的同时,必须监控 Memcached 服务器的状态。
memcached -vv:查看详细日志,特别是evictions(淘汰)信息,如果淘汰频繁,说明内存不足。top/htop:查看 CPU 和内存使用率。netstat -an | grep 11211:查看连接数。
如何解读和优化结果
QPS 上不去,延迟很高
- 检查客户端:
- 网络延迟?客户端和 Memcached 服务器是否在同一个网络/VPC?
- Python 代码是否是瓶颈?尝试增加并发线程数,看 QPS 是否线性增长,如果增加线程数 QPS 不再增加,甚至下降,可能是 Python GIL(全局解释器锁)或客户端机器的 CPU/网络成为瓶颈。
- 检查服务器:
CPU使用率是否 100%?如果是,CPU 是瓶颈。内存是否用尽?evictions是否很高?如果是,内存是瓶颈,需要增加内存或增加 Memcached 节点。网络是否成为瓶颈?检查服务器的网卡流量。
错误率很高
- 连接超时:
pymemcache默认的超时时间是None(无限等待),务必设置合理的connect_timeout和timeout,1-2 秒。 - 服务器拒绝连接:检查 Memcached 的最大连接数限制 (
-c参数),默认是 1024,如果并发连接数超过这个限制,新的连接会被拒绝。 - 数据被淘汰:如果你在读取一个刚刚写入的 key 就失败了,很可能是 key 在写入后立即被淘汰了(内存满了),检查
evictions。
优化方向
- 客户端优化:
- 使用
pymemcache的Serde(Serializer/Deserializer) 来优化数据序列化/反序列化过程,特别是对于复杂对象。 - 确保连接池参数配置合理。
- 使用
- 服务器优化:
- 增加内存 (
-m)。 - 增加最大连接数 (
-c)。 - 使用更快的磁盘(如果使用了持久化,但通常 Memcached 是纯内存的)。
- 增加 Memcached 节点,并使用一致性哈希客户端(如
pymemcache的HashClient)进行分片。
- 增加内存 (
进阶:与 locust 集成
对于更复杂的、可动态调整负载的压力测试,可以使用专业的负载测试工具 Locust,它用 Python 编写,非常灵活。
安装 Locust:
pip install locust
编写 Locust 脚本 (locfile.py):
from locust import HttpUser, task, between
from pymemcache.client.base import Client
class MemcachedUser(HttpUser):
# wait_time 是用户在任务之间等待的时间
wait_time = between(0.1, 0.5)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 每个虚拟用户(Locust 中的“用户”)创建一个 memcached 客户端
self.client = Client("127.0.0.1:11211", connect_timeout=2, timeout=2)
self.key_counter = 0
@task
def memcached_read_write(self):
# 模拟读写混合操作
key = f"locust_key_{self.key_counter}"
self.key_counter += 1
# 70% 概率执行读操作
if self.environment.p_random() < 0.7:
try:
self.client.get(key)
except Exception as e:
print(f"Read failed: {e}")
else:
try:
# 写入 1KB 数据
self.client.set(key, b"x" * 1024)
except Exception as e:
print(f"Write failed: {e}")
def on_stop(self):
# 当虚拟用户结束时关闭连接
self.client.close()
运行 Locust:
- 在终端运行:
locust -f locfile.py - 打开浏览器访问
http://127.0.0.1:8089 - 在 Web UI 中设置 Number of users (并发用户数) 和 Hatch rate (每秒启动的用户数),然后点击 "Start Swarming"。
Locust 的优势:
- 动态调整负载:可以在测试过程中实时增加或减少并发用户数。
- 可视化图表:提供实时的 QPS、响应时间分布图表,非常直观。
- 分布式:可以轻松地在多台机器上运行大规模测试。
| 方法 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 单线程脚本 | 简单,快速验证 | 无法模拟并发,结果不准确 | 功能验证,快速检查连通性 |
| 多线程脚本 | 最常用,能模拟真实并发,可控性好 | 需要自己管理线程和计数器,结果分析依赖脚本输出 | 常规压力测试,性能基准测试,容量规划 |
| Locust 集成 | 功能强大,可视化,动态调整负载,分布式 | 学习曲线稍高,需要额外安装工具 | 高级压力测试,需要精细控制负载和实时监控的场景 |
对于大多数 Python 方案二(多线程脚本) 是进行 Memcached 压力测试的“标准武器”,它能提供足够的信息来诊断性能瓶颈和评估系统容量,当需要更复杂的测试场景时,再转向 Locust。
