카프카 브로커와 클러스터
- 브로커는 카프카 클라이언트와 데이터를 주고받기 위해 사용하는 주체
- 브로커 1대로도 기본 기능이 실행되지만, 데이터를 안전하게 보관하고 처리하기 위해 3대 이상의 브로커 서버를 1개의 클러스터로 묶어서 운영
- 데이터 저장, 전송
- 프로듀서로부터 전달된 데이터는 파일 시스템에 저장된다
- 카프카는 메모리나 데이터베이스에 저장하지 않으며 따로 캐시메모리를 구현하여 사용하지 않는다
- 파일입출력에 대한 이슈에 대한 고민
- 페이지 캐시를 사용하여 디스크 입출력 속도를 높여서 해결
- 프로듀서로부터 전달된 데이터는 파일 시스템에 저장된다
- Page Cache와 Flush
- 메세지는 토픽 파티션에 저장되며, 파티션은 Log Segment file로 구성되어 있다.(기본값으로 1GB마다 새로운 Segment 구성)
- 성능을 위해 Log Segment는 OS Page Cache에 기록(write)되며 Zero-Copy 가능
- Zero Copy: 데이터를 User Space에 복사하지 않고 CPU 개입없이 Page Cache와 Network Buffer 사이에 직접 전송되는 것으로 메모리 절약 및 처리량 제공
- Page Cache에 있는 데이터는 디스크로 Flush됨
- Broker가 종료될 때 Flush Thread가 실행되며 디스크로 Flush된다.
- 만약, Flush가 되기 전에 장애가 발생한다면, 이를 예방하기 위해 Replication하는 것, 아니면 데이터는 영구 손실될 수 있다.
- 데이터 복제, 싱크
- 데이터 복제(replication)의 이유는 클러스터로 묶인 브로커 중 일부가 장애가 발생하더라도 데이터를 유실하지 않고 안전하게 사용하기 위함이다
- 카프카의 데이터 복제는 파티션 단위로 이뤄진다
- 복제된 파티션은 리더와 팔로워로 구성
- 팔로워는 리더 파티션의 오프셋을 확인하여 현재 자신이 가지고 있는 오프셋과 차이나는 경우 리더로부터 데이터를 가져와 자신의 파티션에 저장
- 브로커 장애로 리더 파티션을 사용할 수 없게 되면 팔로워 중 리더를 선출한다.
- 운영에서는 데이터 종류마다 복제 개수를 설정하고, 상황에 따라 토픽마다 복제 개수를 다르게 운영한다
- 데이터 속도가 중요하다면 1 또는 2 설정, 데이터 유실이 고민인 경우 복제 개수를 3으로 설정하여 2개의 브로커에 장애가 발생해도 데이터를 안정적으로 처리할 수 있도록 유지한다
- 컨트롤러
- 클러스터 내의 다수 브로커 중 한대가 컨트롤러 역할을 한다
- 다른 브로커들의 상태를 체크하고 브로커가 클러스터에서 빠지는 경우 해당 브로커에 존재하는 리더 파티션을 재분배한다
- 데이터 삭제
- 컨슈머가 데이터를 가져가더라도 토픽의 데이터는 삭제되지 않는다
- 브로커만이 삭제할 수 있으며, 삭제는 파일 단위로 이루어지고, 이를 로그 세그먼트라 한다
- log.segment.bytes, log.segment.ms 옵션 값으로 용량을 설정할 수 있다
- 컨슈머 오프셋 저장
- 컨슈머는 특정 파티션으로부터 데이터를 가져가서 처리하고, 레코드를 가져간 값을 확인하기 위해 오프셋을 커밋한다
- 커밋한 오프셋은 _consumer_offsets 토픽에 저장
- 코디네이터
- 클러스터 내의 다수 브로커 중 한대는 코디네이터 역할 수행
- 코디네이터는 컨슈머 그룹의 상태를 체크하고 파티션을 컨슈머와 매칭되도록 분배하는 역할을 한다
- 리밸런스
- 파티션을 컨슈머로 재할당하는 과정
- 주키퍼
- 주키퍼는 카프카의 메타데이터를 관리하는 데 사용
- 카프카 클러스터로 묶인 브로커들은 동일한 경로의 주키퍼 경로로 선언해야 같은 카프카 브로커 묶음이 된다
토픽과 파티션
- 토픽은 카프카에서 데이터를 구분하기 위해 사용하는 단위이며, 저장되는 데이터를 레코드라고 한다
- 파티션은 카프카의 병렬처리의 핵심으로 그룹으로 묶인 컨슈머들이 레코드를 병렬로 처리할 수 있도록 매칭
- 파티션은 큐와 비슷한 구조지만, pop으로 레코드를 삭제하지 않고 파일시스템에 저장된다
레코드
- 레코드는 타임스탬프, 메시지 키, 메시지 값, 오프셋, 헤더로 구성되어 있다
- 타임스탬프는 프로듀서에서 해당 레코드가 생성된 시점으로 토픽 설정에 따라 브로커에 적재된 시간으로 설정될수도 있다
- 메시지 키는 메시지 값을 순서대로 처리하거나 메시지 값의 종류를 나타내기 위해 사용
- 메시지값은 실질적으로 처리할 데이터가 들어 있다
- 헤더는 레코드의 추가적인 정보를 담는 메타데이터 저장소로 사용
프로듀서
- 프로듀서는 카프카에 필요한 데이터를 선언하고 브로커의 특정 토픽의 파티션에 전송
-
중요 개념
- 프로듀서는 카프카 브로커로 데이터를 전송할 때 내부적으로 Serializer, 파티셔너, 배치 생성 단계를 거친다
- 전송하고자 하는 데이터는 ProducerRecord 클래스를 통해 인스턴스를 생성
- 파티션 번호를 직접 지정하거나 타임스탬프를 설정, 메시지 키를 설정할 수 있다
- KafkaProducer 인스턴스가 send() 호출 -> ProducerRecord는 파티셔너에서 토픽의 어느 파티션으로 전송될 것인지 정해진다
- 파티션 별 Record를 모아두는 Record batch가 존재하고 Network Thread가 batch를 클러스터로 전송한다
- batch.size: Record Batch 사이즈
- Network thread가 Broker로 전송하는 속도보다 Accumulator에 쌓이는 속도가 더 빠르다면 record batch는 계속 추가되고 전체 record batch 크기가 buffer.memory 만큼 쌓이게 되면 KafkaProducer.send 함수는 블락이 되고 max.block.ms 시간이 넘어가면 exception이 발생
- linger.ms: 0 이상으로 설정하게 되면 Network Thread가 Accumulator에서 바로 record batch를 가져가지 않고 기다린다. 대기 시간이 있으므로 record batch가 채워지지만 그만큼 처리 속도는 감소하게 된다.
- batch.size: Record Batch 사이즈
- 기본값인 DefaultPartitioner로 설정되어 파티션이 저장
- 파티셔너에 의해 구분된 레코드는 데이터를 전송하기 전에 어큐뮬레이터(accumulator)에 데이터를 버퍼로 쌓아놓고 발송
- 버퍼로 쌓인 데이터는 배치로 묶어 전송함으로써 처치량 향상
- 프로듀서 API를 사용하면 UniformStickyPartioner, RoundRobinPartitioner를 제공
- UniformStickyPartioner는 RoundRobinPartitioner 에 단점 개선
- 단순하고 깔끔하지만 메시지 쌓이는 속도가 많지 않다면 record batch를 꽉꽉 채우지 못하고 보내진다.
- UniformStickyPartioner 는 프로듀서 동작에 특화되어 높은 처리량과 낮은 리소스 사용률을 가지는 특성이 있다
- record batch를 꽉 채워서 전송한다.
- 파티션 별 Record를 모아두는 Record batch가 존재하고 Network Thread가 batch를 클러스터로 전송한다
- Partitioner 인터페이스
- 인터페이스를 상속받아 메시지 키, 값에 따라 파티션을 지정할 수 있도록 적용할 수 있다.
- 파티셔너를 통해 파티션이 지정된 데이터는 어큐뮬레이터에 버퍼로 쌓인다
- 압축 옵션을 통해 브로커로 데이터를 전송할 수 있다
- 기본 옵션
- bootstrap.servers
- 카프카 클러스터에 속한 브로커의 호스트이름:포트 를 1개 이상 작성
- key.serializer
- 레코드의 메시지 키를 직렬화하는 클래스 지정
- value.serializer
- 레코드의 메시지 값을 직렬화하는 클래스 지정
- acks
- 브로커에 레코드를 정상 저장되었는지 전송 성공 여부 확인
- 0: 프로듀서가 전송한 즉시 브로커에 데이터 저장 여부와 상관없이 성공으로 판단
- -1(all): 토픽의 miin.insync.replicas 개수에 해당하는 리더 파티션과 팔로워 파티션에 데이터가 저장되면 성공한 것으로 판단
- 1(default): 리더 파티션에 데이터가 저장되면 성공 판단
- buffer.memory
- 브로커로 전송할 데이터를 배치로 모으기 위해 설정할 버퍼 메모리 양
- retries
- 브로커로부터 에러를 받고 난 뒤 재전송을 시도하는 횟수
- batch.size
- 배치로 전송할 레코드 최대 용량 지정
- linger.ms
- 배치를 전송하기 전까지 기다리는 최소 시간, 기본은 0
- partitioner.class
- 레코드를 파티션에 전송할 때 적용하는 파티셔너 클래스 지정
- enable.idempotence
- 멱등성 프로듀서 동작 여부 설정
- transation.id
- 레코드를 전송할 때 트랜잭션 단위로 묶을 지 여부 설정
- 이 값을 설정하면 트랜잭션 프로듀서로 동작
- bootstrap.servers
- 메시지 키를 가진 데이터
- ProducerRecord를 생성할 때, 토픽 이름, 메시지 키, 값 을 넣고 생성
ProducerRecord<String, String> record = new ProducerRecord<>("test", "Pangyo", "23"); // TOMIC_NAME, messageKey, messageValue
- ProducerRecord를 생성할 때, 토픽 이름, 메시지 키, 값 을 넣고 생성
- 파티션 번호를 가진 데이터
- ProducerRecord를 생성할 때, 토픽 이름, 파티션 번호, 메시지 키, 값을 넣고 생성
ProducerRecord<String, String> record = new ProducerRecord<>("test", 0, "Pangyo", "23"); // TOMIC_NAME, partitionNo, messageKey, messageValue
- ProducerRecord를 생성할 때, 토픽 이름, 파티션 번호, 메시지 키, 값을 넣고 생성
- 커스텀 파티셔너를 갖는 프로듀서
- 프로듀서 사용환경에 따라 특정 데이터를 가지는 레코드를 특정 파티션으로 보내야 하는 경우
- Partitoner 인터페이스를 사용하여 사용자 정의 파티셔너를 생성하여 지정할 수 있다
// Pangyo 라는 키가 들어오면 무조건 0번 파티션으로 보내도록 설정 public class CustomPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { if(keyBytes == null { throw new InvalidRecordException("Need message key"); } if((String)key.equals("Pangyo")) { return 0; } List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); return Utils.toPosivice(Utils.murmur2(keyBytes)) % numberPartitions; } @Override public void configure(Map<String, ?> configs) {} @Override public void close() {} }
- partitioner.class 설정해 주어야 한다
- 브로커 정상 전송 여부 확인
- Callback을 생성하여, producer.send(data, callback) 형식으로 전송한다.
@Slf4j public class ProducerCallback implemments Callback { @Override public void onComplete(RecordMetadata recordMetadata, Exception e) { if(e != null) { logger.error(e.getMessage(), e); }else { logger.info(recordMetadata.toString()); } } }
KafkaProducer<String, String> producer = new KafkaProducer(configs); ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue); producer.send(record, new ProducerCallback());
- Callback을 생성하여, producer.send(data, callback) 형식으로 전송한다.
컨슈머
- 중요 개념
- 컨슈머 운영 방법
- 한개 이상의 컨슈머로 컨슈머 그룹을 운영
- 특정 파티션만 구독하는 컨슈머 운영
- 컨슈머 그룹을 운영할 때에는 파티션에 개수와 같거나 작아야 한다
- 컨슈머 그룹은 다른 컨슈머 그룹과 격리되는 특징을 갖고 있다
- 다양한 저장소에 저장하는 파이프라인을 구축할 때 각기 다른 저장소에 저장하는 컨슈머 그룹을 구성하면 격리되어 운영할 수 있다
- 리밸런싱
- 컨슈머 그룹으로 이루어진 컨슈머 중 일부 장애가 발생하면, 장애가 발생한 컨슈머에 할당된 파티션은 다른 컨슈머에 소유권을 넘긴다.
- 컨슈머가 추가되거나 제외될 때 사용
- 커밋
- 컨슈머는 카프카 브로커로부터 데이터를 어디까지 가져갔는지 커밋을 통해 기록(토픽: __consumer_offsets)
- 컨슈머는 그룹에 처음 조인할 때 __consumer_offsets 참고해서 값을 읽어온다. 이후에는 Consumer는 요청해야 할 Offset 상태값을 가지고 있고, 이 값을 바탕으로 Fetch API를 요청해서 받아온다.
- auto commit
- poll()을 호출할 때, auto.commit.interval.ms에 정해진 주기마다 Consumer가 자동으로 Commit을 수행함.
- poll() 메서드가 수행될 때 일정 간격마다 오프셋을 커밋하도록 enable.auto.commit=true 로 설정
- auto.commit.interval.ms 에 설정된 값을 통해 간격을 지정
- 메세지 유실 가능
- 자동 커밋 주기보다 길게 메세지를 처리하는 경우에 장애가 발생하면 커밋된 다음 메세지를 가져오기 때문에 해당 메세지는 유실된다.
- Consumer의 장애 / 재기동 및 리밸런싱 후 브로커에서 이미 읽어온 메세지를 다시 읽어와서 중복 처리가 될 수 있다
- 34~40번까지 읽어온 후, 다음 poll()에서 40번을 Commit 하려는 순간 Consumer 장애가 발생한다면 __consumer_offsets은 Commit 받지 않았기 때문에 여전히 34를 가리키기 때문에 다시 34~40을 읽음.
- Manual Commit(동기, 비동기방식 존재)
- 동기방식 : CommitSync()
- CommitSync()를 할 경우, 해당 라인에서 Main Thread가 Blocking 됨.
- Commit에 실패하면, 성공할 때까지 Commit을 요청함. 더 이상 Commit을 시도할 수 없는 경우 CommitFailedException이 발생함.
- 이 때, NetworkClient를 통해서 메세지는 Batch에 적재되지만, commitSync()에 Block 되어서 다음 메세지를 가져오지 못한다.
- 비동기방식 : CommitAsync()
- 메세지 배치를 poll()을 통해 읽어오고 해당 메세지의 마지막 offset을 브로커에 commit 요청함.
- 브로커에 Commit 적용이 성공적으로 되었음을 기다리지 않고 (Non Blocking) 계속 다음 메세지를 읽어옴.
- 브로커에 Commit 적용이 실패해도 다시 Commit을 시도하지 않는다.
- 200까지 읽고 201을 Commit 했으나 비동기로 실패함.
- 이 때 240까지 읽고 241을 Commit하면 __consumer_offsets는 241이 됨. 즉, 이전에 읽어온 값이 Commit에 실패했어도 문제 없는 상황이 된다.
- CallBack() 함수 사용가능하며 동기 방식 대비 더 빠른 수행시간
- 동기와 비동기 방식은 일관성과 성능의 차이로 고민할 수 있다.
- 비동기 방식 또한 데이터 유실 및 중복이 발생할 수 있다.
ConsumerRecords<String, String> records = consumer.poll(INTERVAL_MS); // 1. 메세지 처리 -> 메세지 처리 후 commit을 하게 되면 데이터 유실 가능 consumer.commitAsync(callback); // 2. 메세지 처리 -> 메세지 처리하기 전 commit을 하게 되면 데이터 중복 가능
- 비동기 방식 또한 데이터 유실 및 중복이 발생할 수 있다.
- 동기방식 : CommitSync()
- KafkaConsumer의 읽기 Offset Commit / 중복 읽기 및 누락 상황 이해
- 카프카는 분산 처리 시스템이고 Consumer / Producer / Broker가 Loose하게 연결되어 있기 때문에 장애가 발생하면 중복 읽기 및 누락 상황이 발생할 수 있다.
- 장애가 발생하지 않도록 설계하고 구조를 최대한 심플하게 가져가서 데이터 정합성을 맞추는 것이 중요
-
컨슈머 내부 구조
- Fetcher
- poll() 메서드를 이용해서 브로커로부터 메세지를 받아올 때 Fetcher와 ConsumerNetworkClient 객체가 상호 협력하며 값을 불러온다.
- Fetcher에서 fetch를 해올 때 ConsumerNetworkClient에 send() 메서드를 호출하는 데, 이 때 해당 메소드는 KafkaClient를 통해 요청을 받아와 Future객체를 리턴한다. 응답을 받은 Fetch는 성공 콜백으로 응답에 대하여ConcurrentLinkedQueue
값 저장
- ConsumerNetworkClient
- Kafka Consumer의 모든 네트워크 통신을 담당하며 요청은 비동기로 동작한다. 결과에 대해서 RequestFuture 클래스로 확인
- HeartBeatThread
- HeartBeat Thread는 백그라운드로 동작하며 끊임없이 Broker에게 Consumer의 상태를 알려주며 첫번째 poll()을 수행할 때 생성
- ConsumerCoordinator
- Consumer Group의 상태를 관리하기 위해 사용되며 상태값으로 리더인지를 확인할 수 있고 리밸런싱에 대한 설정값을 가지고 있다.
- 리밸런싱에서는 JoinGroupResponseHandler, SyncGroupResponseHandler 사용
- 해당 내부에 HeartBeat Thread가 존재하며 Heartbeat Thread는 주기적으로 heartbeat을 GroupCoordinator에게 전송
- Consumer Group의 상태를 관리하기 위해 사용되며 상태값으로 리더인지를 확인할 수 있고 리밸런싱에 대한 설정값을 가지고 있다.
- SubscriptionState
- 현재 Consumer가 구독하고 있는 토픽의 상태를 전반적으로 알려주는 객체로 토픽, 파티션, 오프셋 정보 관리를 담당한다.
- assign 메소드
- KafkaConsumer에 토픽, 파티션 할당은 assign 메서드를 통해 이루어진다.
- 컨슈머의 그룹 관리 기능을 사용하지 않고 사용자가 assign 메서드를 직접 호출하여 수동으로 토픽, 파티션을 할당할 수 있는데 이 경우에는 컨슈머 리밸런스가 일어나지 않는다.
- subscribe 메소드
- 구독 요청은 KafkaConsumer의 subscribe 메서드를 통해 한다.
- 사용자가 구독을 요청한 토픽 정보는 SubscriptionState의 subscription에 저장된다.
- subscription에 저장된 토픽 정보는 컨슈머 리밸런스 과정에서 사용된다.
- 그룹 관리 기능을 사용한 경우에는 컨슈머 리밸런스 과정에서 코디네이터에 의해 토픽, 파티션이 할당된다.
- seek 메소드
- assign 메서드를 통해 할당된 파티션은 초기 오프셋 값 설정이 필요하다. 초기 오프셋 값이 없으면 Fetch가 불가능한 파티션으로 분류된다.
- seek 메서드를 통해 초기 오프셋 값을 설정한다. 초기 오프셋 설정은 오프셋 초기화 과정을 통해 이루어진다.
- Fetcher
- 컨슈머 운영 방법
- 컨슈머 주요 옵션
- boostrap.servers
- 카프카 클러스터에 속한 브로커의 호스트 이름:포트를 1개 이상 작성
- key.deserializer
- 레코드의 메시지 키를 역직렬화하는 클래스 지정
- value.deserializer
- 레코드의 메시지 값을 역직렬화하는 클래스 지정
- group.id
- 컨슈머 그룹 아이디 지정
- subscribe() 메서드로 토픽을 구독하여 사용할 때 해당 옵션을 필수로 넣어야 한다
- auto.offset.reset
- 컨슈머 그룹이 특정 파티션을 읽을 때 지정된 컨슈머 오프셋이 없는 경우 어느 오프셋부터 읽을 지 선택하는 옵션
- latest: 가장 높은 오프셋부터 읽기 시작
- earilest: 가장 낮은 오프셋부터 읽기 시작
- none: 컨슈머 그룹이 커밋한 기록이 있는지 찾아본다. 없으면 오류, 있으면 기록 이후 오프셋부터 읽기 시작
- enable.auto.commit
- 자동, 수동 커밋 여부 선택
- auto.commit.interval.ms
- 자동 커밋 간격 지정
- max.poll.records
- poll() 메서드를 통해 반환되는 레코드 개수 지정
- session.timeout.ms
- 컨슈머가 브로커와 연결이 끊기는 최대 시간
- 이 시간내에 heartbeat를 전송하지 않으면 브로커는 컨슈머에 이슈가 발생했다고 가정하고 리밸런싱 진행
- heartbeat.interval.ms
- 하트비트 전송 시간 간격
- max.poll.interval.ms
- poll() 메서드를 호출하는 간격의 최대 시간 지정
- isolation.level
- 트랜잭션 프로듀서가 레코드를 트랜잭션 단위로 보낼 경우 사용
- read_commited: 커밋이 완료된 레코드만 읽는다
- read_uncommited: 커밋 여부와 관계없이 파티션에 있는 모든 레코드를 읽는다(default)
- boostrap.servers
- Polling method
- 일반적인 Messaging Queue들은 Queue에서 Record를 Push 하는 방식을 사용합니다.
- Push 방식의 단점은 Queue가 Consumer의 처리 성능을 미리 염두해둬야 합니다
- Kafka는 Consumer가 Broker로부터 Record를 요청하는 Polling 구조
- KafkaConsumer.poll 호출 시
- 먼저 Fetcher의 fetchedRecords 메서드가 호출된다.
- fetchedRecords 메서드는 내부 캐시인 nextInLineRecords와 completedFetches를 확인
- 브로커로부터 이미 가져온 데이터가 있는 경우에는 max.poll.records 설정 값만큼 레코드를 반환한다. max.poll.records의 기본값은 500이다.
- 브로커에서 가져온 데이터가 없는 경우에는 KafkaConsumer는 Fetcher의 sendFetches 메서드를 호출한다.
- Fetcher의 sendFetches 메서드는 Fetch API 요청을 파티션 리더가 위치한 각 브로커에게 보낸다.
- KafkaConsumer는 Fetcher가 브로커로부터 응답을 받을 때까지 대기한다.
- 먼저 Fetcher의 fetchedRecords 메서드가 호출된다.
- 일반적인 Messaging Queue들은 Queue에서 Record를 Push 하는 방식을 사용합니다.
- 오프셋 커밋
- 동기 오프셋 커밋
- commitSync() 메서드를 호출하여 오프셋 커밋을 명시적으로 수행할 수 있으며 poll() 메서드로 받은 가장 마지막 레코드의 오프셋을 기준으로 커밋한다.
- 동기 커밋의 경우 브로커로 커밋을 요청한 이후 커밋이 완료되기까지 기다리며 완료되었음을 받을 때까지 대기한다.
- commitSync는 파라미터로 Map<TopicPartition, OffsetAndMetadata>가 필요하다
- 비동기 오프셋 커밋
- 동기 오프셋 커밋에 경우 응답을 기다리는 동안 처리가 일시적으로 중단되기 때문에 더 빠르게 처리하기 위해서는 비동기 오프셋 커밋을 사용할 수 있다.
- commitAsync() 메서드를 호출하면 poll() 메서드로 리턴된 가장 마지막 레코드를 기준으로 오프셋을 커밋한다.
- commitSync() 동작 방식
- KafkaConsumer.commitSync를 호출하면 ConsumerCoordinator.commitOffsetSync(offset) 가 호출된다
- ConmerCoordinator는 응답 핸들러(OffsetCommitResponseHandler) 를 생성한후, GroupCoordinator에 요청 및 응답을 받는다
- 동기 오프셋 커밋
- 리밸런스 리스너를 가진 컨슈머
- poll() 메서드를 통해 반환된 데이터를 모두 처리하기 전에 리밸런스가 발생하면 데이터를 중복 처리할 수 있다
- 리밸런스 발생 시 데이터를 중복 처리하지 않게 하기 위해서는 리밸런스 발생 시 처리한 데이터를 기준으로 커밋 시도
- 리밸런스 발생을 감지하기 위해 카프카 라이브러리는 ConsumerRebalanceListener 인터페이스 지원
- onPartitionAssigned(), onPartitionRevoked() 메서드로 이루어진다
- onPartitionAssigned(): 리밸런스가 끝난 뒤 파티션이 할당 완료되면 호출
-
onPartitionRevoked(): 리밸런스가 시작되기 직전에 호출 ```java public class RebalanceListener implements ConsumerRebalanceListener { public void onPartitionsAssigned(Collection
partitions){ }
public void onPartitionRevoked(Collection
partitions { consumer.commitSync(currentOffsets); } }
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs); consumer.subscribe(Arrays.asList(TOPIC_NAME), new RebalanceListener()); ```
- Static Group Membership
- Consumer는 Consumer에 변동이 있거나, 토픽이 새로 생기는 경우 Leader Consumer에 의해 리밸런싱이 일어나는 데 이때 컨슈머는 메세지를 가져오지 않기 때문에 리밸런싱을 너무 자주하는 것은 좋지 않다.
- 리밸런싱을 적게 하는 방법 중 하나로 Static Group MemberShip을 사용할 수 있다
- 개념
- Consumer Group내의 Consumer들에게 고정된 Id를 부여한다.
- Consumer가 shotdown 된 후, session.timeout.ms(Max Wait Heartbeat) 이내에 재기동한다면 파티션은 리밸런싱 되지 않고 기존 파티션을 그대로 사용한다.
- Consumer가 session.timeout.ms 이내에 재기동하지 못하면 리밸런싱이 발생한다.
- 컨슈머 컨피그 내에 gropu.instance.id 설정으로 지정할 수 있다.
- 리밸런싱 Eager/Cooperative 모드
- Eager(default) : 전체 Stop -> 리밸런스 -> 수행
- 리밸런싱이 일어나면 모든 파티션의 분배를 취소하고, 다시 파티션을 재할당한다. 따라서 리밸런싱은 딱 한번만 일어난다.
- 모든 파티션 분배가 취소 되기 때문에 메세지 Lag가 발생함.
- 파티션 분배전략은 다음이 존재한다.
- Range(Default, 토픽별로 동일한 파티션을 특정 Consumer에게 할당) / RoundRobin(파티션 별로 Consumer들이 균등하게 부하를 분배) / Sticky(최초에 할당된 파티션과 Consumer의 매핑을 리밸런스가 수행되어도 가급적 유지)
- Cooperative : (일부 stop -> 일부 리밸런스) 반복
- 리밸런스 수행 시, 리밸런스 대상이 되는 Consumer들만 선택해서 점진적으로 리밸런싱을 수행
- 점진적으로 리밸런싱 한다는 것은 작은 리밸런싱을 여러 번 반복
- 이 때, 리밸런스 대상이 아닌 기존 Consumer는 메세지를 계속 Consume 한다.
- Eager(default) : 전체 Stop -> 리밸런스 -> 수행
- 파티션 할당 컨슈머
- 컨슈머를 운영할 때 subscribe() 메서드를 사용하는 것 외에도 직접 파티션을 컨슈머에 명시적으로 할당하여 운영할 수 있다
- 컨슈머가 어떤 토픽, 파티션을 할당할 지 명시적으로 선언할 때에는 assign() 메서드 사용
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs); consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME, PARTITION_NUMBER);
- 컨슈머 할당된 파티션 확인 방법
- 컨슈머에 할당된 토픽과 파티션 정보는 assignments() 메서드를 통해 확인할 수 있다
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs); consumer.subscribe(Arrays.asList(TOPIC_NAME)); Set<TopciPartition> assignedTopicPartition = consumer.assignment();
- 컨슈머에 할당된 토픽과 파티션 정보는 assignments() 메서드를 통해 확인할 수 있다
- 컨슈머의 안전한 종료
- 정상적으로 종료하지 않은 컨슈머는 세션 타임아웃이 발생할 때까지 컨슈머 그룹에 남게 된다.
- 실제로 종료되었지만, 더는 동작하지 않는 컨슈머가 존재하는 경우, 컨슈머 랙이 늘어나게 된다.
- 컨슈머 랙이 늘어나면 데이터 처리 지연이 발생하게 된다
- 컨슈머를 안전하게 종료하기 위해 KafkaConsumer는 wakeup() 메서드를 제공한다.
- wakeup() 메서드를 실행하여 KafkaConsumer 인스턴스를 안전하게 종료할 수 있다.
- wakeup() 메서드 호출 후, poll() 메서드가 호출되면 WakeupException 예외 발생
- WakeupException 을 받은 뒤에 close() 메서드를 호출하여 클러스터에 컨슈머가 안전하게 종료되었음을 알려주어야 한다
try { while(true) { ConsumerRecord<String, String> records = consumer.poll(Duration.ofSeconds(1)); for(ConsumerRrcord record: records) { logger.info("{}, record); } } } catch(WakeupException e) { log.error("WakeupException"); } finally { consumer.close(); }
- wakeup() 메서드는 shutdown hook을 구현하여 안전한 종료를 명시적으로 구현할 수 있다.
- 셧다운 훅이란 사용자 또는 OS로부터 종료 요청을 받으면 실행하는 스레드이다
public static void main(String[] args) { Runtime.getRuntime().addShutdownHook(new ShutdownThread()); // ... } static class ShutdownThread extends Thread { public void run() { consumer.wakeup(); } }
출처
- 책: 아파치 카프카 애플리케이션 프로그래밍 with 자바