chap12. 웹소켓

웹소켓 소개
  • WebSocket 프로토콜은 웹의 양방향 데이터 전송 문제에 대한 실용적인 솔루션을 제공하기 위해 클라이언트/서버가 언제든 메시지를 전송할 수 있게 허용
  • 결과적으로 메시지 수신을 비동기적으로 처리하게 요구하도록 새롭게 설치
웹소켓 애플리케이션
  • 채팅방 작동 방식
    • 클라이언트가 서버에 연결하고 채팅에 참여
    • 채팅 메시지가 웹소켓을 통해 교환되며 양방향으로 전송
    • 서버가 모든 클라이언트를 처리
웹소켓 지원 추가
  • 예제
    • / 로 요청: index.html 전송
    • /ws 로 요청: 서버가 웹소켓 업그레이드 수행, 연결이 업그래이드 되면 모든 데이터를 웹소켓을 이용해 전송
    • 요청 방식
      • 채팅방 클라이언트가 HTTP 요청(/ 또는 /ws)
      • 채팅방 서버는 / 요청인 경우 HTTP 를 통해 index.html wjsthd
      • 채팅방 서버는 /ws 요청인 경우 웹소켓 업그레이드 수행(HTTP를 통해 웹소켓 핸드셰이크 작업)
      • 업그레이드 완료 후 서버가 웹소켓을 통해 메시지 전송
  • HTTP 요청 처리시
    public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> { // FullHttpRequest를 처리하기 위한 핸들러
        private final String wsUri;
        private static final File INDEX;
    
        static {
            URL location = HttpRequestHandler.class.getProtectionDomain()
                    .getCodeSource().getLocation();
            try {
                String path = location.toURI() + "index.html";
                path = !path.contains("file:") ? path : path.substring(5);
                INDEX = new File(path);
            } catch (URISyntaxException e) {
                throw new IllegalStateException("Unable to locate index.html", e);
            }
        }
    
        public HttpRequestHandler(String wsUri) {
            this.wsUri = wsUri;
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
            if(wsUri.equalsIgnoreCase(msg.getUri())) {
                ctx.fireChannelRead(msg.retain()); // /ws 를 참조하는 경우 FullHttpRequest retain을 호출한 후 fileChannelRead(msg)를 호출해 다음 ChannelInboundHandler 전달
            } else { // / index.html 전달
                if(HttpHeaders.is100ContinueExpected(msg)) { // 100-continue에 경우 100 Continue 응답 전송
                    send100Continue(ctx);
                }
                RandomAccessFile file = new RandomAccessFile(INDEX, "r"); // index.html을 읽는다
                HttpResponse response = new DefaultHttpResponse(msg.getProtocolVersion(), HttpResponseStatus.OK);
                response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain;charset=UTF-8");
                if(HttpHeaders.isKeepAlive(msg)) { // keep-alive 요청이 있는 경우 필요한 헤더 추가
                    response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
                    response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
                }
                ctx.write(response); // HttpResponse를 클라이언트로 기록
                if(ctx.pipeline().get(SslHandler.class) == null) {
                    ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
                } else {
                    ctx.write(new ChunkedNioFile(file.getChannel()));
                }
                ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); // LastHttpContent 를 클라이언트에 기록하고 플러시
                if(!HttpHeaders.isKeepAlive(msg)) { // keep-alive 요청이 없는 경우 Channel을 닫음
                    future.addListener(ChannelFutureListener.CLOSE);
                }
            }
        }
        private void send100Continue(ChannelHandlerContext ctx) {
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
            ctx.writeAndFlush(response);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
  • 웹소켓 프레임 처리
    public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
        private final ChannelGroup channelGroup;
        public TextWebSocketFrameHandler(ChannelGroup channelGroup) {
            this.channelGroup = channelGroup;
        }
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // 커스텀 이벤트를 처리하기 위해 userEventTriggered() 재정의
            if(evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
                ctx.pipeline().remove(HttpRequestHandler.class); // 이벤트가 핸드세이크 성공을 의미하는 경우 HTTP Message는 더이상 수신하지 않으므로 ChannelPipeline에서 HttpRequestHandler 제거
                channelGroup.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined.")); // 연결된 모든 웹소켓 클라이언트에 새로운 클라이언트가 연결된 것을 알림
                channelGroup.add(ctx.channel()); // 모든 메시지를 수신할 수 있게 새로운 웹소켓 채널을 추가
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
            channelGroup.writeAndFlush(msg.retain()); // 메시지의 참조 카운트를 증가시키고 ChannelGroup에 연결된 모든 클라이언트로 기록
        }
    }
    
    • WebSocketFrame 형식
      • BinaryWebSocketFrame: 이진 데이터 포함
      • TextWebSocketFrame: 텍스트 데이터 포함
      • ContinuationWebSocketFrame: 이진 데이터 또는 텍스트 데이터 포함
      • CloseWebSocketFrame: CLOSE 요청을 나타내며, 닫기 상태 코드와 구문 포함
      • PingWebSocketFrame: PongWebSocketFrame 전송 요청
      • PongWebSocketFrame: PingWebSocketFrame 에 대한 응답 전송
  • ChannelPipeline 초기화
    public class ChatServerInitializer extends ChannelInitializer<Channel> {
        private final ChannelGroup channelGroup;
        public ChatServerInitializer(ChannelGroup channelGroup) {
            this.channelGroup = channelGroup;
        }
    
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new HttpServerCodec());
            pipeline.addLast(new ChunkedWriteHandler());
            pipeline.addLast(new HttpObjectAggregator(64 * 1024));
            pipeline.addLast(new HttpRequestHandler("/ws"));
            pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
            pipeline.addLast(new TextWebSocketFrameHandler(channelGroup));
        }
    }
    
    • pipeline 구성
      • HttpServerCodec: 바이트를 HttpRequest, HttpContent, LastHttpContent로 디코딩, HttpRequest, HttpContent, LastHttpContent를 바이트로 인코딩
      • ChunkedWriteHandler: 파일의 내용을 기록
      • HttpObjectAggregator: HttpMessage 및 해당하는 후속 HttpContent를 집계해 요청이나 응답을 처리하는지에 따라 단일 FullHttpRequest, FullHttpResponse를 생성한다. 이는 파이프라인의 다음 ChannelHandler는 완전한 HTTP 요청만 받는다
      • HttpRequestHandler: /ws URL로 보내지 않는 요청에 해당하는 FullHttpRequest를 처리
      • WebSocketServerProtocolHandler: 웹소켓 사양에서 요구하는 대로 웹소켓 업그레이드 핸드셰이크 PingWebSocketFrame, PongWebSocketFrame, CloseWebSocketFrame 처리
      • TextWebSocketFrameHandler: TextWebSocketFrame 및 핸드셰이크 완료 이벤트 처리
  • Bootstrap
    public class ChatServer {
        private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); // 연결된 모든 웹소켓 채널을 포함할 DefaultChannelGroup 생성
        private final EventLoopGroup group = new NioEventLoopGroup();
        private Channel channel;
    
        public ChannelFuture start(InetSocketAddress address) {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(group)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(createInitializer(channelGroup));
            ChannelFuture future = serverBootstrap.bind(address);
            future.syncUninterruptibly();
            channel = future.channel();
            return future;
        }
    
        protected ChannelHandler createInitializer(ChannelGroup channelGroup) { // ChatServerInitializer 생성
            return new ChatServerInitializer(channelGroup);
        }
    
        public void destroy() {
            if(channel != null) channel.close();
            channelGroup.close();
            group.shutdownGracefully();
        }
    
        public static void main(String[] args) {
            if(args.length != 1) {
                System.err.println("Please give port as argument");
                System.exit(1);
            }
            int port = Integer.parseInt(args[0]);
            final ChatServer endpoint = new ChatServer();
            ChannelFuture future = endpoint.start(new InetSocketAddress(port));
            Runtime.getRuntime().addShutdownHook(new Thread(){
                @Override
                public void run() {
                    endpoint.destroy();
                }
            });
            future.channel().closeFuture().syncUninterruptibly();
        }
    }
    
암호화
  • 암호화를 위해서 ChannelPipeline에 SslHandler 를 추가하면 된다.
  • ChannelPipeline에 암호화 추가
    public class SecureChatServerInitializer extends ChatServerInitializer {
        private final SslContext sslContext
        public SecureChatServerInitializer(ChannelGroup group, SslContext sslContext) {
            super(group);
            this.context = context;
        }
        @Override
        public void initChannel(Channel ch) throws Exception {
            super.initChanne(ch);
            SSLEngine engine = context.newEngine(ch.alloc());
            ch.pipeline().addFirst(new SslHandler(engine));
        }
    }
    
  • ChatServer에 암호화 추가
    public class SecureChatServer extends ChatServer {
        private final SslContext sslContext;
        public SecureChatServer(SslContext sslContext) {
            this.context = context;
        }
        @Override
        public ChannelInitializer<Channel> createInitializer(ChannelGroup gruop) {
            return new SecureChatServer(group, context);
        }
        public static void min(String[] args) throws Exception {
            if(args.length == 1) {
                System.err.println("Please give port as argument");
                System.exit(1);
            }
            int port = Integer.parseInt(args[0]);
            SelfSignedCertificate cert = new SelfSignedCertificate();
            SslContext context = SslContext.newServerContext(cert.certificate(), cert.privateKey());
            final SecureChatServer endpoint = new SecureChatServer(context);
            ChannelFuture future = endpoint.start(new InetSocketAddress(port));
            Runtime.getRuntime().addShutdownHook(new Thread() {
                @Override
                public void run() { endpoint.destroy(); }
            });
            future.channel().closeFuture().syncUninterruptibly();
        }
    }
    

chap13. UDP를 이용한 이벤트 브로드캐스팅

UDP 기본사항
  • UDP같은 비연결 프로토콜에는 안정적 연결(핸드셰이크)라는 개념이 없고 각 메시지(UDP Datagram) 이 독립적으로 전송된다
  • UDP에는 TCP의 오류 수정 메커니즘(피어가 수신한 패킷을 승인하며 스인하지 않은 패킷을 전송자가 재전송함)이 없다
  • 하지만 핸드셰이크, 메시지 관리가 배제되므로 TCP보다 훨씬 빠르며 메시지 손실을 처리하거나 용인할 수 있는 애플리케이션에 적합하다
UDP 브로드캐스트
  • UDP는 메시지를 여러 수신자에게 전송하는 추가 전송 모드 제공   - 멀티캐스트: 정의된 호스트 그룹으로 전송   - 브로드캐스트: 네트워크 상의 모든 호스트로 전송
UDP 예제 애플리케이션
  • 예제
    • Broadcaster 가 새로운 파일 내용 수신
    • UDP를 통해 이벤트를 브로드 캐스팅
    • 이벤트 모니터가 내용을 수신하고 표시
  • 메시지(LogEvent)
    public final class LogEvent {
        public static final byte SEPERATOR = (byte)':';
        private final InetSocketAddress source;
        private final String logfile;
        private final String msg;
        private final long received;
    
        public LogEvent(String logfile, String msg) {
            this(null, -1, logfile, msg);
        }
        public LogEvent(InetSocketAddress source, long received, String logfile, String msg) {
            this.source = source; this.received = received; this.logfile = logfile; this.msg = msg;
        }
    
        // getter 
    }
    
  • 브로드캐스트 작성
    • interface AddressedEnvelope<M, A extends SocketAddress> extends ReferenceCounted
      • 다른 메시지를 래핑하는 메시지를 발신자 및 수신자 주소와 함께 래핑한다. M은 message 형식, A는 Address 형식이다
    • interface DefaultAddressedEnvelope<M,A extends SocketAddress> implements AddressedEnvelope<M,A>
      • AddressEnvelope 인터페이스 기본 구현 제공
    • interface DatagramPacket extends DefaultAddressedEnvelope<ByteBuf, InetSocketAddress> implements ByteBufHolder
      • ByteBuf 를 메시지 데이터 컨테이너로 이용하기 위해 DefaultAddressedEnvelope 확장
    • interface DatagramChannel extends Channel
      • UDP 멀티캐스팅 그룹 관리를 지원하기 위해 네티의 Channel 추상화를 확장한다.
    • interface NioDatagramChannel extends AbstractNioMessageChannel implements DatagramChannel
      • AddressedEnvelope 메시지를 전송 및 수신할 수 있는 Channel 형식을 정의   - ChannelPipeline에서 LogEvent 흐름     - LogEvent -> LogEventEncoder -> DatagramPacket 으로 변환되어야 한다
    • LogEventDecoder
      public class LogEventEncoder extends MessageToMessageDecoder<LogEvent> {
          private final InetSocketAddress remoteAddress;
          public LogEventEncoder(InetSocketAddress remoteAddress) {
              this.remoteAddress = remoteAddress;
          }
          @Override
          public void encode(ChannelHandlerContext ctx, LogEvent logEvent, List<Object> out) throws Exception {
              byte[] file = logEvent.getLogfile().getBytes(CharsetUtils.UTF_8);
              byte[] msg = logEvent.getMsg().getBytes(CharsetUtils.UTF_8);
              ByteBuf buf = ctx.alloc().buffer(file.length + msg.length + 1);
              buf.writeBytes(file); // 파일이름 ByteBuf 에 기록
              buf.writeByte(LogEvent.SEPERATOR); // SEPERATOR 추가
              buf.writeBytes(msg); // 로그 메시지 추가
              out.add(new DatagramPacket(buf, remoteAddress));
          }
      }
      

        

    • LogEventBroadcaster
      public class LogEventBroadcaster {
        private final EventLoopGroup group;
        private final Bootstrap bootstrap;
        private final File file;
            
        public LogEventBroadcaster(InetSocketAddress, File file) {
            group = new NioEventLoopGroup();
            bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioDatagramChannel.class)
                    .option(ChannelOption.SO_BROADCAST, true)
                    .handler(new LogEventEncoder(address));
            this.file = file;
        }
              
        public void run() throws Exception {
            Channel channel = bootstrap.bind(0).sync().channel(); // 채널 바인딩
            long pointer = 0;
            while(true) { // 주 처리 루프 시작
                long length = file.length(); 
                if(length < pointer) { // 파일 재설정됨
                    pointer = length; // 필요한 경우 파일의 마지막 바이트를 가리키도록 파일 포인터로 설정
                } else if(length > pointer) { // 컨텐츠가 추가됨
                    RemoteAccessFile raf = new RandomAccessFile(file, "r");
                    raf.seek(pointer); // 이전 항목이 전송되지 않도록 현재 파일포인터로 설정
                    String line;
                    while( (line = raf.readLine()) != null) {
                        channel.writeAndFlush(new LogEvent(null, -1, file.getAbsolutePath(), line)); // 각 로그 항목에 대해 LogEvent 하나를 채널에 기록
                    }
                    pointer = raf.getFilePointer(); // 파일 내 현재 위치 저장
                    raf.close();
                }
                try {
                    Thread.sleep(1000); // 1초 대기하고 중단될 경우 루프 종료
                } catch(InterruptedException e) {
                    Thread.interrupted(); break;
                }
            }
        }
        public void stop() {
            group.shuntdownGracefully();
        }  
        public static void main(String[] args) throws Exception {
            if(args.length != 2) {
                throw new IllegalArgumentException();
            }
            LogEventBroadcaster broadcaster = new LogEventBroadcaster(new InetSocketAddress("255.255.255.255"), Integer.parseInt(args[0]), new File(args[1]));
           try {
                broadcaster.run();
            } finally {
                broadcaster.stop();
            }
        }
      }
      
  • 모니터 작성
    • 역할
      • LogEventBroadcaster 가 브로드캐스팅하는 UDP DatagramPacket 을 수신하는 역할을 한다
      • 이를 LogEvent 메시지로 디코딩하는 역할을 한다
      • LogEvent 메시지를 System.out으로 출력하는 역할을 한다
    • ChannelPipeline에는 DatagramPacket, LogEvenDecoder, LogEvent, LogEventHandler 순으로 처리된다
    • LogEventDecoder
      public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {
          @Override
          public void decode(ChannelHandlerContextctx, DatagramPacket datagramPacket, List<Object> out) throws Exception {
              ByteBuf data = datagramPacket.data();
              int index = data.indexOf(0, data.readableBytes(), LogEvent.SEPERATOR);
              String filename = data.slice(0, idx).toString(CharsetUtil.UTF_8);
              String logMsg = data.slice(idx + 1, data.readableBytes()).toString(CharsetUtil.UTF_8);
              LogEvent event = new LogEvent(datagramPacket.remoteAddress(), System.currentTimeMillis(), filename, logMsg);
              out.add(event);
          }
      }
      
    • LogEventHandler
      public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> {
          @Override
          public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
              cause.printStackTrace();
              ctx.close();
          }
          @Override
          public void channelRead0(ChannelHandlerContext ctx, LogEvent event) throws Exception {
              StringBuilder builder = new StringBuilder();
              builder.append(event.getReceivedTimestamp()).append(" [")
                  .append(event.getSource().toString()).append("] [")
                  .append(event.getLogfile()).append("] : ").append(event.getMsg());
              System.out.println(builder.toString());
          }
      }
      
    • LogEventMonitor
      public class LogEventMonitor {
          private final EventLoopGroup group;
          privte final Bootstrap bootstrap;
          public LogEventMonitor(InetSocketAddress address) {
              group = new NioEventGroup();
              bootstrap.group(group)
                               .channel(NioDatagramChannel.class)
                               .option(ChannelOption.SO_BROARDCAST, true)
                               .handler(new ChannelInitializer<Channel>() {
                                    @Override
                                    proteted void initChannel(Channel channel) throws Exception{
                                        ChannelPipeline pipeline = channel.pipeline();
                                        pipeline.addLast(new LogEventDecoder());
                                        pipeline.addLast(new LogEventHandler());
                                    }
                                }).localAddress(address);
          }
          public Channel bind() {
              return bootstrap.bind().sync().channel(); // 채널 바인딩. 여기서 DatagramChannel은 비연결
          }
          public void stop() {
              group.shutdownGracefully();
          }
          public static void main(String[] args) thorws Exception {
              if(args.length == 1)
                  throw new IllegalArgumentException("Usage: LogEventMonitor<Port>");
              LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(args[0]));
              try {
                  Channel channel = monitor.bind();
                  channel.closeFuture().sync();
              } finally {
                  monitor.stop();
              }
          }
      }
      

      출처

  • Netty in action(위키북스): 노먼 아운로, 마빈 알렌 울프탈 지음