杰瑞科技汇

java 连接zookeeper

目录

  1. 环境准备

    java 连接zookeeper-图1
    (图片来源网络,侵删)
    • 安装 ZooKeeper 服务
    • 准备 Java 开发环境
    • 添加 ZooKeeper Java 客户端依赖
  2. 核心代码:连接 ZooKeeper

    • 创建 ZooKeeper 实例
    • 处理连接状态(Watcher)
    • 关闭连接
  3. 常用 API 操作

    • 创建节点
    • 获取节点数据
    • 更新节点数据
    • 获取子节点列表
    • 删除节点
  4. 最佳实践与注意事项

    • 连接字符串格式
    • Watcher 机制
    • 异常处理
    • 会话管理
    • 使用连接池 (推荐)
  5. 完整示例代码

    java 连接zookeeper-图2
    (图片来源网络,侵删)

环境准备

a. 安装 ZooKeeper 服务

你需要一个正在运行的 ZooKeeper 服务,你可以从 Apache ZooKeeper 官网 下载二进制包并按照官方文档进行安装和启动。

假设你的 ZooKeeper 服务运行在本地,默认端口为 2181

b. 准备 Java 开发环境

确保你已经安装了 JDK (建议 JDK 8 或更高版本) 和 Maven。

c. 添加 ZooKeeper Java 客户端依赖

在 Maven 项目的 pom.xml 文件中添加 ZooKeeper 客户端的依赖,我们推荐使用 org.apache.zookeeper:zookeeper 的最新稳定版本。

java 连接zookeeper-图3
(图片来源网络,侵删)
<dependencies>
    <!-- ZooKeeper Client -->
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.8.3</version> <!-- 请使用最新的稳定版本 -->
    </dependency>
    <!-- 用于日志输出,可选但推荐 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.36</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.2.12</version>
    </dependency>
</dependencies>

核心代码:连接 ZooKeeper

a. 创建 ZooKeeper 实例

连接 ZooKeeper 的第一步是创建 ZooKeeper 类的实例,构造函数是异步的,它会立即返回,不会等待连接成功。

import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
public class ZKConnection {
    private ZooKeeper zooKeeper;
    // 连接字符串,格式为 "host:port[,host:port,...]"
    private final String connectionString = "localhost:2181";
    // 会话超时时间(毫秒)
    private final int sessionTimeout = 30000;
    public void connect() throws Exception {
        // 创建 ZooKeeper 实例
        // 参数1: 连接字符串
        // 参数2: 会话超时时间
        // 参数3: Watcher 对象,用于接收来自 ZooKeeper 的事件通知
        zooKeeper = new ZooKeeper(connectionString, sessionTimeout, event -> {
            // Watcher 的回调方法,当事件发生时被调用
            System.out.println("收到事件: " + event);
            // 可以在这里处理连接状态变化
            if (event.getState() == States.CONNECTED) {
                System.out.println("已成功连接到 ZooKeeper");
            } else if (event.getState() == States.DISCONNECTED) {
                System.out.println("与 ZooKeeper 连接断开");
            }
        });
        // 注意:new ZooKeeper() 是异步的,上面的回调会在连接成功后触发
        // 所以不能立即使用 zooKeeper 对象进行操作
    }
}

b. 处理连接状态(Watcher)

Watcher 是 ZooKeeper Java 客户端的核心机制之一,它是一个接口,你需要实现它的 process(WatchedEvent event) 方法,当 ZooKeeper 服务端的事件(如连接状态变化、节点数据变化等)发生时,客户端会回调这个方法。

c. 关闭连接

使用完毕后,必须关闭 ZooKeeper 连接,以释放资源。

public void close() throws InterruptedException {
        if (zooKeeper != null) {
            zooKeeper.close();
            System.out.println("ZooKeeper 连接已关闭");
        }
    }

常用 API 操作

在确认连接成功后(在 WatcherCONNECTED 事件中),你就可以使用 ZooKeeper 实例进行各种操作了。

a. 创建节点

create() 方法用于在 ZooKeeper 中创建一个新节点。

// 创建一个持久节点
String path1 = zooKeeper.create("/my-app", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("创建持久节点: " + path1);
// 创建一个临时节点(客户端会话结束后,节点会被自动删除)
String path2 = zooKeeper.create("/my-app/ephemeral-node", "data2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("创建临时节点: " + path2);
// 创建一个顺序持久节点(节点名后会自动附加一个10位数的序列号)
String path3 = zooKeeper.create("/my-app/sequential-", "data3".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println("创建顺序节点: " + path3);

b. 获取节点数据

getData() 方法用于获取节点的数据和更新 Watcher。

// 获取节点数据,并设置一个 Watcher
byte[] data = zooKeeper.getData("/my-app", event -> {
    System.out.println("节点 /my-app 的数据被修改了,事件: " + event);
}, null); // 最后一个参数是节点的 stat,如果不需要可以传 null
System.out.println("节点 /my-app 的数据是: " + new String(data));

c. 更新节点数据

setData() 方法用于更新节点的数据,如果版本号不匹配,更新会失败。

// 更新节点数据,-1 表示匹配任何版本
Stat stat = zooKeeper.setData("/my-app", "new-data".getBytes(), -1);
System.out.println("数据更新成功,新的版本号是: " + stat.getVersion());

d. 获取子节点列表

getChildren() 方法用于获取一个节点的所有直接子节点。

// 获取子节点列表,并设置一个 Watcher
List<String> children = zooKeeper.getChildren("/my-app", event -> {
    System.out.println("节点 /my-app 的子节点列表变化了,事件: " + event);
});
System.out.println("节点 /my-app 的子节点有: " + children);

e. 删除节点

delete() 方法用于删除一个节点,必须确保节点为空(没有子节点),并且版本号匹配。

// 删除节点,-1 表示匹配任何版本
zooKeeper.delete("/my-app/ephemeral-node", -1);
System.out.println("节点 /my-app/ephemeral-node 已删除");

最佳实践与注意事项

a. 连接字符串格式

host:port[,host:port,...] "192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181",这种方式可以实现高可用,客户端会自动尝试连接列表中的其他服务器。

b. Watcher 机制

  • 一次性:Watcher 被触发一次后就会失效,如果需要持续监听,必须在回调中重新设置 Watcher。
  • 异步:所有操作都是异步的,结果通过 Watcher 或 Sync() 方法返回的 Stat 对象来体现。

c. 异常处理

ZooKeeper 的操作可能会抛出多种异常,最常见的是:

  • KeeperException:ZooKeeper 服务端返回的错误。
    • ConnectionLossException:连接丢失。
    • NodeExistsException:节点已存在。
    • NoNodeException:节点不存在。
    • SessionExpiredException:会话已过期。
  • InterruptedException:线程被中断。
  • IOException:网络等 I/O 问题。

生产代码中必须妥善处理这些异常。

d. 会话管理

ZooKeeper 会话是维持连接的关键,如果客户端长时间与服务器没有通信,会话可能会过期,客户端会有一个后台线程维持与服务器的心跳,以保持会话有效。

e. 使用连接池 (强烈推荐)

在高并发场景下,为每个请求创建和销毁 ZooKeeper 连接是非常低效的,强烈建议使用连接池来管理 ZooKeeper 连接。

  • Curator Framework:是 Apache 官方推出的高级客户端库,它极大地简化了 ZooKeeper 的使用,并内置了强大的连接池、重试机制、Recipe(如分布式锁、分布式计数器)等功能。对于任何生产环境,都推荐直接使用 Curator 而不是原生的 ZooKeeper API。

完整示例代码

这是一个结合了上述所有要点的完整示例。

pom.xml

<dependencies>
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.8.3</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.2.12</version>
    </dependency>
</dependencies>

ZooKeeperDemo.java

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class ZooKeeperDemo implements Watcher {
    private static final Logger logger = LoggerFactory.getLogger(ZooKeeperDemo.class);
    private static final String CONNECTION_STRING = "localhost:2181";
    private static final int SESSION_TIMEOUT = 5000;
    private ZooKeeper zooKeeper;
    // 使用 CountDownLatch 确保 connect 方法在连接成功后返回
    private final CountDownLatch connectedLatch = new CountDownLatch(1);
    public static void main(String[] args) {
        ZooKeeperDemo demo = new ZooKeeperDemo();
        try {
            // 1. 连接 ZooKeeper
            demo.connect();
            // 等待连接成功
            connectedLatch.await();
            // 2. 创建节点
            String path1 = demo.createNode("/demo-app", "initial-data", CreateMode.PERSISTENT);
            logger.info("创建节点成功: {}", path1);
            // 3. 获取并设置数据 Watcher
            demo.getDataWithWatcher(path1);
            // 4. 更新节点数据,触发 Watcher
            demo.updateNodeData(path1, "updated-data");
            // 等待一下,以便观察 Watcher 的输出
            TimeUnit.SECONDS.sleep(1);
            // 5. 创建子节点
            String childPath = demo.createNode(path1 + "/child", "child-data", CreateMode.PERSISTENT);
            logger.info("创建子节点成功: {}", childPath);
            // 6. 获取子节点列表,并设置 Watcher
            demo.getChildrenWithWatcher(path1);
            // 删除子节点,触发 Watcher
            demo.deleteNode(childPath);
            TimeUnit.SECONDS.sleep(1);
            // 7. 再次更新数据,验证 Watcher 的一次性
            demo.updateNodeData(path1, "final-data");
            TimeUnit.SECONDS.sleep(1);
        } catch (Exception e) {
            logger.error("发生异常", e);
        } finally {
            // 8. 关闭连接
            demo.close();
        }
    }
    @Override
    public void process(WatchedEvent event) {
        // 连接状态事件
        if (event.getState() == Event.KeeperState.SyncConnected) {
            logger.info("ZooKeeper 连接已建立。");
            connectedLatch.countDown(); // 通知主线程连接成功
        }
        // 节点数据变化事件
        else if (event.getType() == Event.EventType.NodeDataChanged) {
            logger.info("节点数据变化事件: {}", event.getPath());
        }
        // 子节点列表变化事件
        else if (event.getType() == Event.EventType.NodeChildrenChanged) {
            logger.info("子节点列表变化事件: {}", event.getPath());
        }
    }
    public void connect() throws Exception {
        logger.info("正在连接 ZooKeeper: {}", CONNECTION_STRING);
        zooKeeper = new ZooKeeper(CONNECTION_STRING, SESSION_TIMEOUT, this);
    }
    public void close() throws InterruptedException {
        if (zooKeeper != null) {
            zooKeeper.close();
            logger.info("ZooKeeper 连接已关闭。");
        }
    }
    public String createNode(String path, String data, CreateMode mode) throws Exception {
        return zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
    }
    public void getDataWithWatcher(String path) throws Exception {
        Stat stat = new Stat();
        byte[] data = zooKeeper.getData(path, true, stat); // true 表示设置 Watcher
        logger.info("节点 [{}] 的数据是: {}, 版本: {}", path, new String(data), stat.getVersion());
    }
    public void updateNodeData(String path, String data) throws Exception, KeeperException, InterruptedException {
        Stat stat = zooKeeper.setData(path, data.getBytes(), -1); // -1 表示匹配任何版本
        logger.info("节点 [{}] 数据更新成功,新版本: {}", path, stat.getVersion());
    }
    public void getChildrenWithWatcher(String path) throws Exception {
        List<String> children = zooKeeper.getChildren(path, true); // true 表示设置 Watcher
        logger.info("节点 [{}] 的子节点列表: {}", path, children);
    }
    public void deleteNode(String path) throws Exception, KeeperException, InterruptedException {
        zooKeeper.delete(path, -1); // -1 表示匹配任何版本
        logger.info("节点 [{}] 已删除", path);
    }
}
  • 入门:使用 new ZooKeeper(connectionString, timeout, watcher) 建立连接,并通过 Watcher 监听事件。
  • 核心操作:掌握 create, getData, setData, getChildren, delete 等基本 API。
  • 生产实践
    • 务必处理异常
    • 强烈推荐使用 Curator,它能让你从繁琐的底层 API 和连接管理中解放出来,专注于业务逻辑,Curator 的 CuratorFrameworkCuratorClient 是更现代、更健壮的选择。
分享:
扫描分享到社交APP
上一篇
下一篇