目录
-
环境准备
(图片来源网络,侵删)- 安装 ZooKeeper 服务
- 准备 Java 开发环境
- 添加 ZooKeeper Java 客户端依赖
-
核心代码:连接 ZooKeeper
- 创建
ZooKeeper实例 - 处理连接状态(Watcher)
- 关闭连接
- 创建
-
常用 API 操作
- 创建节点
- 获取节点数据
- 更新节点数据
- 获取子节点列表
- 删除节点
-
最佳实践与注意事项
- 连接字符串格式
- Watcher 机制
- 异常处理
- 会话管理
- 使用连接池 (推荐)
-
完整示例代码
(图片来源网络,侵删)
环境准备
a. 安装 ZooKeeper 服务
你需要一个正在运行的 ZooKeeper 服务,你可以从 Apache ZooKeeper 官网 下载二进制包并按照官方文档进行安装和启动。
假设你的 ZooKeeper 服务运行在本地,默认端口为 2181。
b. 准备 Java 开发环境
确保你已经安装了 JDK (建议 JDK 8 或更高版本) 和 Maven。
c. 添加 ZooKeeper Java 客户端依赖
在 Maven 项目的 pom.xml 文件中添加 ZooKeeper 客户端的依赖,我们推荐使用 org.apache.zookeeper:zookeeper 的最新稳定版本。

<dependencies>
<!-- ZooKeeper Client -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.8.3</version> <!-- 请使用最新的稳定版本 -->
</dependency>
<!-- 用于日志输出,可选但推荐 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.12</version>
</dependency>
</dependencies>
核心代码:连接 ZooKeeper
a. 创建 ZooKeeper 实例
连接 ZooKeeper 的第一步是创建 ZooKeeper 类的实例,构造函数是异步的,它会立即返回,不会等待连接成功。
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
public class ZKConnection {
private ZooKeeper zooKeeper;
// 连接字符串,格式为 "host:port[,host:port,...]"
private final String connectionString = "localhost:2181";
// 会话超时时间(毫秒)
private final int sessionTimeout = 30000;
public void connect() throws Exception {
// 创建 ZooKeeper 实例
// 参数1: 连接字符串
// 参数2: 会话超时时间
// 参数3: Watcher 对象,用于接收来自 ZooKeeper 的事件通知
zooKeeper = new ZooKeeper(connectionString, sessionTimeout, event -> {
// Watcher 的回调方法,当事件发生时被调用
System.out.println("收到事件: " + event);
// 可以在这里处理连接状态变化
if (event.getState() == States.CONNECTED) {
System.out.println("已成功连接到 ZooKeeper");
} else if (event.getState() == States.DISCONNECTED) {
System.out.println("与 ZooKeeper 连接断开");
}
});
// 注意:new ZooKeeper() 是异步的,上面的回调会在连接成功后触发
// 所以不能立即使用 zooKeeper 对象进行操作
}
}
b. 处理连接状态(Watcher)
Watcher 是 ZooKeeper Java 客户端的核心机制之一,它是一个接口,你需要实现它的 process(WatchedEvent event) 方法,当 ZooKeeper 服务端的事件(如连接状态变化、节点数据变化等)发生时,客户端会回调这个方法。
c. 关闭连接
使用完毕后,必须关闭 ZooKeeper 连接,以释放资源。
public void close() throws InterruptedException {
if (zooKeeper != null) {
zooKeeper.close();
System.out.println("ZooKeeper 连接已关闭");
}
}
常用 API 操作
在确认连接成功后(在 Watcher 的 CONNECTED 事件中),你就可以使用 ZooKeeper 实例进行各种操作了。
a. 创建节点
create() 方法用于在 ZooKeeper 中创建一个新节点。
// 创建一个持久节点
String path1 = zooKeeper.create("/my-app", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("创建持久节点: " + path1);
// 创建一个临时节点(客户端会话结束后,节点会被自动删除)
String path2 = zooKeeper.create("/my-app/ephemeral-node", "data2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("创建临时节点: " + path2);
// 创建一个顺序持久节点(节点名后会自动附加一个10位数的序列号)
String path3 = zooKeeper.create("/my-app/sequential-", "data3".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println("创建顺序节点: " + path3);
b. 获取节点数据
getData() 方法用于获取节点的数据和更新 Watcher。
// 获取节点数据,并设置一个 Watcher
byte[] data = zooKeeper.getData("/my-app", event -> {
System.out.println("节点 /my-app 的数据被修改了,事件: " + event);
}, null); // 最后一个参数是节点的 stat,如果不需要可以传 null
System.out.println("节点 /my-app 的数据是: " + new String(data));
c. 更新节点数据
setData() 方法用于更新节点的数据,如果版本号不匹配,更新会失败。
// 更新节点数据,-1 表示匹配任何版本
Stat stat = zooKeeper.setData("/my-app", "new-data".getBytes(), -1);
System.out.println("数据更新成功,新的版本号是: " + stat.getVersion());
d. 获取子节点列表
getChildren() 方法用于获取一个节点的所有直接子节点。
// 获取子节点列表,并设置一个 Watcher
List<String> children = zooKeeper.getChildren("/my-app", event -> {
System.out.println("节点 /my-app 的子节点列表变化了,事件: " + event);
});
System.out.println("节点 /my-app 的子节点有: " + children);
e. 删除节点
delete() 方法用于删除一个节点,必须确保节点为空(没有子节点),并且版本号匹配。
// 删除节点,-1 表示匹配任何版本
zooKeeper.delete("/my-app/ephemeral-node", -1);
System.out.println("节点 /my-app/ephemeral-node 已删除");
最佳实践与注意事项
a. 连接字符串格式
host:port[,host:port,...]
"192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181",这种方式可以实现高可用,客户端会自动尝试连接列表中的其他服务器。
b. Watcher 机制
- 一次性:Watcher 被触发一次后就会失效,如果需要持续监听,必须在回调中重新设置 Watcher。
- 异步:所有操作都是异步的,结果通过 Watcher 或
Sync()方法返回的Stat对象来体现。
c. 异常处理
ZooKeeper 的操作可能会抛出多种异常,最常见的是:
KeeperException:ZooKeeper 服务端返回的错误。ConnectionLossException:连接丢失。NodeExistsException:节点已存在。NoNodeException:节点不存在。SessionExpiredException:会话已过期。
InterruptedException:线程被中断。IOException:网络等 I/O 问题。
生产代码中必须妥善处理这些异常。
d. 会话管理
ZooKeeper 会话是维持连接的关键,如果客户端长时间与服务器没有通信,会话可能会过期,客户端会有一个后台线程维持与服务器的心跳,以保持会话有效。
e. 使用连接池 (强烈推荐)
在高并发场景下,为每个请求创建和销毁 ZooKeeper 连接是非常低效的,强烈建议使用连接池来管理 ZooKeeper 连接。
- Curator Framework:是 Apache 官方推出的高级客户端库,它极大地简化了 ZooKeeper 的使用,并内置了强大的连接池、重试机制、Recipe(如分布式锁、分布式计数器)等功能。对于任何生产环境,都推荐直接使用 Curator 而不是原生的
ZooKeeperAPI。
完整示例代码
这是一个结合了上述所有要点的完整示例。
pom.xml
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.8.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.12</version>
</dependency>
</dependencies>
ZooKeeperDemo.java
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class ZooKeeperDemo implements Watcher {
private static final Logger logger = LoggerFactory.getLogger(ZooKeeperDemo.class);
private static final String CONNECTION_STRING = "localhost:2181";
private static final int SESSION_TIMEOUT = 5000;
private ZooKeeper zooKeeper;
// 使用 CountDownLatch 确保 connect 方法在连接成功后返回
private final CountDownLatch connectedLatch = new CountDownLatch(1);
public static void main(String[] args) {
ZooKeeperDemo demo = new ZooKeeperDemo();
try {
// 1. 连接 ZooKeeper
demo.connect();
// 等待连接成功
connectedLatch.await();
// 2. 创建节点
String path1 = demo.createNode("/demo-app", "initial-data", CreateMode.PERSISTENT);
logger.info("创建节点成功: {}", path1);
// 3. 获取并设置数据 Watcher
demo.getDataWithWatcher(path1);
// 4. 更新节点数据,触发 Watcher
demo.updateNodeData(path1, "updated-data");
// 等待一下,以便观察 Watcher 的输出
TimeUnit.SECONDS.sleep(1);
// 5. 创建子节点
String childPath = demo.createNode(path1 + "/child", "child-data", CreateMode.PERSISTENT);
logger.info("创建子节点成功: {}", childPath);
// 6. 获取子节点列表,并设置 Watcher
demo.getChildrenWithWatcher(path1);
// 删除子节点,触发 Watcher
demo.deleteNode(childPath);
TimeUnit.SECONDS.sleep(1);
// 7. 再次更新数据,验证 Watcher 的一次性
demo.updateNodeData(path1, "final-data");
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
logger.error("发生异常", e);
} finally {
// 8. 关闭连接
demo.close();
}
}
@Override
public void process(WatchedEvent event) {
// 连接状态事件
if (event.getState() == Event.KeeperState.SyncConnected) {
logger.info("ZooKeeper 连接已建立。");
connectedLatch.countDown(); // 通知主线程连接成功
}
// 节点数据变化事件
else if (event.getType() == Event.EventType.NodeDataChanged) {
logger.info("节点数据变化事件: {}", event.getPath());
}
// 子节点列表变化事件
else if (event.getType() == Event.EventType.NodeChildrenChanged) {
logger.info("子节点列表变化事件: {}", event.getPath());
}
}
public void connect() throws Exception {
logger.info("正在连接 ZooKeeper: {}", CONNECTION_STRING);
zooKeeper = new ZooKeeper(CONNECTION_STRING, SESSION_TIMEOUT, this);
}
public void close() throws InterruptedException {
if (zooKeeper != null) {
zooKeeper.close();
logger.info("ZooKeeper 连接已关闭。");
}
}
public String createNode(String path, String data, CreateMode mode) throws Exception {
return zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
}
public void getDataWithWatcher(String path) throws Exception {
Stat stat = new Stat();
byte[] data = zooKeeper.getData(path, true, stat); // true 表示设置 Watcher
logger.info("节点 [{}] 的数据是: {}, 版本: {}", path, new String(data), stat.getVersion());
}
public void updateNodeData(String path, String data) throws Exception, KeeperException, InterruptedException {
Stat stat = zooKeeper.setData(path, data.getBytes(), -1); // -1 表示匹配任何版本
logger.info("节点 [{}] 数据更新成功,新版本: {}", path, stat.getVersion());
}
public void getChildrenWithWatcher(String path) throws Exception {
List<String> children = zooKeeper.getChildren(path, true); // true 表示设置 Watcher
logger.info("节点 [{}] 的子节点列表: {}", path, children);
}
public void deleteNode(String path) throws Exception, KeeperException, InterruptedException {
zooKeeper.delete(path, -1); // -1 表示匹配任何版本
logger.info("节点 [{}] 已删除", path);
}
}
- 入门:使用
new ZooKeeper(connectionString, timeout, watcher)建立连接,并通过Watcher监听事件。 - 核心操作:掌握
create,getData,setData,getChildren,delete等基本 API。 - 生产实践:
- 务必处理异常。
- 强烈推荐使用 Curator,它能让你从繁琐的底层 API 和连接管理中解放出来,专注于业务逻辑,Curator 的
CuratorFramework和CuratorClient是更现代、更健壮的选择。
