杰瑞科技汇

Java Netty教程,如何快速上手?

Java Netty 全方位教程:从入门到实战

目录

  1. 为什么选择 Netty?
  2. 核心概念:Netty 的“三板斧”
    • Channel
    • EventLoop
    • ChannelPipeline & ChannelHandler
  3. 第一个 Netty 应用:Echo 服务器
    • 项目结构
    • 服务端代码详解
    • 客户端代码详解
    • 运行与测试
  4. 深入 Netty 核心组件
    • ByteBuf:Netty 的数据容器
    • 编码器与解码器
    • Future & Promise:异步编程的基石
    • Bootstrap & ServerBootstrap:启动器
  5. 实战项目:简易 RPC 框架
    • 需求分析
    • 协议设计
    • 服务端实现
    • 客户端实现
    • 运行与测试
  6. 最佳实践与注意事项
  7. 学习资源与进阶方向

为什么选择 Netty?

在直接上手代码之前,我们首先要明白:Netty 是什么?我们为什么要用它?

Java Netty教程,如何快速上手?-图1
(图片来源网络,侵删)

传统的 Java NIO 编程非常复杂,你需要处理 SelectorChannelBuffer,还要编写大量的样板代码来处理连接、数据读写、异常等,Netty 的出现就是为了简化 NIO 的开发。

Netty 的核心优势:

  • 简单易用:API 设计优雅,对 NIO 底层进行了完美的封装,让你能专注于业务逻辑。
  • 高性能:基于 Reactor 模型(主从 Reactor),采用零拷贝、内存池、无锁并发等技术,性能极高。
  • 功能丰富:内置了多种编解码器、HTTP/WebSocket 协议支持、SSL/TLS 支持等开箱即用的功能。
  • 稳定可靠:经过业界大量顶级项目(如 Dubbo、Elasticsearch、RocketMQ、Zookeeper 等)的长期验证,非常稳定。
  • 社区活跃:文档完善,社区活跃,问题能得到快速响应。

一句话总结:Netty 是一个高性能、异步事件驱动的 NIO 框架,用于快速开发可维护的高性能服务器和客户端。


核心概念:Netty 的“三板斧”

理解 Netty,必须先理解它的三个核心概念,它们构成了 Netty 的骨架。

Java Netty教程,如何快速上手?-图2
(图片来源网络,侵删)

Channel (通道)

你可以把 Channel 看作是 Java NIO 中 java.nio.channels.Channel 的升级版,它代表了一个打开的、可以进行 I/O 操作的连接,比如一个 socket 连接。

  • 作用:进行 I/O 操作,如 read()write()
  • 特点:它是全双工的,既可以读也可以写。Channel 本身不存储数据,它只是一个 I/O 操作的入口。

EventLoop (事件循环)

EventLoop 是 Netty 的“心脏”,是执行处理逻辑的地方。

  • 作用
    1. 处理 I/O 操作EventLoop 会不断地轮询它所绑定的 Channel 上是否有 I/O 事件(如连接建立、数据到达等)。
    2. 执行任务:当 I/O 事件发生时,EventLoop 会调用绑定的 ChannelHandler 来处理该事件。
  • 特点
    • 单线程:一个 EventLoop 在其生命周期内只由一个线程驱动,这避免了多线程竞争带来的复杂性和性能开销。
    • 绑定:一个 EventLoop 会绑定一个或多个 Channel,并且一个 Channel 在其生命周期内也只会被绑定到一个 EventLoop
    • 事件循环EventLoop 会不断地在一个循环中检查和处理 I/O 事件。

ChannelPipeline & ChannelHandler (流水线与处理器)

这两个概念组合在一起,实现了 Netty 的核心设计思想——责任链模式

  • ChannelPipeline (通道流水线)

    • 它是一个 ChannelHandler 的容器,像一个链条,将一系列的 ChannelHandler 串联起来。
    • 当一个 I/O 事件(如读或写)在 Channel 上发生时,这个事件会沿着 ChannelPipeline 传播,依次由链上的 ChannelHandler 处理。
  • ChannelHandler (通道处理器)

    • 它是业务逻辑的真正执行者,你通过实现 ChannelHandler 接口(或其子接口如 ChannelInboundHandler)来定义如何处理 I/O 事件。
    • ChannelInboundHandler:处理入站事件,如读取客户端数据、连接激活等。
    • ChannelOutboundHandler:处理出站事件,如向客户端发送数据、关闭连接等。

它们之间的关系图解:

          +---------------------------+
          |        EventLoop         |
          |  (处理I/O事件和任务)       |
          +-------------+-------------+
                        |
                        | (绑定一个线程)
                        v
          +---------------------------+
          |         Channel           |
          | (I/O操作的入口)           |
          +-------------+-------------+
                        |
                        | (事件流经)
                        v
+----------+    +-------+-------+    +----------+
| Inbound  | <-- | ChannelPipeline  --> | Outbound |
| Handler |    | (责任链)        |    | Handler |
+----------+    +------------------+    +----------+

第一个 Netty 应用:Echo 服务器

让我们通过一个最简单的 "Echo"(回显)服务器来感受 Netty 的魅力,客户端发送什么消息,服务器就原样返回什么消息。

项目结构

netty-tutorial/
├── pom.xml
└── src/
    └── main/
        └── java/
            └── com/
                └── example/
                    └── netty/
                        ├── server/
                        │   └── EchoServer.java
                        │   └── EchoServerHandler.java
                        └── client/
                            └── EchoClient.java
                            └── EchoClientHandler.java

pom.xml (Maven 依赖)

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.example</groupId>
    <artifactId>netty-tutorial</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <netty.version>4.1.94.Final</netty.version>
    </properties>
    <dependencies>
        <!-- Netty 核心依赖 -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>${netty.version}</version>
        </dependency>
    </dependencies>
</project>

服务端代码详解

EchoServer.java (服务器启动类)
package com.example.netty.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class EchoServer {
    private final int port;
    public EchoServer(int port) {
        this.port = port;
    }
    public void start() throws Exception {
        // 1. 创建 EventLoopGroup
        // BossGroup 用于接收连接
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // WorkerGroup 用于处理已接收的连接的 I/O 操作
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 2. 创建 ServerBootstrap,服务器启动辅助类
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup) // 设置 EventLoopGroup
             .channel(NioServerSocketChannel.class) // 指定使用 NioServerSocketChannel 作为服务器通道实现
             .childHandler(new ChannelInitializer<SocketChannel>() { // 设置 childHandler,它会被传递给 ServerChannel 的 accepted Channels
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     // 将我们的业务处理器 EchoServerHandler 添加到 ChannelPipeline 中
                     ch.pipeline().addLast(new EchoServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128) // 设置 TCP 缓冲区
             .childOption(ChannelOption.SO_KEEPALIVE, true); // 设置 TCP keep-alive
            // 3. 绑定端口,开始接收连接
            ChannelFuture f = b.bind(port).sync();
            // 4. 等待服务器关闭
            f.channel().closeFuture().sync();
        } finally {
            // 5. 优雅关闭,释放所有资源
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    public static void main(String[] args) throws Exception {
        new EchoServer(8080).start();
    }
}

代码解析:

  1. EventLoopGroup:创建了两个线程组。bossGroup 只负责处理客户端的连接请求,workerGroup 负责处理后续的 I/O 操作,默认情况下,它们的线程数是 CPU 核心数 * 2。
  2. ServerBootstrap:Netty 提供的工厂类,用于简化服务器的配置和启动。
  3. .group(bossGroup, workerGroup):将两个线程组分别赋给 ServerBootstrap
  4. .channel(NioServerSocketChannel.class):指定服务器监听端口的 Channel 类型。Nio 代表非阻塞 I/O。
  5. .childHandler(...):这是最关键的一步,当一个新的连接被接受时,workerGroup 中的 EventLoop 会创建一个新的 SocketChannel,并执行 ChannelInitializer 中的 initChannel 方法,我们将自定义的 EchoServerHandler 添加到这个新 ChannelChannelPipeline 中。
  6. .option().childOption():前者是设置 ServerSocketChannel 的参数,后者是设置 SocketChannel 的参数。
  7. bind(port).sync():绑定端口并同步等待绑定完成。
  8. closeFuture().sync():获取 Channel 的关闭 Future,并阻塞当前线程,直到服务器关闭。
  9. shutdownGracefully():优雅地关闭线程组,不再接受新任务,并完成已提交的任务。
EchoServerHandler.java (服务器业务处理器)
package com.example.netty.server;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 当从客户端收到新数据时调用这个方法
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // msg Netty 的数据容器 ByteBuf
        ByteBuf in = (ByteBuf) msg;
        try {
            // 打印收到的客户端消息
            System.out.println("Server received: " + in.toString(io.netty.util.CharsetUtil.UTF_8));
            // 将收到的消息写回给客户端
            ctx.write(in); // 写入到 ChannelPipeline 的下一个 ChannelHandler
        } finally {
            // 必须释放 ByteBuf 的引用,否则会导致内存泄漏
            ReferenceCountUtil.release(msg);
        }
    }
    /**
     * 在 channelRead 方法执行完后调用
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        // 将所有待发送的消息刷新到远程节点,并关闭该 Channel
        ctx.flush();
    }
    /**
     * 当处理过程中发生异常时调用
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 打印异常堆栈
        cause.printStackTrace();
        // 关闭 Channel
        ctx.close();
    }
}

代码解析:

  1. channelRead:这是核心方法,每当有新数据从客户端到达时,此方法就会被调用。
    • msg 参数是 Netty 的数据容器 ByteBuf
    • ctx.write(in) 将数据写入 ChannelPipeline,它会沿着链向后传播,最终由最后一个出站处理器(通常是 ChannelOutboundHandler)将其发送出去,对于回显服务器,我们直接写回即可。
    • ReferenceCountUtil.release(msg):非常重要!Netty 使用了引用计数来管理 ByteBuf 的内存,当你使用完它后,必须手动释放,否则会导致内存泄漏。
  2. channelReadComplete:当 channelRead 方法执行完毕后,Netty 会调用此方法,我们在这里调用 ctx.flush() 来确保所有数据都被发送出去。
  3. exceptionCaught:当发生异常时,Netty 会调用此方法,一个好的实践是打印异常并关闭 Channel,以防止资源泄漏。

客户端代码详解

客户端与服务端的结构非常相似,但更简单一些。

EchoClient.java (客户端启动类)
package com.example.netty.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class EchoClient {
    private final String host;
    private final int port;
    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }
    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group) // 使用一个 EventLoopGroup 即可
             .channel(NioSocketChannel.class) // 指定使用 NioSocketChannel
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new EchoClientHandler());
                 }
             });
            // 连接到服务器
            ChannelFuture f = b.connect(host, port).sync();
            // 等待连接关闭
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
    public static void main(String[] args) throws Exception {
        new EchoClient("localhost", 8080).start();
    }
}
EchoClientHandler.java (客户端业务处理器)
package com.example.netty.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
    // 当连接建立成功后,此方法被调用
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        // 向服务器发送消息
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Netty Server!", io.netty.util.CharsetUtil.UTF_8));
    }
    // 当收到服务器消息时调用
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        System.out.println("Client received: " + in.toString(io.netty.util.CharsetUtil.UTF_8));
        // 不需要再写回,因为服务器已经回显了
    }
    // 异常处理
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

运行与测试

  1. 启动服务器:运行 EchoServermain 方法。
  2. 启动客户端:运行 EchoClientmain 方法。
  3. 观察控制台输出
    • 服务器控制台:会打印 Server received: Hello, Netty Server!
    • 客户端控制台:会打印 Client received: Hello, Netty Server!

恭喜!你已经成功运行了你的第一个 Netty 应用。


深入 Netty 核心组件

ByteBuf:Netty 的数据容器

ByteBuf 是 Netty 的数据核心,它比 Java NIO 的 ByteBuffer 更强大、更易用。

  • 引用计数ByteBuf 使用了引用计数来管理内存,当你创建或获取一个 ByteBuf 时,它的引用计数为 1,每次调用 retain() 方法,计数加 1;每次调用 release() 方法,计数减 1,当计数为 0 时,ByteBuf 的内存会被回收。必须手动释放,否则内存泄漏。
  • 读/写索引分离ByteBuffer 只有一个位置指针,读写切换时需要调用 flip(),容易出错。ByteBuf 有独立的读索引 (readerIndex) 和写索引 (writerIndex),无需 flip(),使用更方便。
  • 容量自动扩展:当写入的数据超过 ByteBuf 的容量时,它可以自动扩容。
  • 支持多种类型:提供了丰富的 API 来读取和写入基本数据类型(如 readInt(), writeLong())。

编码器与解码器

网络中传输的数据都是字节(ByteBuf),而我们的业务逻辑通常是基于 Java 对象的,我们需要在发送前将对象序列化为字节(编码),在接收后将字节反序列化为对象(解码)。

  • MessageToMessageCodec<INBOUND, OUTBOUND>:最常用的编解码器,它同时处理入站和出站。

    • encode(ChannelHandlerContext ctx, OUTBOUND msg, List<Object> out):将业务对象 msg 编码为零个或多个 ByteBuf,并添加到 out 列表中。
    • decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out):将入站的 ByteBuf msg 解码为零个或多个业务对象,并添加到 out 列表中。
  • LengthFieldPrependerLengthFieldBasedFrameDecoder

    • 问题:TCP 是流式协议,多个 ByteBuf 可能会粘在一起,或者一个 ByteBuf 可能被拆分成多个包,导致接收方无法正确区分消息的边界。
    • 解决方案:在发送消息前,在消息头中加上消息的长度。LengthFieldPrepender 负责在编码时添加长度字段;LengthFieldBasedFrameDecoder 负责在解码时根据长度字段来拆分数据包,保证每次只读取一个完整的数据包。这是 Netty 中解决 TCP 粘包/拆包问题的标准方案。

Future & Promise:异步编程的基石

Netty 是一个异步框架,FuturePromise 是其异步编程模型的基石。

  • ChannelFuture:代表一个异步 I/O 操作的最终结果,它提供了 isDone(), isSuccess(), cause() 等方法来查询操作状态,你可以通过 addListener() 方法添加一个监听器,当操作完成时,监听器会被通知。

    • f.channel().closeFuture().sync(); 这行代码就是主线程等待 closeFuture 完成。
  • PromiseFuture 的可写版本,它是一个“承诺”,你可以设置它的结果(成功、失败、取消)。ChannelwriteAndFlush() 方法返回的就是一个 ChannelPromise,你可以在上面设置监听器或修改其状态。

Bootstrap & ServerBootstrap:启动器

这两个是 Netty 的工厂类,用于简化客户端和服务端的启动配置。

  • ServerBootstrap:用于启动服务器。

    • .group(): 设置 EventLoopGroup
    • .channel(): 设置服务端 Channel 类型。
    • .childHandler(): 设置子 Channel(即客户端连接)的处理器。
    • .option(): 设置 ServerSocketChannel 的 TCP 参数。
    • .childOption(): 设置 SocketChannel 的 TCP 参数。
  • Bootstrap:用于启动客户端。

    • .group(): 设置一个 EventLoopGroup
    • .channel(): 设置客户端 Channel 类型。
    • .handler(): 设置客户端 Channel 的处理器。
    • .connect(): 连接到远程服务器。

实战项目:简易 RPC 框架

通过 Echo 服务器,我们只是把字符串来回发了,我们来实现一个更真实的应用:一个简易的 RPC(远程过程调用)框架。

目标:让客户端像调用本地方法一样,调用远程服务器上的方法。

需求分析

  1. 服务端:提供一个服务,UserService,包含一个 getUserInfo 方法。
  2. 客户端:不关心网络细节,像调用本地方法 userService.getUserInfo(1) 一样,就能得到结果。
  3. 通信协议:客户端需要告诉服务器要调用哪个接口的哪个方法,以及参数是什么,服务器需要返回调用结果或异常。

协议设计

为了区分不同的消息,我们需要自定义一个通信协议,一个简单的协议可以这样设计:

字段 类型 描述
Magic Number int 魔数,用于标识这是一个自定义协议的请求,防止错误解析。0xCAFEBABE
Version byte 协议版本号。
Serializer Type byte 序列化方式,如 0 代表 JDK 序列化,1 代表 JSON,2 代表 Protobuf。
Message Type byte 消息类型,如 0 代表请求,1 代表响应。
Sequence ID int 序列号,用于请求和响应的匹配。
Data Length int 消息体的长度。
Data []byte 消息体,序列化后的字节数组。

服务端实现

  1. 定义服务接口和实现

    // UserService.java
    public interface UserService {
        String getUserInfo(int userId);
    }
    // UserServiceImpl.java
    public class UserServiceImpl implements UserService {
        @Override
        public String getUserInfo(int userId) {
            System.out.println("Server received request for user id: " + userId);
            if (userId == 1) {
                return "User 1: Zhang San";
            } else if (userId == 2) {
                return "User 2: Li Si";
            }
            return "User not found";
        }
    }
  2. 创建 RPC 请求和响应对象

    // RpcRequest.java
    public class RpcRequest implements Serializable {
        private String interfaceName;
        private String methodName;
        private Object[] args;
        // ... getters and setters
    }
    // RpcResponse.java
    public class RpcResponse implements Serializable {
        private String requestId;
        private Object result;
        private Exception exception;
        // ... getters and setters
    }
  3. 创建编解码器

    // RpcMessageCodec.java
    @ChannelHandler.Sharable
    public class RpcMessageCodec extends MessageToMessageCodec<ByteBuf, Object> {
        private static final int MAGIC_NUMBER = 0xCAFEBABE;
        @Override
        protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
            ByteBuf byteBuf = ctx.alloc().buffer();
            // 1. 写入魔数
            byteBuf.writeInt(MAGIC_NUMBER);
            // 2. 写入版本号
            byteBuf.writeByte(1);
            // 3. 写入序列化方式 (这里用JDK)
            byteBuf.writeByte(0);
            // 4. 写入消息类型 (请求)
            byteBuf.writeByte(0);
            // 5. 写入序列ID (这里用时间戳)
            int sequenceId = (int) (System.currentTimeMillis() / 1000);
            byteBuf.writeInt(sequenceId);
            // 6. 获取消息体并序列化
            byte[] bytes;
            if (msg instanceof RpcRequest) {
                bytes = serialize((RpcRequest) msg);
            } else {
                bytes = serialize((RpcResponse) msg);
            }
            // 7. 写入数据长度和数据
            byteBuf.writeInt(bytes.length);
            byteBuf.writeBytes(bytes);
            out.add(byteBuf);
        }
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            // 1. 检查魔数
            if (in.readInt() != MAGIC_NUMBER) {
                throw new IllegalArgumentException("Invalid magic number!");
            }
            // 2. 读取版本号、序列化方式、消息类型、序列ID
            in.readByte(); // version
            byte serializerType = in.readByte();
            byte messageType = in.readByte();
            int sequenceId = in.readInt();
            // 3. 读取数据长度和数据
            int dataLength = in.readInt();
            byte[] bytes = new byte[dataLength];
            in.readBytes(bytes);
            // 4. 根据序列化方式反序列化
            ObjectSerializer serializer = getSerializer(serializerType);
            Object obj;
            if (messageType == 0) { // Request
                obj = serializer.deserialize(bytes, RpcRequest.class);
            } else { // Response
                obj = serializer.deserialize(bytes, RpcResponse.class);
            }
            out.add(obj);
        }
        // ... 省略 serialize, deserialize, getSerializer 方法
    }
  4. 创建业务处理器

    // RpcServerHandler.java
    public class RpcServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
        // 使用一个 Map 来存储服务接口和实现类的映射
        private final Map<String, Object> serviceMap = new HashMap<>();
        public RpcServerHandler() {
            // 注册服务
            serviceMap.put("com.example.netty.UserService", new UserServiceImpl());
        }
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {
            System.out.println("Received request: " + request);
            RpcResponse response = new RpcResponse();
            response.setRequestId(request.getRequestId());
            try {
                // 1. 根据请求找到对应的服务
                Object service = serviceMap.get(request.getInterfaceName());
                if (service == null) {
                    throw new RuntimeException("Service not found: " + request.getInterfaceName());
                }
                // 2. 反射调用方法
                Method method = service.getClass().getMethod(request.getMethodName(), request.getArgs().getClass());
                Object result = method.invoke(service, request.getArgs());
                // 3. 设置响应结果
                response.setResult(result);
            } catch (Exception e) {
                // 4. 设置响应异常
                response.setException(e);
            }
            // 5. 将响应写回客户端
            ctx.writeAndFlush(response);
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }
  5. 启动服务器

    // RpcServer.java
    public class RpcServer {
        public void start(int port) throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                 .channel(NioServerSocketChannel.class)
                 .childHandler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     protected void initChannel(SocketChannel ch) throws Exception {
                         ch.pipeline()
                           .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 7, 4)) // 解码器
                           .addLast(new RpcMessageCodec()) // 自定义编解码器
                           .addLast(new RpcServerHandler()); // 业务处理器
                     }
                 })
                 .bind(port).sync().channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
        public static void main(String[] args) throws Exception {
            new RpcServer().start(8080);
        }
    }

客户端实现

  1. 创建代理工厂

    // RpcClientProxy.java
    public class RpcClientProxy {
        private final String host;
        private final int port;
        public RpcClientProxy(String host, int port) {
            this.host = host;
            this.port = port;
        }
        @SuppressWarnings("unchecked")
        public <T> T getProxy(Class<T> interfaceClass) {
            // 使用 JDK 动态代理
            return (T) Proxy.newProxyInstance(
                interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass},
                (proxy, method, args) -> {
                    // 1. 创建 RPC 请求
                    RpcRequest request = new RpcRequest();
                    request.setInterfaceName(interfaceClass.getName());
                    request.setMethodName(method.getName());
                    request.setArgs(args);
                    // 2. 发送请求并获取响应
                    RpcClientHandler handler = new RpcClientHandler();
                    Bootstrap bootstrap = new Bootstrap();
                    bootstrap.group(new NioEventLoopGroup())
                             .channel(NioSocketChannel.class)
                             .handler(new ChannelInitializer<SocketChannel>() {
                                 @Override
                                 protected void initChannel(SocketChannel ch) throws Exception {
                                     ch.pipeline()
                                       .addLast(new LengthFieldPrepender(4)) // 编码器
                                       .addLast(new RpcMessageCodec()) // 自定义编解码器
                                       .addLast(handler); // 处理响应的handler
                                 }
                             });
                    ChannelFuture future = bootstrap.connect(host, port).sync();
                    future.channel().writeAndFlush(request).sync();
                    future.channel().closeFuture().sync();
                    // 3. 返回调用结果
                    return handler.getResponse().getResult();
                }
            );
        }
    }
  2. 创建客户端响应处理器

    // RpcClientHandler.java
    public class RpcClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
        private RpcResponse response;
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
            this.response = response;
        }
        public RpcResponse getResponse() {
            return response;
        }
    }
  3. 启动客户端

    // RpcClient.java
    public class RpcClient {
        public static void main(String[] args) {
            RpcClientProxy proxy = new RpcClientProxy("localhost", 8080);
            UserService userService = proxy.getProxy(UserService.class);
            String result1 = userService.getUserInfo(1);
            System.out.println("Client received: " + result1);
            String result2 = userService.getUserInfo(2);
            System.out.println("Client received: " + result2);
        }
    }

运行与测试

  1. 启动 RpcServer
  2. 运行 RpcClientmain 方法。
  3. 观察服务器和客户端的控制台输出,你会发现客户端成功调用了远程服务。

这个简易 RPC 框架已经具备了 RPC 的核心功能,虽然还很简陋(如没有服务注册中心、没有负载均衡、序列化方式单一等),但它清晰地展示了 Netty 在构建复杂网络应用时的强大能力。


最佳实践与注意事项

  1. 内存泄漏:务必记住释放 ByteBuf 的引用,在 ChannelInboundHandlerAdapter 中,使用 ReferenceCountUtil.release(msg) 是一个好习惯,更推荐使用 SimpleChannelInboundHandler,它会在 channelRead0 方法执行完后自动释放消息。
  2. 线程模型EventLoop 是单线程的,不要在 ChannelHandler 中执行耗时的同步任务(如复杂计算、数据库查询、网络请求等),否则会阻塞 EventLoop 线程,导致整个 Netty 应用性能急剧下降,对于耗时任务,应该将其提交到业务线程池中异步执行。
  3. 线程安全:默认情况下,ChannelHandler 实例在 ChannelPipeline 中是共享的(@Sharable 注解),因此它们必须是线程安全的,如果你的 Handler 有状态(比如有成员变量),请确保它是线程安全的(使用 ConcurrentHashMap 或加锁),或者不要将其设为 @Sharable
  4. 资源释放:在 exceptionCaught 方法中,通常需要调用 ctx.close() 来关闭连接,防止资源泄漏。
  5. 使用正确的编解码器:处理 TCP 粘包/拆包问题时,LengthFieldPrependerLengthFieldBasedFrameDecoder 是黄金搭档。

学习资源与进阶方向

学习资源

  • 官方文档Netty 官方网站,特别是 用户指南API 文档
  • 书籍
    • 《Netty in Action》:必读!Netty 之父写的一本书,内容详实,案例丰富。
    • 《Netty 权威指南》:国内经典的 Netty 书籍,讲解细致。
  • 源码:阅读 Netty 的源码是提升最快的方式,从 Echo 示例开始,逐步深入到 ByteBufEventLoopPipeline 等核心组件的源码。

进阶方向

  1. 高性能调优:学习 Netty 的各种参数(如 SO_RCVBUF, TCP_NODELAY),了解 Netty 的内存分配策略(PoolArena),进行性能压测和调优。
  2. 自定义协议:设计并实现更复杂的、适用于特定场景的应用层协议。
  3. 集成中间件:学习如何将 Netty 与其他中间件(如 Redis、数据库)集成,构建完整的分布式系统。
  4. 框架源码分析:深入分析基于 Netty 的知名框架(如 Dubbo、Elasticsearch)是如何使用 Netty 的,学习其设计思想。
  5. 安全:学习如何在 Netty 中集成 SSL/TLS,实现安全的通信。

希望这份教程能为你打开 Netty 的大门,Netty 是一个博大精深的框架,需要不断地学习和实践才能掌握,祝你学习愉快!

分享:
扫描分享到社交APP
上一篇
下一篇