杰瑞科技汇

websocket服务端JAVA

WebSocket 简介

WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,它使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据,这对于实时性要求高的应用(如在线聊天、实时数据推送、在线游戏等)非常有用。

websocket服务端JAVA-图1
(图片来源网络,侵删)

与传统的 HTTP 轮询相比,WebSocket 具有以下优势:

  • 真正的全双工通信:服务器可以随时主动向客户端发送消息。
  • 低延迟:一旦连接建立,数据帧可以双向传输,无需每次都建立新的 HTTP 连接。
  • 轻量级:基于 TCP,协议开销比 HTTP 小。

Java WebSocket API

Java 提供了标准的 WebSocket API,位于 javax.websocket 包中,这个 API 是 Java EE(Jakarta EE)的一部分,因此我们可以直接使用它来构建服务端。

主要有三个核心组件:

  1. @ServerEndpoint:这是一个注解,用于将一个 Java 类标记为 WebSocket 端点,它的值是端点的 URI 路径。
  2. Session:代表一个 WebSocket 连接会话,通过它,你可以获取连接信息、发送消息、关闭连接等。
  3. Endpoint 类的生命周期方法
    • @OnOpen:当一个新的 WebSocket 连接建立时被调用。
    • @OnClose:当 WebSocket 连接关闭时被调用。
    • @OnMessage:当收到来自客户端的消息时被调用。
    • @OnError:当连接过程中发生错误时被调用。

创建 WebSocket 服务端(基础版)

我们将创建一个简单的 WebSocket 服务端,它能够接收客户端的消息,并将消息广播给所有连接的客户端。

websocket服务端JAVA-图2
(图片来源网络,侵删)

步骤 1:项目环境

如果你使用 Maven,在 pom.xml 中添加依赖,这个依赖通常包含在 Jakarta EE / Java EE 容器(如 Tomcat, Jetty, WildFly)中,但如果你想在独立的 Java 应用(如 Spring Boot)中使用,可能需要显式添加。

<!-- 如果你使用的是 Tomcat 9+ 或 Jakarta EE 8+ -->
<dependency>
    <groupId>jakarta.platform</groupId>
    <artifactId>jakarta.jakartaee-api</artifactId>
    <version>9.1.0</version>
    <scope>provided</scope> <!-- 如果在 Tomcat 等 Web 容器中运行,通常设为 provided -->
</dependency>
<!-- 如果你使用的是较旧的 Java EE 7 (Tomcat 7/8) -->
<dependency>
    <groupId>javax</groupId>
    <artifactId>javax.websocket-api</artifactId>
    <version>1.1</version>
    <scope>provided</scope>
</dependency>

注意scope 设为 provided 是因为这些 API 会在运行时(如 Tomcat)由容器提供,如果你使用的是 Spring Boot,它会自动处理这些依赖。

步骤 2:编写 WebSocket 端点类

创建一个 Java 类,并使用 @ServerEndpoint 注解。

import jakarta.websocket.*;
import jakarta.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
/**
 * WebSocket 服务端
 * @ServerEndpoint 注解用于定义一个WebSocket端点
 * 值 "/chat" 是客户端连接时的URI路径
 */
@ServerEndpoint("/chat")
public class ChatServer {
    // 使用一个静态的 Set 来存储所有活跃的会话,实现广播功能
    // 注意:在多线程环境下,Set 需要是线程安全的,这里使用 Collections.synchronizedSet
    private static final Set<Session> chatroomUsers = Collections.synchronizedSet(new HashSet<>());
    /**
     * 当新的 WebSocket 连接建立时调用
     * @param session 代表客户端与服务器之间的会话
     */
    @OnOpen
    public void onOpen(Session session) {
        // 将新连接的会话添加到集合中
        chatroomUsers.add(session);
        System.out.println("新连接加入: " + session.getId());
        // 广播新用户加入的消息
        broadcast("用户 " + session.getId() + " 加入了聊天室,当前在线人数: " + chatroomUsers.size());
    }
    /**
     * 当收到来自客户端的消息时调用
     * @param message 客户端发送的消息
     * @param session 发送消息的会话
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        System.out.println("来自 " + session.getId() + " 的消息: " + message);
        // 广播消息给所有用户
        broadcast("用户 " + session.getId() + ": " + message);
    }
    /**
     * 当 WebSocket 连接关闭时调用
     * @param session 关闭的会话
     * @param closeReason 关闭原因
     */
    @OnClose
    public void onClose(Session session, CloseReason closeReason) {
        // 从集合中移除已关闭的会话
        chatroomUsers.remove(session);
        System.out.println("连接关闭: " + session.getId() + ", 原因: " + closeReason.getReasonPhrase());
        // 广播用户离开的消息
        broadcast("用户 " + session.getId() + " 离开了聊天室,当前在线人数: " + chatroomUsers.size());
    }
    /**
     * 当连接过程中发生错误时调用
     * @param session 出错的会话
     * @param error 错误信息
     */
    @OnError
    public void onError(Session session, Throwable error) {
        System.err.println("发生错误: " + session.getId());
        error.printStackTrace();
    }
    /**
     * 广播消息给所有连接的客户端
     * @param message 要广播的消息
     */
    private void broadcast(String message) {
        // 同步遍历,避免并发修改问题
        synchronized (chatroomUsers) {
            for (Session user : chatroomUsers) {
                if (user.isOpen()) {
                    try {
                        // getBasicRemote() 是同步的,适合简单的消息发送
                        user.getBasicRemote().sendText(message);
                    } catch (IOException e) {
                        System.err.println("向用户 " + user.getId() + " 发送消息失败: " + e.getMessage());
                        // 如果发送失败,可能连接已断开,可以尝试从集合中移除
                        // chatroomUsers.remove(user);
                    }
                }
            }
        }
    }
}

步骤 3:部署到 Web 容器

这个类需要被部署到一个支持 WebSocket 的 Web 容器中,Apache Tomcat

  1. 将你的项目打包成一个 .war 文件。
  2. .war 文件放入 Tomcat 的 webapps 目录下。
  3. 启动 Tomcat。
  4. 你的 WebSocket 服务端将在 http://your-host:your-port/your-app-name/chat 上可用。

创建一个简单的 HTML 客户端进行测试

在同一项目的 webapp 目录下创建一个 index.html 文件,用于测试我们的服务端。

<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">WebSocket 聊天室测试</title>
    <style>
        body { font-family: Arial, sans-serif; }
        #chat-log {
            border: 1px solid #ccc;
            padding: 10px;
            height: 300px;
            overflow-y: scroll;
            margin-bottom: 10px;
        }
        #message-input {
            width: 80%;
            padding: 5px;
        }
        #send-button {
            padding: 5px 15px;
        }
    </style>
</head>
<body>
    <h1>WebSocket 聊天室</h1>
    <div id="chat-log"></div>
    <div>
        <input type="text" id="message-input" placeholder="输入消息...">
        <button id="send-button">发送</button>
    </div>
    <script>
        // 连接到 WebSocket 服务端
        // 注意:这里的路径必须和 @ServerEndpoint 的值匹配
        const socket = new WebSocket("ws://" + window.location.host + "${pageContext.request.contextPath}/chat");
        const chatLog = document.getElementById('chat-log');
        const messageInput = document.getElementById('message-input');
        const sendButton = document.getElementById('send-button');
        // 连接打开时的处理
        socket.onopen = function(event) {
            console.log("WebSocket 连接已建立。");
            addLog("你已连接到聊天室。");
        };
        // 接收消息时的处理
        socket.onmessage = function(event) {
            console.log("收到消息: " + event.data);
            addLog(event.data);
        };
        // 连接关闭时的处理
        socket.onclose = function(event) {
            if (event.wasClean) {
                console.log(`连接正常关闭,代码=${event.code} 原因=${event.reason}`);
            } else {
                console.error('连接被异常中断');
            }
            addLog("你已断开连接。");
        };
        // 发生错误时的处理
        socket.onerror = function(error) {
            console.error("WebSocket 错误: " + error);
            addLog("发生错误,请检查控制台。");
        };
        // 发送消息
        function sendMessage() {
            const message = messageInput.value.trim();
            if (message && socket.readyState === WebSocket.OPEN) {
                socket.send(message);
                messageInput.value = '';
            }
        }
        // 添加日志到聊天窗口
        function addLog(message) {
            const p = document.createElement('p');
            p.textContent = message;
            chatLog.appendChild(p);
            chatLog.scrollTop = chatLog.scrollHeight; // 自动滚动到底部
        }
        // 绑定发送按钮的点击事件
        sendButton.onclick = sendMessage;
        // 绑定输入框的回车事件
        messageInput.onkeypress = function(event) {
            if (event.key === 'Enter') {
                sendMessage();
            }
        };
    </script>
</body>
</html>

你可以打开多个浏览器窗口访问这个 index.html,它们将能够互相发送消息,实现一个简单的聊天室。

高级主题与最佳实践

1 使用 @OnMessage 处理二进制数据

@OnMessage 方法可以重载以处理不同类型的数据。

// 处理文本消息
@OnMessage
public void onMessage(String message, Session session) {
    // ...
}
// 处理二进制消息 (字节缓冲区)
@OnMessage
public void onMessage(ByteBuffer bytes, Session session) {
    // ...
}
// 处理二进制消息 (输入流)
@OnMessage
public void onMessage(InputStream stream, Session session) {
    // ...
}

2 异步消息发送

使用 getAsyncRemote() 进行异步消息发送,它不会阻塞当前线程,性能更高。

// 在 broadcast 方法中替换 getBasicRemote()
session.getAsyncRemote().sendText(message, new SendHandler() {
    @Override
    public void onResult(SendResult result) {
        if (!result.isOK()) {
            System.err.println("消息发送失败: " + result.getException());
        }
    }
});

2 使用 WebSocket 容器配置(ServerEndpointConfig

如果你想进行更高级的配置,比如为每个端点配置自定义的 Configurator,可以使用 ServerEndpointConfig

import jakarta.websocket.server.ServerEndpointConfig;
public class MyEndpointConfigurator extends ServerEndpointConfig.Configurator {
    @Override
    public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
        // 在握手阶段,可以从 HTTP 请求中获取参数或 Header
        // 获取用户名
        String username = request.getHeaders().get("X-Username").get(0);
        sec.getUserProperties().put("username", username);
        super.modifyHandshake(sec, request, response);
    }
}

然后在你的端点类上使用这个配置:

@ServerEndpoint(
    value = "/chat",
    configurator = MyEndpointConfigurator.class
)
public class ChatServer {
    // ...
    @OnOpen
    public void onOpen(Session session) {
        // 可以通过 session.getUserProperties() 获取配置信息
        String username = (String) session.getUserProperties().get("username");
        System.out.println("用户 " + username + " (" + session.getId() + ") 加入了聊天室。");
    }
    // ...
}

3 集成到 Spring Boot

在 Spring Boot 中,创建 WebSocket 服务端非常简单,Spring Boot 自动配置了 WebSocket 支持。

  1. 添加依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
  2. 配置 WebSocket 端点: 你可以继续使用 @ServerEndpoint,或者使用 Spring 的 @EnableWebSocketWebSocketConfigurer 方式,后者更符合 Spring 的编程模型。

    import org.springframework.context.annotation.Configuration;
    import org.springframework.web.socket.config.annotation.EnableWebSocket;
    import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
    import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
    @Configuration
    @EnableWebSocket
    public class WebSocketConfig implements WebSocketConfigurer {
        @Override
        public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
            // 注册我们的自定义 WebSocket 处理器
            registry.addHandler(new MyWebSocketHandler(), "/chat-spring")
                    .setAllowedOrigins("*"); // 允许所有来源,生产环境请设置具体域名
        }
    }
  3. 实现 WebSocketHandler: Spring 提供了 WebSocketHandler 接口,让你可以用更 Spring 的方式来处理 WebSocket 事件。

    import org.springframework.web.socket.*;
    import java.util.concurrent.ConcurrentHashMap;
    public class MyWebSocketHandler implements WebSocketHandler {
        private final ConcurrentHashMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
        @Override
        public void afterConnectionEstablished(WebSocketSession session) throws Exception {
            sessions.put(session.getId(), session);
            System.out.println("新连接建立: " + session.getId());
            // 广播
            broadcast("用户 " + session.getId() + " 加入了。");
        }
        @Override
        public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
            System.out.println("收到来自 " + session.getId() + " 的消息: " + message.getPayload());
            broadcast("用户 " + session.getId() + ": " + message.getPayload());
        }
        @Override
        public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
            System.err.println("传输错误: " + session.getId());
            sessions.remove(session.getId());
        }
        @Override
        public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
            sessions.remove(session.getId());
            System.out.println("连接关闭: " + session.getId());
            broadcast("用户 " + session.getId() + " 离开了。");
        }
        @Override
        public boolean supportsPartialMessages() {
            return false;
        }
        private void broadcast(String message) {
            sessions.forEach((id, session) -> {
                if (session.isOpen()) {
                    try {
                        session.sendMessage(new TextMessage(message));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
  • 基础实现:使用 javax.websocket (或 jakarta.websocket) API 和 @ServerEndpoint 注解是创建 Java WebSocket 服务端最直接的方式。
  • 核心概念:理解 Session、生命周期方法 (@OnOpen, @OnMessage, @OnClose, @OnError) 和广播机制是关键。
  • 测试:使用简单的 HTML/JavaScript 客户端是测试服务端功能的最佳方式。
  • 进阶:根据项目需求,可以考虑异步发送、自定义配置器以及与 Spring Boot 等框架集成,以获得更好的可维护性和扩展性。
分享:
扫描分享到社交APP
上一篇
下一篇