使用新的 Spring 5 WebSockets API 以及 Spring WebFlux 提供的响应式特性创建一个快速示例。
WebSocket 是一种众所周知的协议,可以实现客户端和服务器之间的全双工通信,通常用于客户端和服务器需要以高频率和低延迟交换事件的 Web 应用程序。
Spring Framework 5 对框架中的 WebSockets 支持进行了现代化改造,为该通信通道添加了响应式功能。
WebFlux 本身提供了对 WebSocket 协议的支持,处理 WebSocket 请求需要对应的 handler 实现 WebSocketHandler 接口,每一个 WebSocket 都有一个关联的 WebSocketSession,包含了建立请求时的握手信息 HandshakeInfo,以及其它相关的信息。可以通过 session 的 receive() 方法来接收客户端的数据,通过 session 的 send() 方法向客户端发送数据。
1
2
3
4
5
6
7
8
9
10
11
12
|
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-integration -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<version>2.7.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-webflux -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>2.7.5</version>
</dependency>
|
Spring 中的 WebSocket 配置
首先,需要使用SimpleUrlHandlerMapping将WebSocketHandler映射到 URL。 然后我们需要一个WebSocketHandlerAdapter来调用WebSocketHandler。
最后,为了让WebSocketHandlerAdapter了解传入的响应式运行时请求,我们需要使用ReactorNettyRequestUpgradeStrategy配置WebSocketService(因为我们正在使用默认的 Netty 服务器)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
@Autowired
private WebSocketHandler webSocketHandler;
@Bean
public HandlerMapping webSocketHandlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/event-emitter", webSocketHandler);
SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
handlerMapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
handlerMapping.setUrlMap(map);
return handlerMapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter(webSocketService());
}
@Bean
public WebSocketService webSocketService() {
return new HandshakeWebSocketService(new ReactorNettyRequestUpgradeStrategy());
}
|
ws://localhost:<port>/event-emitter
Spring 中的 WebSocket 消息处理
1
2
3
4
5
6
7
8
9
10
11
12
|
@Component
public class ReactiveWebSocketHandler implements WebSocketHandler {
// private fields ...
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
return webSocketSession.send(intervalFlux
.map(webSocketSession::textMessage))
.and(webSocketSession.receive()
.map(WebSocketMessage::getPayloadAsText)
.log());
}
}
|
响应式 WebSocket 客户端
1
2
3
4
|
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
public class ReactiveJavaClientWebSocket {
public static void main(String[] args) throws InterruptedException {
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(
URI.create("ws://localhost:8080/event-emitter"),
session -> session.send(
Mono.just(session.textMessage("event-spring-reactive-client-websocket")))
.thenMany(session.receive()
.map(WebSocketMessage::getPayloadAsText)
.log())
.then())
.block(Duration.ofSeconds(10L));
}
}
|
WebFlux 默认使用 Jackson 来处理 JSON 对应的字符串
IDEA HTTP 客户端中支持 WebSockets 端点
CORS
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public class MyWebSocketHandler implements WebSocketHandler, CorsConfigurationSource {
@Override
public Mono<Void> handle(WebSocketSession session) {
...
}
@Override
public CorsConfiguration getCorsConfiguration(ServerWebExchange exchange) {
CorsConfiguration configuration = new CorsConfiguration();
configuration.addAllowedOrigin("*");
return configuration;
}
}
|
也可以在 SimpleUrlHandler 上设置 corsConfigurations 属性。
Junit 5 测试
@WebFluxTest 注解和 WebTestClient
1
2
3
4
5
|
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
|
@WebFluxTest注解
它需要完全自动配置,而仅应用与 WebFlux 测试相关的配置(例如@Controller,@ControllerAdvice,@JsonComponent,Converter和WebFluxConfigurer bean,但没有@Component,@Service或@Repository bean)。
默认情况下,用@WebFluxTest注解的测试还将自动配置WebTestClient。
通常,@WebFluxTest与@MockBean或@Import结合使用以创建@Controller bean 所需的任何协作者。
要编写需要完整应用程序上下文的集成测试,请考虑将@SpringBootTest与@AutoConfigureWebTestClient结合使用。
WebTestClient
它是用于测试 Web 服务器的非阻塞式响应客户端,该客户端内部使用响应WebClient来执行请求,并提供流利的 API 来验证响应。
它可以通过 HTTP 连接到任何服务器,或使用模拟请求和响应对象直接绑定到 WebFlux 应用程序,而无需 HTTP 服务器。
WebTestClient与MockMvc相似。 这些测试 Web 客户端之间的唯一区别是WebTestClient旨在测试 WebFlux 端点。
测试 Controller
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
@ExtendWith(SpringExtension.class)
@WebFluxTest(controllers = EmployeeController.class)
@Import(EmployeeService.class)
public class EmployeeControllerTest
{
@MockBean
EmployeeRepository repository;
@Autowired
private WebTestClient webClient;
@Test
void testCreateEmployee() {
Employee employee = new Employee();
employee.setId(1);
employee.setName("Test");
employee.setSalary(1000);
Mockito.when(repository.save(employee)).thenReturn(Mono.just(employee));
webClient.post()
.uri("/create")
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromObject(employee))
.exchange()
.expectStatus().isCreated();
Mockito.verify(repository, times(1)).save(employee);
}
@Test
void testGetEmployeesByName()
{
Employee employee = new Employee();
employee.setId(1);
employee.setName("Test");
employee.setSalary(1000);
List<Employee> list = new ArrayList<Employee>();
list.add(employee);
Flux<Employee> employeeFlux = Flux.fromIterable(list);
Mockito
.when(repository.findByName("Test"))
.thenReturn(employeeFlux);
webClient.get().uri("/name/{name}", "Test")
.header(HttpHeaders.ACCEPT, "application/json")
.exchange()
.expectStatus().isOk()
.expectBodyList(Employee.class);
Mockito.verify(repository, times(1)).findByName("Test");
}
@Test
void testGetEmployeeById()
{
Employee employee = new Employee();
employee.setId(100);
employee.setName("Test");
employee.setSalary(1000);
Mockito
.when(repository.findById(100))
.thenReturn(Mono.just(employee));
webClient.get().uri("/{id}", 100)
.exchange()
.expectStatus().isOk()
.expectBody()
.jsonPath("$.name").isNotEmpty()
.jsonPath("$.id").isEqualTo(100)
.jsonPath("$.name").isEqualTo("Test")
.jsonPath("$.salary").isEqualTo(1000);
Mockito.verify(repository, times(1)).findById(100);
}
@Test
void testDeleteEmployee()
{
Mono<Void> voidReturn = Mono.empty();
Mockito
.when(repository.deleteById(1))
.thenReturn(voidReturn);
webClient.get().uri("/delete/{id}", 1)
.exchange()
.expectStatus().isOk();
}
}
@RestController
public class EmployeeController
{
@Autowired
private EmployeeService employeeService;
@PostMapping(value = { "/create", "/" })
@ResponseStatus(HttpStatus.CREATED)
public void create(@RequestBody Employee e) {
employeeService.create(e);
}
@GetMapping(value = "/{id}")
@ResponseStatus(HttpStatus.OK)
public ResponseEntity<Mono<Employee>> findById(@PathVariable("id") Integer id) {
Mono<Employee> e = employeeService.findById(id);
HttpStatus status = (e != null) ? HttpStatus.OK : HttpStatus.NOT_FOUND;
return new ResponseEntity<>(e, status);
}
@GetMapping(value = "/name/{name}")
@ResponseStatus(HttpStatus.OK)
public Flux<Employee> findByName(@PathVariable("name") String name) {
return employeeService.findByName(name);
}
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseStatus(HttpStatus.OK)
public Flux<Employee> findAll() {
return employeeService.findAll();
}
@PutMapping(value = "/update")
@ResponseStatus(HttpStatus.OK)
public Mono<Employee> update(@RequestBody Employee e) {
return employeeService.update(e);
}
@DeleteMapping(value = "/delete/{id}")
@ResponseStatus(HttpStatus.OK)
public void delete(@PathVariable("id") Integer id) {
employeeService.delete(id).subscribe();
}
}
|
- 使用@ExtendWith( SpringExtension.class )支持 Junit 5 中的测试。在 Junit 4 中,我们需要使用@RunWith(SpringRunner.class)。
- 使用@Import(EmployeeService.class)为应用程序上下文提供服务依赖,而使用@WebFluxTest时不会自动扫描该上下文。
- 我们模拟了EmployeeRepository类型为ReactiveMongoRepository的模型。 这将阻止实际的数据库插入和更新。
- WebTestClient用于命中控制器的特定端点并验证其是否返回正确的状态代码和主体。
附录