目录

WebSocket

WebSocket是一种网络传输协议,可在单个TCP连接上进行全双工通信,位于OSI模型的应用层。WebSocket协议在2011年由IETF标准化为RFC 6455,后由RFC 7936补充规范。Web IDL中的WebSocket API由W3C标准化。

WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就可以创建持久性的连接,并进行双向数据传输。

WebSocket 简介

WebSocket 协议RFC 6455提供了一种标准化方式,可通过单个 TCP 连接在客户端和服务器之间建立全双工双向通信通道。它是与 HTTP 不同的 TCP 协议,但旨在通过 HTTP 工作,使用端口 80 和 443,并允许重复使用现有的防火墙规则。

WebSocket 交互以 HTTP 请求开始,该请求使用 HTTPUpgrade标头进行升级,或者在这种情况下,切换到 WebSocket 协议。以下示例显示了这样的交互:

客户端请求:

1
2
3
4
5
6
7
8
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Origin: http://example.com
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13

服务器回应:

1
2
3
4
5
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat

握手成功后,HTTP 升级请求底层的 TCP 套接字保持打开状态,供客户端和服务器继续发送和接收消息。

字段说明

  • Connection必须设置Upgrade,表示客户端希望连接升级。
  • Upgrade字段必须设置Websocket,表示希望升级到Websocket协议。
  • Sec-WebSocket-Key是随机的字符串,服务器端会用这些数据来构造出一个SHA-1的信息摘要。把“Sec-WebSocket-Key”加上一个特殊字符串“258EAFA5-E914-47DA-95CA-C5AB0DC85B11”,然后计算SHA-1摘要,之后进行Base64编码,将结果做为“Sec-WebSocket-Accept”头的值,返回给客户端。如此操作,可以尽量避免普通HTTP请求被误认为Websocket协议。
  • Sec-WebSocket-Version 表示支持的Websocket版本。RFC6455要求使用的版本是13,之前草案的版本均应当弃用。
  • Origin字段是必须的。如果缺少origin字段,WebSocket服务器需要回复HTTP 403 状态码(禁止访问)。
  • 其他一些定义在HTTP协议中的字段,如Cookie等,也可以在Websocket中使用。

何时使用 WebSocket

WebSockets 可以使网页动态和交互。但是,在许多情况下,Ajax 和 HTTP 流或长轮询的组合可以提供简单有效的解决方案。

例如,新闻、邮件和社交订阅源需要动态更新,但每隔几分钟更新一次可能完全没问题。另一方面,协作、游戏和金融应用程序需要更接近实时。

延迟本身并不是决定性因素。如果消息量相对较少(例如,监控网络故障),HTTP 流式传输或轮询可以提供有效的解决方案。正是低延迟、高频率和高容量的组合,才成为使用 WebSocket 的最佳案例。

还要记住,在 Internet 上,不受您控制的限制性代理可能会阻止 WebSocket 交互,因为它们未配置为传递 Upgrade标头,或者因为它们关闭了看起来空闲的长期连接。这意味着将 WebSocket 用于防火墙内的内部应用程序是一个比面向公众的应用程序更直接的决定。

WebSocket API

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
<!-- https://mvnrepository.com/artifact/jakarta.servlet/jakarta.servlet-api -->
<dependency>
    <groupId>jakarta.servlet</groupId>
    <artifactId>jakarta.servlet-api</artifactId>
    <version>6.0.0</version>
    <scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/jakarta.websocket/jakarta.websocket-api -->
<dependency>
    <groupId>jakarta.websocket</groupId>
    <artifactId>jakarta.websocket-api</artifactId>
    <version>2.1.0</version>
    <scope>provided</scope>
</dependency>

Spring Framework 提供了一个 WebSocket API,也可以使用它来编写处理 WebSocket 消息的客户端和服务器端应用程序。

集成 websocket

原生注解

1
2
3
4
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Component
@ServerEndpoint("/j2ee-ws/{msg}")
public class WebSocketServer {

    //建立连接成功调用
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "msg") String msg){
        System.out.println("WebSocketServer 收到连接: " + session.getId() + ", 当前消息:" + msg);
    }

    //收到客户端信息
    @OnMessage
    public void onMessage(Session session, String message) throws IOException {
        message = "WebSocketServer 收到连接:" + session.getId() +  ",已收到消息:" + message;
        System.out.println(message);
        session.getBasicRemote().sendText(message);
    }

    //连接关闭
    @OnClose
    public void onclose(Session session){
        System.out.println("连接关闭");
    }

}

Tomcat 容器 为 WS 所做的支持,这套代码的包名前缀叫做:javax.websocket。

  • @ServerEndpoint 这里就像 RequestMapping 一样,放入一个 WS 服务器监听的 URL。
  • @OnOpen 这个注解修饰的方法会在 WS 连接开始时执行。
  • @OnClose 这个注解修饰的方法则会在 WS 关闭时执行。
  • @OnMessage 这个注解则是修饰消息接受的方法,并且由于消息有文本和二进制两种方式,所以此方法参数上可以使用 String 或者二进制数组的方式
  • @OnError 当 websocket 建立连接时出现异常会触发这个注解修饰的方法,注意它有一个 Session 参数
1
2
3
4
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

内嵌容器需要被 Spring 管理并初始化,所以需要如下配置。

1
2
3
4
5
6
7
8
@Configuration
@EnableWebSocket
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpoint() {
        return new ServerEndpointExporter();
    }
}

Spring 封装

1
2
3
4
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Component
public class HttpAuthHandler extends TextWebSocketHandler {

    /**
     * socket 建立成功事件
     *
     * @param session
     * @throws Exception
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        Object token = session.getAttributes().get("token");
        if (token != null) {
            // 用户连接成功,放入在线用户缓存
            WsSessionManager.add(token.toString(), session);
        } else {
            throw new RuntimeException("用户登录已经失效!");
        }
    }

    /**
     * 接收消息事件
     *
     * @param session
     * @param message
     * @throws Exception
     */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        // 获得客户端传来的消息
        String payload = message.getPayload();
        Object token = session.getAttributes().get("token");
        System.out.println("server 接收到 " + token + " 发送的 " + payload);
        session.sendMessage(new TextMessage("server 发送给 " + token + " 消息 " + payload + " " + LocalDateTime.now().toString()));
    }

    /**
     * socket 断开连接时
     *
     * @param session
     * @param status
     * @throws Exception
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        Object token = session.getAttributes().get("token");
        if (token != null) {
            // 用户退出,移除缓存
            WsSessionManager.remove(token.toString());
        }
    }
}

通过继承 TextWebSocketHandler 类并覆盖相应方法,可以对 websocket 的事件进行处理,这里可以同原生注解的那几个注解连起来看

  • afterConnectionEstablished 方法是在 socket 连接成功后被触发,同原生注解里的 @OnOpen 功能
  • afterConnectionClosed 方法是在 socket 连接关闭后被触发,同原生注解里的 @OnClose 功能
  • handleTextMessage 方法是在客户端发送信息时触发,同原生注解里的 @OnMessage 功能
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Slf4j
public class WsSessionManager {
    /**
     * 保存连接 session 的地方
     */
    private static ConcurrentHashMap<String, WebSocketSession> SESSION_POOL = new ConcurrentHashMap<>();

    /**
     * 添加 session
     *
     * @param key
     */
    public static void add(String key, WebSocketSession session) {
        // 添加 session
        SESSION_POOL.put(key, session);
    }

    /**
     * 删除 session,会返回删除的 session
     *
     * @param key
     * @return
     */
    public static WebSocketSession remove(String key) {
        // 删除 session
        return SESSION_POOL.remove(key);
    }

    /**
     * 删除并同步关闭连接
     *
     * @param key
     */
    public static void removeAndClose(String key) {
        WebSocketSession session = remove(key);
        if (session != null) {
            try {
                // 关闭连接
                session.close();
            } catch (IOException e) {
                // todo: 关闭出现异常处理
                e.printStackTrace();
            }
        }
    }

    /**
     * 获得 session
     *
     * @param key
     * @return
     */
    public static WebSocketSession get(String key) {
        // 获得 session
        return SESSION_POOL.get(key);
    }
}

简单通过 ConcurrentHashMap 来实现了一个 session 池,用来保存已经登录的 web socket 的 session。前文提过,服务端发送消息给客户端必须要通过这个 session。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Component
public class MyInterceptor implements HandshakeInterceptor {

    /**
     * 握手前
     *
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        System.out.println("握手开始");
        // 获得请求参数
        HashMap<String, String> paramMap = HttpUtil.decodeParamMap(request.getURI().getQuery(), "utf-8");
        String uid = paramMap.get("token");
        if (StrUtil.isNotBlank(uid)) {
            // 放入属性域
            attributes.put("token", uid);
            System.out.println("用户 token " + uid + " 握手成功!");
            return true;
        }
        System.out.println("用户登录已失效");
        return false;
    }

    /**
     * 握手后
     *
     */
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
        System.out.println("握手完成");
    }
}

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Autowired
    private HttpAuthHandler httpAuthHandler;
    @Autowired
    private MyInterceptor myInterceptor;

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry
                .addHandler(httpAuthHandler, "myWS")
                .addInterceptors(myInterceptor)
                .setAllowedOrigins("*");
    }
}

STOMP

1
2
3
4
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 配置客户端尝试连接地址
        registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 设置广播节点
        registry.enableSimpleBroker("/topic", "/user");
        // 客户端向服务端发送消息需有/app 前缀
        registry.setApplicationDestinationPrefixes("/app");
        // 指定用户发送(一对一)的前缀 /user/
        registry.setUserDestinationPrefix("/user/");
    }
}
  • 通过实现 WebSocketMessageBrokerConfigurer 接口和加上 @EnableWebSocketMessageBroker 来进行 stomp 的配置与注解扫描。
  • 其中覆盖 registerStompEndpoints 方法来设置暴露的 stomp 的路径,其它一些跨域、客户端之类的设置。
  • 覆盖 configureMessageBroker 方法来进行节点的配置。
  • 其中 enableSimpleBroker 配置的广播节点,也就是服务端发送消息,客户端订阅就能接收消息的节点。
  • 覆盖 setApplicationDestinationPrefixes 方法,设置客户端向服务端发送消息的节点。
  • 覆盖 setUserDestinationPrefix 方法,设置一对一通信的节点。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Controller
public class WSController {

    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    @MessageMapping("/hello")
    @SendTo("/topic/hello")
    public ResponseMessage hello(RequestMessage requestMessage) {
        System.out.println("接收消息:" + requestMessage);
        return new ResponseMessage("服务端接收到你发的:" + requestMessage);
    }

    @GetMapping("/sendMsgByUser")
    public @ResponseBody
    Object sendMsgByUser(String token, String msg) {
        simpMessagingTemplate.convertAndSendToUser(token, "/msg", msg);
        return "success";
    }

    @GetMapping("/sendMsgByAll")
    public @ResponseBody
    Object sendMsgByAll(String msg) {
        simpMessagingTemplate.convertAndSend("/topic", msg);
        return "success";
    }

    @GetMapping("/test")
    public String test() {
        return "test-stomp.html";
    }
}

Session 共享的问题

上面反复提到一个问题就是,服务端如果要主动发送消息给客户端一定要用到 session。而大家都知道的是 session 这个东西是不跨 jvm 的。如果有多台服务器,在 http 请求的情况下,我们可以通过把 session 放入缓存中间件中来共享解决这个问题,通过 spring session 几条配置就解决了。但是 web socket 不可以。他的 session 是不能序列化的,当然这样设计的目的不是为了为难你,而是出于对 http 与 web socket 请求的差异导致的。目前网上找到的最简单方案就是通过 redis 订阅广播的形式,主要代码跟第二种方式差不多,你要在本地放个 map 保存请求的 session。也就是说每台服务器都会保存与他连接的 session 于本地。然后发消息的地方要修改,并不是现在这样直接发送,而通过 redis 的订阅机制。服务器要发消息的时候,你通过 redis 广播这条消息,所有订阅的服务端都会收到这个消息,然后本地尝试发送。最后肯定只有有这个对应用户 session 的那台才能发送出去。

如何选择

如果你的业务要求比较灵活多变,推荐使用前两种,更推荐第二种 Spring 封装的形式。如果只是简单的服务器双向通信,推荐 stomp 的形式,因为他更容易规范使用。

附录