MessageListener

  • Message Listener Container를 사용할 때 데이터를 수신하기 위해 리스너를 사용해야 하는데, 8개의 Message Listener interface를 제공하고 있다.

    Listener Commit ConsumerRecord Consumer Access ETC
    MessageListener 자동 커밋 또는 container managed commit 개별 X -
    AcknowledgingMessageLsitener container managed commit 개별 X -
    ConsumerAwareMessageListener 자동 커밋 또는 container managed commit 개별 O -
    AcknowledgingConsumerAwareMessageListener container managed commit 개별 O -
    BatchMessageListener 자동 커밋 또는 container managed commit 배치 X AckMode.RECORD 지원하지 않음
    BatchAcknowledgingMessageListener container managed commit 배치 X -
    BatchConsumerAwareMessageListener 자동 커밋 또는 container managed commit 배치 O AckMode.RECORD 지원하지 않음
    BatchAcknowledgingConsumerAwareMessageListener container managed commit 배치 O -
  • MessageListenerContainer
    • MessageListenerContainer 구현체 두가지를 제공하고 있다.
      • KafkaMessageListenerContainer: 단일 스레드 환경에서 모든 토픽과 파티션으로부터 메세지를 수신
        @Bean
        public KafkaMessageListenerContainer<String, String> kafkaMessageListenerContainer() {
            // properties
            ContainerProperties properties = new ContainerProperties("SAMPLE");
            properties.setGroupId("SAMPLE1");
            properties.setMessageListener(new MessageListener<>() {
                @Override
                public void onMessage(ConsumerRecord<Object, Object> data) {
                    System.out.println("Consume record: " + data.key() + " " + data.value() + " " + data.offset());
                }
            });
            // consumerFactory
            ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(getConfig());
          
            return new KafkaMessageListenerContainer<String, String>(consumerFactory, properties);
        }      
        
      • ConcurrentMessageListenerContainer: 멀티 스레드 소비를 제공하므로써 하나 또는 여러개의 kafkaMessageListenerContainer를 위임받아 사용
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConcurrency(3);
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, String>(getConfig()));
            factory.getContainerProperties().setPollTimeout(500L);
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
            return factory;
        }
        
        • concurrency 를 세팅하여 N개의 KafkaMessageListenerContainer 인스턴스 생성, 병렬처리가 가능하다.
  • Offset Commit
    • enable.auto.commit 이 false인 경우 여러 AckMode 설정을 제공하며 기본은 AckMode.BATCH 를 사용한다. |AckMode|Description| |—|—| |RECORD|레코드 처리 후 리스너가 리턴할 때 오프셋 커밋| |BATCH|poll() 메서드를 통해 반환된 전체 레코드 처리 후 오프셋 커밋| |TIME|특정 시간(ackTime) 이후 커밋| |COUNT|특정 개수(ackCount) 이후 커밋| |COUNT_TIME|TIME, COUNT 두 조건 중 하나가 만족하면 커밋| |MANUAL|Message Listener는 Acknowlegment에서 acknowledge()를 호출한 이후 poll()에서 커밋. 메서드 호출 후에는 BATCH와 동일| |MANUAL_IMMEDIATE|Acknowledgment.acknowledge() 호출되면 즉시 오프셋 커밋|
      • MANUAL, MANUAL_IMMEDIATE를 사용하기 위해서는 AcknowledgeMessageListener, BatchAcknowledgingMessageListener 중에 선택해야 한다.
      • 기본적으로 비동기커밋(commitAsync)로 지원한다.
    • 일반적으로 AckMode.MANUAL, MANUAL_IMMDIATE를 사용할 때 각각의 Record에 대한 상태를 유지하지 않고, 각 파티션 별 커밋된 오프셋을 유지하기 때문에 Acknowledgement에 대한 순서대로 승인해야 한다.
      • 2.8 버전 부터 asynckAcks 설정을 통해 폴리에서 반환된 레코드에 대한 승인에 순서가 상관없이 허용되도록 설정할 수 있다.
      • asyncAcks가 true인 경우 리스너 컨테이너는 누락된 커밋이 수신될 때까지 순서가 잘못된 커밋을 연기하며 이전 폴리에 대한 모든 오프셋이 커밋될 때까지 Consumer에게 새로운 Record를 전달하지 않는다.
        Set to true to support asynchronous record acknowledgments.
        Only applies with ContainerProperties.AckMode.MANUAL or ContainerProperties.AckMode.MANUAL_IMMEDIATE.
        Out of order offset commits are deferred until all previous offsets in the partition have been committed. The consumer is paused, if necessary, until all acks have been completed.
        Parameters: asyncAcks - true to use async acks.
        

@KafkaListener 설정

  • @KafkaListener 를 사용하기 위해서는 Configuration 설정을 해주어야 한다.
    • @EnableKafka, @Configuration + @Bean으로 ConcurrentMessageListenerContainer를 빈으로 등록해주어야 한다.
    • ConcurrentMessageListenerContainer concurrency 최적의 설정
      • kafka의 파티션 수와 동일한 값으로 concurrency를 설정해야 최적의 성능을 낼 수 있다.
      • 컨슈머 쓰레드가 1대일 때는 concurrency를 1로 하는 것이 좋다.
      • 파티션 수 > concurrency 일 경우는 하나의 컨슈머 쓰레드가 여러 개의 파티션을 처리한다.
      • 파티션 수 < concurrency 일 경우는 파티션 수보다 많은 컨슈머 쓰레드는 유휴상태가 된다. 즉, 아무작업도 하지 않는다는 의미다.
      • 파티션 수 = concurrency 일 경우는 하나의 컨슈머 쓰레드가 하나의 파티션을 처리한다. 즉 최적의 상태라는 의미이다.
  • 명시적 파티션 할당
    @KafkaListener(id = "thing2", topicPartitions =
            { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
              @TopicPartition(topic = "topic2", partitions = "0",
                 partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
            })
    public void listen(ConsumerRecord<?, ?> record) {
        ...
    }
    
    • @TopicPartition(topic = “topic1”, partitions = { “0”, “1” })
      • topci1에 대하여 0, 1 파티션 할당
    • partitionOffsets = @PartitionOffset(partition = “1”, initialOffset = “100”))
      • 1번 파티션에 대하여 initialOffset 설정
      • 리스너가 초기화될 때 initialOffset에 설정된 값부터 읽어오기 때문에 중복 처리될 수 있다.
  • 리스너 별 Container Factory 설정
    @KafkaListener(id = "cat", topics = "myTopic", containerFactory = "kafkaManualAckListenerContainerFactory")
    public void listen(String data, Acknowledgment ack) {
        ack.acknowledge();
    }
    
    • AckMode.MANUAL을 사용한다면 특정 리스너를 사용해야 한다.
    • 해당 예제처럼 다른 Container Factory를 설정해줄 수 있다.
  • 레코드 메타데이터 사용
    • KafkaHeaders 클래스 내에 key들이 정의되어 있으며 리스너에 파라미터로 받아서 사용할 수 있다.
      @KafkaListener(id = "qux", topicPattern = "myTopic1")
      public void listen(@Payload String foo,
              @Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
              @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
              @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
              @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
          ) {
      }
      
  • 배치모드 사용
    factory.setBatchListener(true);
    
    • 해당 ConcurrentKafkaListenerContainerFactory를 사용하면 배치 처리가 가능하다.

@KafkaListener Lifecycle Management

  • @KafkaListener는 Application Context에서 관리되는 빈이 아닌 KafkaListenerEndpointRegistry에 의해 등록되는 빈이다.
    • @KafkaListener 처리를 하고 있는 KafkaListenerAnnotationBeanPostProcessor 내에 processListener image

      • MethodKafkaListenerEndpoint 란 객체에 메타정보를 담은 후 KafkaListenerEndpointRegistry 를 통해 저장되는 것을 알 수 있다. image
      • Container ID 별로 Map<String, MessageListenerContainer> 형태의 listenerContainers 에 세팅
      • 빈이 관리되고 있지 않으면, appContext.getBeanFactory().registerSingleton을 호출하여 빈 등록
    • 프레임워크에 의해 자동으로 선언되며 컨테이너 라이프사이클을 관리(autoStartup 이 true로 설정)

  • autoStartup을 false로 설정하고, 필요할 때 리스너컨테이너를 다시 시작할 수 있다.
    @KafkaListener(id="testContainer", topics="testTopic", autoStartup=false)
    public void listen() {}
    
    private final KafkaListenerEndpointRegistry registry;
    
    // ...
    this.registry.getListenerContainer("testContainer").start();
    // ...
    

출처

  • Spring Docs[https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages.html]