토픽과 파티션
- 적정 파티션 개수
- 데이터 처리량
- 데이터 처리를 올리기 위해서는 컨슈머의 처리량을 늘리거나 컨슈머를 추가하여 병렬처리량을 늘리는 것
- 파티션 개수만큼 컨슈머 스레드를 운영한다면 해당 토픽의 병렬처리를 극대화할 수 있다
- 만약 전체 컨슈머 데이터 처리량이 프로듀서가 보내는 데이터보다 적다면 컨슈머 랙이 생기고, 데이터 처리 지연 발생
- 프로듀서 전송 데이터량 < 컨슈머 데이터 처리량 * 파티션 개수(만약 초당 프로듀서가 10000개를 발행하고, 컨슈머가 초당 10000개를 처리한다면 파티션은 최소 10개가 필요)
- 메시지 키를 사용
- 메시지 키 사용 여부와 관련해서는 메시지 키를 사용함과 동시에 데이터 처리 순서를 지켜야하는 경우 고려
- 프로듀서가 토픽으로 데이터를 보낼 때 메시지 키를 해시 변환하여 메시지 키를 파티션에 매칭하기 때문에 파티션 개수가 달라지면 매칭된 파티션과 메시지 키 매칭이 깨지고 다른 파티션에 할당된다
- 메시지 키를 사용하여 순서 보장되어야 한다면 최대한 파티션 변화는 일어나지 않도록 해야 하며 변화가 필요한 경우 커스텀 파티셔너를 개발하여 적용해야 한다
- 브로커, 컨슈머 영향도
- 카프카에서 파티션은 각 브로커와 파일 시스템을 사용하기 때문에 파티션이 늘어나는 만큼 브러커에서 접근하는 파일 개수가 많아진다
- 운영체제에서는 프로세스 당 열 수 있는 파일 최대 개수를 제한하고 있기 때문에 안정적으로 유지할 수 있는 브로커 당 파티션 개수를 모니터링 해야 한다
- 데이터 처리량
- 토픽 정리 정책 (cleanup.policy)
- 토픽의 데이터는 시간 또는 용량에 따라 삭제 규칙을 적용할 수 있다
- 클라우드를 활용하는 경우 저장소 용량이 늘어남에 따라 비용도 함께 늘어나기 때문에 삭제 정책을 지정할 필요가 있다
- cleanup.policy: delete (삭제)
- segment.bytes 옵션
- 세그먼트는 토픽의 데이터를 저장하는 명시적인 파일 시스템 단위
- 1개의 세그먼트 크기를 지정
- retention.ms 옵션
- 토픽 데이터의 유지하는 기간을 밀리초 단위로 지정할 수 있다
- retention.bytes 옵션
- 지정한 크기를 넘어간 세그먼트 파일 삭제
- segment.bytes 옵션
- cleanup.policy: compact (압축)
- zip, tar 압축과는 다른 개념
- 여기에서 압축이란 메시지 키별로 해당 메시지 키의 레코드 중 오래된 데이터를 삭제하는 정책을 뜻한다
- 1개 파티션에서 오프셋의 증가가 일정하지 않ㅇ르 수 있다
- KTable과 같이 메시지 키를 기반으로 데이터를 처리할 경우 유용
- 데이터의 흐름이 아닌 가장 마지막으로 업데이트 된 메시지 키의 데이터가 중요할 경우 가장 최신의 데이터를 제외한 나머지 데이터를 삭제할 수 있다
- min.cleanable.dirty.ratio 옵션
- 데이터의 압축 시작 시점은 min.cleanable.dirty.ratio 옵션값을 따른다
- 세그먼트에 남아 있는 데이터 테일 여역의 레코드 개수와 헤드 영역의 레코드 개수의 비율을 뜻한다
- 테일 영역의 레코드들은 클린 로그라고 부르고 압축이 완료됐기 때문에 테일 영역에는 중복된 메시지 키가 없다
- 헤드 영역의 레코드들은 더티 로그라고 부르고 압축이 되기 전 레코드들이 있으므로 중복된 메시지 키를 가진 레코드들이 있다
- 더티 비율: 더티 영역의 메시지 개수를 압축 대상 세그먼트에 남아있는 데이터의 총 레코드 수로 나눈 비율을 뜻한다
- 더티 레코드 개수 / (클린 레코드 개수 + 더티 레코드 개수)
- 더티 비율을 너무 작게 설정하면 압축이 더 자주 일어나기 때문에 계속해서 메시지 키의 최신 데이터만 유지할 수 있다
- ISR (In-Sync-Replicas)
- ISR은 리더 파티션과 팔로워 파티션이 모두 싱크가 된 상태(동기화 완료)를 뜻한다
- 팔로워 파티션이 리더 파티션으로부터 데이터를 복제하는 데 시간이 걸린다
- replica.lag.time.max.ms 옵션
- 설정한 옵션값만큼 데이터를 복제하는 지 확인
- 더 긴 시간동안 데이터를 가져가지 않는다면 해당 팔로워 파티션에 문제가 생긴것으로 간주하여 ISR 그룹에서 제외
- unclean.leader.election.enable 옵션
- ISR 에서 제외된 파티션은 리더 파티션으로 선출될 수 없는 데, 해당 옵션을 true로 하는 경우 동기화가 되지 않은 팔로우 파티션도 리더로 선출될 수 있다
- false에 경우 리더 파티션을 선출할 수 없는 경우 리더 파티션이 존재하는 브로커가 다시 시작할 때까지 기다리며, 그 동안 서비스가 중단된다
- 가용성(서비스 중단)이 중요하다면 true로 설정, 데이터 유실이 발생하지 않는것이 더 중요하다면 false로 설정
- 토픽 옵션 설정
bin/kafka-topic.sh --bootstrap-server my-kafka:9092 \ --create --topic my-topic \ -- config unclean.leader.election.enable=false
카프카 프로듀서
- acks 옵션
- acks 옵션으로 0, 1, all(또는 -1) 값을 가질 수 있다
- acks = 0
- 프로듀서가 리더 파티션으로 데이터를 전송했을 때 리더 파티션으로 데이터가 저장되었는 지 확인하지 않는다
- 리더 파티션은 데이터를 저장한 후에 데이터가 몇번째 오프셋에 저장했는지 리턴하는 데, acks가 0인 경우 응답 값을 받지 않는다
- retries 옵션 설정과 관련해서는 데이터 전송을 알 수 없기 때문에 무의미하다
- 데이터 유실보다 전송 속도가 중요한 경우 acks=0을 사용한다
- acks = 1
- 프로듀서는 보낸 데이터가 리더 파티션에만 정상 적재되었는지 확인
- 리더 파티션에 정상 적재될 때까지 재시도할 수 있다
- 데이터 유실이 발생할 수 있다.
- 복제 개수를 2 이상으로 운영할 경우 리더 파티션에 적재가 완료되어 있어도 팔로워 파티션에는 아직 데이터가 동기화되지 못한 일부 데이터가 유실 될 수 있기 때문이다
- acks = all or -1
- 보낸 데이터가 리더 파티션과 팔로워 파티션에 모두 정상적으로 적재되었는 지 확인
- 리더 파티션 뿐만 아니라 팔로워 파티션까지 적재되었는 지 확인하기 때문에 0 또는 1 옵션보다 속도가 느리다
- 일부 브로커에 장애가 발생하더라도 프로듀서는 안전하게 데이터를 전송하고 저장할 수 있음을 보장
- min.insync.replicas 옵션 값에 따라 데이터의 안정성이 달라진다
- ISR의 2개 이상의 파티션에 데이터가 정상 적재되었음을 확인
- 운영하는 카프카 브로커 개수 보다 작게 설정해야 한다 (배포 시 롤링 다운 타임 발생 가능)
- NotEnoughReplicasException or NotEnoughReplicasAfterAppendException 이 발생하여 토픽으로 데이터를 전송할 수 없게 된다
- 메세지 보증 전략
- at-most-once : 실패나 타임아웃 등이 발생하면 메시지를 버릴 수 있다. 데이터가 일부 누락되더라도 영향이 없는 경우엔 대량처리 및 짧은 주기의 전송 서비스에 유용할 수 있다.
- 프로듀서는 ACK를 기다리지 않고 다음 메세지 배치를 보내기 때문에 메세지 전송이 누락될 수 있음. ack(0) 으로 설정
- exactly-once : 메시지가 정확하게 한 번만 전달되는 것을 보장한다. 손실이나 중복 없이, 순서대로 메시지를 전송하는 것은 구현 난이도가 높고 비용이 많이 든다.
- 카프카 0.11.0 버전 이후부터 제공하며 enable.idempotence 옵션을 true로 설정하여 만들 수 있다.
- 멱등성 + 트랜잭션을 설정하여 모든 데이터에 대한 원자성 만족
- at-least-once : 메시지가 최소 1번 이상 전달되는 것을 보장한다. 실패나 타임아웃 등이 발생하면 메시지를 다시 전송하며, 이 경우엔 동일한 메시지가 중복으로 처리될 수 있다.
- Producer가 메시지를 전송하고 나서 Broker로부터 ack를 받는 수준(1, all)을 조절함으로써 at-least-once를 보장할 수 있다.
- at-most-once : 실패나 타임아웃 등이 발생하면 메시지를 버릴 수 있다. 데이터가 일부 누락되더라도 영향이 없는 경우엔 대량처리 및 짧은 주기의 전송 서비스에 유용할 수 있다.
- 멱등성 프로듀서
- 프로듀서는 PID와 시퀀스 넘버를 함께 전달하여 브로커는 여러번 요청이 오더라도 단 한번만 적재
- 여기서 PID(Producer unique ID)는 동일한 세션에서만 정확히 한번 전달이 보장된다.
- 만약 프로듀서가 이슈가 발생하여 종료되고 재시작하면 PID는 달라지며 동일한 데이터를 보내더라도 PID가 다르기 때문에 다른 데이터라고 판단한다.
- enable.idempotence= true를 설정하면 다른 값들도 강제로 설정된다.
- retries: Integer.MAX_VALUE, acks: -1(all)
- OutOfOrderSequenceException
- 프로듀서에서 보낸 데이터에 시퀀스번호와 브로커가 기대한 시퀀스번호가 다른 경우 해당 예외 발생
- 시퀀스 번호의 역전현상이 발생할 수 있기 때문에 순서가 중요한 데이터를 전송하는 프로듀서라면 대응 방안을 마련해야 한다.
- 트랜잭선 프로듀서
- 다수의 파티션에 데이터를 저장할 경우 모든 데이터에 대해 동일한 원자성을 만족시키기 위해 사용
- 프로듀서: enable.idempotence true, transaction.id 임의의 String 정의
- 컨슈머는 isolation.level read_commited 설정
- 원자성을 만족시키는 것은 다수의 데이터를 동일 트랜잭션으로 묶음으로 전체 데이터를 처리하거나 전체 데이터를 처리하지 않도록 하는 것을 의미
- 트랜잭션은 파티션의 레코드로 구분한다.
- 프로듀서는 트랜잭션의 시작과 끝을 표현하기 위한 COMMIT 레코드(트랜잭션 레코드)를 파티션별로 하나 더 보낸다.
- 컨슈머는 데이터를 파티션에 가져올 때 COMMIT 레코드(트랜잭션 레코드)가 존재하지 않으면 트랜잭션이 완료되지 않았다고 판단하여 데이터를 가져오지 않는다.
카프카 컨슈머
- 다수의 파티션에 데이터를 저장할 경우 모든 데이터에 대해 동일한 원자성을 만족시키기 위해 사용
- 멀티 스레드 컨슈머
- 파티션 개수가 N개면 컨슈머 그룹의 컨슈머 또한 최대 N개를 운영할 수 있고, 한개의 컨슈머에서 N개의 프로세스를 사용할 수 있다.
- 만약 멀티 스레드로 동작하는 컨슈머 스레드를 개발 적용하기 위해서는 고려할 사항이 많다
- 하나의 컨슈머 스레드에서 예외적인 상황이 발생할 경우 프로세스 자체가 종료될 수 있고 이는 다른 컨슈머 스레드에 영향을 미칠 수 있다
- 컨슈머 스레드가 비정상 종료되는 경우 데이터 처리에서 중복 또는 유실이 발생할 수 있다
- 멀티 스레드 활용 방식
- 컨슈머 스레드 1개만 실행하고 데이터 처리를 담당하는 워커 스레드를 여러개 실행하는 방법
- 컨슈머 인스턴스에서 poll() 메서드를 호출하는 스레드를 여러 개 띄워서 사용하는 컨슈머 멀티 스레드 전략
- 카프카 컨슈머 멀티 워커 스레드 전략
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs); consumer.subscribe(Arrays.asList(TOPIC_NAME)); ExecutorService executorService = Executors.newCachedThreadPool(); while(true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10)); for(ConsumerRecord<String, String> record: records) { ConsumerWorker worker = new ConsumerWorker(record.value()); executorService.execute(worker); } }
public class ConsumerWorker implements Runnbale { private String recordValue; ConsumerWorker(String recordValue) { this.recordValue = recordValue; } @Override public void run() { logger.info("thread: {}, trecord:{}", Thread.currentThread.getName(), recordValue); } }
- CachedThreadPool을 활용하여 작업 이후 스레드가 종료되도록 설정
- ConsumerWroker는 Runnable 인터페이스로 구현하여 run() 메서드가 실행되도록 했다
- poll() 메서드를 통해 리턴받은 레코드들을 처리하는 스레드를 레코드마다 개별 실행 - poll()을 통해 데이터를 병렬처리함으로써 속도 이점이 있다
- 주의 사항으로는 스레드를 사용함으로 써 데이터 처리가 끝나지 않았음에도 커밋하기 때문에 리밸런싱, 컨슈머 장애 시에 데이터 유실 발생 가능
- 주의 사항 두번째로는 레코드 처리 역전현상이다. 레코드의 순서가 중요한 경우, 역전이 발생할 수 있다
- 카프카 컨슈머 멀티 스레드 전략
public class ConsumerWorker implements Runnbale { private Properties props; private String topic; private String threadName; private KafkaConsumer<String, String> consumer; ConsumerWorker(Properties props, String topic, int number) { this.props = props; this.topic = topic; this.threadName = "Thread-consumer-" + number; } @Override public void run() { consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); while(true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for(ConsumerRecord<String, String> record: records) { logger.info("{}", record); } } } }
ExecutorService executorService = Executors.newCachedThreadPool(); for(int i = 0; i < CONSUMER_COUNT; i++) { ConsumerWorker worker = new ConsumerWorker(configs, TOPIC_NAME, i); executorService.execute(worker); }
- 컨슈머 스레드를 늘려서 운영하여 각 스레드에 각 파티션을 할당하고 파티션의 레코드들을 병렬처리할 수 있다
- 컨슈머 랙
- 토픽의 최신 오프셋과 컨슈머 오프셋 간의 차이로 컨슈머의 데이터 처리량이 적을 경우 컨슈머 랙은 늘어나며, 최소값 0은 지연이 없다는 뜻이다
- 컨슈머 랙을 모니터링함으로써 컨슈머의 장애를 확인할 수 있고 파티션의 개수를 정하는 데에 참고할 수 있다
- 컨슈머 랙을 확인하는 방법
- 방법 1. 카프카 명령어를 사용하여 컨슈머 랙 조회
$ bin/kafka-consumer-group.sh --bootstrap-server my-kafka:9092 \ --group my-group --describe TOPIC PARTITION CURRENT_OFFSET, LOG_END_OFFSET LAG CONSUMER_ID HOST CLIENT_ID
- 카프카 명령어를 통한 모니터링은 일회성에 그치고 지표를 지속적으로 기록하고 모니터링하기에는 부족하다
- 방법 2. 컨슈머 metrics() 메서드를 사용하여 컨슈머 랙 조회
for(Map.Entry<MetricName, ? extends Metric> entry: kafkaConsumer.metrics().entrySet()) { if("records-lag-max".equals(entry.getKey().getName()) || "records-lag".equals(entry.getKey().getName()) || "records-lag-avg".equals(entry.getKey().getName())) { Metric metric = entry.getValue(); logger.info("{}:{}", entry.getKey().getName(), metric.meticValue()); } }
- 단점 1. 컨슈머가 정상 동작할 경우에만 확인할 수 있다. 비정상 종료되는 경우 컨슈머 랙을 모니터링 할 수 없다
- 단점 2. 모든 컨슈머 애플리케이션에 컨슈머 랙 모니터링 코드를 중복해서 작성해야 한다
- 단점 3. 컨슈머 랙을 모니터링하는 코드를 추가할 수 없는 서드파티 애플리케이션에서는 불가능하다
- 방법 3. 외부 모니터링을 활용하여 컨슈머 랙 조회
- 데이터 독, 컨플루언트 컨트롤 센터와 같은 카프카 클러스터 종합 모니터링 툴을 이용하면 다양한 지표를 모니터링 할 수 있다
- 카프카 버러우
- 컨슈머 배포 프로세스
- 중단 배포
- 컨슈머 애플리케이션을 완전히 종료한 이후에 개선된 코드를 가진 애플리케이션을 배포하는 방식
- 컨슈머 애플리케이션이 완전히 종료된 이후에 신규 애플리케이션이 배포하기 때문에 그 동안 컨슈머 랙이 늘어난다
- 배포 전 후, 로직에 변경을 나눌 수 있다는 점이 장점이다
- 무중단 배포
- 무중단 배포는 인스턴스의 발급과 반환이 다소 유영ㄴ한 가상 서버를 사용하는 경우 유용
- 블루/그린
- 이전 버전 애플리케이션과 신규 버전 애플리케이션을 동시에 띄워놓고 트래픽을 전환하는 방법
- 파티션 개수와 컨슈머 개수를 동일하게 실행하는 애플리케이션을 운영할 때 유용하며 신규 버전 애플리케이션의 컨슈머들은 파티션 할당을 받지 못하고 유휴 상태로 기다릴 수 있기 때문이다
- 리밸런스가 한번만 발생하기 떄문에 많은 수의 파티션을 운영하는 경우에 짧은 리밸런스 시간으로 배포를 수행할 수 있다
- 롤링
- 인스턴스 할당과 반환으로 인한 리소스 낭비를 줄이면서 무중단 배포를 할 수 있다
- 롤링 업그레이드를 진행할 때 리밸런싱이 발생하기 때문에 파티션 개수가 많지 않은 경우 효과적이다
- 카나리 배포
- 100개의 파티션으로 운영하는 토픽이 있을 경우 1개 파티션에 컨슈머를 신규 버전의 애플리케이션으로 처리하는 방식으로 테스트 한 후, 나머지 파티션에 롤링, 블루/그린 방식으로 배포하여 무중단 배포가 가능하다
- 많은 데이터 중 일부분 신규 버전의 애플리케이션에 먼저 배포함으로써 이슈가 없는 지 사전에 탐지할 수 있다
- 중단 배포
- 리밸런싱
- 카프카 컨슈머 그룹 리밸런싱 발생 시점
- 컨슈머의 생성/삭제(배포할 때)
- 시간안에 Poll 요청 실패
- 컨슈머는 max.poll.records 설정의 개수만큼 메세지를 처리한 뒤 Poll 요청을 보내게 되는데, “max.poll.interval.ms” 설정 시간을 넘기게 되면 컨슈머에 문제가 있다고 판단하여 리밸런싱 진행
- 컨슈머 문제 발생(Heartbeat)
- GroupCoordinator
- 컨슈머 그룹을 관리하는 역할을 가진 브로커로 컨슈머 그룹 멤버에 변화가 발생했을 때 리밸런싱 과정에서 컨슈머 클라이언트와 그룹코디네이터 간의 요청/응답 메세지 전달이 이루어진다.
- 리밸런싱 과정
- Broker의 GroupCoordinator가 컨슈머 그룹내의 모든 컨슈머들의 파티션 소유권을 박탈한 후, 컨슈머들의 JoinGroup 요청을 일정시간 기다린다.
- 컨슈머(ConsumerGoordinator)는 FindCoordinator API를 사용하여 JoinGroup 요청을 보낼 GroupCoordinator를 찾는다.
- 제일 먼저 JoinGroup을 요청한 컨슈머를 리더로 지정, 리더에게 파티션 정보와 컨슈머 목록 전달
- JoinGroup 요청과 응답
- 리더는 전달받은 정보를 바탕으로 파티션 소유권을 재조정하고, 이를 그룹 코디네이터에게 전달하면 그룹 코디네이터는 재조정된 파티션 소유권을 각 컨슈머에게 알리고 리밸런싱 종료
- SyncGroup 요청과 응답
- 리밸런싱 발생 시 리스크 - 메세지 중복 처리
- 메세지를 poll 받아 처리하고 있는 도중에 처리가 늦어져 max.poll.interval.ms 시간이 지나게되어 리밸런싱이 된다면, 현재 consumer의 처리 offset이 커밋되지 않는다.
- 리밸런싱된 다른 컨슈머는 이전 컨슈머가 commit하지 않았기 때문에 중복된 데이터를 가져와 처리된다.
- 메세지 중복 처리 해결방법
- max.poll.records 조정
- 리밸런싱 시간 단축, Poll 지연에 의한 리밸런싱 발생 가능성 감소, 메세지 중복 컨슈밍 가능성 감소와 같은 장점이 있다.
- 하지만 max.poll.records 수가 적으면 많은 IO가 발생하기 때문에 성능에 영향이 있을 수 있기 때문에 적절한 수로 설정해야 한다.
- 수동 커밋 사용하기
- 중복 컨슈밍 방어 로직 작성
- max.poll.records 조정
- 파티션 할당 전략(partition.assignment.strategy)
- RangeAssigner, RoundRobinAssigner, StickyAssigner
- 읽기 작업을 중단하고 파티션 멤버십을 포기하기 떄문에 Stop The World(중단) 발생하며 모든 컨슈머가 다시 그룹에 합류해서 새로운 파티션을 무작위로 할당받는다.
- CoopertativeStickyAssigner
- 모든 파티션을 모든 컨슈머에게 재할당하는 게 아닌 파티션을 작은 그룹으로 나눠서 일부만 한 컨슈머에서 다른 컨슈머로 재할당하여 중단없이 리밸런싱이 일어난다.
- RangeAssigner, RoundRobinAssigner, StickyAssigner
- Static Group Membership
- 그룹 인스턴스 ID를 컨슈머 구성 값의 일부로 특정하게 되면 컨슈머는 정적 멤버가 된다.
- 이렇게 하면 정해진 시간동안 다른 컨슈머로 재할당되지 않고, 정적 멤버가 다시 합류하게 되면 해당 멤버에게 할단된다.
- 만약 시간안에 돌아오지 못하면 리밸런싱이 발생되어 파티션이 다른 컨슈머로 이동하게 된다.
- 카프카 컨슈머 그룹 리밸런싱 발생 시점
출처
- 아파치 카프카 애플리케이션 프로그래밍 with 자바 책 발췌