자바 NIO로 논블로킹 네트워크 I/O 구현하기

자바 NIO를 사용해 논블로킹 네트워크 I/O를 구현해보자!

NIO 패키지

자바 IO 패키지

java.io 패키지에서 제공하는 소켓 통신은 블로킹 방식으로 동작한다. 클라이언트 소켓을 생성해 서버 소켓에 요청을 전송하는 상황을 가정해보자.

public void sendMessage(final Socket socket, final String message) throws IOException {
    // 서버에 요청을 전송한다
    OutputStream outputStream = socket.getOutputStream();
    PrintWriter writer = new PrintWriter(new OutputStreamWriter(outputStream));
    writer.print(message);
    writer.flush();
}
  • 클라이언트는 OutputStream을 생성해 서버에 요청을 전송한다
  • 소켓 전송 버퍼에 데이터를 저장할 공간이 없으면 print() 메서드는 블록된다

클라이언트는 서버에 요청을 전송한 후 응답을 받아올 것이다.

public String receiveMessage(final Socket socket) throws IOException {
    // 서버 응답을 받아온다
    InputStream serverInput = sock.getInputStream();
    BufferedReader reader = new BufferedReader(new InputStreamReader(serverInput));
    StringBuilder ServerResponse = new StringBuilder();

    // 데이터가 모두 도착할 때까지 받아온다
    String line;
    while (StringUtils.isNotBlank(line = reader.readLine())) {
        ServerResponse.append(line);
    }
}
  • InputStream을 생성해 서버에서 보낸 응답을 가져온다
  • 소켓에서 읽어올 데이터가 없으면 readLine() 메서드는 블록된다

정리하면 java.io의 특징은 다음과 같다

  • 데이터를 전송하거나 요청할 준비가 되지 않았다면 준비가 완료될 때까지 메서드는 블록된다
  • 데이터를 전송할 때와 읽어올 때 각각 단방향 스트림 객체를 생성해야 한다

java.io와 다르게 java.nio는 논블로킹 방식의 네트워크 I/O를 지원한다.

자바 NIO 패키지

NIO는 채널과 버퍼, 셀렉터 (Selector)로 구성된다.

SocketChannel

public void connectToServer() throws IOException {
    // 클라이언트 소켓 채널
    SocketChannel socketChannel = SocketChannel.open();
    sockChannel.configureBlocking(true);
    sockChannel.connect(address);
}
  • 스트림 기반의 IO와 다르게 NIO는 채널 기반으로 동작한다
  • IO는 데이터 입출력을 위해 입력 스트림과 출력 스트림을 생성했지만, 채널은 입출력이 모두 가능하다
  • open() 메서드를 호출해 소켓을 위한 소켓 채널을 생성한다
  • configureBlocking() 메서드를 호출해 논블로킹 설정이 가능하다

Buffer

public void read(final SelectionKey key) throws IOException {
    // 소켓 데이터를 저장하기 위한 버퍼 메모리 할당
    ByteBuffer buffer = ByteBuffer.allocate(256);
    SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
    // 소켓 데이터를 읽어 버퍼에 저장
    socketChannel.read(buffer);
    buffer.flip();
    return CHARSET.decode(buffer).toString();
}
  • 채널은 소켓에 입출력된 데이터를 버퍼 공간에 저장한다

Selector

public void createServerSocketChannel() throws IOException {
    // 논블로킹 서버 소켓 채널을 생성한다
    final serverSocketChannel = ServerSocketChannel.open();
    final serverSocketChannel.bind(new InetSocketAddress(PORT));
    final serverSocketChannel.configureBlocking(false);

    // 셀렉터에 작업을 등록한다
    final selector = Selector.open();
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

    // 작업이 완료되길 기다린다
    final Set<SelectionKey> selectedKeys = selector.select();
}
  • 셀렉터는 등록된 작업 중에서 입출력 준비가 완료된 작업들을 알려준다
  • 셀렉터에 여러 개의 채널과 작업을 등록할 수 있다
  • select() 메서드는 등록된 작업 중에서 준비된 작업이 하나 이상 생성될 때까지 블로킹한다

Buffer

소켓 채널은 입출력 데이터를 버퍼 메모리 공간에 저장한다.

  • Buffer는 데이터를 읽고 쓸 수 있는 메모리 공간이다
  • 추상 클래스인 Buffer에는 ByteBuffer, CharBuffer, IntBuffer 등 타입별 구현체가 존재한다

버퍼 위치 정보

버퍼는 데이터를 추적하기 위해 내부적으로 위치 정보를 기록한다.

position
limit
capacity
  • position 은 현재 읽거나 쓰고 있는 인덱스 값으로, 0부터 시작한다
  • limit 은 position 이 도달할 수 있는 최대 값이다. position과 limit 이 같아지면 더이상 버퍼에 데이터를 저장할 수 없다
  • capacity 는 버퍼에 저장할 수 있는 데이터의 최대 개수이다
0 <= position <= limit <= capacity
  • 위치 정보 간에는 위와 같은 대소 관계가 성립한다

버퍼에 데이터를 저장하고, 이를 읽는 상황을 가정해보자.

// 소켓 데이터를 저장하기 위한 버퍼 메모리 할당
ByteBuffer buffer = ByteBuffer.allocate(256);
SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
  • 소켓에서 읽어온 데이터를 저장하기 위해 버퍼를 할당했다
// 소켓 데이터를 읽어 버퍼에 저장
socketChannel.read(buffer);
  • 소켓에서 읽은 데이터를 버퍼에 저장했다

버퍼에 데이터를 저장했기 때문에 position 값이 이동했다. 만약 저장된 데이터를 읽고 싶다면 어떻게 해야할까? 저장된 데이터는 0번부터 position-1의 위치까지 저장되어 있을 것이다.

// 버퍼 데이터를 읽기 위해 포지션을 이동한다
buffer.flip();
return CHARSET.decode(buffer).toString();
  • flip() 메서드를 호출해 위치 정보를 조정해줘야 한다
  • flip() 호출로 position은 0의 위치로, limit은 기존 position의 위치로 이동된다
  • 따라서 0번부터 position-1 까지 저장된 데이터를 읽어올 수 있다

버퍼에 저장된 데이터를 전부 읽어왔다. 이번에는 버퍼에 새로운 데이터를 작성하려고 할 때, 어떻게 해야할까?

// 새로운 데이터 저장을 위해 포지션을 이동한다
buffer.compact();

// 소켓으로 전송할 데이터를 버퍼에 저장한다
socketChannel.write(buffer);
  • compact()를 호출해 이미 읽어온 데이터는 버퍼에서 제거한다
  • 아직 읽지 않은 데이터들은 0번 인덱스를 시작점으로 차례대로 복사된다
  • limit 값은 capacity로 이동한다

Selector

셀렉터는 소켓 채널에 발생한 이벤트를 감지해주는 일종의 이벤트 리스너라고 볼 수 있다. 셀렉터는 등록된 작업 유형에 대해, 해당 작업을 실행할 수 있는 준비가 완료되면 이를 알려주는 역할을 한다.

// 셀렉터에 작업을 등록한다
final selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  • 예를 들어, 셀렉터에 서버 소켓 채널과 ACCEPT 작업을 등록한다
  • 서버 소켓이 ACCEPT 작업을 실행할 준비가 완료되면 셀렉터를 통해 이를 전달받을 수 있다

셀렉터 작업 유형

셀렉터에 등록 가능한 작업 유형은 다음과 같다.

  • OP_ACCEPT - 서버 소켓 채널의 연결 수락 작업
  • OP_CONNECT - 소켓 채널의 연결 작업
  • OP_READ - 소켓 채널의 데이터 읽기 작업
  • OP_WRITE - 소켓 채널의 데이터 쓰기 작업

앞서 IO 패키지를 사용해 소켓에 읽기 요청을 전송했을 때, 데이터가 존재하지 않으면 메서드가 블록됐었다. 그러나 논블로킹 메서드와 셀렉터를 사용하면 이를 다음과 같이 변경할 수 있다.

  • 읽기 메서드는 데이터가 존재하는지 여부와 관계없이 일단 반환된다
  • 셀렉터는 소켓이 읽을 수 있는 데이터가 존재하면 이를 전달해준다

셀렉터 키셋

셀렉터는 준비가 완료된 작업들을 SelectionKey로 반환해준다.

selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
  • select() 메서드는 반환 가능한 키셋이 생성될 때까지 블록한다
  • 준비 완료된 작업이 있으면 키셋을 반환한다
for (SelectionKey key : selectedKeys) {

    // 서버 소켓 - 클라이언트 연결 수락 작업
  if (key.isAcceptable()) {
      register(selector, serverSocket);
  }

  // 서버 소켓 - 읽기 작업
  if (key.isReadable()) {
      response(buffer, key);
  }
}
  • 키셋을 순회하며 SelectionKey가 가리키는 작업 유형에 따라 적절히 처리해준다

NIO 사용하기

논블로킹 서버 구현

NIO를 사용해 논블로킹 서버를 구현해보자. 전체 코드는 다음과 같다.

public class NioServer {

    private static final int PORT = 8000;
    private static final Charset CHARSET = Charset.forName("UTF-8");

    public static void main(String[] args)  {
        try {
            Selector selector = Selector.open();
            ServerSocketChannel serverSocket = ServerSocketChannel.open();
            serverSocket.bind(new InetSocketAddress("localhost", PORT));
            serverSocket.configureBlocking(false);
            serverSocket.register(selector, SelectionKey.OP_ACCEPT);
            ByteBuffer buffer = ByteBuffer.allocate(256);

            while (true) {
                selector.select();
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                for (SelectionKey key : selectedKeys) {

                    if (key.isAcceptable()) {
                        accept(selector, serverSocket);
                    }

                    if (key.isReadable()) {
                        read(buffer, key);
                    }
                }

                selectedKeys.clear();
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }

    private static void read(ByteBuffer buffer, SelectionKey key) throws IOException {
        // 클라이언트 요청 읽기
        SocketChannel client = (SocketChannel) key.channel();
        client.read(buffer);
        buffer.flip();
        log.info("client send request: {}", Charset.defaultCharset().decode(buffer));

        buffer.clear();
        key.cancel();
        client.close();
    }

    private static void accept(Selector selector, ServerSocketChannel serverSocket) throws IOException {
        SocketChannel client = serverSocket.accept();
        if (client != null) {
            client.configureBlocking(false);
            client.register(selector, SelectionKey.OP_READ);
        }
    }
}

클라이언트 소켓 연결 수락하기

Selector selector = Selector.open();
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress("localhost", PORT));
serverSocket.configureBlocking(false);
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
  • 논블로킹 서버 소켓 채널을 생성한다
  • 클라이언트 연결 요청 작업인 OP_ACCEPT를 셀렉터에 등록한다
private static void accept(Selector selector, ServerSocketChannel serverSocket) throws IOException {
    SocketChannel client = serverSocket.accept();
    if (client != null) {
        client.configureBlocking(false);
        client.register(selector, SelectionKey.OP_READ);
    }
}
  • 클라이언트 소켓의 연결 요청이 들어오면 accept() 메서드가 이를 처리한다
  • accept()를 호출해 연결 요청을 수락한 후 읽기 작업을 셀렉터에 등록한다

클라이언트 요청 읽기

private static void read(ByteBuffer buffer, SelectionKey key) throws IOException {
    SocketChannel client = (SocketChannel) key.channel();
    client.read(buffer);
    buffer.flip();
    log.info("client request: {}", CHARSET.decode(buffer));

    buffer.clear();
    key.cancel();
    client.close();
}
  • 클라이언트로부터 요청이 들어오면 read() 메서드가 이를 처리한다
  • 클라이언트 요청을 읽어 버퍼에 저장한 후 출력한다
  • 클라이언트와 연결을 종료한다