카프카 스트림즈

  • 토픽에 적재된 데이터를 상태기반(stateful) 또는 비상태기반(stateless)으로 실시간 변환하여 다른 토픽에 적재하는 라이브러리
  • 자바 기반 스트림즈 애플리케이션은 카프카 클러스터와 완벽하게 호환되며 스트림 처리에 필요한 편리한 기능들(신규 토픽 생성, 상태 저장, 데이터 조인 등)을 제공
  • 처리 안전성이 뛰어남
    • exactly once
    • fault tolerant system
  • 스트림즈 애플리케이션은 내부적으로 스레드를 1개 이상 생성할 수 있으며, 스레드는 1개 이상의 태스크를 가진다
    • 태스크는 스트림즈 애플리케이션을 실행할 때 생기는 데이터 처리 최소 단위
    • 3개의 파티션으로 이루어진 토픽을 처리하는 애플리케이션을 실행하면 3개의 태스크가 생긴다
  • 병렬처리를 위해서는 파티션과 스트림즈 스레드(또는 프로세스) 개수를 늘림으로써 처리량을 늘릴 수 있다

  • 프로듀서/컨슈머 조합을 사용하지 않고 스트림즈를 사용하는 이유
    • 스트림 데이터 처리에 필요한 다양한 기능을 스트림즈 DSL 로 제공하여 필요하다면 프로세서 API를 사용하여 기능을 확장할 수 있다
    • 물론 프로듀서, 컨슈머를 조합하여 스트림즈가 제공하는 기능과 유사하게 만들 수 있다
      • 하지만, 스트림즈 라이브러리를 통해 단 한 번의 데이터 처리, 장애 허용 시스템 등의 특징들은 프로듀서/컨슈머 조합으로 완벽하게 구현하기 어렵다
      • 스트림즈가 제공하지 못하는 기능을 구현할 때에는 사용하는 것이 좋다
        • 소스 토픽(사용하는 토픽)과 싱크 토픽(저장하는 토픽)의 카프카 클러스더가 서로 다른 경우
  • 토폴로지
    • 토폴로지란 2개 이상의 노드와 선으로 이루어진 집합을 뜻하며, 링형, 트리형, 성형 등이 있다
      • 스트림즈에서 사용하는 토폴로지는 트리 형태와 유사
    • 카프카 스트림즈에서는 토폴로지들을 이루는 노드를 하나의 프로세서라고 부르며 노드를 이은 선을 스트림이라고 부른다
    • 프로세서에는 소스 프로세서, 스트림 프로세서, 싱크 프로세서 3가지가 있다
      • 소스 프로세서: 데이터를 처리하기 위해 최초로 선언하는 노드로 하나 이상의 토픽에서 데이터를 가져오는 역할
      • 스트림 프로세서: 다른 프로세서가 반환한 데이터를 처리하는 역할
      • 싱크 프로세서: 데이터를 특정 카프카 토픽으로 저장하는 역할
  • 개발 방법
    • 스트림즈 DSL로 구현
      • 메시지 값을 기반으로 토픽 분기처리
      • 지난 10분 간 들어온 데이터 개수 집계
      • 토픽과 다른 토픽의 결합으로 새로운 데이터 생성
    • 프로세서 API 로 구현
      • 메시지 값의 종류에 따라 토픽을 가변적으로 전송
      • 일정한 시간 간격으로 데이터 처리

스트림즈 DSL

  • 레코드 흐름 추상화 개념

    image

    • 이미지 출처: https://techannotation.wordpress.com/2019/06/24/apache-kafka-stream-dealing-with-data/

    • KStream
      • 레코드 스트림안에 있는 모든 레코드들은 INSERT로 해석된다
      • 메시지 키와 값으로 구성되어 있으며 모든 레코드가 출력된다
    • KTable
      • 메시지 키를 기준으로 묶어서 사용하며 새로운 데이터를 적재할 때 동일한 메시지 키가 있을 때에는 데이트가 업데이트 된다
      • record의 value는 동일한 키에 대한 마지막 값의 “UPDATE”로 해석된다. 만약에 키가 아직 존재하지 않는면 업데이트는 INSERT로 간주
    • GlobalKTable
      • KTable로 선언된 토픽은 1개 파티션이 1개 태스크에 할당되어 사용되고, GlobalKTable로 선언된 토픽은 모든 파티션 데이터가 각 태스크에 할당되어 사용된다는 차이점이 있다
      • Globalktable vs Ktable
        • input 토픽이 5개의 파티션을 가지고 있다고 하자. 그리고 해당 토픽을 테이블로 읽는 5개의 어플리케이션을 실행시킨다고 하자.
        • Input 토픽을 KTable을 이용해서 읽는 경우, 각 어플리케이션의 로컬 KTable 객체는 5개의 파티션 중에서 1개의 파티션의 데이터로 채워질 것이다
        • Input 토픽을 GlobalKTable을 이용해서 읽는 경우, 각 어플리케이션의 로컬 GlobalKTable 객체는 해당 토픽의 모든 파티션의 데이터로 채워질 것이다
      • 코파티셔닝
        • KStream, KTable 데이터 조인할 때, 2개 데이터의 파티션 개수가 동일하고 파티셔닝 전략을 동일하게 맞추는 작업
        • 파티션 개수가 동일하고 파티셔닝 전략이 같은 경우, 동일한 메시지 키를 가진 데이터가 동일한 태스크에 들어가는 것을 보장한다, 이를 통해 조인을 수행할 수 있다
        • 문제는 파티션 개수가 다르거나 파티션 전략이 다를 경우 조인을 수행할 수 없다
        • 코파티셔닝이 되지 않은 2개의 토픽을 조인하는 로직이 담긴 스트림즈 애플리케이션을 실행하면 TopologyException 이 발생
        • 리파티셔닝 과정을 거쳐 KStream, KTable로 사용하는 토픽이 코파티셔닝되도록 할 수 있다
      • GlobalKTable은 코파티셔닝되지 않은 KStream과 데이터 조인을 할 수 있다
        • KTable과 다르게 GlobalKTable로 정의된 데이터는 스트림즈 애플리케이션의 모든 태스크에 동일하게 공유되어 사용되기 때문이다
      • GlobalKTable은 각 태스크마다 모든 데이터를 저장하기 때문에 스토리지 사용량 증가, 네트워크, 브로커 부하등에 단점이 있기 때문에 많은 양의 데이터를 가진 토픽으로 조인하는 경우 리파티셔닝을 통한 KTable 사용 권장
  • 주요 옵션
    • bootstrap.servers
      • 카프카 클러스터에 속한 브로커의 호스트 이름: 포트를 1개 이상 작성
    • application.id
      • 스트림즈 애플리케이션을 구분하기 위한 고유 아이디 설정
    • default.key.serde
      • 레코드의 메시지 키 직렬화/역직렬화하는 클래스 지정
      • 기본값은 Serdes.ByteArray().getClass().getName 이다
    • default.value.serde
      • 레코드의 메시지 값 직렬화/역직렬화하는 클래스 지정
      • 기본값은 Serdes.ByteArray().getClass().getName 이다
    • num.stream.threads
      • 스트림 프로세싱 실행 시 실행될 스레드 개수 지정
    • state.dir
      • rocksDB 저장소 위치할 디렉토리 지정
      • 카프카 스트림즈 상태기반 데이터를 처리할 때 로컬 저장소로 사용
      • 기본값으로는 tmp/kafka-streams 이다
  • 주요 메서드
    • stream(), to()
      • 특정 토픽을 KStream형태로 가져오려면 스트림즈 DSL의 stream()을 사용     - 특정 토픽으로 저장하려면 스트림즈DSL의 to() 메서드를 사용
        StreamBuilder builder = new StreamBuilder();
        KStream<String, String> streamLog = builder.stream(STREAM_LOG);
        streamLog.to(STREAM_LOG_COPY);
        KafkaStream streams = new KafkaStreams(builder.build(), props);
        streams.start();
        
    • filter()    - 메시지 키 또는 메시지 값을 필터링하여 특정 조건에 맞는 데이터를 골라낼 때 사용
        StreamBuilder builder = new StreamBuilder();
        KStream<String, String> streamLog = builder.stream(STREAM_LOG);
        KStream<String, String> filteredStream == streamLog
                    .filter((key, value) -> value.length() > 5);
        filteredStream.to(STREAM_LOG_FILTER);
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
      
    • join()
      • KTable, KStream을 조인
        • 조인하기 전에 코파티셔닝이 되어 있는지 확인하자. 코파티셔닝이 되어 있지 않으면 TopologyException 이 발생한다
        • KTable로 사용할 토픽과 KStream으로 사용할 토픽을 생성할 때 동일한 파티션 개수, 동일한 파티셔닝을 사용하는 것이 중요
        • 토픽 생성과는 다르게 KTable, KStream, GlobalKStream 모두 동일한 토픽이고, 스트림즈 애플리케이션 내부에서 사용할 때 메시지 키와 값을 사용하는 형태를 구분하는 것 ```java StreamBuilder builder = new StreamBuilder(); KTable<String, String> addressTable = builder.table(ADDRESS_TABLE); KStream<String, String> orderStream = builder.stream(ORDER_STREAM); orderStream.join(addressTable, (order, address) -> order + “ send to “ + address) .to(ORDER_JOIN_STREAM);

        KafkaStreams stream = new KafkaStreams(builder.build(), props); stream.start(); ```

      • GlobalKTable, KStream을 조인
        • 코파티셔닝이 되어 있지 않을 경우 토픽 조인을 하기 위해서는 GlobalKTable 을 사용해야 한다 ```java StreamBuilder builder = new StreamBuilder(); GloablKTable<String, String> addressGlobalTable = builder.globalTable(ADDRESS_GLOBAL_TABLE); KStream<String, String> orderStream = builder.stream(ORDER_STREAM);

        orderStream.join(); ```

프로세서 API

  • 스트림즈 DSL은 데이터 처리, 분기, 조인을 위한 다양한 메소드를 제공하지만, 추가적인 상세 로직 구현이 필요하면 프로세서 API를 활용
  • KStream, KTable, GlobalKTable 과 같은 개념은 없다
  • 예제
    • FilterProcessor.java: 문자열 길이가 5이상인 데이터를 필터링
      public class FilterProcessor implements Processor<String, String> {
          private ProcessorContext context;
          @Override
          public void init(ProcessorContext context) {
              this.context = context;
          }
          @Override
          public void process(String key, String value) {
          // 프로세스 처리 로직 -> 1개의 레코드를 처리하는 것으로 생각하며
          // forward()는 다음 토폴로지로 넘어가며, 완료된 이후에는 commit() 호출하여 처리 선언
              if(value.length() > 5) {
                  context.forward(key, value);
              }
                  
              context.commit();
          }
          @Override
          public void close() {}
      }
      
    • SimpleProcessor.java: FilterProcessor을 사용하여 스트림 진행
      Topology topology = new Topology();
      topology.addSource("Source", STREAM_LOG)
              .addProcess("Process", () -> new FilterProcessor(), "Source)
              .addSink("Sink", STREAM_LOG_FILTER, "Process");
      KafkaStream stream = new KafkaStream(topology, props);
      stream.start();
      

Spring Cloud Stream 과 Kafka Stream

  • Spring Cloud Stream 2.0
    • 메세지 기반 마이크로서비스
      • Kafka 같은 Message Broker 기반에 pub/sub 구조, Consumer Group, Parititon 등의 개념으로 스트림 처리 응용 프로그램을 만들기 위한 기반 제공
    • 이벤트 중심 마이크로 서비스
      • 외부 미들에어와의 통신을 하기 위한 통합 컴포넌트를 제공
  • 통합 컴포넌트
    • image

      • 출처: https://docs.spring.io/spring-cloud-stream/docs/1.0.0.RC1/reference/html/_spring_cloud_stream_main_concepts.html
    • Binder
      • Spring Cloud Stream에서 제공하는 미들에워와 통신 컴포넌트로 producer/consumer의 연결, 위임 및 라우팅 등을 담당한다
    • Binding
      • Binder의 입/출력을 미들웨어에 연결하기 위한 Bridge 역할로 @EnableBinding 을 사용하여 Application에서 사용할 바인더 인터페이스를 추가 바인딩이 적용된다
  • 여러 애노테이션
    @SpringBootApplication
    @EnableBinding(Sink.class) 
    public class StreamApplication {
        // main    
        
        @StreamListener(Sink.INPUT)
        public void processVote(Vote vote) { votingService.recordVote(votd); }
    }
    
    • @EnableBinding을 메인 어플리케이션에 추가하면 @StreamListener 애노테이션을 붙인 메서드와 메시지 브로커간에 연결을 하며, stream processing을 받게 된다
    • @EnableBinding에는 여러 인터페이스를 추가할 수 있는 데, Sink/Source/Processor 로 선언된 인터페이스를 추가할 수 있다.
      • Source: output(outbound)
      • Sink: input(inbound)
      • Processor: input/output (both)
    • Binder는 MessageChannel, SubscribableChannel 인터페이스를 이용하여 정의할 수 있고, @Input, @Output 을 사용하여 채널의 이름을 정의할 수 있다
      public interface CustomizingSink {
          String PROCESS_IN = "process_in";
              
          @Input(PROCESS_IN)
          SubscribableChannel messagePush();
      }
          
      public interface CustomizingSource {
          String PROCESS_OUT = "process_out";
              
          @Output(PROCESS_OUT)
          MessageChannel createMessage();
      }
          
      public interface CustomizingProcessor extends CustomizingSink, CustomizingSource {}
      
    • @StreamListener 와 message 기반 애노테이션도 같이 사용 가능하다
      @EnableBinding(Sink.class) 
      public class StreamApplication {}
          
      @Component
      @Slf4j
      public class KafkaListener {
          
          @StreamListener(CustomizingProcessor.PROCESS_INPUT)
          @SendTo(CustomizingProcessor.PROCESS_OUTPUT)
          public Data listeneMessage(@Payload Data data, @Header("contentType") String contentType) {
              log.info("Data: {}, Content-Type: {}, data.toString(), contentType);
              return data;
          }
      }
      
      • @StreamListener는 Input, @SendTo 는 Output을 지정할 수 있다
  • Configuration Options
    • Spring Cloud Stream Properties
      • spring.cloud.stream.instanceCount: 카프카의 파티션같은 인스턴스 수
      • spring.cloud.stream.instanceIndex: 0보다 큰 값에 경우 인스턴스 색인 정의
      • spring.cloud.stream.dynamicDestinations: 동적 바인딩인 경우 바인더 리스트
      • spring.cloud.stream.defaultBinder: 여러개의 바인더를 사용할 경우, 기본 바인더 지정
    • Binding properties
      • spring.cloud.stream.bindings..=
        • channelName: 설정할 채널 이름
          • destination
          • group
          • contentType
          • binder
      • spring.cloud.stream.bindings..consumer
      • spring.cloud.stream.bindings..producer
    • Kafka Stream
      • binder: spring.cloud.stream.kafka.binder.
      • consumer: spring.cloud.stream.kafka..consumer
      • producer: spring.cloud.stream.kafka..producer
  • Spring Cloud Stream 3.0
    • Spring Cloud Stream 3.0 부터는 @EnableBinding, @Input, @Output, @StreamListener, @StreamMessageConverter 등이 모두 deprecated 되었다
    • 이제는 함수형 프로그래밍 방식으로 설정해야 한다
    • 예제
      • 이제는 Function을 빈으로 생성하면 메시지를 처리할 수 있는 Spring Cloud Stream 애플리케이션이 된다
        @Bean
        public Function<String, String> uppercase() {
          return value -> value.toUppercase();
        }
        
      • application.yml 등록
        • input: + -in +
        • output: + -out +
          spring:
          cloud:
            stream:
              bindings:
                uppercase-in-0:
                  gruop: my-topic-group
                  destantion: my-topic
          
    • Kafka 설정과 관련 참고
      • https://cloud.spring.io/spring-cloud-stream-binder-kafka/spring-cloud-stream-binder-kafka.html#_kafka_binder_properties

출처

  • 아파치 카프카 애플리케이션 프로그래밍 with 자바 책 발췌
  • Spring Cloud Stream: https://docs.spring.io/spring-cloud-stream/docs/1.0.0.RC1/reference/html/_introducing_spring_cloud_stream.html
  • https://coding-start.tistory.com/139
  • https://shuiky.tistory.com/1971