目录

Java NIO

IO – java.io

Java 1.0 中引入了 java.io 包, Java 1.1 中引入了 Reader

  • InputStreamOutputStream – 一次提供一个字节的数据
  • ReaderWriter – 字符流的便利包装器
  • blocking mode(阻塞模式) – 等待一条完整的消息

NIO – java.nio

java.nio 包是在 Java 1.4 中引入的,并在 Java 1.7 (NIO.2)中进行了更新,具有增强的文件操作和一个 ASynchronousSocketChannel。它提供:

  • Buffer(缓冲区) – 一次读取若干数据块(chunks)
  • CharsetDecoder – for mapping raw bytes to/from readable characters
  • Channel – for communicating with the outside world
  • Selector – 在 SelectableChannel 上启用多路复用并提供对准备好 I/O 的任何 Channel 的访问
  • non-blocking mode(非阻塞模式) – 读取任何准备好的东西
IO NIO
面向流 面向缓冲区
阻塞IO 非阻塞IO
选择器
1
2
InetSocketAddress address = new InetSocketAddress("localhost", 8888);
SocketChannel socketChannel = SocketChannel.open(address);

Java NIO 包

Channel

  • FileChannel for file system read/write
  • DatagramChannel for read/write over a network using UDP
  • SocketChannel for read/write over a network using TCP
  • ServerSocketChannel:可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个 SocketChannel。
  • AsynchronousServerSocketChannel 和 AsynchronousSocketChannel NIO2加入,是上面两个 Channel 的 AIO 版本

Buffer

缓冲区是一块内存,我们可以从中读取或写入数据。NIO Buffer对象包装了一个内存块。Buffer类提供了一组与内存块一起工作的功能。要使用Buffer对象,我们需要了解Buffer类的三个主要属性:容量、位置和限制。

  • 容量定义了内存块的大小。当我们将数据写入缓冲区时,我们只能写入有限的长度。当缓冲区已满时,我们需要读取数据或清除数据。
  • 位置是我们写入数据的起点。一个空缓冲区从 0 开始到容量 - 1。另外,当我们读取数据时,我们从位置值开始。
  • 限制意味着我们如何从缓冲区写入和读取。

除 Boolean 外每个原始类型一个Buffer, eg IntBuffer,还有 MappedByteBuffer

要使用缓冲区,我们需要知道一些重要的方法:

  • allocate(int value) — 创建一个特定大小的缓冲区。
  • flip() - 此方法用于从写入模式切换到读取模式
  • clear() – 清除缓冲区内容的方法
  • compact() – 只清除我们已经阅读的内容的方法
  • rewind() - 将位置重置为 0,以便我们可以重新读取缓冲区中的数据
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Test
public void convertUsingNewStringFromBufferArray_thenOK() {
    String content = "ynthm";
    ByteBuffer byteBuffer = ByteBuffer.wrap(content.getBytes());

    if (byteBuffer.hasArray()) {
        String newContent = new String(byteBuffer.array(), charset);

        assertEquals(content, newContent);
    }
}

@Test
public void convertUsingNewStringFromByteBufferGetBytes_thenOK() {
    String content = "ynthm";
    ByteBuffer byteBuffer = ByteBuffer.wrap(content.getBytes());

    byte[] bytes = new byte[byteBuffer.remaining()];
    byteBuffer.get(bytes);
    String newContent = new String(bytes, charset);

    assertEquals(content, newContent);
}

@Test
public void convertUsingCharsetDecode_thenOK() {
    String content = "ynthm";
    ByteBuffer byteBuffer = ByteBuffer.wrap(content.getBytes());
    String newContent = charset.decode(byteBuffer).toString();

    assertEquals(content, newContent);
}

Selector

Java NIO Selector 允许我们用一个线程管理多个通道。要使用选择器对象监控多个通道,每个通道实例必须处于非阻塞模式,并且我们必须注册它。通道注册后,我们得到一个SelectionKey对象,表示通道和选择器之间的连接。当我们有多个通道连接到一个选择器时,我们可以使用 select() 方法来检查有多少通道可供使用。调用 select() 方法后,我们可以使用 selectedKeys() 方法获取所有准备好的通道。

使用选择器,我们可以使用一个线程而不是多个线程来管理多个通道。线程之间的上下文切换对于操作系统来说代价高昂,此外,每个线程都会占用内存。因此,我们使用的线程越少越好。但是,重要的是要记住,现代操作系统和 CPU 在多任务处理方面不断进步,因此多线程的开销会随着时间的推移而不断减少。选择器不仅可以帮助您读取数据;还可以侦听传入的网络连接并通过慢速通道写入数据。

向选择器注册的任何通道都必须是 SelectableChannel 的子类。这些是一种特殊类型的通道,可以置于非阻塞模式。 Selector 是 SelectableChannel 对象的多路复用器, 可以同时监控多个 SelectableChannel 的 IO 状态。

在使用选择器注册通道之前,它必须处于非阻塞模式

1
2
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

可以监听四种不同的事件,每一种都由 SelectionKey 类中的一个常量表示:

  • 连接-当客户端尝试连接到服务器时。由 SelectionKey.OP_CONNECT 表示
  • 接受-当服务器接受来自客户端的连接时。由 SelectionKey.OP_ACCEPT 表示
  • 读取-当服务器准备好从通道读取时。由 SelectionKey.OP_READ 表示
  • 写入-当服务器准备好写入通道时。由 SelectionKey.OP_WRITE 表示

返回的对象 SelectionKey 表示 SelectableChannel 向选择器的注册。

SelectionKey

The Interest Set 兴趣集

兴趣集定义了我们希望选择器在此通道上注意的事件集。它是一个整数值;我们可以通过以下方式获取这些信息。

首先,我们有由SelectionKey的interestOps方法返回的兴趣集。然后我们在前面看到的SelectionKey中有事件常量。

当我们 AND 这两个值时,我们得到一个布尔值,它告诉我们是否正在监视事件:

1
2
3
4
5
6
int interestSet = selectionKey.interestOps();

boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;

The Ready Set

1
2
3
4
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWriteable();

Pipe

管道用于在两个线程之间建立单向数据连接。它有一个槽通道和源通道。数据正在写入槽通道,然后可以从源通道读取该数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class PipeDemo {
  public static void main(String[] args) throws IOException {
    // The Pipe is created
    Pipe pipe = Pipe.open();
    // For accessing the pipe sink channel
    ByteBuffer bb;
    try (Pipe.SinkChannel skChannel = pipe.sink()) {
      String td = "Data is successfully sent for checking the java NIO Channel Pipe.";
      bb = ByteBuffer.allocate(1024);
      bb.clear();
      bb.put(td.getBytes());
      bb.flip();
      // write the data into a sink channel.
      while (bb.hasRemaining()) {
        skChannel.write(bb);
      }
    }
    // For accessing the pipe source channel
    try (Pipe.SourceChannel sourceChannel = pipe.source()) {
      bb = ByteBuffer.allocate(512);
      // The data is writing to the console
      while (sourceChannel.read(bb) > 0) {
        bb.flip();

        while (bb.hasRemaining()) {
          char testData = (char) bb.get();
          System.out.print(testData);
        }
        bb.clear();
      }
    }
  }
}

FileLock

FileLock锁定或尝试锁定文件的给定部分。它属于java.nio.channels包,该功能在JDK 1.4以上版本可用。 FileLock用于在共享模式或非共享模式下锁定文件。它有两个重要的方法如下:

  • FileLock.lock(long position, long size, boolean shared)
  • FileLock.tryLock(long position, long size, boolean shared)
1
2
3
public final FileLock lock() throws IOException {
  return lock(0L, Long.MAX_VALUE, false);
}

上述方法使用参数作为初始位置,文件大小锁定和一个参数来决定是否共享锁定。

  • 共享锁:允许多个线程进行文件的读取操作
  • 独占锁: 只允许一个线程进行文件的读/写操作

当使用FileChannel或AsynchronousFileChannel的lock()或tryLock()方法之一获取文件锁时,将创建文件锁定对象。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class FileLockDemo {
  public static void main(String[] args) throws IOException {
    String input = "* end of the file.";
    System.out.println("Input string to the test file is: " + input);
    ByteBuffer buf = ByteBuffer.wrap(input.getBytes());
    String fp = "testout-file.txt";
    Path pt = Paths.get(fp);
    try (FileChannel fc =
        FileChannel.open(pt, StandardOpenOption.WRITE, StandardOpenOption.APPEND)) {
      System.out.println("File channel is open for write and Acquiring lock...");
      // position of a cursor at the end of file
      fc.position(fc.size() - 1);
      FileLock lock = fc.lock();
      System.out.println("The Lock is shared: " + lock.isShared());
      fc.write(buf);
      // Releases the Lock fc.close();
    }
    System.out.println(
        "Content Writing is complete. Therefore close the channel and release the lock.");print(fp);
  }
  public static void print(String path) throws IOException {
    FileReader filereader = new FileReader(path);
    BufferedReader bufferedreader = new BufferedReader(filereader);
    String tr = bufferedreader.readLine();
    System.out.println("The Content of testout-file.txt file is: ");
    while (tr != null) {
      System.out.println("    " + tr);
      tr = bufferedreader.readLine();
    }
    filereader.close();
    bufferedreader.close();
  }
}

NIO 封装缺点

java.nio包引入的变化更多与底层数据 IO 相关。虽然他们允许非阻塞 API,但其他方面仍然存在问题:

  • 对符号链接的有限支持
  • 对文件属性访问的有限支持
  • 缺少更好的文件系统管理工具

Java NIO2

Java 1.7 引入了新的 java.nio.file 包,也称为 NIO.2 包。此包遵循java.nio包中不支持的非阻塞 IO 的异步方法。最重要的变化与高级文件操作有关。它们与Files、Path和Paths类一起添加。最显着的低级更改是添加了AsynchroniousFileChannelAsyncroniousSocketChannel

**Path 对象表示由分隔符分隔的目录和文件名的分层序列。**根组件在最左边,而文件在右边。此类提供实用方法,例如getFileName()、 getParent()等。 Path类还提供解析和相对化方法,帮助构建不同文件之间的路径。Paths 类是一组静态实用程序方法,它们接收String或URI以创建Path实例。

**Files 类提供了使用前面描述的Path类并对文件、目录和符号链接进行操作的实用方法。**它还提供了一种使用readAttributes()方法读取许多文件属性的方法。

1
2
3
4
5
@Test
public void readFromFileUsingNIO2() throws Exception {
    List<String> strings = Files.readAllLines(Paths.get("src/test/resources/nio-vs-nio2.txt"));
    assertThat(strings.get(0)).isEqualTo("Hello from file!");
}

java.nio和java.nio.file包的基础知识。我们可以看到,NIO.2 并不是 NIO 包的新版本。NIO 包引入了用于非阻塞 IO 的低级 API,而 NIO.2 引入了更好的文件管理。这两个包不是同义词,而是相互补充。

AsynchronousFileChannel

读取数据

读取AsynchronousFileChannel的数据有两种方式。每种方法都会调用AsynchronousFileChannel的一个read()接口。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
Path path = Paths.get("data/test.xml");
AsynchronousFileChannel fileChannel = 
    AsynchronousFileChannel.open(path, StandardOpenOption.READ);

ByteBuffer buffer = ByteBuffer.allocate(1024);
long position = 0;

Future<Integer> operation = fileChannel.read(buffer, position);

while(!operation.isDone());

buffer.flip();
byte[] data = new byte[buffer.limit()];
buffer.get(data);
System.out.println(new String(data));
buffer.clear();
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
Path path = Paths.get("data/test.xml");
AsynchronousFileChannel fileChannel = 
    AsynchronousFileChannel.open(path, StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(1024);
long position = 0;
fileChannel.read(buffer, position, buffer, new CompletionHandler<Integer, ByteBuffer>() {
    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        System.out.println("result = " + result);

        attachment.flip();
        byte[] data = new byte[attachment.limit()];
        attachment.get(data);
        System.out.println(new String(data));
        attachment.clear();
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {

    }
});

写数据

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
Path path = Paths.get("data/test-write.txt");
if(!Files.exists(path)){
    Files.createFile(path);
}
AsynchronousFileChannel fileChannel = 
    AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);

ByteBuffer buffer = ByteBuffer.allocate(1024);
long position = 0;

buffer.put("test data".getBytes());
buffer.flip();

Future<Integer> operation = fileChannel.write(buffer, position);
buffer.clear();

while(!operation.isDone());

System.out.println("Write done");
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Path path = Paths.get("data/test-write.txt");
if(!Files.exists(path)){
    Files.createFile(path);
}
AsynchronousFileChannel fileChannel = 
    AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);

ByteBuffer buffer = ByteBuffer.allocate(1024);
long position = 0;

buffer.put("test data".getBytes());
buffer.flip();

fileChannel.write(buffer, position, buffer, new CompletionHandler<Integer, ByteBuffer>() {

    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        System.out.println("bytes written: " + result);
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        System.out.println("Write failed");
        exc.printStackTrace();
    }
});

例子

ServerSocketChannel 与 SocketChannel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class EchoServer {

    private static final String POISON_PILL = "POISON_PILL";

    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel serverSocket = ServerSocketChannel.open();
        serverSocket.bind(new InetSocketAddress("localhost", 5454));
        serverSocket.configureBlocking(false);
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        ByteBuffer buffer = ByteBuffer.allocate(256);

        while (true) {
            selector.select();
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> iter = selectedKeys.iterator();
            while (iter.hasNext()) {

                SelectionKey key = iter.next();

                if (key.isAcceptable()) {
                    register(selector, serverSocket);
                }

                if (key.isReadable()) {
                    answerWithEcho(buffer, key);
                }
                iter.remove();
            }
        }
    }

    private static void answerWithEcho(ByteBuffer buffer, SelectionKey key)
      throws IOException {
 
        SocketChannel client = (SocketChannel) key.channel();
        client.read(buffer);
        if (new String(buffer.array()).trim().equals(POISON_PILL)) {
            client.close();
            System.out.println("Not accepting client messages anymore");
        }
        else {
            buffer.flip();
            client.write(buffer);
            buffer.clear();
        }
    }

    private static void register(Selector selector, ServerSocketChannel serverSocket)
      throws IOException {
 
        SocketChannel client = serverSocket.accept();
        client.configureBlocking(false);
        client.register(selector, SelectionKey.OP_READ);
    }

    public static Process start() throws IOException, InterruptedException {
        String javaHome = System.getProperty("java.home");
        String javaBin = javaHome + File.separator + "bin" + File.separator + "java";
        String classpath = System.getProperty("java.class.path");
        String className = EchoServer.class.getCanonicalName();

        ProcessBuilder builder = new ProcessBuilder(javaBin, "-cp", classpath, className);

        return builder.start();
    }
}

客户端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class EchoClient {
    private static SocketChannel client;
    private static ByteBuffer buffer;
    private static EchoClient instance;

    public static EchoClient start() {
        if (instance == null)
            instance = new EchoClient();

        return instance;
    }

    public static void stop() throws IOException {
        client.close();
        buffer = null;
    }

    private EchoClient() {
        try {
            client = SocketChannel.open(new InetSocketAddress("localhost", 5454));
            buffer = ByteBuffer.allocate(256);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public String sendMessage(String msg) {
        buffer = ByteBuffer.wrap(msg.getBytes());
        String response = null;
        try {
            client.write(buffer);
            buffer.clear();
            client.read(buffer);
            response = new String(buffer.array()).trim();
            System.out.println("response=" + response);
            buffer.clear();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response;

    }
}

测试

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class EchoTest {

    Process server;
    EchoClient client;

    @Before
    public void setup() throws IOException, InterruptedException {
        server = EchoServer.start();
        client = EchoClient.start();
    }

    @Test
    public void givenServerClient_whenServerEchosMessage_thenCorrect() {
        String resp1 = client.sendMessage("hello");
        String resp2 = client.sendMessage("world");
        assertEquals("hello", resp1);
        assertEquals("world", resp2);
    }

    @After
    public void teardown() throws IOException {
        server.destroy();
        EchoClient.stop();
    }
}

AsynchronousServerSocketChannel 和 AsynchronousSocketChannel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
AsynchronousServerSocketChannel serverChannel
  = AsynchronousServerSocketChannel.open();
  server.bind(new InetSocketAddress("127.0.0.1", 4999));
  while (true) {
    serverChannel.accept(
      null, new CompletionHandler<AsynchronousSocketChannel,Object>() {

        @Override
        public void completed(
          AsynchronousSocketChannel result, Object attachment) {
            if (serverChannel.isOpen()){
                serverChannel.accept(null, this);
            }

            clientChannel = result;
            if ((clientChannel != null) && (clientChannel.isOpen())) {
                ReadWriteHandler handler = new ReadWriteHandler();
                ByteBuffer buffer = ByteBuffer.allocate(32);

                Map<String, Object> readInfo = new HashMap<>();
                readInfo.put("action", "read");
                readInfo.put("buffer", buffer);

                clientChannel.read(buffer, readInfo, handler);
             }
         }
         @Override
         public void failed(Throwable exc, Object attachment) {
             // process error
         }
    });
    System.in.read();
}

class ReadWriteHandler implements 
  CompletionHandler<Integer, Map<String, Object>> {
    
    @Override
    public void completed(
      Integer result, Map<String, Object> attachment) {
        Map<String, Object> actionInfo = attachment;
        String action = (String) actionInfo.get("action");

        if ("read".equals(action)) {
            ByteBuffer buffer = (ByteBuffer) actionInfo.get("buffer");
            buffer.flip();
            actionInfo.put("action", "write");

            clientChannel.write(buffer, actionInfo, this);
            buffer.clear();

        } else if ("write".equals(action)) {
            ByteBuffer buffer = ByteBuffer.allocate(32);

            actionInfo.put("action", "read");
            actionInfo.put("buffer", buffer);

            clientChannel.read(buffer, actionInfo, this);
        }
    }
    
    @Override
    public void failed(Throwable exc, Map<String, Object> attachment) {
        // 
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
InetSocketAddress hostAddress = new InetSocketAddress("localhost", 4999);
Future<Void> future = client.connect(hostAddress);


public String sendMessage(String message) {
    byte[] byteMsg = new String(message).getBytes();
    ByteBuffer buffer = ByteBuffer.wrap(byteMsg);
    Future<Integer> writeResult = client.write(buffer);

    // do some computation

    writeResult.get();
    buffer.flip();
    Future<Integer> readResult = client.read(buffer);
    
    // do some computation

    readResult.get();
    String echo = new String(buffer.array()).trim();
    buffer.clear();
    return echo;
}