카프카 스트림즈
- 토픽에 적재된 데이터를 상태기반(stateful) 또는 비상태기반(stateless)으로 실시간 변환하여 다른 토픽에 적재하는 라이브러리
- 자바 기반 스트림즈 애플리케이션은 카프카 클러스터와 완벽하게 호환되며 스트림 처리에 필요한 편리한 기능들(신규 토픽 생성, 상태 저장, 데이터 조인 등)을 제공
- 처리 안전성이 뛰어남
- exactly once
- fault tolerant system
- 스트림즈 애플리케이션은 내부적으로 스레드를 1개 이상 생성할 수 있으며, 스레드는 1개 이상의 태스크를 가진다
- 태스크는 스트림즈 애플리케이션을 실행할 때 생기는 데이터 처리 최소 단위
- 3개의 파티션으로 이루어진 토픽을 처리하는 애플리케이션을 실행하면 3개의 태스크가 생긴다
-
병렬처리를 위해서는 파티션과 스트림즈 스레드(또는 프로세스) 개수를 늘림으로써 처리량을 늘릴 수 있다
- 프로듀서/컨슈머 조합을 사용하지 않고 스트림즈를 사용하는 이유
- 스트림 데이터 처리에 필요한 다양한 기능을 스트림즈 DSL 로 제공하여 필요하다면 프로세서 API를 사용하여 기능을 확장할 수 있다
- 물론 프로듀서, 컨슈머를 조합하여 스트림즈가 제공하는 기능과 유사하게 만들 수 있다
- 하지만, 스트림즈 라이브러리를 통해 단 한 번의 데이터 처리, 장애 허용 시스템 등의 특징들은 프로듀서/컨슈머 조합으로 완벽하게 구현하기 어렵다
- 스트림즈가 제공하지 못하는 기능을 구현할 때에는 사용하는 것이 좋다
- 소스 토픽(사용하는 토픽)과 싱크 토픽(저장하는 토픽)의 카프카 클러스더가 서로 다른 경우
- 토폴로지
- 토폴로지란 2개 이상의 노드와 선으로 이루어진 집합을 뜻하며, 링형, 트리형, 성형 등이 있다
- 스트림즈에서 사용하는 토폴로지는 트리 형태와 유사
- 카프카 스트림즈에서는 토폴로지들을 이루는 노드를 하나의 프로세서라고 부르며 노드를 이은 선을 스트림이라고 부른다
- 프로세서에는 소스 프로세서, 스트림 프로세서, 싱크 프로세서 3가지가 있다
- 소스 프로세서: 데이터를 처리하기 위해 최초로 선언하는 노드로 하나 이상의 토픽에서 데이터를 가져오는 역할
- 스트림 프로세서: 다른 프로세서가 반환한 데이터를 처리하는 역할
- 싱크 프로세서: 데이터를 특정 카프카 토픽으로 저장하는 역할
- 토폴로지란 2개 이상의 노드와 선으로 이루어진 집합을 뜻하며, 링형, 트리형, 성형 등이 있다
- 개발 방법
- 스트림즈 DSL로 구현
- 메시지 값을 기반으로 토픽 분기처리
- 지난 10분 간 들어온 데이터 개수 집계
- 토픽과 다른 토픽의 결합으로 새로운 데이터 생성
- 프로세서 API 로 구현
- 메시지 값의 종류에 따라 토픽을 가변적으로 전송
- 일정한 시간 간격으로 데이터 처리
- 스트림즈 DSL로 구현
스트림즈 DSL
-
레코드 흐름 추상화 개념
-
이미지 출처: https://techannotation.wordpress.com/2019/06/24/apache-kafka-stream-dealing-with-data/
- KStream
- 레코드 스트림안에 있는 모든 레코드들은 INSERT로 해석된다
- 메시지 키와 값으로 구성되어 있으며 모든 레코드가 출력된다
- KTable
- 메시지 키를 기준으로 묶어서 사용하며 새로운 데이터를 적재할 때 동일한 메시지 키가 있을 때에는 데이트가 업데이트 된다
- record의 value는 동일한 키에 대한 마지막 값의 “UPDATE”로 해석된다. 만약에 키가 아직 존재하지 않는면 업데이트는 INSERT로 간주
- GlobalKTable
- KTable로 선언된 토픽은 1개 파티션이 1개 태스크에 할당되어 사용되고, GlobalKTable로 선언된 토픽은 모든 파티션 데이터가 각 태스크에 할당되어 사용된다는 차이점이 있다
- Globalktable vs Ktable
- input 토픽이 5개의 파티션을 가지고 있다고 하자. 그리고 해당 토픽을 테이블로 읽는 5개의 어플리케이션을 실행시킨다고 하자.
- Input 토픽을 KTable을 이용해서 읽는 경우, 각 어플리케이션의 로컬 KTable 객체는 5개의 파티션 중에서 1개의 파티션의 데이터로 채워질 것이다
- Input 토픽을 GlobalKTable을 이용해서 읽는 경우, 각 어플리케이션의 로컬 GlobalKTable 객체는 해당 토픽의 모든 파티션의 데이터로 채워질 것이다
- 코파티셔닝
- KStream, KTable 데이터 조인할 때, 2개 데이터의 파티션 개수가 동일하고 파티셔닝 전략을 동일하게 맞추는 작업
- 파티션 개수가 동일하고 파티셔닝 전략이 같은 경우, 동일한 메시지 키를 가진 데이터가 동일한 태스크에 들어가는 것을 보장한다, 이를 통해 조인을 수행할 수 있다
- 문제는 파티션 개수가 다르거나 파티션 전략이 다를 경우 조인을 수행할 수 없다
- 코파티셔닝이 되지 않은 2개의 토픽을 조인하는 로직이 담긴 스트림즈 애플리케이션을 실행하면 TopologyException 이 발생
- 리파티셔닝 과정을 거쳐 KStream, KTable로 사용하는 토픽이 코파티셔닝되도록 할 수 있다
- GlobalKTable은 코파티셔닝되지 않은 KStream과 데이터 조인을 할 수 있다
- KTable과 다르게 GlobalKTable로 정의된 데이터는 스트림즈 애플리케이션의 모든 태스크에 동일하게 공유되어 사용되기 때문이다
- GlobalKTable은 각 태스크마다 모든 데이터를 저장하기 때문에 스토리지 사용량 증가, 네트워크, 브로커 부하등에 단점이 있기 때문에 많은 양의 데이터를 가진 토픽으로 조인하는 경우 리파티셔닝을 통한 KTable 사용 권장
-
- 주요 옵션
- bootstrap.servers
- 카프카 클러스터에 속한 브로커의 호스트 이름: 포트를 1개 이상 작성
- application.id
- 스트림즈 애플리케이션을 구분하기 위한 고유 아이디 설정
- default.key.serde
- 레코드의 메시지 키 직렬화/역직렬화하는 클래스 지정
- 기본값은 Serdes.ByteArray().getClass().getName 이다
- default.value.serde
- 레코드의 메시지 값 직렬화/역직렬화하는 클래스 지정
- 기본값은 Serdes.ByteArray().getClass().getName 이다
- num.stream.threads
- 스트림 프로세싱 실행 시 실행될 스레드 개수 지정
- state.dir
- rocksDB 저장소 위치할 디렉토리 지정
- 카프카 스트림즈 상태기반 데이터를 처리할 때 로컬 저장소로 사용
- 기본값으로는 tmp/kafka-streams 이다
- bootstrap.servers
- 주요 메서드
- stream(), to()
- 특정 토픽을 KStream형태로 가져오려면 스트림즈 DSL의 stream()을 사용
- 특정 토픽으로 저장하려면 스트림즈DSL의 to() 메서드를 사용
StreamBuilder builder = new StreamBuilder(); KStream<String, String> streamLog = builder.stream(STREAM_LOG); streamLog.to(STREAM_LOG_COPY); KafkaStream streams = new KafkaStreams(builder.build(), props); streams.start();
- 특정 토픽을 KStream형태로 가져오려면 스트림즈 DSL의 stream()을 사용
- 특정 토픽으로 저장하려면 스트림즈DSL의 to() 메서드를 사용
- filter()
- 메시지 키 또는 메시지 값을 필터링하여 특정 조건에 맞는 데이터를 골라낼 때 사용
StreamBuilder builder = new StreamBuilder(); KStream<String, String> streamLog = builder.stream(STREAM_LOG); KStream<String, String> filteredStream == streamLog .filter((key, value) -> value.length() > 5); filteredStream.to(STREAM_LOG_FILTER); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start();
- join()
- KTable, KStream을 조인
- 조인하기 전에 코파티셔닝이 되어 있는지 확인하자. 코파티셔닝이 되어 있지 않으면 TopologyException 이 발생한다
- KTable로 사용할 토픽과 KStream으로 사용할 토픽을 생성할 때 동일한 파티션 개수, 동일한 파티셔닝을 사용하는 것이 중요
- 토픽 생성과는 다르게 KTable, KStream, GlobalKStream 모두 동일한 토픽이고, 스트림즈 애플리케이션 내부에서 사용할 때 메시지 키와 값을 사용하는 형태를 구분하는 것 ```java StreamBuilder builder = new StreamBuilder(); KTable<String, String> addressTable = builder.table(ADDRESS_TABLE); KStream<String, String> orderStream = builder.stream(ORDER_STREAM); orderStream.join(addressTable, (order, address) -> order + “ send to “ + address) .to(ORDER_JOIN_STREAM);
KafkaStreams stream = new KafkaStreams(builder.build(), props); stream.start(); ```
- GlobalKTable, KStream을 조인
- 코파티셔닝이 되어 있지 않을 경우 토픽 조인을 하기 위해서는 GlobalKTable 을 사용해야 한다 ```java StreamBuilder builder = new StreamBuilder(); GloablKTable<String, String> addressGlobalTable = builder.globalTable(ADDRESS_GLOBAL_TABLE); KStream<String, String> orderStream = builder.stream(ORDER_STREAM);
orderStream.join(); ```
- KTable, KStream을 조인
- stream(), to()
프로세서 API
- 스트림즈 DSL은 데이터 처리, 분기, 조인을 위한 다양한 메소드를 제공하지만, 추가적인 상세 로직 구현이 필요하면 프로세서 API를 활용
- KStream, KTable, GlobalKTable 과 같은 개념은 없다
- 예제
- FilterProcessor.java: 문자열 길이가 5이상인 데이터를 필터링
public class FilterProcessor implements Processor<String, String> { private ProcessorContext context; @Override public void init(ProcessorContext context) { this.context = context; } @Override public void process(String key, String value) { // 프로세스 처리 로직 -> 1개의 레코드를 처리하는 것으로 생각하며 // forward()는 다음 토폴로지로 넘어가며, 완료된 이후에는 commit() 호출하여 처리 선언 if(value.length() > 5) { context.forward(key, value); } context.commit(); } @Override public void close() {} }
- SimpleProcessor.java: FilterProcessor을 사용하여 스트림 진행
Topology topology = new Topology(); topology.addSource("Source", STREAM_LOG) .addProcess("Process", () -> new FilterProcessor(), "Source) .addSink("Sink", STREAM_LOG_FILTER, "Process"); KafkaStream stream = new KafkaStream(topology, props); stream.start();
- FilterProcessor.java: 문자열 길이가 5이상인 데이터를 필터링
Spring Cloud Stream 과 Kafka Stream
- Spring Cloud Stream 2.0
- 메세지 기반 마이크로서비스
- Kafka 같은 Message Broker 기반에 pub/sub 구조, Consumer Group, Parititon 등의 개념으로 스트림 처리 응용 프로그램을 만들기 위한 기반 제공
- 이벤트 중심 마이크로 서비스
- 외부 미들에어와의 통신을 하기 위한 통합 컴포넌트를 제공
- 메세지 기반 마이크로서비스
- 통합 컴포넌트
-
- 출처: https://docs.spring.io/spring-cloud-stream/docs/1.0.0.RC1/reference/html/_spring_cloud_stream_main_concepts.html
- Binder
- Spring Cloud Stream에서 제공하는 미들에워와 통신 컴포넌트로 producer/consumer의 연결, 위임 및 라우팅 등을 담당한다
- Binding
- Binder의 입/출력을 미들웨어에 연결하기 위한 Bridge 역할로 @EnableBinding 을 사용하여 Application에서 사용할 바인더 인터페이스를 추가 바인딩이 적용된다
-
- 여러 애노테이션
@SpringBootApplication @EnableBinding(Sink.class) public class StreamApplication { // main @StreamListener(Sink.INPUT) public void processVote(Vote vote) { votingService.recordVote(votd); } }
- @EnableBinding을 메인 어플리케이션에 추가하면 @StreamListener 애노테이션을 붙인 메서드와 메시지 브로커간에 연결을 하며, stream processing을 받게 된다
- @EnableBinding에는 여러 인터페이스를 추가할 수 있는 데, Sink/Source/Processor 로 선언된 인터페이스를 추가할 수 있다.
- Source: output(outbound)
- Sink: input(inbound)
- Processor: input/output (both)
- Binder는 MessageChannel, SubscribableChannel 인터페이스를 이용하여 정의할 수 있고, @Input, @Output 을 사용하여 채널의 이름을 정의할 수 있다
public interface CustomizingSink { String PROCESS_IN = "process_in"; @Input(PROCESS_IN) SubscribableChannel messagePush(); } public interface CustomizingSource { String PROCESS_OUT = "process_out"; @Output(PROCESS_OUT) MessageChannel createMessage(); } public interface CustomizingProcessor extends CustomizingSink, CustomizingSource {}
- @StreamListener 와 message 기반 애노테이션도 같이 사용 가능하다
@EnableBinding(Sink.class) public class StreamApplication {} @Component @Slf4j public class KafkaListener { @StreamListener(CustomizingProcessor.PROCESS_INPUT) @SendTo(CustomizingProcessor.PROCESS_OUTPUT) public Data listeneMessage(@Payload Data data, @Header("contentType") String contentType) { log.info("Data: {}, Content-Type: {}, data.toString(), contentType); return data; } }
- @StreamListener는 Input, @SendTo 는 Output을 지정할 수 있다
- Configuration Options
- Spring Cloud Stream Properties
- spring.cloud.stream.instanceCount: 카프카의 파티션같은 인스턴스 수
- spring.cloud.stream.instanceIndex: 0보다 큰 값에 경우 인스턴스 색인 정의
- spring.cloud.stream.dynamicDestinations: 동적 바인딩인 경우 바인더 리스트
- spring.cloud.stream.defaultBinder: 여러개의 바인더를 사용할 경우, 기본 바인더 지정
- Binding properties
- spring.cloud.stream.bindings.
. = - channelName: 설정할 채널 이름
- destination
- group
- contentType
- binder
- channelName: 설정할 채널 이름
- spring.cloud.stream.bindings.
.consumer - spring.cloud.stream.bindings.
.producer
- spring.cloud.stream.bindings.
- Kafka Stream
- binder: spring.cloud.stream.kafka.binder.
- consumer: spring.cloud.stream.kafka.
.consumer - producer: spring.cloud.stream.kafka.
.producer
- binder: spring.cloud.stream.kafka.binder.
- Spring Cloud Stream Properties
- Spring Cloud Stream 3.0
- Spring Cloud Stream 3.0 부터는 @EnableBinding, @Input, @Output, @StreamListener, @StreamMessageConverter 등이 모두 deprecated 되었다
- 이제는 함수형 프로그래밍 방식으로 설정해야 한다
- 예제
- 이제는 Function을 빈으로 생성하면 메시지를 처리할 수 있는 Spring Cloud Stream 애플리케이션이 된다
@Bean public Function<String, String> uppercase() { return value -> value.toUppercase(); }
- application.yml 등록
- input:
+ -in + - output:
+ -out + spring: cloud: stream: bindings: uppercase-in-0: gruop: my-topic-group destantion: my-topic
- input:
- 이제는 Function을 빈으로 생성하면 메시지를 처리할 수 있는 Spring Cloud Stream 애플리케이션이 된다
- Kafka 설정과 관련 참고
- https://cloud.spring.io/spring-cloud-stream-binder-kafka/spring-cloud-stream-binder-kafka.html#_kafka_binder_properties
출처
- 아파치 카프카 애플리케이션 프로그래밍 with 자바 책 발췌
- Spring Cloud Stream: https://docs.spring.io/spring-cloud-stream/docs/1.0.0.RC1/reference/html/_introducing_spring_cloud_stream.html
- https://coding-start.tistory.com/139
- https://shuiky.tistory.com/1971