Java NIO - 选择器



众所周知,Java NIO 支持与通道和缓冲区之间进行多重事务处理。因此,为了检查一个或多个 NIO 通道,并确定哪些通道已准备好进行数据事务处理(即读取或写入),Java NIO 提供了选择器。

使用选择器,我们可以创建一个线程来了解哪些通道已准备好进行数据写入和读取,并可以处理该特定通道。

我们可以通过调用其静态方法 `open()` 来获取选择器实例。打开选择器后,我们必须向其注册一个非阻塞模式通道,这将返回一个 SelectionKey 实例。

SelectionKey 本质上是可以在通道上执行的操作集合,或者可以说我们可以借助选择键了解通道的状态。

选择键表示的主要操作或通道状态如下:

  • SelectionKey.OP_CONNECT - 准备好连接到服务器的通道。

  • SelectionKey.OP_ACCEPT - 准备好接受传入连接的通道。

  • SelectionKey.OP_READ - 准备好读取数据的通道。

  • SelectionKey.OP_WRITE - 准备好写入数据的通道。

注册后获得的选择键有一些重要的方法,如下所示:

  • attach() - 此方法用于将对象附加到键。将对象附加到通道的主要目的是识别同一个通道。

  • attachment() - 此方法用于从通道中保留附加的对象。

  • channel() - 此方法用于获取为其创建特定键的通道。

  • selector() - 此方法用于获取为其创建特定键的选择器。

  • isValid() - 此方法返回键是否有效。

  • isReadable() - 此方法表明键的通道是否准备好读取。

  • isWritable() - 此方法表明键的通道是否准备好写入。

  • isAcceptable() - 此方法表明键的通道是否准备好接受传入连接。

  • isConnectable() - 此方法测试此键的通道是否已完成或未能完成其套接字连接操作。

  • isAcceptable() - 此方法测试此键的通道是否准备好接受新的套接字连接。

  • interestOps() - 此方法检索此键的兴趣集。

  • readyOps() - 此方法检索就绪集,即通道已准备好执行的操作集。

我们可以通过调用其静态方法 `select()` 从选择器中选择一个通道。选择器的 select 方法是重载的,如下所示:

  • select() - 此方法阻塞当前线程,直到至少有一个通道已准备好处理其注册的事件。

  • select(long timeout) - 此方法与 select() 执行相同的操作,但它最多阻塞线程 timeout 毫秒(参数)。

  • selectNow() - 此方法根本不会阻塞。它立即返回任何已准备好的通道。

此外,为了离开调用 select 方法的阻塞线程,可以在选择器实例中调用 `wakeup()` 方法,之后在 select() 中等待的线程将立即返回。

最后,我们可以通过调用 `close()` 方法来关闭选择器,该方法还会使与此选择器注册的所有 SelectionKey 实例失效,以及关闭选择器。

示例

import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class SelectorDemo {
   public static void main(String[] args) throws IOException {
      String demo_text = "This is a demo String";	
      Selector selector = Selector.open();
      ServerSocketChannel serverSocket = ServerSocketChannel.open();
      serverSocket.bind(new InetSocketAddress("localhost", 5454));
      serverSocket.configureBlocking(false);
      serverSocket.register(selector, SelectionKey.OP_ACCEPT);
      ByteBuffer buffer = ByteBuffer.allocate(256);
      while (true) {
         selector.select();
         Set<SelectionKey> selectedKeys = selector.selectedKeys();
         Iterator<SelectionKey> iter = selectedKeys.iterator();
         while (iter.hasNext()) {
            SelectionKey key = iter.next();
            int interestOps = key.interestOps();
            System.out.println(interestOps);
            if (key.isAcceptable()) {
               SocketChannel client = serverSocket.accept();
               client.configureBlocking(false);
               client.register(selector, SelectionKey.OP_READ);
            }
            if (key.isReadable()) {
               SocketChannel client = (SocketChannel) key.channel();
               client.read(buffer);
               if (new String(buffer.array()).trim().equals(demo_text)) {
                  client.close();
                  System.out.println("Not accepting client messages anymore");
               }
               buffer.flip();
               client.write(buffer);
               buffer.clear();
            }
            iter.remove();
         }
      }
   }
}
广告