Netty是一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
Netty是一个NIO客户端、服务端框架。允许快速简单的开发网络应用程序。例如:服务端和客户端之间的协议。它最牛逼的地方在于简化了网络编程规范。例如:TCP和UDP的Socket服务。
Netty 本身是用于快速构建服务端与客户端之间通信协议的框架。Netty在消息处理上使用责任链模式,用户可以轻松方便的对它进行扩展。官方也提供了大量的优秀的扩展。
1
2
3
4
5
6
|
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.75.Final</version>
</dependency>
|
DISCARD
它是一种丢弃任何接收到的数据而没有任何响应的协议
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
/**
* 处理服务器端通道
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
// 以静默方式丢弃接收的数据
((ByteBuf) msg).release(); // (3)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// 出现异常时关闭连接。
cause.printStackTrace();
ctx.close();
}
}
|
一些重要的解释:
-
DiscardServerHandler
扩展了ChannelInboundHandlerAdapter
,它是ChannelInboundHandler
的一个实现。 ChannelInboundHandler
提供了可以覆盖的各种事件处理程序方法。 现在,它只是扩展了ChannelInboundHandlerAdapter
,而不是自己实现处理程序接口。
-
我们在这里覆盖channelRead()
事件处理程序方法。每当从客户端接收到新数据时,使用该方法来接收客户端的消息。 在此示例中,接收到的消息的类型为ByteBuf
。
-
要实现DISCARD
协议,处理程序必须忽略接收到的消息。ByteBuf
是引用计数的对象,必须通过release()
方法显式释放。请记住,处理程序负责释放传递给处理程序的引用计数对象。 通常,channelRead()
处理程序方法实现如下:
1
2
3
4
5
6
7
8
9
10
11
|
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
// Do something with msg
} finally {
ReferenceCountUtil.release(msg);
}
}
Java
|
当Netty由于I/O错误或由处理事件时抛出的异常而导致的处理程序实现引发异常时,使用Throwable
调用exceptionCaught()
事件处理程序方法。 在大多数情况下,捕获的异常会被记录,并且其相关的通道应该在这里关闭,这种方法的实现可以根据想要什么样的方式来处理异常情况而有所不同。 例如,您可能希望在关闭连接之前发送带有错误代码的响应消息。
到现在如果没有问题,我们已经实现了DISCARD
服务器的前半部分。 现在剩下的就是编写main()
方法,并使用DiscardServerHandler
启动服务器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new DiscardServer(port).run();
}
}
|
-
NioEventLoopGroup
是处理I/O
操作的多线程事件循环。 Netty为不同类型的传输提供了各种EventLoopGroup
实现。 在此示例中,实现的是服务器端应用程序,因此将使用两个NioEventLoopGroup
。 第一个通常称为“boss
”,接受传入连接。 第二个通常称为“worker
”,当“boss
”接受连接并且向“worker
”注册接受连接,则“worker
”处理所接受连接的流量。 使用多少个线程以及如何将它们映射到创建的通道取决于EventLoopGroup
实现,甚至可以通过构造函数进行配置。
-
ServerBootstrap
是一个用于设置服务器的助手类。 您可以直接使用通道设置服务器。 但是,请注意,这是一个冗长的过程,在大多数情况下不需要这样做。
-
在这里,我们指定使用NioServerSocketChannel
类,该类用于实例化新的通道以接受传入连接。
-
此处指定的处理程序将始终由新接受的通道计算。 ChannelInitializer
是一个特殊的处理程序,用于帮助用户配置新的通道。 很可能要通过添加一些处理程序(例如DiscardServerHandler
)来配置新通道的ChannelPipeline
来实现您的网络应用程序。 随着应用程序变得复杂,可能会向管道中添加更多处理程序,并最终将此匿名类提取到顶级类中。
-
还可以设置指定Channel
实现的参数。这里编写的是一个TCP/IP
服务器,所以我们允许设置套接字选项,如tcpNoDelay
和keepAlive
。 请参阅ChannelOption的apidocs和指定的ChannelConfig实现,以了解关于ChannelOptions。
-
你注意到option()
和childOption()
没有? option()
用于接受传入连接的NioServerSocketChannel
。 childOption()
用于由父ServerChannel
接受的通道,在这个示例中为NioServerSocketChannel
。
-
现在准备好了。剩下的是绑定到端口和启动服务器。 这里,我们绑定到机器中所有NIC(网络接口卡)的端口:8080
。 现在可以根据需要多次调用bind()
方法(使用不同的绑定地址)。
恭喜!这就完成了一个基于 Netty 的第一个服务器。
为它是一个”丢弃“(什么也不处理)服务器。所以发送什么请求根本不会得到任何反应。 为了证明它是真的在运行工作,我们还修改一点服务器端上的代码 - 打印它收到了什么东西。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
try {
while (in.isReadable()) { // (1)
System.out.print((char) in.readByte());
System.out.flush();
}
} finally {
ReferenceCountUtil.release(msg); // (2)
}
// 或者直接打印
System.out.println("Yes, A new client in = " + ctx.name());
}
|
TIME 协议
它与先前的示例不同,时间服务器只发送包含32位整数的消息,而不接收任何请求,并在消息发送后关闭连接。 在本示例中,您将学习如何构造和发送消息,以及在完成时关闭连接。
因为时间服务器将忽略任何接收到的数据,但是一旦建立连接就发送消息,所以我们不能使用channelRead()方法。而是覆盖channelActive()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) { // (1)
final ByteBuf time = ctx.alloc().buffer(4); // (2)
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
|
- 如上所述,当建立连接并准备好生成流量时,将调用channelActive()方法。现在在这个方法中编写一个32位的整数来表示当前的时间。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = args[0];
int port = Integer.parseInt(args[1]);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
Date currentTime = new Date(currentTimeMillis);
System.out.println("Default Date Format:" + currentTime.toString());
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateString = formatter.format(currentTime);
// 转换一下成中国人的时间格式
System.out.println("Date Format:" + dateString);
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
|
附录