IO – java.io
Java 1.0 中引入了 java.io
包, Java 1.1 中引入了 Reader
InputStream
和 OutputStream
– 一次提供一个字节的数据
Reader
和 Writer
– 字符流的便利包装器
- blocking mode(阻塞模式) – 等待一条完整的消息
NIO – java.nio
java.nio
包是在 Java 1.4 中引入的,并在 Java 1.7 (NIO.2)中进行了更新,具有增强的文件操作和一个 ASynchronousSocketChannel
。它提供:
Buffer
(缓冲区) – 一次读取若干数据块(chunks)
CharsetDecoder
– for mapping raw bytes to/from readable characters
Channel
– for communicating with the outside world
Selector
– 在 SelectableChannel
上启用多路复用并提供对准备好 I/O 的任何 Channel
的访问
- non-blocking mode(非阻塞模式) – 读取任何准备好的东西
IO |
NIO |
面向流 |
面向缓冲区 |
阻塞IO |
非阻塞IO |
无 |
选择器 |
1
2
|
InetSocketAddress address = new InetSocketAddress("localhost", 8888);
SocketChannel socketChannel = SocketChannel.open(address);
|
Java NIO 包
Channel
- FileChannel for file system read/write
- DatagramChannel for read/write over a network using UDP
- SocketChannel for read/write over a network using TCP
- ServerSocketChannel:可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个 SocketChannel。
- AsynchronousServerSocketChannel 和 AsynchronousSocketChannel NIO2加入,是上面两个 Channel 的 AIO 版本
Buffer
缓冲区是一块内存,我们可以从中读取或写入数据。NIO Buffer对象包装了一个内存块。Buffer类提供了一组与内存块一起工作的功能。要使用Buffer对象,我们需要了解Buffer类的三个主要属性:容量、位置和限制。
- 容量定义了内存块的大小。当我们将数据写入缓冲区时,我们只能写入有限的长度。当缓冲区已满时,我们需要读取数据或清除数据。
- 位置是我们写入数据的起点。一个空缓冲区从 0 开始到容量 - 1。另外,当我们读取数据时,我们从位置值开始。
- 限制意味着我们如何从缓冲区写入和读取。
除 Boolean 外每个原始类型一个Buffer, eg IntBuffer,还有 MappedByteBuffer
要使用缓冲区,我们需要知道一些重要的方法:
- allocate(int value) — 创建一个特定大小的缓冲区。
- flip() - 此方法用于从写入模式切换到读取模式
- clear() – 清除缓冲区内容的方法
- compact() – 只清除我们已经阅读的内容的方法
- rewind() - 将位置重置为 0,以便我们可以重新读取缓冲区中的数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
@Test
public void convertUsingNewStringFromBufferArray_thenOK() {
String content = "ynthm";
ByteBuffer byteBuffer = ByteBuffer.wrap(content.getBytes());
if (byteBuffer.hasArray()) {
String newContent = new String(byteBuffer.array(), charset);
assertEquals(content, newContent);
}
}
@Test
public void convertUsingNewStringFromByteBufferGetBytes_thenOK() {
String content = "ynthm";
ByteBuffer byteBuffer = ByteBuffer.wrap(content.getBytes());
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
String newContent = new String(bytes, charset);
assertEquals(content, newContent);
}
@Test
public void convertUsingCharsetDecode_thenOK() {
String content = "ynthm";
ByteBuffer byteBuffer = ByteBuffer.wrap(content.getBytes());
String newContent = charset.decode(byteBuffer).toString();
assertEquals(content, newContent);
}
|
Selector
Java NIO Selector 允许我们用一个线程管理多个通道。要使用选择器对象监控多个通道,每个通道实例必须处于非阻塞模式,并且我们必须注册它。通道注册后,我们得到一个SelectionKey对象,表示通道和选择器之间的连接。当我们有多个通道连接到一个选择器时,我们可以使用 select()
方法来检查有多少通道可供使用。调用 select()
方法后,我们可以使用 selectedKeys()
方法获取所有准备好的通道。
使用选择器,我们可以使用一个线程而不是多个线程来管理多个通道。线程之间的上下文切换对于操作系统来说代价高昂,此外,每个线程都会占用内存。因此,我们使用的线程越少越好。但是,重要的是要记住,现代操作系统和 CPU 在多任务处理方面不断进步,因此多线程的开销会随着时间的推移而不断减少。选择器不仅可以帮助您读取数据;还可以侦听传入的网络连接并通过慢速通道写入数据。
向选择器注册的任何通道都必须是 SelectableChannel 的子类。这些是一种特殊类型的通道,可以置于非阻塞模式。
Selector 是 SelectableChannel 对象的多路复用器, 可以同时监控多个
SelectableChannel 的 IO 状态。
在使用选择器注册通道之前,它必须处于非阻塞模式
1
2
|
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
|
可以监听四种不同的事件,每一种都由 SelectionKey
类中的一个常量表示:
- 连接-当客户端尝试连接到服务器时。由
SelectionKey.OP_CONNECT
表示
- 接受-当服务器接受来自客户端的连接时。由
SelectionKey.OP_ACCEPT
表示
- 读取-当服务器准备好从通道读取时。由
SelectionKey.OP_READ
表示
- 写入-当服务器准备好写入通道时。由
SelectionKey.OP_WRITE
表示
返回的对象 SelectionKey
表示 SelectableChannel
向选择器的注册。
SelectionKey
The Interest Set 兴趣集:
兴趣集定义了我们希望选择器在此通道上注意的事件集。它是一个整数值;我们可以通过以下方式获取这些信息。
首先,我们有由SelectionKey的interestOps方法返回的兴趣集。然后我们在前面看到的SelectionKey中有事件常量。
当我们 AND 这两个值时,我们得到一个布尔值,它告诉我们是否正在监视事件:
1
2
3
4
5
6
|
int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
|
The Ready Set:
1
2
3
4
|
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWriteable();
|
Pipe
管道用于在两个线程之间建立单向数据连接。它有一个槽通道和源通道。数据正在写入槽通道,然后可以从源通道读取该数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
public class PipeDemo {
public static void main(String[] args) throws IOException {
// The Pipe is created
Pipe pipe = Pipe.open();
// For accessing the pipe sink channel
ByteBuffer bb;
try (Pipe.SinkChannel skChannel = pipe.sink()) {
String td = "Data is successfully sent for checking the java NIO Channel Pipe.";
bb = ByteBuffer.allocate(1024);
bb.clear();
bb.put(td.getBytes());
bb.flip();
// write the data into a sink channel.
while (bb.hasRemaining()) {
skChannel.write(bb);
}
}
// For accessing the pipe source channel
try (Pipe.SourceChannel sourceChannel = pipe.source()) {
bb = ByteBuffer.allocate(512);
// The data is writing to the console
while (sourceChannel.read(bb) > 0) {
bb.flip();
while (bb.hasRemaining()) {
char testData = (char) bb.get();
System.out.print(testData);
}
bb.clear();
}
}
}
}
|
FileLock
FileLock锁定或尝试锁定文件的给定部分。它属于java.nio.channels包,该功能在JDK 1.4以上版本可用。
FileLock用于在共享模式或非共享模式下锁定文件。它有两个重要的方法如下:
- FileLock.lock(long position, long size, boolean shared)
- FileLock.tryLock(long position, long size, boolean shared)
1
2
3
|
public final FileLock lock() throws IOException {
return lock(0L, Long.MAX_VALUE, false);
}
|
上述方法使用参数作为初始位置,文件大小锁定和一个参数来决定是否共享锁定。
- 共享锁:允许多个线程进行文件的读取操作
- 独占锁: 只允许一个线程进行文件的读/写操作
当使用FileChannel或AsynchronousFileChannel的lock()或tryLock()方法之一获取文件锁时,将创建文件锁定对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
public class FileLockDemo {
public static void main(String[] args) throws IOException {
String input = "* end of the file.";
System.out.println("Input string to the test file is: " + input);
ByteBuffer buf = ByteBuffer.wrap(input.getBytes());
String fp = "testout-file.txt";
Path pt = Paths.get(fp);
try (FileChannel fc =
FileChannel.open(pt, StandardOpenOption.WRITE, StandardOpenOption.APPEND)) {
System.out.println("File channel is open for write and Acquiring lock...");
// position of a cursor at the end of file
fc.position(fc.size() - 1);
FileLock lock = fc.lock();
System.out.println("The Lock is shared: " + lock.isShared());
fc.write(buf);
// Releases the Lock fc.close();
}
System.out.println(
"Content Writing is complete. Therefore close the channel and release the lock.");print(fp);
}
public static void print(String path) throws IOException {
FileReader filereader = new FileReader(path);
BufferedReader bufferedreader = new BufferedReader(filereader);
String tr = bufferedreader.readLine();
System.out.println("The Content of testout-file.txt file is: ");
while (tr != null) {
System.out.println(" " + tr);
tr = bufferedreader.readLine();
}
filereader.close();
bufferedreader.close();
}
}
|
NIO 封装缺点
java.nio包引入的变化更多与底层数据 IO 相关。虽然他们允许非阻塞 API,但其他方面仍然存在问题:
- 对符号链接的有限支持
- 对文件属性访问的有限支持
- 缺少更好的文件系统管理工具
Java NIO2
Java 1.7 引入了新的 java.nio.file
包,也称为 NIO.2 包。此包遵循java.nio包中不支持的非阻塞 IO 的异步方法。最重要的变化与高级文件操作有关。它们与Files、Path和Paths类一起添加。最显着的低级更改是添加了AsynchroniousFileChannel
和 AsyncroniousSocketChannel
。
**Path 对象表示由分隔符分隔的目录和文件名的分层序列。**根组件在最左边,而文件在右边。此类提供实用方法,例如getFileName()、 getParent()等。 Path类还提供解析和相对化方法,帮助构建不同文件之间的路径。Paths 类是一组静态实用程序方法,它们接收String或URI以创建Path实例。
**Files 类提供了使用前面描述的Path类并对文件、目录和符号链接进行操作的实用方法。**它还提供了一种使用readAttributes()方法读取许多文件属性的方法。
1
2
3
4
5
|
@Test
public void readFromFileUsingNIO2() throws Exception {
List<String> strings = Files.readAllLines(Paths.get("src/test/resources/nio-vs-nio2.txt"));
assertThat(strings.get(0)).isEqualTo("Hello from file!");
}
|
java.nio和java.nio.file包的基础知识。我们可以看到,NIO.2 并不是 NIO 包的新版本。NIO 包引入了用于非阻塞 IO 的低级 API,而 NIO.2 引入了更好的文件管理。这两个包不是同义词,而是相互补充。
AsynchronousFileChannel
读取数据
读取AsynchronousFileChannel的数据有两种方式。每种方法都会调用AsynchronousFileChannel的一个read()接口。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
Path path = Paths.get("data/test.xml");
AsynchronousFileChannel fileChannel =
AsynchronousFileChannel.open(path, StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(1024);
long position = 0;
Future<Integer> operation = fileChannel.read(buffer, position);
while(!operation.isDone());
buffer.flip();
byte[] data = new byte[buffer.limit()];
buffer.get(data);
System.out.println(new String(data));
buffer.clear();
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
Path path = Paths.get("data/test.xml");
AsynchronousFileChannel fileChannel =
AsynchronousFileChannel.open(path, StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(1024);
long position = 0;
fileChannel.read(buffer, position, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
System.out.println("result = " + result);
attachment.flip();
byte[] data = new byte[attachment.limit()];
attachment.get(data);
System.out.println(new String(data));
attachment.clear();
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
});
|
写数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
Path path = Paths.get("data/test-write.txt");
if(!Files.exists(path)){
Files.createFile(path);
}
AsynchronousFileChannel fileChannel =
AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
ByteBuffer buffer = ByteBuffer.allocate(1024);
long position = 0;
buffer.put("test data".getBytes());
buffer.flip();
Future<Integer> operation = fileChannel.write(buffer, position);
buffer.clear();
while(!operation.isDone());
System.out.println("Write done");
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
Path path = Paths.get("data/test-write.txt");
if(!Files.exists(path)){
Files.createFile(path);
}
AsynchronousFileChannel fileChannel =
AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
ByteBuffer buffer = ByteBuffer.allocate(1024);
long position = 0;
buffer.put("test data".getBytes());
buffer.flip();
fileChannel.write(buffer, position, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
System.out.println("bytes written: " + result);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("Write failed");
exc.printStackTrace();
}
});
|
例子
ServerSocketChannel 与 SocketChannel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
public class EchoServer {
private static final String POISON_PILL = "POISON_PILL";
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress("localhost", 5454));
serverSocket.configureBlocking(false);
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer buffer = ByteBuffer.allocate(256);
while (true) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isAcceptable()) {
register(selector, serverSocket);
}
if (key.isReadable()) {
answerWithEcho(buffer, key);
}
iter.remove();
}
}
}
private static void answerWithEcho(ByteBuffer buffer, SelectionKey key)
throws IOException {
SocketChannel client = (SocketChannel) key.channel();
client.read(buffer);
if (new String(buffer.array()).trim().equals(POISON_PILL)) {
client.close();
System.out.println("Not accepting client messages anymore");
}
else {
buffer.flip();
client.write(buffer);
buffer.clear();
}
}
private static void register(Selector selector, ServerSocketChannel serverSocket)
throws IOException {
SocketChannel client = serverSocket.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
}
public static Process start() throws IOException, InterruptedException {
String javaHome = System.getProperty("java.home");
String javaBin = javaHome + File.separator + "bin" + File.separator + "java";
String classpath = System.getProperty("java.class.path");
String className = EchoServer.class.getCanonicalName();
ProcessBuilder builder = new ProcessBuilder(javaBin, "-cp", classpath, className);
return builder.start();
}
}
|
客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
public class EchoClient {
private static SocketChannel client;
private static ByteBuffer buffer;
private static EchoClient instance;
public static EchoClient start() {
if (instance == null)
instance = new EchoClient();
return instance;
}
public static void stop() throws IOException {
client.close();
buffer = null;
}
private EchoClient() {
try {
client = SocketChannel.open(new InetSocketAddress("localhost", 5454));
buffer = ByteBuffer.allocate(256);
} catch (IOException e) {
e.printStackTrace();
}
}
public String sendMessage(String msg) {
buffer = ByteBuffer.wrap(msg.getBytes());
String response = null;
try {
client.write(buffer);
buffer.clear();
client.read(buffer);
response = new String(buffer.array()).trim();
System.out.println("response=" + response);
buffer.clear();
} catch (IOException e) {
e.printStackTrace();
}
return response;
}
}
|
测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
public class EchoTest {
Process server;
EchoClient client;
@Before
public void setup() throws IOException, InterruptedException {
server = EchoServer.start();
client = EchoClient.start();
}
@Test
public void givenServerClient_whenServerEchosMessage_thenCorrect() {
String resp1 = client.sendMessage("hello");
String resp2 = client.sendMessage("world");
assertEquals("hello", resp1);
assertEquals("world", resp2);
}
@After
public void teardown() throws IOException {
server.destroy();
EchoClient.stop();
}
}
|
AsynchronousServerSocketChannel 和 AsynchronousSocketChannel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
AsynchronousServerSocketChannel serverChannel
= AsynchronousServerSocketChannel.open();
server.bind(new InetSocketAddress("127.0.0.1", 4999));
while (true) {
serverChannel.accept(
null, new CompletionHandler<AsynchronousSocketChannel,Object>() {
@Override
public void completed(
AsynchronousSocketChannel result, Object attachment) {
if (serverChannel.isOpen()){
serverChannel.accept(null, this);
}
clientChannel = result;
if ((clientChannel != null) && (clientChannel.isOpen())) {
ReadWriteHandler handler = new ReadWriteHandler();
ByteBuffer buffer = ByteBuffer.allocate(32);
Map<String, Object> readInfo = new HashMap<>();
readInfo.put("action", "read");
readInfo.put("buffer", buffer);
clientChannel.read(buffer, readInfo, handler);
}
}
@Override
public void failed(Throwable exc, Object attachment) {
// process error
}
});
System.in.read();
}
class ReadWriteHandler implements
CompletionHandler<Integer, Map<String, Object>> {
@Override
public void completed(
Integer result, Map<String, Object> attachment) {
Map<String, Object> actionInfo = attachment;
String action = (String) actionInfo.get("action");
if ("read".equals(action)) {
ByteBuffer buffer = (ByteBuffer) actionInfo.get("buffer");
buffer.flip();
actionInfo.put("action", "write");
clientChannel.write(buffer, actionInfo, this);
buffer.clear();
} else if ("write".equals(action)) {
ByteBuffer buffer = ByteBuffer.allocate(32);
actionInfo.put("action", "read");
actionInfo.put("buffer", buffer);
clientChannel.read(buffer, actionInfo, this);
}
}
@Override
public void failed(Throwable exc, Map<String, Object> attachment) {
//
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
InetSocketAddress hostAddress = new InetSocketAddress("localhost", 4999);
Future<Void> future = client.connect(hostAddress);
public String sendMessage(String message) {
byte[] byteMsg = new String(message).getBytes();
ByteBuffer buffer = ByteBuffer.wrap(byteMsg);
Future<Integer> writeResult = client.write(buffer);
// do some computation
writeResult.get();
buffer.flip();
Future<Integer> readResult = client.read(buffer);
// do some computation
readResult.get();
String echo = new String(buffer.array()).trim();
buffer.clear();
return echo;
}
|