chap12. 웹소켓
웹소켓 소개
- WebSocket 프로토콜은 웹의 양방향 데이터 전송 문제에 대한 실용적인 솔루션을 제공하기 위해 클라이언트/서버가 언제든 메시지를 전송할 수 있게 허용
- 결과적으로 메시지 수신을 비동기적으로 처리하게 요구하도록 새롭게 설치
웹소켓 애플리케이션
- 채팅방 작동 방식
- 클라이언트가 서버에 연결하고 채팅에 참여
- 채팅 메시지가 웹소켓을 통해 교환되며 양방향으로 전송
- 서버가 모든 클라이언트를 처리
웹소켓 지원 추가
- 예제
- / 로 요청: index.html 전송
- /ws 로 요청: 서버가 웹소켓 업그레이드 수행, 연결이 업그래이드 되면 모든 데이터를 웹소켓을 이용해 전송
- 요청 방식
- 채팅방 클라이언트가 HTTP 요청(/ 또는 /ws)
- 채팅방 서버는 / 요청인 경우 HTTP 를 통해 index.html wjsthd
- 채팅방 서버는 /ws 요청인 경우 웹소켓 업그레이드 수행(HTTP를 통해 웹소켓 핸드셰이크 작업)
- 업그레이드 완료 후 서버가 웹소켓을 통해 메시지 전송
- HTTP 요청 처리시
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> { // FullHttpRequest를 처리하기 위한 핸들러 private final String wsUri; private static final File INDEX; static { URL location = HttpRequestHandler.class.getProtectionDomain() .getCodeSource().getLocation(); try { String path = location.toURI() + "index.html"; path = !path.contains("file:") ? path : path.substring(5); INDEX = new File(path); } catch (URISyntaxException e) { throw new IllegalStateException("Unable to locate index.html", e); } } public HttpRequestHandler(String wsUri) { this.wsUri = wsUri; } @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { if(wsUri.equalsIgnoreCase(msg.getUri())) { ctx.fireChannelRead(msg.retain()); // /ws 를 참조하는 경우 FullHttpRequest retain을 호출한 후 fileChannelRead(msg)를 호출해 다음 ChannelInboundHandler 전달 } else { // / index.html 전달 if(HttpHeaders.is100ContinueExpected(msg)) { // 100-continue에 경우 100 Continue 응답 전송 send100Continue(ctx); } RandomAccessFile file = new RandomAccessFile(INDEX, "r"); // index.html을 읽는다 HttpResponse response = new DefaultHttpResponse(msg.getProtocolVersion(), HttpResponseStatus.OK); response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain;charset=UTF-8"); if(HttpHeaders.isKeepAlive(msg)) { // keep-alive 요청이 있는 경우 필요한 헤더 추가 response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length()); response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); } ctx.write(response); // HttpResponse를 클라이언트로 기록 if(ctx.pipeline().get(SslHandler.class) == null) { ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length())); } else { ctx.write(new ChunkedNioFile(file.getChannel())); } ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); // LastHttpContent 를 클라이언트에 기록하고 플러시 if(!HttpHeaders.isKeepAlive(msg)) { // keep-alive 요청이 없는 경우 Channel을 닫음 future.addListener(ChannelFutureListener.CLOSE); } } } private void send100Continue(ChannelHandlerContext ctx) { FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE); ctx.writeAndFlush(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
- 웹소켓 프레임 처리
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { private final ChannelGroup channelGroup; public TextWebSocketFrameHandler(ChannelGroup channelGroup) { this.channelGroup = channelGroup; } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // 커스텀 이벤트를 처리하기 위해 userEventTriggered() 재정의 if(evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) { ctx.pipeline().remove(HttpRequestHandler.class); // 이벤트가 핸드세이크 성공을 의미하는 경우 HTTP Message는 더이상 수신하지 않으므로 ChannelPipeline에서 HttpRequestHandler 제거 channelGroup.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined.")); // 연결된 모든 웹소켓 클라이언트에 새로운 클라이언트가 연결된 것을 알림 channelGroup.add(ctx.channel()); // 모든 메시지를 수신할 수 있게 새로운 웹소켓 채널을 추가 } else { super.userEventTriggered(ctx, evt); } } @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { channelGroup.writeAndFlush(msg.retain()); // 메시지의 참조 카운트를 증가시키고 ChannelGroup에 연결된 모든 클라이언트로 기록 } }
- WebSocketFrame 형식
- BinaryWebSocketFrame: 이진 데이터 포함
- TextWebSocketFrame: 텍스트 데이터 포함
- ContinuationWebSocketFrame: 이진 데이터 또는 텍스트 데이터 포함
- CloseWebSocketFrame: CLOSE 요청을 나타내며, 닫기 상태 코드와 구문 포함
- PingWebSocketFrame: PongWebSocketFrame 전송 요청
- PongWebSocketFrame: PingWebSocketFrame 에 대한 응답 전송
- WebSocketFrame 형식
- ChannelPipeline 초기화
public class ChatServerInitializer extends ChannelInitializer<Channel> { private final ChannelGroup channelGroup; public ChatServerInitializer(ChannelGroup channelGroup) { this.channelGroup = channelGroup; } @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new HttpObjectAggregator(64 * 1024)); pipeline.addLast(new HttpRequestHandler("/ws")); pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); pipeline.addLast(new TextWebSocketFrameHandler(channelGroup)); } }
- pipeline 구성
- HttpServerCodec: 바이트를 HttpRequest, HttpContent, LastHttpContent로 디코딩, HttpRequest, HttpContent, LastHttpContent를 바이트로 인코딩
- ChunkedWriteHandler: 파일의 내용을 기록
- HttpObjectAggregator: HttpMessage 및 해당하는 후속 HttpContent를 집계해 요청이나 응답을 처리하는지에 따라 단일 FullHttpRequest, FullHttpResponse를 생성한다. 이는 파이프라인의 다음 ChannelHandler는 완전한 HTTP 요청만 받는다
- HttpRequestHandler: /ws URL로 보내지 않는 요청에 해당하는 FullHttpRequest를 처리
- WebSocketServerProtocolHandler: 웹소켓 사양에서 요구하는 대로 웹소켓 업그레이드 핸드셰이크 PingWebSocketFrame, PongWebSocketFrame, CloseWebSocketFrame 처리
- TextWebSocketFrameHandler: TextWebSocketFrame 및 핸드셰이크 완료 이벤트 처리
- pipeline 구성
- Bootstrap
public class ChatServer { private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); // 연결된 모든 웹소켓 채널을 포함할 DefaultChannelGroup 생성 private final EventLoopGroup group = new NioEventLoopGroup(); private Channel channel; public ChannelFuture start(InetSocketAddress address) { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(group) .channel(NioServerSocketChannel.class) .childHandler(createInitializer(channelGroup)); ChannelFuture future = serverBootstrap.bind(address); future.syncUninterruptibly(); channel = future.channel(); return future; } protected ChannelHandler createInitializer(ChannelGroup channelGroup) { // ChatServerInitializer 생성 return new ChatServerInitializer(channelGroup); } public void destroy() { if(channel != null) channel.close(); channelGroup.close(); group.shutdownGracefully(); } public static void main(String[] args) { if(args.length != 1) { System.err.println("Please give port as argument"); System.exit(1); } int port = Integer.parseInt(args[0]); final ChatServer endpoint = new ChatServer(); ChannelFuture future = endpoint.start(new InetSocketAddress(port)); Runtime.getRuntime().addShutdownHook(new Thread(){ @Override public void run() { endpoint.destroy(); } }); future.channel().closeFuture().syncUninterruptibly(); } }
암호화
- 암호화를 위해서 ChannelPipeline에 SslHandler 를 추가하면 된다.
- ChannelPipeline에 암호화 추가
public class SecureChatServerInitializer extends ChatServerInitializer { private final SslContext sslContext public SecureChatServerInitializer(ChannelGroup group, SslContext sslContext) { super(group); this.context = context; } @Override public void initChannel(Channel ch) throws Exception { super.initChanne(ch); SSLEngine engine = context.newEngine(ch.alloc()); ch.pipeline().addFirst(new SslHandler(engine)); } }
- ChatServer에 암호화 추가
public class SecureChatServer extends ChatServer { private final SslContext sslContext; public SecureChatServer(SslContext sslContext) { this.context = context; } @Override public ChannelInitializer<Channel> createInitializer(ChannelGroup gruop) { return new SecureChatServer(group, context); } public static void min(String[] args) throws Exception { if(args.length == 1) { System.err.println("Please give port as argument"); System.exit(1); } int port = Integer.parseInt(args[0]); SelfSignedCertificate cert = new SelfSignedCertificate(); SslContext context = SslContext.newServerContext(cert.certificate(), cert.privateKey()); final SecureChatServer endpoint = new SecureChatServer(context); ChannelFuture future = endpoint.start(new InetSocketAddress(port)); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { endpoint.destroy(); } }); future.channel().closeFuture().syncUninterruptibly(); } }
chap13. UDP를 이용한 이벤트 브로드캐스팅
UDP 기본사항
- UDP같은 비연결 프로토콜에는 안정적 연결(핸드셰이크)라는 개념이 없고 각 메시지(UDP Datagram) 이 독립적으로 전송된다
- UDP에는 TCP의 오류 수정 메커니즘(피어가 수신한 패킷을 승인하며 스인하지 않은 패킷을 전송자가 재전송함)이 없다
- 하지만 핸드셰이크, 메시지 관리가 배제되므로 TCP보다 훨씬 빠르며 메시지 손실을 처리하거나 용인할 수 있는 애플리케이션에 적합하다
UDP 브로드캐스트
- UDP는 메시지를 여러 수신자에게 전송하는 추가 전송 모드 제공 - 멀티캐스트: 정의된 호스트 그룹으로 전송 - 브로드캐스트: 네트워크 상의 모든 호스트로 전송
UDP 예제 애플리케이션
- 예제
- Broadcaster 가 새로운 파일 내용 수신
- UDP를 통해 이벤트를 브로드 캐스팅
- 이벤트 모니터가 내용을 수신하고 표시
- 메시지(LogEvent)
public final class LogEvent { public static final byte SEPERATOR = (byte)':'; private final InetSocketAddress source; private final String logfile; private final String msg; private final long received; public LogEvent(String logfile, String msg) { this(null, -1, logfile, msg); } public LogEvent(InetSocketAddress source, long received, String logfile, String msg) { this.source = source; this.received = received; this.logfile = logfile; this.msg = msg; } // getter }
- 브로드캐스트 작성
- interface AddressedEnvelope<M, A extends SocketAddress> extends ReferenceCounted
- 다른 메시지를 래핑하는 메시지를 발신자 및 수신자 주소와 함께 래핑한다. M은 message 형식, A는 Address 형식이다
- interface DefaultAddressedEnvelope<M,A extends SocketAddress> implements AddressedEnvelope<M,A>
- AddressEnvelope 인터페이스 기본 구현 제공
- interface DatagramPacket extends DefaultAddressedEnvelope<ByteBuf, InetSocketAddress> implements ByteBufHolder
- ByteBuf 를 메시지 데이터 컨테이너로 이용하기 위해 DefaultAddressedEnvelope 확장
- interface DatagramChannel extends Channel
- UDP 멀티캐스팅 그룹 관리를 지원하기 위해 네티의 Channel 추상화를 확장한다.
- interface NioDatagramChannel extends AbstractNioMessageChannel implements DatagramChannel
- AddressedEnvelope 메시지를 전송 및 수신할 수 있는 Channel 형식을 정의 - ChannelPipeline에서 LogEvent 흐름 - LogEvent -> LogEventEncoder -> DatagramPacket 으로 변환되어야 한다
- LogEventDecoder
public class LogEventEncoder extends MessageToMessageDecoder<LogEvent> { private final InetSocketAddress remoteAddress; public LogEventEncoder(InetSocketAddress remoteAddress) { this.remoteAddress = remoteAddress; } @Override public void encode(ChannelHandlerContext ctx, LogEvent logEvent, List<Object> out) throws Exception { byte[] file = logEvent.getLogfile().getBytes(CharsetUtils.UTF_8); byte[] msg = logEvent.getMsg().getBytes(CharsetUtils.UTF_8); ByteBuf buf = ctx.alloc().buffer(file.length + msg.length + 1); buf.writeBytes(file); // 파일이름 ByteBuf 에 기록 buf.writeByte(LogEvent.SEPERATOR); // SEPERATOR 추가 buf.writeBytes(msg); // 로그 메시지 추가 out.add(new DatagramPacket(buf, remoteAddress)); } }
- LogEventBroadcaster
public class LogEventBroadcaster { private final EventLoopGroup group; private final Bootstrap bootstrap; private final File file; public LogEventBroadcaster(InetSocketAddress, File file) { group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioDatagramChannel.class) .option(ChannelOption.SO_BROADCAST, true) .handler(new LogEventEncoder(address)); this.file = file; } public void run() throws Exception { Channel channel = bootstrap.bind(0).sync().channel(); // 채널 바인딩 long pointer = 0; while(true) { // 주 처리 루프 시작 long length = file.length(); if(length < pointer) { // 파일 재설정됨 pointer = length; // 필요한 경우 파일의 마지막 바이트를 가리키도록 파일 포인터로 설정 } else if(length > pointer) { // 컨텐츠가 추가됨 RemoteAccessFile raf = new RandomAccessFile(file, "r"); raf.seek(pointer); // 이전 항목이 전송되지 않도록 현재 파일포인터로 설정 String line; while( (line = raf.readLine()) != null) { channel.writeAndFlush(new LogEvent(null, -1, file.getAbsolutePath(), line)); // 각 로그 항목에 대해 LogEvent 하나를 채널에 기록 } pointer = raf.getFilePointer(); // 파일 내 현재 위치 저장 raf.close(); } try { Thread.sleep(1000); // 1초 대기하고 중단될 경우 루프 종료 } catch(InterruptedException e) { Thread.interrupted(); break; } } } public void stop() { group.shuntdownGracefully(); } public static void main(String[] args) throws Exception { if(args.length != 2) { throw new IllegalArgumentException(); } LogEventBroadcaster broadcaster = new LogEventBroadcaster(new InetSocketAddress("255.255.255.255"), Integer.parseInt(args[0]), new File(args[1])); try { broadcaster.run(); } finally { broadcaster.stop(); } } }
- interface AddressedEnvelope<M, A extends SocketAddress> extends ReferenceCounted
- 모니터 작성
- 역할
- LogEventBroadcaster 가 브로드캐스팅하는 UDP DatagramPacket 을 수신하는 역할을 한다
- 이를 LogEvent 메시지로 디코딩하는 역할을 한다
- LogEvent 메시지를 System.out으로 출력하는 역할을 한다
- ChannelPipeline에는 DatagramPacket, LogEvenDecoder, LogEvent, LogEventHandler 순으로 처리된다
- LogEventDecoder
public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> { @Override public void decode(ChannelHandlerContextctx, DatagramPacket datagramPacket, List<Object> out) throws Exception { ByteBuf data = datagramPacket.data(); int index = data.indexOf(0, data.readableBytes(), LogEvent.SEPERATOR); String filename = data.slice(0, idx).toString(CharsetUtil.UTF_8); String logMsg = data.slice(idx + 1, data.readableBytes()).toString(CharsetUtil.UTF_8); LogEvent event = new LogEvent(datagramPacket.remoteAddress(), System.currentTimeMillis(), filename, logMsg); out.add(event); } }
- LogEventHandler
public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void channelRead0(ChannelHandlerContext ctx, LogEvent event) throws Exception { StringBuilder builder = new StringBuilder(); builder.append(event.getReceivedTimestamp()).append(" [") .append(event.getSource().toString()).append("] [") .append(event.getLogfile()).append("] : ").append(event.getMsg()); System.out.println(builder.toString()); } }
- LogEventMonitor
public class LogEventMonitor { private final EventLoopGroup group; privte final Bootstrap bootstrap; public LogEventMonitor(InetSocketAddress address) { group = new NioEventGroup(); bootstrap.group(group) .channel(NioDatagramChannel.class) .option(ChannelOption.SO_BROARDCAST, true) .handler(new ChannelInitializer<Channel>() { @Override proteted void initChannel(Channel channel) throws Exception{ ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new LogEventDecoder()); pipeline.addLast(new LogEventHandler()); } }).localAddress(address); } public Channel bind() { return bootstrap.bind().sync().channel(); // 채널 바인딩. 여기서 DatagramChannel은 비연결 } public void stop() { group.shutdownGracefully(); } public static void main(String[] args) thorws Exception { if(args.length == 1) throw new IllegalArgumentException("Usage: LogEventMonitor<Port>"); LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(args[0])); try { Channel channel = monitor.bind(); channel.closeFuture().sync(); } finally { monitor.stop(); } } }
출처
- 역할
- Netty in action(위키북스): 노먼 아운로, 마빈 알렌 울프탈 지음