카프카 브로커와 클러스터

  • 브로커는 카프카 클라이언트와 데이터를 주고받기 위해 사용하는 주체
  • 브로커 1대로도 기본 기능이 실행되지만, 데이터를 안전하게 보관하고 처리하기 위해 3대 이상의 브로커 서버를 1개의 클러스터로 묶어서 운영
  • 데이터 저장, 전송
    • 프로듀서로부터 전달된 데이터는 파일 시스템에 저장된다
      • 카프카는 메모리나 데이터베이스에 저장하지 않으며 따로 캐시메모리를 구현하여 사용하지 않는다
    • 파일입출력에 대한 이슈에 대한 고민
      • 페이지 캐시를 사용하여 디스크 입출력 속도를 높여서 해결
  • Page Cache와 Flush
    • 메세지는 토픽 파티션에 저장되며, 파티션은 Log Segment file로 구성되어 있다.(기본값으로 1GB마다 새로운 Segment 구성)
    • 성능을 위해 Log Segment는 OS Page Cache에 기록(write)되며 Zero-Copy 가능
      • Zero Copy: 데이터를 User Space에 복사하지 않고 CPU 개입없이 Page Cache와 Network Buffer 사이에 직접 전송되는 것으로 메모리 절약 및 처리량 제공
    • Page Cache에 있는 데이터는 디스크로 Flush됨
      • Broker가 종료될 때 Flush Thread가 실행되며 디스크로 Flush된다.
      • 만약, Flush가 되기 전에 장애가 발생한다면, 이를 예방하기 위해 Replication하는 것, 아니면 데이터는 영구 손실될 수 있다.
  • 데이터 복제, 싱크
    • 데이터 복제(replication)의 이유는 클러스터로 묶인 브로커 중 일부가 장애가 발생하더라도 데이터를 유실하지 않고 안전하게 사용하기 위함이다
    • 카프카의 데이터 복제는 파티션 단위로 이뤄진다
      • 복제된 파티션은 리더와 팔로워로 구성
      • 팔로워는 리더 파티션의 오프셋을 확인하여 현재 자신이 가지고 있는 오프셋과 차이나는 경우 리더로부터 데이터를 가져와 자신의 파티션에 저장
      • 브로커 장애로 리더 파티션을 사용할 수 없게 되면 팔로워 중 리더를 선출한다.
    • 운영에서는 데이터 종류마다 복제 개수를 설정하고, 상황에 따라 토픽마다 복제 개수를 다르게 운영한다
      • 데이터 속도가 중요하다면 1 또는 2 설정, 데이터 유실이 고민인 경우 복제 개수를 3으로 설정하여 2개의 브로커에 장애가 발생해도 데이터를 안정적으로 처리할 수 있도록 유지한다
  • 컨트롤러
    • 클러스터 내의 다수 브로커 중 한대가 컨트롤러 역할을 한다
    • 다른 브로커들의 상태를 체크하고 브로커가 클러스터에서 빠지는 경우 해당 브로커에 존재하는 리더 파티션을 재분배한다
  • 데이터 삭제
    • 컨슈머가 데이터를 가져가더라도 토픽의 데이터는 삭제되지 않는다
    • 브로커만이 삭제할 수 있으며, 삭제는 파일 단위로 이루어지고, 이를 로그 세그먼트라 한다
    • log.segment.bytes, log.segment.ms 옵션 값으로 용량을 설정할 수 있다
  • 컨슈머 오프셋 저장
    • 컨슈머는 특정 파티션으로부터 데이터를 가져가서 처리하고, 레코드를 가져간 값을 확인하기 위해 오프셋을 커밋한다
    • 커밋한 오프셋은 _consumer_offsets 토픽에 저장
  • 코디네이터
    • 클러스터 내의 다수 브로커 중 한대는 코디네이터 역할 수행
    • 코디네이터는 컨슈머 그룹의 상태를 체크하고 파티션을 컨슈머와 매칭되도록 분배하는 역할을 한다
    • 리밸런스
      • 파티션을 컨슈머로 재할당하는 과정
  • 주키퍼
    • 주키퍼는 카프카의 메타데이터를 관리하는 데 사용
    • 카프카 클러스터로 묶인 브로커들은 동일한 경로의 주키퍼 경로로 선언해야 같은 카프카 브로커 묶음이 된다

토픽과 파티션

  • 토픽은 카프카에서 데이터를 구분하기 위해 사용하는 단위이며, 저장되는 데이터를 레코드라고 한다
  • 파티션은 카프카의 병렬처리의 핵심으로 그룹으로 묶인 컨슈머들이 레코드를 병렬로 처리할 수 있도록 매칭
  • 파티션은 큐와 비슷한 구조지만, pop으로 레코드를 삭제하지 않고 파일시스템에 저장된다

레코드

  • 레코드는 타임스탬프, 메시지 키, 메시지 값, 오프셋, 헤더로 구성되어 있다
  • 타임스탬프는 프로듀서에서 해당 레코드가 생성된 시점으로 토픽 설정에 따라 브로커에 적재된 시간으로 설정될수도 있다
  • 메시지 키는 메시지 값을 순서대로 처리하거나 메시지 값의 종류를 나타내기 위해 사용
  • 메시지값은 실질적으로 처리할 데이터가 들어 있다
  • 헤더는 레코드의 추가적인 정보를 담는 메타데이터 저장소로 사용

프로듀서

  • 프로듀서는 카프카에 필요한 데이터를 선언하고 브로커의 특정 토픽의 파티션에 전송
  • 중요 개념

    image

    • 프로듀서는 카프카 브로커로 데이터를 전송할 때 내부적으로 Serializer, 파티셔너, 배치 생성 단계를 거친다
    • 전송하고자 하는 데이터는 ProducerRecord 클래스를 통해 인스턴스를 생성
      • 파티션 번호를 직접 지정하거나 타임스탬프를 설정, 메시지 키를 설정할 수 있다
    • KafkaProducer 인스턴스가 send() 호출 -> ProducerRecord는 파티셔너에서 토픽의 어느 파티션으로 전송될 것인지 정해진다
      • 파티션 별 Record를 모아두는 Record batch가 존재하고 Network Thread가 batch를 클러스터로 전송한다
        • batch.size: Record Batch 사이즈
          • Network thread가 Broker로 전송하는 속도보다 Accumulator에 쌓이는 속도가 더 빠르다면 record batch는 계속 추가되고 전체 record batch 크기가 buffer.memory 만큼 쌓이게 되면 KafkaProducer.send 함수는 블락이 되고 max.block.ms 시간이 넘어가면 exception이 발생
        • linger.ms: 0 이상으로 설정하게 되면 Network Thread가 Accumulator에서 바로 record batch를 가져가지 않고 기다린다. 대기 시간이 있으므로 record batch가 채워지지만 그만큼 처리 속도는 감소하게 된다.
      • 기본값인 DefaultPartitioner로 설정되어 파티션이 저장
      • 파티셔너에 의해 구분된 레코드는 데이터를 전송하기 전에 어큐뮬레이터(accumulator)에 데이터를 버퍼로 쌓아놓고 발송
      • 버퍼로 쌓인 데이터는 배치로 묶어 전송함으로써 처치량 향상
      • 프로듀서 API를 사용하면 UniformStickyPartioner, RoundRobinPartitioner를 제공
      • UniformStickyPartioner는 RoundRobinPartitioner 에 단점 개선
        • 단순하고 깔끔하지만 메시지 쌓이는 속도가 많지 않다면 record batch를 꽉꽉 채우지 못하고 보내진다.
      • UniformStickyPartioner 는 프로듀서 동작에 특화되어 높은 처리량과 낮은 리소스 사용률을 가지는 특성이 있다
        • record batch를 꽉 채워서 전송한다.
    • Partitioner 인터페이스
      • 인터페이스를 상속받아 메시지 키, 값에 따라 파티션을 지정할 수 있도록 적용할 수 있다.
      • 파티셔너를 통해 파티션이 지정된 데이터는 어큐뮬레이터에 버퍼로 쌓인다
    • 압축 옵션을 통해 브로커로 데이터를 전송할 수 있다
  • 기본 옵션
    • bootstrap.servers
      • 카프카 클러스터에 속한 브로커의 호스트이름:포트 를 1개 이상 작성
    • key.serializer
      • 레코드의 메시지 키를 직렬화하는 클래스 지정
    • value.serializer
      • 레코드의 메시지 값을 직렬화하는 클래스 지정
    • acks
      • 브로커에 레코드를 정상 저장되었는지 전송 성공 여부 확인
      • 0: 프로듀서가 전송한 즉시 브로커에 데이터 저장 여부와 상관없이 성공으로 판단
      • -1(all): 토픽의 miin.insync.replicas 개수에 해당하는 리더 파티션과 팔로워 파티션에 데이터가 저장되면 성공한 것으로 판단
      • 1(default): 리더 파티션에 데이터가 저장되면 성공 판단
    • buffer.memory
      • 브로커로 전송할 데이터를 배치로 모으기 위해 설정할 버퍼 메모리 양
    • retries
      • 브로커로부터 에러를 받고 난 뒤 재전송을 시도하는 횟수
    • batch.size
      • 배치로 전송할 레코드 최대 용량 지정
    • linger.ms
      • 배치를 전송하기 전까지 기다리는 최소 시간, 기본은 0
    • partitioner.class
      • 레코드를 파티션에 전송할 때 적용하는 파티셔너 클래스 지정
    • enable.idempotence
      • 멱등성 프로듀서 동작 여부 설정
    • transation.id
      • 레코드를 전송할 때 트랜잭션 단위로 묶을 지 여부 설정
      • 이 값을 설정하면 트랜잭션 프로듀서로 동작
  • 메시지 키를 가진 데이터
    • ProducerRecord를 생성할 때, 토픽 이름, 메시지 키, 값 을 넣고 생성
      ProducerRecord<String, String> record = new ProducerRecord<>("test", "Pangyo", "23");
      // TOMIC_NAME, messageKey, messageValue
      
  • 파티션 번호를 가진 데이터
    • ProducerRecord를 생성할 때, 토픽 이름, 파티션 번호, 메시지 키, 값을 넣고 생성
      ProducerRecord<String, String> record = new ProducerRecord<>("test", 0, "Pangyo", "23");
      // TOMIC_NAME, partitionNo, messageKey, messageValue
      
  • 커스텀 파티셔너를 갖는 프로듀서
    • 프로듀서 사용환경에 따라 특정 데이터를 가지는 레코드를 특정 파티션으로 보내야 하는 경우
    • Partitoner 인터페이스를 사용하여 사용자 정의 파티셔너를 생성하여 지정할 수 있다
      // Pangyo 라는 키가 들어오면 무조건 0번 파티션으로 보내도록 설정
      public class CustomPartitioner implements Partitioner {
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            if(keyBytes == null {
                throw new InvalidRecordException("Need message key");
            }
            if((String)key.equals("Pangyo")) {
                return 0;
            }
                
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            return Utils.toPosivice(Utils.murmur2(keyBytes)) % numberPartitions;
        }
            
        @Override
        public void configure(Map<String, ?> configs) {}
            
        @Override
        public void close() {}
      }
      
    • partitioner.class 설정해 주어야 한다
  • 브로커 정상 전송 여부 확인
    • Callback을 생성하여, producer.send(data, callback) 형식으로 전송한다.
      @Slf4j
      public class ProducerCallback implemments Callback {
        @Override
        public void onComplete(RecordMetadata recordMetadata, Exception e) {
            if(e != null) {
                logger.error(e.getMessage(), e);
            }else {
                logger.info(recordMetadata.toString());
            }
        }
      }
      
      KafkaProducer<String, String> producer = new KafkaProducer(configs);
      ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
      producer.send(record, new ProducerCallback());
      

컨슈머

  • 중요 개념
    • 컨슈머 운영 방법
      • 한개 이상의 컨슈머로 컨슈머 그룹을 운영
      • 특정 파티션만 구독하는 컨슈머 운영
    • 컨슈머 그룹을 운영할 때에는 파티션에 개수와 같거나 작아야 한다
    • 컨슈머 그룹은 다른 컨슈머 그룹과 격리되는 특징을 갖고 있다
      • 다양한 저장소에 저장하는 파이프라인을 구축할 때 각기 다른 저장소에 저장하는 컨슈머 그룹을 구성하면 격리되어 운영할 수 있다
    • 리밸런싱
      • 컨슈머 그룹으로 이루어진 컨슈머 중 일부 장애가 발생하면, 장애가 발생한 컨슈머에 할당된 파티션은 다른 컨슈머에 소유권을 넘긴다.
      • 컨슈머가 추가되거나 제외될 때 사용
    • 커밋
      • 컨슈머는 카프카 브로커로부터 데이터를 어디까지 가져갔는지 커밋을 통해 기록(토픽: __consumer_offsets)
      • 컨슈머는 그룹에 처음 조인할 때 __consumer_offsets 참고해서 값을 읽어온다. 이후에는 Consumer는 요청해야 할 Offset 상태값을 가지고 있고, 이 값을 바탕으로 Fetch API를 요청해서 받아온다.
      • auto commit
        • poll()을 호출할 때, auto.commit.interval.ms에 정해진 주기마다 Consumer가 자동으로 Commit을 수행함.
        • poll() 메서드가 수행될 때 일정 간격마다 오프셋을 커밋하도록 enable.auto.commit=true 로 설정
        • auto.commit.interval.ms 에 설정된 값을 통해 간격을 지정
        • 메세지 유실 가능
          • 자동 커밋 주기보다 길게 메세지를 처리하는 경우에 장애가 발생하면 커밋된 다음 메세지를 가져오기 때문에 해당 메세지는 유실된다.
        • Consumer의 장애 / 재기동 및 리밸런싱 후 브로커에서 이미 읽어온 메세지를 다시 읽어와서 중복 처리가 될 수 있다
          • 34~40번까지 읽어온 후, 다음 poll()에서 40번을 Commit 하려는 순간 Consumer 장애가 발생한다면 __consumer_offsets은 Commit 받지 않았기 때문에 여전히 34를 가리키기 때문에 다시 34~40을 읽음.
      • Manual Commit(동기, 비동기방식 존재)
        • 동기방식 : CommitSync()
          • CommitSync()를 할 경우, 해당 라인에서 Main Thread가 Blocking 됨.
          • Commit에 실패하면, 성공할 때까지 Commit을 요청함. 더 이상 Commit을 시도할 수 없는 경우 CommitFailedException이 발생함.
          • 이 때, NetworkClient를 통해서 메세지는 Batch에 적재되지만, commitSync()에 Block 되어서 다음 메세지를 가져오지 못한다.
        • 비동기방식 : CommitAsync()
          • 메세지 배치를 poll()을 통해 읽어오고 해당 메세지의 마지막 offset을 브로커에 commit 요청함.
          • 브로커에 Commit 적용이 성공적으로 되었음을 기다리지 않고 (Non Blocking) 계속 다음 메세지를 읽어옴.
          • 브로커에 Commit 적용이 실패해도 다시 Commit을 시도하지 않는다.
            • 200까지 읽고 201을 Commit 했으나 비동기로 실패함.
            • 이 때 240까지 읽고 241을 Commit하면 __consumer_offsets는 241이 됨. 즉, 이전에 읽어온 값이 Commit에 실패했어도 문제 없는 상황이 된다.
          • CallBack() 함수 사용가능하며 동기 방식 대비 더 빠른 수행시간
        • 동기와 비동기 방식은 일관성과 성능의 차이로 고민할 수 있다.
          • 비동기 방식 또한 데이터 유실 및 중복이 발생할 수 있다.
            ConsumerRecords<String, String> records = consumer.poll(INTERVAL_MS);
            // 1. 메세지 처리 -> 메세지 처리 후 commit을 하게 되면 데이터 유실 가능
            consumer.commitAsync(callback);
            // 2. 메세지 처리 -> 메세지 처리하기 전 commit을 하게 되면 데이터 중복 가능
            
      • KafkaConsumer의 읽기 Offset Commit / 중복 읽기 및 누락 상황 이해
        • 카프카는 분산 처리 시스템이고 Consumer / Producer / Broker가 Loose하게 연결되어 있기 때문에 장애가 발생하면 중복 읽기 및 누락 상황이 발생할 수 있다.
        • 장애가 발생하지 않도록 설계하고 구조를 최대한 심플하게 가져가서 데이터 정합성을 맞추는 것이 중요
    • 컨슈머 내부 구조

      image

      • Fetcher
        • poll() 메서드를 이용해서 브로커로부터 메세지를 받아올 때 Fetcher와 ConsumerNetworkClient 객체가 상호 협력하며 값을 불러온다.
        • Fetcher에서 fetch를 해올 때 ConsumerNetworkClient에 send() 메서드를 호출하는 데, 이 때 해당 메소드는 KafkaClient를 통해 요청을 받아와 Future객체를 리턴한다. 응답을 받은 Fetch는 성공 콜백으로 응답에 대하여ConcurrentLinkedQueue 값 저장
      • ConsumerNetworkClient
        • Kafka Consumer의 모든 네트워크 통신을 담당하며 요청은 비동기로 동작한다. 결과에 대해서 RequestFuture 클래스로 확인
      • HeartBeatThread
        • HeartBeat Thread는 백그라운드로 동작하며 끊임없이 Broker에게 Consumer의 상태를 알려주며 첫번째 poll()을 수행할 때 생성
      • ConsumerCoordinator
        • Consumer Group의 상태를 관리하기 위해 사용되며 상태값으로 리더인지를 확인할 수 있고 리밸런싱에 대한 설정값을 가지고 있다.
          • 리밸런싱에서는 JoinGroupResponseHandler, SyncGroupResponseHandler 사용
        • 해당 내부에 HeartBeat Thread가 존재하며 Heartbeat Thread는 주기적으로 heartbeat을 GroupCoordinator에게 전송
      • SubscriptionState
        • 현재 Consumer가 구독하고 있는 토픽의 상태를 전반적으로 알려주는 객체로 토픽, 파티션, 오프셋 정보 관리를 담당한다.
        • assign 메소드
          • KafkaConsumer에 토픽, 파티션 할당은 assign 메서드를 통해 이루어진다.
          • 컨슈머의 그룹 관리 기능을 사용하지 않고 사용자가 assign 메서드를 직접 호출하여 수동으로 토픽, 파티션을 할당할 수 있는데 이 경우에는 컨슈머 리밸런스가 일어나지 않는다.
        • subscribe 메소드
          • 구독 요청은 KafkaConsumer의 subscribe 메서드를 통해 한다.
          • 사용자가 구독을 요청한 토픽 정보는 SubscriptionState의 subscription에 저장된다.
            • subscription에 저장된 토픽 정보는 컨슈머 리밸런스 과정에서 사용된다.
            • 그룹 관리 기능을 사용한 경우에는 컨슈머 리밸런스 과정에서 코디네이터에 의해 토픽, 파티션이 할당된다.
        • seek 메소드
          • assign 메서드를 통해 할당된 파티션은 초기 오프셋 값 설정이 필요하다. 초기 오프셋 값이 없으면 Fetch가 불가능한 파티션으로 분류된다.
          • seek 메서드를 통해 초기 오프셋 값을 설정한다. 초기 오프셋 설정은 오프셋 초기화 과정을 통해 이루어진다.
  • 컨슈머 주요 옵션
    • boostrap.servers
      • 카프카 클러스터에 속한 브로커의 호스트 이름:포트를 1개 이상 작성
    • key.deserializer
      • 레코드의 메시지 키를 역직렬화하는 클래스 지정
    • value.deserializer
      • 레코드의 메시지 값을 역직렬화하는 클래스 지정
    • group.id
      • 컨슈머 그룹 아이디 지정
      • subscribe() 메서드로 토픽을 구독하여 사용할 때 해당 옵션을 필수로 넣어야 한다
    • auto.offset.reset
      • 컨슈머 그룹이 특정 파티션을 읽을 때 지정된 컨슈머 오프셋이 없는 경우 어느 오프셋부터 읽을 지 선택하는 옵션
      • latest: 가장 높은 오프셋부터 읽기 시작
      • earilest: 가장 낮은 오프셋부터 읽기 시작
      • none: 컨슈머 그룹이 커밋한 기록이 있는지 찾아본다. 없으면 오류, 있으면 기록 이후 오프셋부터 읽기 시작
    • enable.auto.commit
      • 자동, 수동 커밋 여부 선택
    • auto.commit.interval.ms
      • 자동 커밋 간격 지정
    • max.poll.records
      • poll() 메서드를 통해 반환되는 레코드 개수 지정
    • session.timeout.ms
      • 컨슈머가 브로커와 연결이 끊기는 최대 시간
      • 이 시간내에 heartbeat를 전송하지 않으면 브로커는 컨슈머에 이슈가 발생했다고 가정하고 리밸런싱 진행
    • heartbeat.interval.ms
      • 하트비트 전송 시간 간격
    • max.poll.interval.ms
      • poll() 메서드를 호출하는 간격의 최대 시간 지정
    • isolation.level
      • 트랜잭션 프로듀서가 레코드를 트랜잭션 단위로 보낼 경우 사용
      • read_commited: 커밋이 완료된 레코드만 읽는다
      • read_uncommited: 커밋 여부와 관계없이 파티션에 있는 모든 레코드를 읽는다(default)
  • Polling method
    • 일반적인 Messaging Queue들은 Queue에서 Record를 Push 하는 방식을 사용합니다.
      • Push 방식의 단점은 Queue가 Consumer의 처리 성능을 미리 염두해둬야 합니다
    • Kafka는 Consumer가 Broker로부터 Record를 요청하는 Polling 구조
    • KafkaConsumer.poll 호출 시
      • 먼저 Fetcher의 fetchedRecords 메서드가 호출된다.
        • fetchedRecords 메서드는 내부 캐시인 nextInLineRecords와 completedFetches를 확인
        • 브로커로부터 이미 가져온 데이터가 있는 경우에는 max.poll.records 설정 값만큼 레코드를 반환한다. max.poll.records의 기본값은 500이다.
      • 브로커에서 가져온 데이터가 없는 경우에는 KafkaConsumer는 Fetcher의 sendFetches 메서드를 호출한다.
        • Fetcher의 sendFetches 메서드는 Fetch API 요청을 파티션 리더가 위치한 각 브로커에게 보낸다.
        • KafkaConsumer는 Fetcher가 브로커로부터 응답을 받을 때까지 대기한다.
  • 오프셋 커밋
    • 동기 오프셋 커밋
      • commitSync() 메서드를 호출하여 오프셋 커밋을 명시적으로 수행할 수 있으며 poll() 메서드로 받은 가장 마지막 레코드의 오프셋을 기준으로 커밋한다.
      • 동기 커밋의 경우 브로커로 커밋을 요청한 이후 커밋이 완료되기까지 기다리며 완료되었음을 받을 때까지 대기한다.
      • commitSync는 파라미터로 Map<TopicPartition, OffsetAndMetadata>가 필요하다
    • 비동기 오프셋 커밋
      • 동기 오프셋 커밋에 경우 응답을 기다리는 동안 처리가 일시적으로 중단되기 때문에 더 빠르게 처리하기 위해서는 비동기 오프셋 커밋을 사용할 수 있다.
      • commitAsync() 메서드를 호출하면 poll() 메서드로 리턴된 가장 마지막 레코드를 기준으로 오프셋을 커밋한다.
    • commitSync() 동작 방식
      • KafkaConsumer.commitSync를 호출하면 ConsumerCoordinator.commitOffsetSync(offset) 가 호출된다
      • ConmerCoordinator는 응답 핸들러(OffsetCommitResponseHandler) 를 생성한후, GroupCoordinator에 요청 및 응답을 받는다
  • 리밸런스 리스너를 가진 컨슈머
    • poll() 메서드를 통해 반환된 데이터를 모두 처리하기 전에 리밸런스가 발생하면 데이터를 중복 처리할 수 있다
    • 리밸런스 발생 시 데이터를 중복 처리하지 않게 하기 위해서는 리밸런스 발생 시 처리한 데이터를 기준으로 커밋 시도
    • 리밸런스 발생을 감지하기 위해 카프카 라이브러리는 ConsumerRebalanceListener 인터페이스 지원
    • onPartitionAssigned(), onPartitionRevoked() 메서드로 이루어진다
      • onPartitionAssigned(): 리밸런스가 끝난 뒤 파티션이 할당 완료되면 호출
      • onPartitionRevoked(): 리밸런스가 시작되기 직전에 호출 ```java public class RebalanceListener implements ConsumerRebalanceListener { public void onPartitionsAssigned(Collection partitions){

        }

        public void onPartitionRevoked(Collection partitions { consumer.commitSync(currentOffsets); } }

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs); consumer.subscribe(Arrays.asList(TOPIC_NAME), new RebalanceListener()); ```

  • Static Group Membership
    • Consumer는 Consumer에 변동이 있거나, 토픽이 새로 생기는 경우 Leader Consumer에 의해 리밸런싱이 일어나는 데 이때 컨슈머는 메세지를 가져오지 않기 때문에 리밸런싱을 너무 자주하는 것은 좋지 않다.
    • 리밸런싱을 적게 하는 방법 중 하나로 Static Group MemberShip을 사용할 수 있다
    • 개념
      • Consumer Group내의 Consumer들에게 고정된 Id를 부여한다.
      • Consumer가 shotdown 된 후, session.timeout.ms(Max Wait Heartbeat) 이내에 재기동한다면 파티션은 리밸런싱 되지 않고 기존 파티션을 그대로 사용한다.
      • Consumer가 session.timeout.ms 이내에 재기동하지 못하면 리밸런싱이 발생한다.
    • 컨슈머 컨피그 내에 gropu.instance.id 설정으로 지정할 수 있다.
  • 리밸런싱 Eager/Cooperative 모드
    • Eager(default) : 전체 Stop -> 리밸런스 -> 수행
      • 리밸런싱이 일어나면 모든 파티션의 분배를 취소하고, 다시 파티션을 재할당한다. 따라서 리밸런싱은 딱 한번만 일어난다.
      • 모든 파티션 분배가 취소 되기 때문에 메세지 Lag가 발생함.
      • 파티션 분배전략은 다음이 존재한다.
        • Range(Default, 토픽별로 동일한 파티션을 특정 Consumer에게 할당) / RoundRobin(파티션 별로 Consumer들이 균등하게 부하를 분배) / Sticky(최초에 할당된 파티션과 Consumer의 매핑을 리밸런스가 수행되어도 가급적 유지)
    • Cooperative : (일부 stop -> 일부 리밸런스) 반복
      • 리밸런스 수행 시, 리밸런스 대상이 되는 Consumer들만 선택해서 점진적으로 리밸런싱을 수행
      • 점진적으로 리밸런싱 한다는 것은 작은 리밸런싱을 여러 번 반복
      • 이 때, 리밸런스 대상이 아닌 기존 Consumer는 메세지를 계속 Consume 한다.
  • 파티션 할당 컨슈머
    • 컨슈머를 운영할 때 subscribe() 메서드를 사용하는 것 외에도 직접 파티션을 컨슈머에 명시적으로 할당하여 운영할 수 있다
    • 컨슈머가 어떤 토픽, 파티션을 할당할 지 명시적으로 선언할 때에는 assign() 메서드 사용
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
      consumer.assign(Collections.singleton(new TopicPartition(TOPIC_NAME, PARTITION_NUMBER);
      
  • 컨슈머 할당된 파티션 확인 방법
    • 컨슈머에 할당된 토픽과 파티션 정보는 assignments() 메서드를 통해 확인할 수 있다
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
      consumer.subscribe(Arrays.asList(TOPIC_NAME));
      Set<TopciPartition> assignedTopicPartition = consumer.assignment();
      
  • 컨슈머의 안전한 종료
    • 정상적으로 종료하지 않은 컨슈머는 세션 타임아웃이 발생할 때까지 컨슈머 그룹에 남게 된다.
    • 실제로 종료되었지만, 더는 동작하지 않는 컨슈머가 존재하는 경우, 컨슈머 랙이 늘어나게 된다.
    • 컨슈머 랙이 늘어나면 데이터 처리 지연이 발생하게 된다
    • 컨슈머를 안전하게 종료하기 위해 KafkaConsumer는 wakeup() 메서드를 제공한다.
      • wakeup() 메서드를 실행하여 KafkaConsumer 인스턴스를 안전하게 종료할 수 있다.
      • wakeup() 메서드 호출 후, poll() 메서드가 호출되면 WakeupException 예외 발생
      • WakeupException 을 받은 뒤에 close() 메서드를 호출하여 클러스터에 컨슈머가 안전하게 종료되었음을 알려주어야 한다
        try {
        while(true) {
            ConsumerRecord<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for(ConsumerRrcord record: records) {
                logger.info("{}, record);
            }
        }
        } catch(WakeupException e) {
        log.error("WakeupException");
        } finally {
        consumer.close();
        }
        
    • wakeup() 메서드는 shutdown hook을 구현하여 안전한 종료를 명시적으로 구현할 수 있다.
    • 셧다운 훅이란 사용자 또는 OS로부터 종료 요청을 받으면 실행하는 스레드이다
      public static void main(String[] args) {
        Runtime.getRuntime().addShutdownHook(new ShutdownThread());
        // ...
      }
      static class ShutdownThread extends Thread {
        public void run() { consumer.wakeup(); }
      }
      

출처

  • 책: 아파치 카프카 애플리케이션 프로그래밍 with 자바