네트워크 확장성

멀티스레드 서버
  • 요청을 위한 별도의 스레드를 사용하는 것은 접속과 관련된 요청이 동시에 처리될 수 있다
  • 스레드 구성 방법
    • 요청별 스레드
      • 서버에 도착하는 각 요청을 새 스레드로 할당되며 각 요청은 종종 새로운 연결이 생성되는 것을 의미한다
      • 요청이 유지될 필요가 없는 환경에서 잘 작동한다
    • 연결별 스레드
      • 클라이언트 연결은 세션 기간 동안 유지된다
      • 세션은 요청 및 응답으로 구성되며 특정 커멘드, 타임아웃 기간 이후에 종료된다
    • 객체별 스레드
      • 관련 요청을 처리할 수 있는 특정 객체와 함께 대기열에 넣는다
      • 객체와 그 메소드는 한 번에 하나의 요청을 처리하는 스레드에 위치하며, 요청은 스레드에서 대기한다.
      • 스레드 풀에서 종종 사용된다.
  • 요청별 스레드
    • 병렬을 접속을 지원하는 ConcurrentHashMap 사용하며 애플리케이션의 성능을 개선하는 데 유용할 수 있다
    • main 메소드에서 소버 소켓이 클라이언트 요청을 대기하고 클라이언트 소켓이 처리를 위한 스레드로 전달하는 새로운 스레드로 생성
      public class SimpleMultiThreadServer implements Runnable {
        private static ConcurrentHashMap<String, Float> map;
        private Socket clientSocket;
        static {
            map = new ConcurrentHashMap<>();
            map.put("Axle", 238.50f);
            map.put("Gear", 45.55f);
            map.put("Wheel", 86.30f);
            map.put("Rotor", 8.50f);
        }
        public SimpleMultiThreadServer(Socket clientSocket) {
            this.clientSocket = clientSocket;
        }
        @Override
        public void run() {
            System.out.println("Client Thread Started");
            try(BufferedReader br = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
                PrintStream out = new PrintStream(clientSocket.getOutputStream())) {
      
                String partName = br.readLine();
                out.println(map.get(partName));
                System.out.println("Request for " + partName + "and returned price is " + map.get(partName));
                clientSocket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            System.out.println("Client Thread Terminated");
        }
        public static void main(String[] args) {
            System.out.println("Multi-Threaded Server Started");
            try {
                ServerSocket serverSocket = new ServerSocket(5000);
                while (true) {
                    System.out.println("Listeneing for a client connection");
                    Socket clientSocket = serverSocket.accept();
                    new Thread(new SimpleMultiThreadServer(clientSocket)).start();;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            System.out.println("Multi-Threaded Server Terminated");
        }
      }
      
  • 클라이언트
    System.out.println("Client started");
    Scanner scanner = new Scanner(System.in);
    try {
        Socket socket = new Socket("127.0.0.1", 5000);
        BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        PrintStream ps = new PrintStream(socket.getOutputStream());
    
        System.out.print("Part Name: ");
        String partName = scanner.nextLine();
        ps.println(partName);;
        System.out.println(partName + " request sent. Response: " + br.readLine());
        socket.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
    System.out.println("Client Terminated");
    
  • 연결별 스레드 접근법
    • 서버
      • 타임아웃은 충분한 시간이 경과한 후 자동으로 클라이언트의 연결을 해제하게 설정될 필요가 잆다
      • 현재 quit 이 요청되면 루프는 종료되며 그 외 요청은 이전과 동일하게 처리된다
        @Override
        public void run() {
          System.out.println("Client Thread Started");
          Socket socket = serverSocket.accept();
          BufferedReader br = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
          PrintStream out = new PrintStream(clientSocket.getOutputStream())
          while(true) {
              String partName = br.readLine();
              if("quit".equals(partName) {
                  break; 
              }
              out.println(map.get(partName));
              System.out.println("Request for " + partName + "and returned price is " + map.get(partName));
              clientSocket.close();
          }
          System.out.println("Client Thread Terminated");
        }
        
        스레드풀
  • 풀은 많은 스레드가 생성되는 방법을 제어하며 비용이 많이 드는 작업에서 반복적으로 요청되는 스레드를 생성하고 파괴하는 작업을 생략할 수 있다
  • 요청은 풀에서 스레드에게 할당되며, 스레드가 없다면 일부 스레드 풀은 새 스레드를 생성할 수도 있고, 그 외에는 이용가능한 스레드 수를 제한한다. 일부 요청에서 블록된 결과가 발생할 수 있다
  • ThreadPoolExecutor 클래스는 스레드 실행에 대한 상태 정보를 전달하는 메소드를 제공한다
    • 생성 메소드
      • newCachedThreadPool(생성된 풀은 스레드를 재사용하며 필요한 경우 새로운 스레드를 생성)
      • newFixedThreadPool(고정된 크기의 풀을 생성)
    • 풀 종료
      • ThreadPoolExecutor 클래스의 인스턴스가 생성되면 스레드 풀에 전달되는 새로운 작업을 수락할 것이나 풀은 자동으로 닫히지 않는다.
      • 유휴상태의 경우 새 작업이 전송될 때까지 기다리며 풀을 종료하기 위해서는 shutdown, shutdownNow 메소드가 호출 되어야 한다
      • shutdownNow 는 바로 풀을 종료하고 대기 중인 작업을 실행하지 않는다
    • 스레드 풀 정보
      • getPoolSize: 풀에서 현재 스레드의 수를 반환
      • getActiveCount: 활성 스레드의 수를 반환
      • getLargePoolSize: 한번에 풀에서의 최대 스레드의 수를 반환
  • 서버
    • ThreadPoolExecutor 생성하여 요청 스레드 생성
      System.out.println("Thread Pool Server Started");
      ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
      
      try {
          ServerSocket serverSocket = new ServerSocket(5000);
          while(true) {
              System.out.println("Listening for a client connection");
              Socket clientSocket = serverSocket.accept(); // 연결 요청이 올 때까지 블록된다
      
              executor.execute(new WorkerThread(clientSocket));
          }
      } catch (IOException e) {
          e.printStackTrace();
      }
      
      executor.shutdown();
      System.out.println("Thread Pool Server Terminated");
      
    • 요청 스레드(WorkerThread)
      public class WorkerThread implements Runnable {
          private static ConcurrentHashMap<String, Float> map;
          private Socket clientSocket;
          static {
              map = new ConcurrentHashMap<>();
              map.put("Axle", 238.50f);
              map.put("Gear", 45.55f);
              map.put("Wheel", 86.30f);
              map.put("Rotor", 8.50f);
          }
          public WorkerThread(Socket clientSocket) {
              this.clientSocket = clientSocket;
          }
          @Override
          public void run() {
              System.out.println("Worker Thread Started");
              try(BufferedReader br = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
                  PrintStream out = new PrintStream(clientSocket.getOutputStream())) {
      
                  String partName = br.readLine();
                  out.println(map.get(partName));
                  System.out.println("Request for " + partName + ", price: " + map.get(partName));
                  clientSocket.close();
              } catch (IOException e) {
                  e.printStackTrace();
              }
              System.out.println("Worker Thread Started");
          }
      }
      
    • 클라이언트는 요청별 스레드와 동일
  • Callable 과 스레드 풀
    • Callable, Future 인터페이스는 멀티스레드를 지원하는 또 다른 접근 방법을 제공한다
      • Callable은 스레드가 결과를 반환해야 하는 스레딩을 지원하며 Runnable 인터페이스에 run 메소드를 대신해 사용되는 call 메소드 보유
      • Future 인터페이스는 Callable 객체와 함께 사용
    • Callable
      • 다른 스레드가 생성되는 동안 call 메소드가 반환될 때까지 블록되며 Future 인터페이스를 통해 기술을 향상시킬 수 있다 ```java public class WorkerThread implements Runnable { private Socket clientSocket; public WorkerThread(Socket clientSocket) { this.clientSocket = clientSocket; } @Override public void run() { System.out.println(“Worker Thread Started”); try(BufferedReader br = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); PrintStream out = new PrintStream(clientSocket.getOutputStream())) {

              String partName = br.readLine();
              float price = new WorkerCallable(partName).call();
              out.println(price);
              System.out.println("Request for " + partName + ", price: " + price);
              clientSocket.close();
          } catch (IOException e) {
              e.printStackTrace();
          } catch (Exception e) {
              e.printStackTrace();
          }
          System.out.println("Worker Thread Started");   } }
        

      public class WorkerCallable implements Callable { private static ConcurrentHashMap<String, Float> map; private String partName; static { map = new ConcurrentHashMap<>(); map.put("Axle", 238.50f); map.put("Gear", 45.55f); map.put("Wheel", 86.30f); map.put("Rotor", 8.50f); } public WorkerCallable(String partName) { this.partName = partName; } @Override public Float call() throws Exception { return map.get(partName); } } ```

    • Future
      • 완료된 call 매소드의 결과를 나타낸다
      • Future 인터페이스를 사용해 Callable 객체를 호출하고 반환될 떄까지 기다리지 않을 수 없다
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
        Future<Float> future1 = executor.submit(() -> 1.0f);
        Future<Float> future2 = executor.submit(() -> 2.0f);
        try {
          Float price1 = future1.get(); // 1.0
          Float price2 = future2.get(); // 2.0
        } catch (InterruptedException | ExecutionException e) {
          e.printStackTrace();
        }
        
    • CompletableFuture
    • CompletionStage
  • HTTPServer 에서 스레드 풀 사용하기
    • HTTPServer 에서는 기본적으로는 start 메소드가 호출될 때 생성된 스레드 사용
    • setExecutor 메소드를 사용하여 요청을 스레드에 할당하는 방법을 지정할 수 있다.
      HttpServer server = HttpServer.create(new InetSocketAddress(80), 0);
      server.createContext("/index", new OtherHandler();
      server.setExecutor(Executors.newCachedThreadPool());
      server.start();
      
셀렉터 사용
  • Selector 는 NIO 애플리케이션에서 사용되고 하나의 스레드가 여러 채널을 처리하게 하기 때문에 스레드 전환과정에서 발생하는 오버헤드의 일부를 방지할 수 있다.
  • 여러 채널과 그 이벤트를 조정하며 처리를 위해 준비된 채널을 식별한다.
  • 주요 클래스
    • Selector: 기본 기능 제공
    • SelectionKey: 처리를 위해 준비된 이벤트의 타입 식별
  • 셀렉터 사용 단계 1. 셀렉터 생성
    • 생성자가 없고 static 메소드인 open() 메소드 사용
    • 셀렉터를 열고 더이상 필요하지 않을 때 닫기 위한 close() 사용, isOpen() 메소드도 있다
  • 셀렉터 사용 단계 2. 채널을 셀렉터에 등록
    • register 메소드로 채널을 등록하며 등록되는 모든 채널은 논블로킹 모드여야 한다
      socketChannel.configureBlocking(false); // 셀렉터에 채널을 등록하기 위해서는 반드시 해당 채널이 non-blocking 모드로 변환
      // 파라미터: 등록을 위한 셀렉터, 관심있는 이벤트 타입, 채널과 관련된 데이터
      socketChannel.register(selector, SelectionKey.OP_WRITE, null);
      
    • SelectionKey
      • 이벤트 타입(interst sets: 관심 세트)
        • Connect(SelectionKey.OP_CONNECT): 채널이 성공적으로 서버에 연결되었음을 나타낸다.
        • Accept(SelectionKey.OP_ACCEPT): 서버 소켓 채널은 클라이언트의 연결 요청을 받아들일 준비가 돼 있음을 나타낸다.
        • Read(SelectionKey.OP_READ): 채널은 읽기 준비가 된 데이터를 갖고 있음을 나타낸다
        • Write(SelectionKey.OP_WRITE): 채널이 쓰기 작업을 위한 준비가 돼 있음을 나타낸다.
      • OR 연산자를 통해 조합하여 생성할 수 있으며 register 메소드를 통해 다수의 유용한 속성을 포함하는 SelectionKey 인스턴스를 반환한다.
        int interstSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
        SelectionKey key = socketChannel.register(selector, interstSet);
        
      • 속성
        • interst set: 관심있는 이벤트 포함
        • ready set: 채널이 처리할 준비된 작업들의 집합
        • Channel: channel 메소드는 셀렉션 키와 연관된 채널을 반환
        • Selector: selector 메소드는 셀렉션 키와 연결된 셀렉터를 반환
        • Attached objects: 자세한 정보는 attach 메소드를 사용해 첨부될 수 있으며 attachment 메소드는 이 객체에 접속을 위해 나중에 사용된다
      • 이벤트가 준비된 것을 확인하기 위한 메소드
        • readOps: 준비 이벤트를 포함하는 정수 반환
        • interstOps: 관심있는 이벤트를 나타내는 정수 반환
        • isAcceptable: 수락 이벤트가 준비돼 있음을 나타낸다
        • isConnectable: 연결 이벤트가 준비돼 있음을 나타낸다
        • isReadable: 읽기 이벤트가 준비돼 있음을 나타낸다
        • isWritable: 쓰기 이벤트가 준비돼 있음을 나타낸다
      • 셀렉터를 이용해 채널 선택
        • 셀렉터에 하나 이상의 채널을 등록한 후에는 select() 메소드를 호출할 수 있다
        • accept, connect, read, write 이벤트에 대해 준비(ready) 되어 있는 채널을 반환
        • select(): 등록한 이벤트에 대해 하나 이상의 채널이 준비 될 때까지 block. 몇개의 채널이 준비되었는지 준비된 채널의 수를 반환합니다.
        • select(long timeout): 최대 timeout(ms) 동안만 block한다는 점을 제외하면 select()와 동일
        • selectNow(): select와 달리 block하지 않고 준비된 채널이 있으면 즉시 반환됩니다.
  • 셀렉터 사용 단계 3. 사용 가능할 때 사용하기 위한 채널 선택
    • 서버
      public class SelectorTimeServer {
          private static Selector selector;
      
          public static void main(String[] args) {
              System.out.println("Time Server Started");
              try {
                  ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                  serverSocketChannel.socket().bind(new InetSocketAddress(5000));
      
                  selector = Selector.open(); // 셀렉터 생성 및 스레드 실행
                  while(true) {
                      SocketChannel socketChannel = serverSocketChannel.accept();
                      System.out.println("Socket Channel Accepted - " + socketChannel);
                      if(socketChannel != null) {
                          // 스레드는 select 메소드에 의해 블록될 수 있다 - wakeup 메소드를 사용하면 select 메소드가 즉시 반환되므로 register 메소드가 차단을 해제할 수 있다.
                          socketChannel.configureBlocking(false); // 해당 메소드 호출 필요
                          selector.wakeup(); // 셀렉터는 시작
                          socketChannel.register(selector, SelectionKey.OP_WRITE, null); // 채널 셀렉터에 등록
      
                          new Thread(new SelectorHandler()).start();
                      }
                  }
              } catch (ClosedChannelException e) {
                  e.printStackTrace();
              } catch (IOException e) {
                  e.printStackTrace();
              }
              System.out.println("Time Server Terminated");
          }
      
          private static class SelectorHandler implements Runnable {
              @Override
              public void run() {
                  while(true) {
                      try {
                          System.out.println("About to select ...");
                          int readyChannels = selector.select();
                          if(readyChannels == 0) {
                              continue;
                          }
      
                          Set<SelectionKey> selectionKeys = selector.selectedKeys();
                          Iterator<SelectionKey> iterator = selectionKeys.iterator();
                          while (iterator.hasNext()){
                              SelectionKey selectionKey = iterator.next();
                             if(selectionKey.isAcceptable()) {
                                 // 연결 가능
                             } else if(selectionKey.isConnectable()) {
                                 // 연결 성립
                             } else if(selectionKey.isReadable()) {
                                 // 읽을 준비가 된 채널
                             } else if(selectionKey.isWritable()) {
                                 sendMessage(selectionKey);
                             }
                             Thread.sleep(1000);
                             iterator.remove();
                          }
      
                      } catch (IOException | InterruptedException e) {
                          e.printStackTrace();
                      }
                  }
              }
      
              private void sendMessage(SelectionKey key) throws IOException {
                  String message = "Date: " + LocalDateTime.now().toString();
                  ByteBuffer buffer = ByteBuffer.allocate(64);
                  buffer.put(message.getBytes());
                  buffer.flip();
                  SocketChannel socketChannel = null;
                  while(buffer.hasRemaining()) {
                      socketChannel = (SocketChannel) key.channel();
                      socketChannel.write(buffer);
                  }
                  System.out.println("SENT Message: " + message);
              }
          }
      }
      
    • 클라이언트
      Random random = new Random();
      SocketAddress address = new InetSocketAddress("127.0.0.1", 5000);
      try(SocketChannel socketChannel = SocketChannel.open(address)) {
          while(true) {
              ByteBuffer buffer = ByteBuffer.allocate(64);
              int byteRead = socketChannel.read(buffer);
              while(byteRead > 0) {
                  buffer.flip();
                  while(buffer.hasRemaining()) {
                      System.out.print((char) buffer.get());
                  }
                  System.out.println();
                  byteRead = socketChannel.read(buffer);
              }
              Thread.sleep(random.nextInt(1000) + 1000);
          }
      } catch (ClosedChannelException e) {
          e.printStackTrace();
      } catch (IOException | InterruptedException e) {
          e.printStackTrace();
      }
      
네트워크 타임아웃 설정
  • 소켓 통신의 일부 제어를 여러 소켓 옵션이 있다.
  • SO_TIMEOUT 옵션은 읽기 작업에 대한 타임아웃을 설정하는 데 사용되며 지정된 시간이 경과하면 SocketTimeoutException 예외 발생
  • 아래와 같은 방법으로 타임아웃 시간을 지정할 수 있다.
    socket.setTimeout(3000); // 3
    
    출처
  • 에이콘 출판사. 자바 네트워크 프로그래밍. 리차드 리스 지음, 유연재 옮김 (http://www.yes24.com/Product/Goods/34894821)