네트워크 확장성
멀티스레드 서버
- 요청을 위한 별도의 스레드를 사용하는 것은 접속과 관련된 요청이 동시에 처리될 수 있다
- 스레드 구성 방법
- 요청별 스레드
- 서버에 도착하는 각 요청을 새 스레드로 할당되며 각 요청은 종종 새로운 연결이 생성되는 것을 의미한다
- 요청이 유지될 필요가 없는 환경에서 잘 작동한다
- 연결별 스레드
- 클라이언트 연결은 세션 기간 동안 유지된다
- 세션은 요청 및 응답으로 구성되며 특정 커멘드, 타임아웃 기간 이후에 종료된다
- 객체별 스레드
- 관련 요청을 처리할 수 있는 특정 객체와 함께 대기열에 넣는다
- 객체와 그 메소드는 한 번에 하나의 요청을 처리하는 스레드에 위치하며, 요청은 스레드에서 대기한다.
- 스레드 풀에서 종종 사용된다.
- 요청별 스레드
- 요청별 스레드
- 병렬을 접속을 지원하는 ConcurrentHashMap 사용하며 애플리케이션의 성능을 개선하는 데 유용할 수 있다
- main 메소드에서 소버 소켓이 클라이언트 요청을 대기하고 클라이언트 소켓이 처리를 위한 스레드로 전달하는 새로운 스레드로 생성
public class SimpleMultiThreadServer implements Runnable { private static ConcurrentHashMap<String, Float> map; private Socket clientSocket; static { map = new ConcurrentHashMap<>(); map.put("Axle", 238.50f); map.put("Gear", 45.55f); map.put("Wheel", 86.30f); map.put("Rotor", 8.50f); } public SimpleMultiThreadServer(Socket clientSocket) { this.clientSocket = clientSocket; } @Override public void run() { System.out.println("Client Thread Started"); try(BufferedReader br = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); PrintStream out = new PrintStream(clientSocket.getOutputStream())) { String partName = br.readLine(); out.println(map.get(partName)); System.out.println("Request for " + partName + "and returned price is " + map.get(partName)); clientSocket.close(); } catch (IOException e) { e.printStackTrace(); } System.out.println("Client Thread Terminated"); } public static void main(String[] args) { System.out.println("Multi-Threaded Server Started"); try { ServerSocket serverSocket = new ServerSocket(5000); while (true) { System.out.println("Listeneing for a client connection"); Socket clientSocket = serverSocket.accept(); new Thread(new SimpleMultiThreadServer(clientSocket)).start();; } } catch (IOException e) { e.printStackTrace(); } System.out.println("Multi-Threaded Server Terminated"); } }
- 클라이언트
System.out.println("Client started"); Scanner scanner = new Scanner(System.in); try { Socket socket = new Socket("127.0.0.1", 5000); BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream())); PrintStream ps = new PrintStream(socket.getOutputStream()); System.out.print("Part Name: "); String partName = scanner.nextLine(); ps.println(partName);; System.out.println(partName + " request sent. Response: " + br.readLine()); socket.close(); } catch (IOException e) { e.printStackTrace(); } System.out.println("Client Terminated");
- 연결별 스레드 접근법
- 서버
- 타임아웃은 충분한 시간이 경과한 후 자동으로 클라이언트의 연결을 해제하게 설정될 필요가 잆다
- 현재 quit 이 요청되면 루프는 종료되며 그 외 요청은 이전과 동일하게 처리된다
@Override public void run() { System.out.println("Client Thread Started"); Socket socket = serverSocket.accept(); BufferedReader br = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); PrintStream out = new PrintStream(clientSocket.getOutputStream()) while(true) { String partName = br.readLine(); if("quit".equals(partName) { break; } out.println(map.get(partName)); System.out.println("Request for " + partName + "and returned price is " + map.get(partName)); clientSocket.close(); } System.out.println("Client Thread Terminated"); }
스레드풀
- 서버
- 풀은 많은 스레드가 생성되는 방법을 제어하며 비용이 많이 드는 작업에서 반복적으로 요청되는 스레드를 생성하고 파괴하는 작업을 생략할 수 있다
- 요청은 풀에서 스레드에게 할당되며, 스레드가 없다면 일부 스레드 풀은 새 스레드를 생성할 수도 있고, 그 외에는 이용가능한 스레드 수를 제한한다. 일부 요청에서 블록된 결과가 발생할 수 있다
- ThreadPoolExecutor 클래스는 스레드 실행에 대한 상태 정보를 전달하는 메소드를 제공한다
- 생성 메소드
- newCachedThreadPool(생성된 풀은 스레드를 재사용하며 필요한 경우 새로운 스레드를 생성)
- newFixedThreadPool(고정된 크기의 풀을 생성)
- 풀 종료
- ThreadPoolExecutor 클래스의 인스턴스가 생성되면 스레드 풀에 전달되는 새로운 작업을 수락할 것이나 풀은 자동으로 닫히지 않는다.
- 유휴상태의 경우 새 작업이 전송될 때까지 기다리며 풀을 종료하기 위해서는 shutdown, shutdownNow 메소드가 호출 되어야 한다
- shutdownNow 는 바로 풀을 종료하고 대기 중인 작업을 실행하지 않는다
- 스레드 풀 정보
- getPoolSize: 풀에서 현재 스레드의 수를 반환
- getActiveCount: 활성 스레드의 수를 반환
- getLargePoolSize: 한번에 풀에서의 최대 스레드의 수를 반환
- 생성 메소드
- 서버
- ThreadPoolExecutor 생성하여 요청 스레드 생성
System.out.println("Thread Pool Server Started"); ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(); try { ServerSocket serverSocket = new ServerSocket(5000); while(true) { System.out.println("Listening for a client connection"); Socket clientSocket = serverSocket.accept(); // 연결 요청이 올 때까지 블록된다 executor.execute(new WorkerThread(clientSocket)); } } catch (IOException e) { e.printStackTrace(); } executor.shutdown(); System.out.println("Thread Pool Server Terminated");
- 요청 스레드(WorkerThread)
public class WorkerThread implements Runnable { private static ConcurrentHashMap<String, Float> map; private Socket clientSocket; static { map = new ConcurrentHashMap<>(); map.put("Axle", 238.50f); map.put("Gear", 45.55f); map.put("Wheel", 86.30f); map.put("Rotor", 8.50f); } public WorkerThread(Socket clientSocket) { this.clientSocket = clientSocket; } @Override public void run() { System.out.println("Worker Thread Started"); try(BufferedReader br = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); PrintStream out = new PrintStream(clientSocket.getOutputStream())) { String partName = br.readLine(); out.println(map.get(partName)); System.out.println("Request for " + partName + ", price: " + map.get(partName)); clientSocket.close(); } catch (IOException e) { e.printStackTrace(); } System.out.println("Worker Thread Started"); } }
- 클라이언트는 요청별 스레드와 동일
- ThreadPoolExecutor 생성하여 요청 스레드 생성
- Callable 과 스레드 풀
- Callable, Future 인터페이스는 멀티스레드를 지원하는 또 다른 접근 방법을 제공한다
- Callable은 스레드가 결과를 반환해야 하는 스레딩을 지원하며 Runnable 인터페이스에 run 메소드를 대신해 사용되는 call 메소드 보유
- Future 인터페이스는 Callable 객체와 함께 사용
- Callable
-
다른 스레드가 생성되는 동안 call 메소드가 반환될 때까지 블록되며 Future 인터페이스를 통해 기술을 향상시킬 수 있다 ```java public class WorkerThread implements Runnable { private Socket clientSocket; public WorkerThread(Socket clientSocket) { this.clientSocket = clientSocket; } @Override public void run() { System.out.println(“Worker Thread Started”); try(BufferedReader br = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); PrintStream out = new PrintStream(clientSocket.getOutputStream())) {
String partName = br.readLine(); float price = new WorkerCallable(partName).call(); out.println(price); System.out.println("Request for " + partName + ", price: " + price); clientSocket.close(); } catch (IOException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } System.out.println("Worker Thread Started"); } }
public class WorkerCallable implements Callable
{ private static ConcurrentHashMap<String, Float> map; private String partName; static { map = new ConcurrentHashMap<>(); map.put("Axle", 238.50f); map.put("Gear", 45.55f); map.put("Wheel", 86.30f); map.put("Rotor", 8.50f); } public WorkerCallable(String partName) { this.partName = partName; } @Override public Float call() throws Exception { return map.get(partName); } } ``` -
- Future
- 완료된 call 매소드의 결과를 나타낸다
- Future 인터페이스를 사용해 Callable 객체를 호출하고 반환될 떄까지 기다리지 않을 수 없다
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(); Future<Float> future1 = executor.submit(() -> 1.0f); Future<Float> future2 = executor.submit(() -> 2.0f); try { Float price1 = future1.get(); // 1.0 Float price2 = future2.get(); // 2.0 } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
- CompletableFuture
- CompletionStage
- Callable, Future 인터페이스는 멀티스레드를 지원하는 또 다른 접근 방법을 제공한다
- HTTPServer 에서 스레드 풀 사용하기
- HTTPServer 에서는 기본적으로는 start 메소드가 호출될 때 생성된 스레드 사용
- setExecutor 메소드를 사용하여 요청을 스레드에 할당하는 방법을 지정할 수 있다.
HttpServer server = HttpServer.create(new InetSocketAddress(80), 0); server.createContext("/index", new OtherHandler(); server.setExecutor(Executors.newCachedThreadPool()); server.start();
셀렉터 사용
- Selector 는 NIO 애플리케이션에서 사용되고 하나의 스레드가 여러 채널을 처리하게 하기 때문에 스레드 전환과정에서 발생하는 오버헤드의 일부를 방지할 수 있다.
- 여러 채널과 그 이벤트를 조정하며 처리를 위해 준비된 채널을 식별한다.
- 주요 클래스
- Selector: 기본 기능 제공
- SelectionKey: 처리를 위해 준비된 이벤트의 타입 식별
- 셀렉터 사용 단계 1. 셀렉터 생성
- 생성자가 없고 static 메소드인 open() 메소드 사용
- 셀렉터를 열고 더이상 필요하지 않을 때 닫기 위한 close() 사용, isOpen() 메소드도 있다
- 셀렉터 사용 단계 2. 채널을 셀렉터에 등록
- register 메소드로 채널을 등록하며 등록되는 모든 채널은 논블로킹 모드여야 한다
socketChannel.configureBlocking(false); // 셀렉터에 채널을 등록하기 위해서는 반드시 해당 채널이 non-blocking 모드로 변환 // 파라미터: 등록을 위한 셀렉터, 관심있는 이벤트 타입, 채널과 관련된 데이터 socketChannel.register(selector, SelectionKey.OP_WRITE, null);
- SelectionKey
- 이벤트 타입(interst sets: 관심 세트)
- Connect(SelectionKey.OP_CONNECT): 채널이 성공적으로 서버에 연결되었음을 나타낸다.
- Accept(SelectionKey.OP_ACCEPT): 서버 소켓 채널은 클라이언트의 연결 요청을 받아들일 준비가 돼 있음을 나타낸다.
- Read(SelectionKey.OP_READ): 채널은 읽기 준비가 된 데이터를 갖고 있음을 나타낸다
- Write(SelectionKey.OP_WRITE): 채널이 쓰기 작업을 위한 준비가 돼 있음을 나타낸다.
- OR 연산자를 통해 조합하여 생성할 수 있으며 register 메소드를 통해 다수의 유용한 속성을 포함하는 SelectionKey 인스턴스를 반환한다.
int interstSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE; SelectionKey key = socketChannel.register(selector, interstSet);
- 속성
- interst set: 관심있는 이벤트 포함
- ready set: 채널이 처리할 준비된 작업들의 집합
- Channel: channel 메소드는 셀렉션 키와 연관된 채널을 반환
- Selector: selector 메소드는 셀렉션 키와 연결된 셀렉터를 반환
- Attached objects: 자세한 정보는 attach 메소드를 사용해 첨부될 수 있으며 attachment 메소드는 이 객체에 접속을 위해 나중에 사용된다
- 이벤트가 준비된 것을 확인하기 위한 메소드
- readOps: 준비 이벤트를 포함하는 정수 반환
- interstOps: 관심있는 이벤트를 나타내는 정수 반환
- isAcceptable: 수락 이벤트가 준비돼 있음을 나타낸다
- isConnectable: 연결 이벤트가 준비돼 있음을 나타낸다
- isReadable: 읽기 이벤트가 준비돼 있음을 나타낸다
- isWritable: 쓰기 이벤트가 준비돼 있음을 나타낸다
- 셀렉터를 이용해 채널 선택
- 셀렉터에 하나 이상의 채널을 등록한 후에는 select() 메소드를 호출할 수 있다
- accept, connect, read, write 이벤트에 대해 준비(ready) 되어 있는 채널을 반환
- select(): 등록한 이벤트에 대해 하나 이상의 채널이 준비 될 때까지 block. 몇개의 채널이 준비되었는지 준비된 채널의 수를 반환합니다.
- select(long timeout): 최대 timeout(ms) 동안만 block한다는 점을 제외하면 select()와 동일
- selectNow(): select와 달리 block하지 않고 준비된 채널이 있으면 즉시 반환됩니다.
- 이벤트 타입(interst sets: 관심 세트)
- register 메소드로 채널을 등록하며 등록되는 모든 채널은 논블로킹 모드여야 한다
- 셀렉터 사용 단계 3. 사용 가능할 때 사용하기 위한 채널 선택
- 서버
public class SelectorTimeServer { private static Selector selector; public static void main(String[] args) { System.out.println("Time Server Started"); try { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(5000)); selector = Selector.open(); // 셀렉터 생성 및 스레드 실행 while(true) { SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("Socket Channel Accepted - " + socketChannel); if(socketChannel != null) { // 스레드는 select 메소드에 의해 블록될 수 있다 - wakeup 메소드를 사용하면 select 메소드가 즉시 반환되므로 register 메소드가 차단을 해제할 수 있다. socketChannel.configureBlocking(false); // 해당 메소드 호출 필요 selector.wakeup(); // 셀렉터는 시작 socketChannel.register(selector, SelectionKey.OP_WRITE, null); // 채널 셀렉터에 등록 new Thread(new SelectorHandler()).start(); } } } catch (ClosedChannelException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } System.out.println("Time Server Terminated"); } private static class SelectorHandler implements Runnable { @Override public void run() { while(true) { try { System.out.println("About to select ..."); int readyChannels = selector.select(); if(readyChannels == 0) { continue; } Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()){ SelectionKey selectionKey = iterator.next(); if(selectionKey.isAcceptable()) { // 연결 가능 } else if(selectionKey.isConnectable()) { // 연결 성립 } else if(selectionKey.isReadable()) { // 읽을 준비가 된 채널 } else if(selectionKey.isWritable()) { sendMessage(selectionKey); } Thread.sleep(1000); iterator.remove(); } } catch (IOException | InterruptedException e) { e.printStackTrace(); } } } private void sendMessage(SelectionKey key) throws IOException { String message = "Date: " + LocalDateTime.now().toString(); ByteBuffer buffer = ByteBuffer.allocate(64); buffer.put(message.getBytes()); buffer.flip(); SocketChannel socketChannel = null; while(buffer.hasRemaining()) { socketChannel = (SocketChannel) key.channel(); socketChannel.write(buffer); } System.out.println("SENT Message: " + message); } } }
- 클라이언트
Random random = new Random(); SocketAddress address = new InetSocketAddress("127.0.0.1", 5000); try(SocketChannel socketChannel = SocketChannel.open(address)) { while(true) { ByteBuffer buffer = ByteBuffer.allocate(64); int byteRead = socketChannel.read(buffer); while(byteRead > 0) { buffer.flip(); while(buffer.hasRemaining()) { System.out.print((char) buffer.get()); } System.out.println(); byteRead = socketChannel.read(buffer); } Thread.sleep(random.nextInt(1000) + 1000); } } catch (ClosedChannelException e) { e.printStackTrace(); } catch (IOException | InterruptedException e) { e.printStackTrace(); }
- 서버
네트워크 타임아웃 설정
- 소켓 통신의 일부 제어를 여러 소켓 옵션이 있다.
- SO_TIMEOUT 옵션은 읽기 작업에 대한 타임아웃을 설정하는 데 사용되며 지정된 시간이 경과하면 SocketTimeoutException 예외 발생
- 아래와 같은 방법으로 타임아웃 시간을 지정할 수 있다.
socket.setTimeout(3000); // 3
출처
- 에이콘 출판사. 자바 네트워크 프로그래밍. 리차드 리스 지음, 유연재 옮김 (http://www.yes24.com/Product/Goods/34894821)