安装 kazoo 库
你需要使用 pip 来安装 kazoo。

pip install kazoo
连接到 Zookeeper
使用 kazoo 的第一步是创建一个 KazooClient 实例并连接到 Zookeeper 集群。
from kazoo.client import KazooClient
# 创建一个 KazooClient 实例
# zk_host 可以是单个主机:端口,也可以是多个主机组成的逗号分隔列表
zk = KazooClient(hosts='127.0.0.1:2181')
# 启动连接
zk.start()
print("Successfully connected to Zookeeper!")
# 关闭连接
# zk.stop()
重要提示:
hosts参数的格式是ip1:port1,ip2:port2,...,Zookeeper 客户端会自动连接到集群中的任意一个节点,并自动发现其他节点。start()方法是异步的,它会立即返回,然后在后台建立连接,如果你需要确保连接已经建立好,可以使用zk.start(timeout=...)带上超时参数,或者在后续操作中通过异常来处理连接问题。- 使用完毕后,务必调用
stop()方法来关闭连接,释放资源。
常用操作
1 检查节点是否存在
# 检查根节点 '/是否存在'
if zk.exists('/'):
print("Root node '/' exists.")
else:
print("Root node '/' does not exist.")
# 检查一个不存在的节点
if zk.exists('/non_existent_node'):
print("Node '/non_existent_node' exists.")
else:
print("Node '/non_existent_node' does not exist.")
2 创建节点
kazoo 提供了多种方式来创建节点。
# 1. 创建一个持久化节点
# 如果父节点不存在,会抛出 NoNodeError
zk.ensure_path('/my_app') # 确保父路径存在,不存在则创建
zk.create('/my_app/data', b'hello', ephemeral=False)
# 2. 创建一个临时节点
# 临时节点在客户端会话结束后会自动被删除
zk.create('/my_app/temp_node', b'temporary_value', ephemeral=True)
# 3. 创建一个顺序持久化节点
# Zookeeper 会在节点名后自动添加一个 10 位的序列号
node_path = zk.create('/my_app/seq_node-', b'sequential', sequence=True)
print(f"Created sequential node: {node_path}") # 输出可能是 /my_app/seq_node-0000000001
# 4. 创建一个顺序临时节点
seq_ephemeral_path = zk.create('/my_app/seq_temp-', b'sequential_temp', sequence=True, ephemeral=True)
print(f"Created sequential ephemeral node: {seq_ephemeral_path}")
参数说明:

value: 节点的数据,必须是bytes类型。ephemeral:True表示创建临时节点,False表示持久化节点。sequence:True表示创建顺序节点,Zookeeper 会自动在路径后附加单调递增的序列号。makepath:create方法的参数,如果为True,则当父节点不存在时会自动创建(类似于ensure_path)。
3 获取和设置节点数据
# 获取节点数据
data, stat = zk.get('/my_app/data')
print(f"Data of '/my_app/data': {data.decode('utf-8')}")
print(f"Stat of '/my_app/data': {stat}") # stat 包含版本号、创建时间等信息
# 设置节点数据
# version 参数用于乐观锁,防止并发修改,如果传入的版本号与服务器不一致,会抛出 BadVersionError。
# -1 表示忽略版本检查,强制更新
zk.set('/my_app/data', b'new_hello', version=stat.version)
# 再次获取数据验证
new_data, new_stat = zk.get('/my_app/data')
print(f"New data of '/my_app/data': {new_data.decode('utf-8')}")
4 获取子节点列表
# 获取 '/my_app' 下的所有子节点
children = zk.get_children('/my_app')
print(f"Children of '/my_app': {children}") # 输出可能是 ['data', 'seq_node-0000000001', 'temp_node']
5 删除节点
# 删除一个节点
# version 参数同样用于乐观锁
zk.delete('/my_app/temp_node', version=-1) # -1 表示忽略版本检查
# 尝试删除一个不存在的节点会抛出 NoNodeError
try:
zk.delete('/my_app/non_existent')
except Exception as e:
print(f"Error deleting node: {e}")
# 递归删除一个节点及其所有子节点
# 需要先导入工具模块
from kazoo.exceptions import NoNodeError
from kazoo.security import OPEN_ACL_UNSAFE
def delete_recursive(path):
try:
children = zk.get_children(path)
for child in children:
child_path = f"{path}/{child}"
delete_recursive(child_path)
zk.delete(path)
print(f"Deleted node: {path}")
except NoNodeError:
# 节点已经被其他客户端删除,忽略即可
pass
delete_recursive('/my_app')
监听机制
Zookeeper 最强大的功能之一就是它的 Watcher 机制,当被监听的节点或子节点发生变化时,Zookeeper 会通知客户端。
1 一次性监听
kazoo 的监听器是一次性的,当你设置一个监听后,它只会在下一次事件触发时生效,触发后自动失效。
def watch_data_change(event):
"""数据变化回调函数"""
print(f"Data changed for node: {event.path}")
print(f"Event type: {event.event_type}") # EVENT_DELETED, EVENT_CHANGED
# 设置监听
zk.get('/my_app/data', watch=watch_data_change)
# 在另一个终端或通过其他方式修改节点数据
# zk.set('/my_app/data', b'changed_by_watcher')
# 你会看到控制台打印出 watch_data_change 函数中的信息
# 再次修改,监听器不会再次触发,因为它是一次性的
# zk.set('/my_app/data', b'changed_again')
2 持久监听
如果你需要持续监听,可以在回调函数中重新设置监听。
def persistent_watch_data_change(event):
"""持久化数据变化监听"""
print(f"[Persistent Watch] Data changed for node: {event.path}")
print(f"[Persistent Watch] Event type: {event.event_type}")
# 重新设置监听
zk.get(event.path, watch=persistent_watch_data_change)
# 设置持久化监听
zk.get('/my_app/data', watch=persistent_watch_data_change)
# 现在你可以多次修改节点数据,每次都会触发回调
# zk.set('/my_app/data', b'persistent_change_1')
# zk.set('/my_app/data', b'persistent_change_2')
3 监听子节点变化
def watch_children_change(event):
"""子节点变化回调函数"""
print(f"Children changed for node: {event.path}")
print(f"Event type: {event.event_type}") # EVENT_CHILDREN_CHANGED
# 监听 '/my_app' 的子节点变化
zk.get_children('/my_app', watch=watch_children_change)
# 创建一个新的子节点
zk.create('/my_app/new_child', b'new_child_data')
# 你会看到控制台打印出 watch_children_change 函数中的信息
完整示例:实现一个简单的分布式锁
Zookeeper 经典的应用场景之一就是实现分布式锁,下面是一个使用 kazoo 的 Lock 对象实现的简单示例。

from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError
from kazoo.recipe.lock import Lock
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
# 锁的路径
lock_path = '/my_distributed_lock'
try:
# 创建一个 Lock 对象
# kazoo 会自动处理锁的创建、获取和释放
lock = Lock(zk, lock_path)
print("Attempting to acquire the lock...")
# 尝试获取锁,这是一个阻塞操作
# 可以设置超时 timeout=10
with lock:
print("Lock acquired! Performing critical work...")
# 在这里执行你的需要互斥访问的代码
import time
time.sleep(10) # 模拟耗时操作
print("Critical work finished.")
except Exception as e:
print(f"Failed to acquire lock: {e}")
finally:
# Lock 对象通常使用 with 语句,退出时会自动释放
# 如果手动创建,需要确保最终释放
if 'lock' in locals():
# lock.release() # with 语句会自动处理
pass
zk.stop()
print("Connection closed.")
总结与最佳实践
- 异常处理: 网络问题、节点不存在等都会抛出异常(如
ConnectionClosedError,NoNodeError,NodeExistsError),代码中应妥善处理。 - 连接管理: 使用
try...finally或上下文管理器 (with语句) 来确保start()和stop()成对调用。 - 数据类型: Zookeeper 的节点数据是
bytes类型,Python 中存取时需要进行编解码(通常是utf-8)。 - 版本控制: 在进行更新或删除操作时,尽量使用
stat对象中的version,可以避免并发问题。 - 监听是一次性的:
kazoo的监听器只触发一次,如果需要持续监听,必须在回调中重新设置。 - 使用高级组件: 对于分布式锁、队列等复杂功能,优先使用
kazoo.recipe中提供的现成组件(如Lock,Party,Queue),它们已经处理了各种边界条件和细节。
kazoo 文档非常完善,是深入学习的首选资源:https://kazoo.readthedocs.io/
