chap 10. 코덱 프레임워크

코덱이란
  • 모든 네트워크 어플리케이션은 피어 간에 전송되는 원시 바이트를 대상 프로그램의 데이터 포맷으로 구문 분석하고 변환하는 방법을 정의해야 한다
  • 이러한 변환은 인코더와 디코더로 구성된 코덱애 의해 처리
  • 인코더는 메시지를 전송하기에 적합한 형식(대부분 바이트 스트림)으로 변환
  • 디코더는 네트워크 스트림을 다시 프로그램의 메시지 포맷으로 변환
디코더
  • 인바운드 데이터를 ChannelPipelines 내에 ChannelInboundHandler를 위해 변환할 때 이용
  • ByteToMessageDecoder
    public class ToIntegerDecoder extends ByteToMessageDecoder {
        @Override
        public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            if(in.readByes() >= 4) {
                out.add(in.readInt());
            }
        }
    }
    
    • 바이트 스트림을 메시지로 디코딩하는 작업
    • 원격 피어가 완성된 메시지를 한번에 보낼지는 알 수 없으므로 이 클래스는 인바운드 데이터가 처리할 만큼 모일 떄까지 버퍼에 저장
    • decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
      • 구현해야 하는 유일한 추상 메서드로서 들어오는 데이터가 포함된 ByteBuf와 디코딩된 메시지가 추가될 List를 받는다
      • 이 호출은 더 이상 List에 추가할 새로운 항목이 없거나 ByteBuf 에 읽을 바이트가 없을 때까지 반복
      • 그 이후 List가 비어 있지 않은 경우 그 내용이 파이프라인의 다음 핸들러로 전달
    • decodeLast(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out)
      • Channel이 비활성화될 때 한번 호출
  • ReplayingDecoder
    • 바이트 스트림을 메시지로 디코딩
    • ByteToMessageDecoder 를 확장하며, readableBytes() 메서드를 호출할 필요를 없애준다.
    • 이를 위해 들어오는 ByteBuf 를 커스텀 ByteBuf 구현인 ReplayingDecoderBuffer로 매핑하여 내부적으로 호출 수행   - ReplayingDecoderBuffer
      • 모든 ByteBuf 작업이 지원되는 것은 아니며 지원되지 않는 메서드를 호출하면 UnsupportedOperationException 발생
      • ReplayingDecoder 는 ByteToMessageDecoder 보다 약간 느리다
  • MesssageToMessageDecoder<I>

    public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer> throws Exception {
        @Override
        public void decode(ChannelHandlerContext ctx, Integer in, List<Object> out) {
            out.add(String.valueOf(in));
        }
    }
    
    • 메시지를 다른 메시지 유형으로 디코딩
    • decode(ChannelHandlerContext ctx, I msg, List<Object> out)
      • 인바운드 메시지를 다른 포맷으로 디코딩할 때마다 호출. 디코딩된 메시지는 파이프라인의 다음 ChannelInboundHandler로 전달
  • HttpObjectDecoder, LineBasedFrameDecoder 등 다양한 디코더등이 존재

  • TooLongFrameException
    • 디코딩할 수 있을때까지 바이트를 메모리 버퍼에 저장해야 하는데, 디코더가 메모리를 소진할 만큼 많은 데이터를 저장하지 않게 해야 한다   - 지정한 크기를 초과하면 발생하는 TooLongFrameException 발생
인코더
  • ChannelOutboundHandler 를 구현하고 아웃바운드 데이터를 다른 포맷으로 변환한다
  • MessageToByteEncoder<I>
    public class ShortToByteEncoder extends MessageToByteEncoder<Short> {
        @Override
        public void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out) throws Exception {
            out.write(msg);
        }
    } 
    
    • encode(ChannelHandlerContext ctx, I msg, ByteBuf out)     - encode 메서드는 구현해야 하는 유일한 추상 메서드이며, ByteBuf 로 인코딩할 아웃바운드 메시지를 전달하고 호출한다.     - 그런 다음 ByteBuf 는 파이프라인 내의 대음 ChannelOutboundHandler로 전달
  • MessageToMessageEncoder<I>
    public class IntegerToStringEncoder extends MessageToMessageEncoder<Integer> {
        @Override
        public void encode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
            out.add(String.valueOf(msg));
        }
    }
    
    • encode(ChannelHandlerContext ctx, I msg, List<Object> out)
      • encode()는 구현해야 하는 유일한 메서드로 write() 로 기록한 메시지는 encode()로 전달된 후 하나 이상의 아웃바운드 메시지로 인코딩     - 그런 다음 파이프라인 내의 다음 ChannelOutboundHandler 로 전달
추상 코덱 클래스
  • 추상 코덱 클래스는 디코더/인코더를 한 쌍으로 묶어 두 작업을 함께 처리할 수 있다
  • ChannelInboundHandler, ChannelOutboundHandler 모두 구현한다
  • ByteToMessageCodec
    • 바이트를 일종의 메시지로 디코딩한 후 다시 인코딩해야 하는 경우, ByteToMessageDecoder, MessageToByteEncoder 결합
    • decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
    • decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
    • encode(ChannelHandlerContext ctx, I msg, ByteBuf out)
  • MessageToMessageCodec
    public class WebSocketConvertHandler extends MessageToMessageCodec<WebSocketFrame, WebSocketConvertHandler.MyWebSocketFrame> {
        @Override
        protected void encode(ChannelHandlerContext ctx, WebSocketConvertHandler.MyWebSocketFrame msg, List<Object> out) throws Exception {
            ByteBuf payload = msg.getData().duplicate().retain();
            switch(msg.getType()) {
                case BINARY: out.add(new BinaryWebSocketFrame(payload)); break;
                //...
                default: throw new IllegalStateException("Unsupported websocket msg " + msg); 
            }
        }
          
        @Override
        protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {
            ByteBuf payload = msg.getData().duplicate().retain();
            if(msg instanceOf BinaryWebSocketFrame) {
                out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.BINARY, payload)); 
            } else if(...) {
              
            } else {
                throw new IllegalStateException("Unsupported websocket msg " + msg); 
            }
        }
    }
    
    • 메시지 포맷을 다른 포맷으로 변환하기 위해 MessageToMessageEncoder를 확장했다
    • MessageToMessageCodec을 이용하면 한 클래스에서 왕복 작업을 해결할 수 있다.
      public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>
      
    • protected abstract decode(ChannelHandlerContext ctx, INBOUNT_IN msg, List<Object> out)
    • protected abstract encode(ChannelHandlerContext ctx, OUTOBOUND_IN msg, List<Object> out)
  • CombinedChannelDuplexHandler
    public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O extends ChannelOutboundHandler>
    
    • decoder, encoder를 결합하면 재사용성이 저하되지만 CombinedChannelDuplexHandler를 이용하면 해결할 수 있다
    • 디코더와 인코더 클래스를 확장하는 형식을 제공함으로써 추상 코덱 클래스를 직접 확장하지 않고 코덱을 구현할 수 있다

chap 11. 네티에서 제공하는 ChannelHandler와 코덱

SSL/TLS를 이용한 네티 애플리케이션 보안
  • 자바는 SSL/TLS를 지원하기 위해 javax.net.ssl 패키지를 지원하며 해당 패키지의 SSLContext, SSLEngine 클래스를 이용할 수 있다
  • 네티는 SSLEngine 을 통해 실제 작업을 하는 SslHandler라는 ChannelHandler 구현을 통해 해당 API를 활용
    • 인바운드: SslHandler가 암호화된 인바운드 데이터를 가로채고, SslHandler 가 데이터를 복호화하여 인바운드로 보냄
    • 아웃바운드: 아웃바운드 데이터가 SslHandler를 통해 전달되며 SslHandler가 데이터를 암호화하고 아웃바운드로 보낸다
  • SslHandler를 ChannelPipeline에 추가하기 위해 ChannelInitializer를 이용하는 방법이 있다
    • SslHandler는 ChannelPipeline의 첫번째 ChannelHandler로 등록하여 모든 ChannelHandler에서 암호화가 수행할 수 있게 한다
      public class SslChannelInitializer extends ChannelInitializer<Channel> {
        private final SslContext context;
        private final boolean startTls;
            
        public SslChannelInitializer(SslContext context, boolean startTls) {
            this.context = context; // 이용할 SslContext 전달 받는다
            this.startTls = startTls; // true인 경우 처음 기록된 메시지가 암호화되지 않는다
        }
            
        @Override
        protected void initChannel(Channel ch) throws Exception {
            SSLEngine engine = context.newEngine(ch.alloc()); // 각 SslHandler 인스턴스마다 Channel의 ByteBufAllocator를 이용해 SslContext에서 새로운 SSLEnging을 얻음
            ch.pipeline().addFirst("ssl", new SslHandler(engine, startTls)); // SslHandler를 첫번째 핸들러로서 파이프라인에 추가
        }
      }
      
  • 메소드
    • setHandshakeTimeout(long, TimeUnit), setHandshakeTimeoutMillis(long), getHandshakeTimeoutMillis()
      • 핸드셰이크 ChannelFuture에 작업 실패를 알릴 시간 만료 값을 설정하거나 가져온다
    • setCloseNotifyTimeout(long, TimeUnit), setCloseNotifyTimeoutMillis(long), getCloseNotifyTimeoutMillis()
      • 핸드셰이크가 완료되면 알릴 ChannelFuture 반환, 핸드셰이크가 이전에 실행될 경우 이전 핸드셰이크의 결과를 포함하는 ChannelFuture 반환
    • close(), close(ChannelPromise), close(ChannelHandlerContext, ChannelPromise)
      • 기반 SslEngine을 닫고 삭제하도록 요청하는 close_notify 전송
네티 HTTP/HTTPS 애플리케이션 개발
  • HTTP decoder, encoder, codec
    public class HttpPipelineInitializer extends ChannelInitializer<Channel> {
        private final boolean isClient;
        public HttpPipelineInitializer(boolean isClient) {
            this.isClient = isClient;
        }
    
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            if(isClient) {
                pipeline.addLast("decoder", new HttpResponseDecoder());
                pipeline.addLast("encoder", new HttpRequestEncoder());
            } {
                pipeline.addLast("decoder", new HttpRequestDecoder());
                pipeline.addLast("encoder", new HttpResponseEncoder());
            }
        }
    }
    
    • HTTP 요청/응답은 둘 이상의 데이터 파트로 구성되며 LastHttpContext 파트로 끝나며 모든 HTTP Message 형식은 HttpObject 인터페이스를 구현한다
    • 요청(FullHttpRequest) 구조: HttpRequest(HTTP 요청의 첫번째 파트 헤더) + HttpContent(하나 이상이 올 수 있으며 데이터가 들어있다) + LastHttpContent(요청의 끝임을 표시하며 후행 헤더 포함할 수 있다)
    • 응답(FullHttpResponse) 구조: HttpResponse(HTTP 응답의 첫번째 파트 헤더) + HttpContent(하나 이상이 올 수 있으며 데이터가 들어있다) + LastHttpContent(요청의 끝임을 표시하며 후행 헤더 포함할 수 있다)
    • 디코더와 인코더
      • HttpRequestDecoder, HttpResponseDecoder: 바이트를 HttpRequest+HttpContent+LastHttpContent, HttpResponse+HttpContent+LastHttpContent를 디코딩한다
      • HttpRequestEncoder, HttpResponseEncoder: HttpRequest+HttpContent+LastHttpContent, HttpResponse+HttpContent+LastHttpContent를 바이트로 인코딩한다
  • HTTP Message 집합체
    public class HttpAggregatorInitializer extends ChannelInitializer<Channel> {
        private final boolean isClient;
        public HttpAggregatorInitializer(boolean isClient) {
            this.isClient = isClient;
        }
    
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            if(isClient) {
                pipeline.addLast("codec", new HttpClientCodec());
            } {
                pipeline.addLast("codec", new HttpServerCodec());
            }
            pipeline.addLast("aggregator", new HttpObjectAggregator(512 * 1024));
        }
    }
    
    • HTTP 요청과 응답은 여러 파트로 구성될 수 있으므로 먼저 이런 파트를 연결하여 완성된 메시지를 만들어야 한다
    • 네티에서는 이런 반복 작업을 줄이기 위해 메시지 파트를 FullHttpRequest, FullHttpResponse 메시지로 병합하는 집계자(Aggregator) 제공
  • HTTP 압축
    public class HttpCompressionInitializer extends ChannelInitializer<Channel> {
        private final boolean isClient;
        public HttpCompressionInitializer(boolean isClient) {
            this.isClient = isClient;
        }
    
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            if(isClient) {
                pipeline.addLast("codec", new HttpClientCodec());
                pipeline.addLast("decompressor", new HttpContentDecompressor());
            } {
                pipeline.addLast("codec", new HttpServerCodec());
                pipeline.addLast("compressor", new HttpContentCompressor());
            }
        }
    }
    
    • 압축을 적용해 전송되는 데이터의 크기를 최대한 줄이는 것이 좋다
    • CPU 사용률이 증가하지만 일반적으로는 이익이며 텍스트 데이터의 굥우 효과가 크다
    • gzip, deflate 인코딩을 지원하는 압축과 압축 해제를 위한 ChannelHandler 구현 제공
  • HTTPS 이용
    public class HttpsCodecInitializer extends ChannelInitializer<Channel> {
        private final SslContext sslContext;
        private final boolean isClient;
    
        public HttpsCodecInitializer(SslContext sslContext, boolean isClient) {
            this.sslContext = sslContext;
            this.isClient = isClient;
        }
    
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            SSLEngine engine = sslContext.newEngine(ch.alloc());
            pipeline.addFirst("ssl", new SslHandler(engine));
    
            if(isClient) {
                pipeline.addLast("codec", new HttpClientCodec());
            } else {
                pipeline.addLast("codec", new HttpServerCodec());
            }
        }
    }
    
  • 웹소켓
    • 애플리케이션에 웹소켓 지원을 추가하기 위해서는 파이프라인에 적절한 클라이언트/서버 웹소켓 ChannelHandler 추가하면 된다
    • 해당 클래스는 웹소켓이 정의하는 프레임이라는 특수한 메시지 형식 처리
      • BinaryWebSocketFrame: 이진 데이터 프레임
      • TextWebSocketFrame: 텍스트 데이터 프레임
      • ContinuationWebSocketFrame: 이전 BinaryWebSocketFrame, TextWebSocketFrame 에 속하는 텍스트 또는 이진 데이터
      • CloseWebSocketFrame: CLOSE 요청이며 닫기 상태 코드와 구문을 포함하는 제어 프레임
      • PingWebSocketFrame: PongWebSocketFrame 요청을 포함하는 제어 프레임
      • PongWebSocketFrame: PingWebSocketFrame 요청에 대한 응답을 포함하는 제어 프레임
    public class WebSocketServerInitializer extends ChannelInitializer<Channel> {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(
                    new HttpServerCodec(),
                    new HttpObjectAggregator(65536), // 핸드세이크를 위한 집계된 HttpRequest 지원
                    new WebSocketServerProtocolHandler("/websocket"), // 엔드포인트 요청이 "/websocket"인 경우 업그레이드 핸드세이크 처리
                    new TextFrameHandler(),
                    new BinaryFrameHandler(),
                    new ContinuationFrameHandler()
            );
        }
        private class TextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
                // text frame 처리
            }
        }
        private class BinaryFrameHandler extends SimpleChannelInboundHandler<BinaryWebSocketFrame> {
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, BinaryWebSocketFrame msg) throws Exception {
                // binary frame 처리
            }
        }
        private class ContinuationFrameHandler extends SimpleChannelInboundHandler<ContinuationWebSocketFrame> {
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, ContinuationWebSocketFrame msg) throws Exception {
                // 지속 프레임 처리
            }
        }
    }
    
    유휴 연결과 시간 만료
  • 리소스를 시기적절하게 해제하려면 유휴 연결을 감지하고 시간을 만료시키는 것이 중요하다
  • 유휴 연결과 시간 만료를 위한 ChannelHandler를 제공
    • IdleStateHandler
      • 연결이 너무 오랫동안 유휴 상태인 경우 IdleStateEvent를 생성
      • IdleStateEvent를 처리하기 위해서는 ChannelInboundHandler에서 userEventTriggered() 오버라이딩해야 한다
    • ReadTimeoutHandler
      • 지정한 기간동안 인바운드 데이터를 받지 못하는 경우 ReadTimeoutException 생성하고 Channel을 닫는다
      • ReadTimeoutException을 감지하려면 ChannelHandler에서 exceptionCaught() 오버라이딩해야 한다
    • WriteTimeoutHandler
      • 지정한 기간동안 아웃바운드 데이터를 받지 못하는 경우 WriteTimeoutException 생성하고 Channel을 닫는다
      • WriteTimeoutException을 감지하려면 ChannelHandler에서 exceptionCaught() 오버라이딩
  • IdleStateHandler 예제
    public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS)); // IdleStateHandler는 트리거될 때 IdleStateEvent 전송
            pipeline.addLast(new HeartbeatHandler());
        }
    
        public static class HeartbeatHandler extends ChannelInboundHandlerAdapter {
            private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.ISO_8859_1));
            @Override
            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                if(evt instanceof IdleStateEvent) {
                    ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate())
                        .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    super.userEventTriggered(ctx, evt);
                }
            }
        }
    }
    
구분 기호 및 길이 기반 프로토콜의 디코딩
  • 구분 기호 프로토콜
    • 구분 기호 메시지 프로토콜은 프레임이나 메시지 시그먼트의 시작과 끝을 정의된 문자로 표시한다
    • 디코더
      • DelimiterBasedFrameDecoder: 사용자가 제공한 구분 기호를 이용해 프레임을 추출하는 범용 디코더
      • LineBasedFrameDecoder: 행종료 또는 캐리지리턴+줄바꿈 문자로 구분된 프레임을 추출하는 디코더로 DelimiterBasedFrameDecoder 보다 빠르다
        public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new LineBasedFrameDecoder(64 * 128));
            pipeline.addLast(new FrameHandler()); // 각 프레임을 처리할 FrameHandler 추가
        }
        
        public static class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
                // 프레임에서 추출한 데이터로 필요한 작업 처리
            }
        }
        }
        
  • 길이기반 프로토콜
    • 특수한 구문 문자로 끝을 표시하지 않고 프레임의 헤더 세그먼트에 프레임의 길이를 인코딩하는 방법으로 프레임 정의
    • 디코더
      • FixedLengthFrameDecoder: 생성자를 호출할 때 지정한 고정 크기의 프레임 추출
      • LengthFieldBasedFrameDecoder: 프레임 헤더의 필드에 인코딩된 길이 값을 기준으로 프레임 추출. 필드의 오프셋과 길이는 생성자에서 지정
        public class LengthBasedHandlerInitializer extends ChannelInitializer<Channel> {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new LengthFieldBasedFrameDecoder(64 * 128, 0, 8)); // 처음 8바이트에 프레임 길이가 인코딩된 메시지 처리
            pipeline.addLast(new FrameHandler());
        }
        
        public static class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        
            }
        }
        }
        
대용량 데이터
  • 대용량 데이터를 기록하는 작업은 네트워크 포화 가능성 때문에 특수한 문제로 여겨지고 있다
  • 기록 작업은 논블로킹이므로 완료 시 반환되며 모든 데이터를 기록하지 않아도 ChannelFuture 에 알림을 전달한다
  • FileRegion을 이용한 파일 내용 전송
    • 애플리케이션의 데이터 처리를 생략하고 파일 내용을 직접 전송하는 경우에만 적용
      FileInputStream in = new FileInputStream(file);
      FileRegion region = new DefaultFileRegion(in.getChannel(), 0, file.length());
      channel.writeAndFlush(region).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if(!future.isSuccess()) { // 필요한 적업 }
        }
      })
      
  • ChunkedInput<B> 인터페이스
    • 대용량 파일 전송에 중요한 역할을 하며 매개변수는 readChunk() 반환 형식이다
    • 구현
      • ChunkedFile: 플랫폼에서 제로 카피를 지원하지 않거나 데이터를 전송해야할 때 이용하기 위해 파일에서 청크 단위로 데이터를 가져옴
      • ChunkedNioFile: ChunkedFile과 비슷하지만 FileChannel 이용
      • ChunkedStream: InputStream에서 청크 단위로 내용 전송
      • ChunkedNioStream: ReadableByteChannel 에서 청크 단위로 내용 전송
  • ChunkedStream을 이용한 파일 내용 전송
    public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {
        private final File file;
        private final SslContext sslContext;
    
        public ChunkedWriteHandlerInitializer(File file, SslContext sslContext) {
            this.file = file;
            this.sslContext = sslContext;
        }
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new SslHandler(sslContext.newEngine(ByteBufAllocator.DEFAULT))); // SSLHandler를 ChannelPipeline에 추가
            pipeline.addLast(new ChunkedWriteHandler()); // ChunkedInput 형태로 전달된 데이터를 처리할 ChunkedWriteHandler 추가
            pipeline.addLast(new WriteStreamHanlder()); // 연결이 설정되면 파일 데이터를 기록하기 시작
        }
        public final class WriteStreamHanlder extends ChannelInboundHandlerAdapter {
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception { // channelActive는 연결이 설정되면 ChunkedInput 형식을 이용해 파일 데이터 기록
                super.channelActive(ctx);
                ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
            }
        }
    }
    
데이터 직렬화
  • JDK는 네트워크를 통해 전송하는 기본형 데이터 형식과 POJO의 그래프를 직렬화/역직렬화기 위한 ObjectInputStream, ObjectOutputStream 제공
  • API는 복잡하지 않으며 java.io.Serializable을 구현하는 모든 객체에 적용할 수 있지만 효율적이지 않은 단점이 있다
  • JDK 직렬화
    • 네티가 JDK와의 상호 작용을 위해 제공하는 직렬화 클래스
      • CompatibleObjectDecoder: JDK 직렬화를 이용하는 비-네티 피어와의 상호작용을 위한 디코더
      • CompatibleObjectEncoder: JDK 직렬화를 이용하는 비-네티 피어와의 상호작용을 위한 인코더
      • ObjectDecoder: JDK 직렬화를 바탕으로 커스텀 직렬화를 디코딩에 이용하는 디코더로서 외부 의존성이 없는 경우 속도 향상 제공
        • 외부 의존성이 있다면 다른 직렬화 구현을 이용하는 것이 좋다.
      • ObjectEncoder: JDK 직렬화를 바탕으로 커스텀 직렬화를 인코딩에 이용하는 인코더로서 외부 의존성이 없는 경우 속도 향상 제공
        • 외부 의존성이 있다면 다른 직렬화 구현을 이용하는 것이 좋다.
  • 프로토콜 버퍼를 통한 직렬화
    • 구글이 개발한 프로토콜 버퍼(Protocol Buffer)를 활용하는 코덱을 직렬화 솔루션으로 제공
    • protobuf codec
      • ProtobufDecoder: protobuf를 이용해 메시지 디코₩
      • ProtobufEncoder: protobuf를 이용해 메시지 인코딩
      • ProtobufVarint32FrameDecoder: 수신한 ByteBuf 를 메시지의 구글 프로토콜 Base 128 Varints 정수 길이 필드의 값을 기준으로 동적 분할
        public class ProtoBufInitializer extends ChannelInitializer<Channel> {
        private final MessageLite lite;
        
        public ProtoBufInitializer(MessageLite lite) {
            this.lite = lite;
        }
        
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new ProtobufVarint32FrameDecoder()); // 프레임 분할하기 위해 추가
            pipeline.addLast(new ProtobufEncoder()); // 메시지 인코딩 처리
            pipeline.addLast(new ProtobufDecoder(lite)); // 메시지 디코딩 처리
            pipeline.addLast(new ObjectHandler()); // 디코딩된 메시지를 처리한 ObjectHandler 추가
        }
        
        public static final class ObjectHandler extends SimpleChannelInboundHandler<Object> {
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
                // 객체를 이용해 필요한 일 수행
            }
        }
        }
        

출처

  • Netty in action(위키북스): 노먼 아운로, 마빈 알렌 울프탈 지음