프로그래밍/카프카

[kafka] @KafkaListener를 이용한 Consumer 구현

Ericlee 2022. 1. 22. 02:00
반응형

목차

  1. Consumer
  2. KafkaListener Annotation 설정
  3. KafkaListener Annotation 사용방법
    1. Simple POJO Listeners
    2. 파티션 할당
    3. 특정 파티션 InitialOffset 설정
    4. 수동 확인 ( Manual Acknowledgment )
    5. Consumer Record Metadata
    6. Batch Listeners
    7. GroupId

1. Consumer

컨슈머는 카프카 클러스터( 파티션 )에서 저장된 데이터를 읽어오는 역할을 합니다. 구현 방법은 Message Listeners, @KafkaListener 두 가지로 가능합니다. 이번 글에서는 @KafkaListener 를 이용하여 구현한 내용을 정리해보겠습니다.

 

 

2. KafkaListener Annotation 설정

개인적으로 MessageListener보단 @KafkaListener를 선호하는 편입니다. 이유는 @KafkaListener를 사용하면 Config와 명확히 분류되며 Config에서 Bean에 등록한 Factory 또한 사용할 수 있기 때문입니다. 

 

@KafkaListener를 사용하기 위한 필수 조건입니다. ( KafkaConsumerConfig에서 설정 )

  • @Configuration
  • @EnableKafka
  • kafkaListenerContainerFactory Bean 객체 
@EnableKafka
@Configuration
class KafkaConsumerConfig {

    @Value("\${spring.kafka.consumer.bootstrap-servers}")
    private lateinit var BOOTSTRAP_SERVER: String

    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()

        factory.setConcurrency(2) // Consumer Process Thread Count
        factory.consumerFactory = DefaultKafkaConsumerFactory(getConfig())
        factory.containerProperties.pollTimeout = 500

        return factory
    }

    private fun getConfig(): Map<String, Any> =
        mapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",    // 마지막 읽은 부분부터 Read
            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class
        )
}
여러 Partition을 생성하여 테스트할 예정이라 ConcurrentKafkaListerContainer를 사용하겠습니다.

 

3. KafkaListener Annotation 사용방법

Kafka 클러스터는 Topic 1개, Partition 3개로 구성되어있습니다.

 

1. Simple POJO Listeners

<Sample1>

@Component
class MemberTestConsumer {

    @KafkaListener(id = "mem", topics = ["insert_member"], clientIdPrefix = "memClientId")
    fun listen(data: String) {
        println("data:: $data")
    }

}

// result 
// Send Message = [ test message ] 
//  - Offset = [ 37 ], Topic = [ insert_member ], Partition = [ 2 ]
  
// data:: test message

 

<Sample2>

@Component
class MemberTestConsumer {

    @KafkaListener(
        id = "mem", 
        topics = ["insert_member"], 
        autoStartup = "\${listen.auto.start:true}", 
        concurrency = "\${listen.concurrency:3}"
    )
    fun listen(data: String) {
        println("data:: $data")
    }
}

// result 
// Send Message = [ test message ] 
//  - Offset = [ 38 ], Topic = [ insert_member ], Partition = [ 0 ]
  
// data:: test message

concurrency 설정 기준에 대해서 궁금하시면 여기를 참조해주세요.

 

2. 파티션 할당

0번 파티션에 Recode만 Listen 합니다.

@Component
class MemberTestConsumer {

    @KafkaListener(
        id = "thing2",
        topicPartitions = [
            TopicPartition(topic = "insert_member", partitions = ["0"])
        ]
    )
    fun listen(data: String) {
        println("data:: $data")
    }
    
}


/**
 Send Message = [ test message ] 
   - Offset = [ 43 ], Topic = [ insert_member ], Partition = [ 1 ]
 Send Message = [ test message ] 
   - Offset = [ 38 ], Topic = [ insert_member ], Partition = [ 2 ]
 Send Message = [ test message ] 
   - Offset = [ 44 ], Topic = [ insert_member ], Partition = [ 1 ]
 // 0번 파티션  
 Send Message = [ test message ] 
   - Offset = [ 39 ], Topic = [ insert_member ], Partition = [ 0 ]
 data:: test message
 Send Message = [ test message ] 
   - Offset = [ 45 ], Topic = [ insert_member ], Partition = [ 1 ]
 Send Message = [ test message ] 
   - Offset = [ 39 ], Topic = [ insert_member ], Partition = [ 2 ]
 // 0번 파티션  
 Send Message = [ test message ] 
   - Offset = [ 40 ], Topic = [ insert_member ], Partition = [ 0 ]
 data:: test message
 */

 

3. 특정 파티션 InitialOffset 설정

@Component
class MemberTestConsumer {

    @KafkaListener(
        id = "mem",
        topicPartitions = [
            TopicPartition(
                topic = "insert_member",
                partitions = ["1"],
                partitionOffsets = [PartitionOffset(partition = "0", initialOffset = "5000")]
            )
        ]
    )
    fun listen(data: String) {
        println("data:: $data")
    }

}

 

4. 수동 확인( Manual Acknowledgment )

AckMode를 Manaul로 설정한 경우입니다. 다시 말해서 카프카 Offset Commit을 수동으로 설정하고 Listener에서 데이터를 받고 처리를 완료한 후 로직에서 Commit 명령어를 호출하는 방식입니다.

 

KafkaConsumerConfig.kt

컨슈머 설정 클래스에서 ackMode를 Manual로 설정합니다.

factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
@EnableKafka
@Configuration
class KafkaConsumerConfig {

    @Value("\${spring.kafka.consumer.bootstrap-servers}")
    private lateinit var BOOTSTRAP_SERVER: String

    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()

        factory.setConcurrency(2) // Consumer Process Thread Count
        factory.consumerFactory = DefaultKafkaConsumerFactory(getConfig())
        factory.containerProperties.pollTimeout = 500
        factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL  // 추가

        return factory
    }

    private fun getConfig(): Map<String, Any> =
        mapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",    // 마지막 읽은 부분부터 Read
            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class
        )
}

 

@KafkaListener에서 Bean에 등록한 kafkaListenerContainerFactory를 containerFactory로 설정하면 Acknowledgment 객체를 파라미터로 받을 수 있습니다. 

@Component
class MemberTestConsumer {

    @KafkaListener(id = "mem", topics = ["insert_member"], containerFactory = "kafkaListenerContainerFactory")
    fun listen(data: String, ack: Acknowledgment) {
        println("data:: $data")
        ack.acknowledge()
    }

}

Listner에서 로직 수행이 완료되면 ack.acknowledge() 메서드를 호출하여 offset을 커밋하는 명령어를 날립니다. 

 

5. Consumer Record Metadata

컨슈머에서 Record의 메타데이터를 가져올 수 있습니다. 

  • @Payload, @Header를 이용한 방법
  • ConsumerRecordMetadata 객체를 이용한 방법

@Payload, @Header를 이용한 방법

@Component
class MemberTestConsumer {

    @KafkaListener(id = "mem", topicPattern = "insert_member")
    fun listen(
        @Payload data: String,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) key: Int?,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) partition: Int,
        @Header(KafkaHeaders.RECEIVED_TOPIC) topic: String,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) ts: Long
    ) {
        println("data:: $data key:: $key partition:: $partition topic:: $topic ts:: $ts")
    }
    
}


/**
 * result 
 * 1 CALL
 * Send Message = [ test message ] 
 *  - Offset = [ 118 ], Topic = [ insert_member ], Partition = [ 2 ]
 * data:: test message key:: null partition:: 2 topic:: insert_member ts:: 1629873469337
 *
 * 2 CALL
 * Send Message = [ test message ] 
 *  - Offset = [ 123 ], Topic = [ insert_member ], Partition = [ 1 ]
 * data:: test message key:: null partition:: 1 topic:: insert_member ts:: 1629873481904
 */

 

ConsumerRecordMetadata 객체를 이용한 방법

@Component
class MemberTestConsumer {    

    @KafkaListener(id = "mem", topicPattern = "insert_member")
    fun listen(data: String, meta: ConsumerRecordMetadata) {
        println("data:: $data partition:: ${meta.partition()} topic:: ${meta.topic()} ts:: ${meta.timestamp()}")
    }
    
}

 

6. Batch Listeners

Batch 설정을 적용한 Factory를 Bean에 등록합니다.

@Bean
fun batchFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
    val factory = ConcurrentKafkaListenerContainerFactory<String, String>()

    factory.setConcurrency(3) // Consumer Process Thread Count
    factory.consumerFactory = DefaultKafkaConsumerFactory(getConfig())
    factory.containerProperties.pollTimeout = 500
    factory.isBatchListener = true    // Batch 적용!!!!!!!!!

    return factory
}

 

BatchFactory를 이용한 KafkaListener를 사용한 Consumer

@Component
class MemberTestConsumer {

    @KafkaListener(id = "mem", topics = ["insert_member"], containerFactory = "batchFactory")
    fun listen(data: List<String>) {
        println("data:: $data")
    }

}


/**
 * Result
 * data:: [test message, test message, test message, test message, test message, test message]
 * data:: [test message, test message, test message, test message]
 * data:: [test message, test message]
 */

 

그 외의 Batch Listeners 활용법

@KafkaListener(id = "listenBatch1", topics = ["insert_member"], containerFactory = "batchFactory")
fun listenBatch1(data: List<String>) {
    println("batch1 data:: $data")
}

@KafkaListener(id = "listenBatch2", topics = ["insert_member"], containerFactory = "batchFactory")
fun listenBatch2(data: List<Message>?) {
    println("batch2 data:: $data")
}

@KafkaListener(id = "listenBatch3", topics = ["insert_member"], containerFactory = "batchFactory")
fun listenBatch3(data: List<Message>?, ack: Acknowledgment) {
    println("batch3 data:: $data")
}

@KafkaListener(id = "listenBatch4", topics = ["insert_member"], containerFactory = "batchFactory")
fun listenBatch4(data: List<Message>?, ack: Acknowledgment, consumer: Consumer<String, String>) {
    println("batch4 data:: $data")
}

@KafkaListener(id = "listenBatch5", topics = ["insert_member"], containerFactory = "batchFactory")
fun listenBatch5(data: List<ConsumerRecord<String, String>>) {
    data.forEach {
        println("batch5 data:: ${it.value()} topic=${it.topic()} partition=${it.partition()} offset=${it.offset()}")
    }

}

@KafkaListener(id = "listenBatch6", topics = ["insert_member"], containerFactory = "batchFactory")
fun listenBatch6(data: List<ConsumerRecord<String, String>>, ack: Acknowledgment) {
    data.forEach {
        println("batch6 data:: ${it.value()} topic=${it.topic()} partition=${it.partition()} offset=${it.offset()}")
    }
}

/**
 * Result
 * batch5 data:: test message topic=insert_member partition=2 offset=152
 * batch3 data:: [test message]
 * batch6 data:: test message topic=insert_member partition=2 offset=152
 * batch2 data:: [test message]
 * batch4 data:: [test message]
 * batch1 data:: [test message]
 */

 

GroupId

Consumer는 GroupId로 그룹화할 수 있습니다. 그룹화된 Consumer별로 데이터를 읽어옵니다. 반대로 같은 그룹에 2개 이상의 컨슈머가 있다면 1개의 컨슈머만 Read하고 나머지 컨슈머는 놀고 있는 상태가 됩니다.

 

코드로 살펴보겠습니다. 

2개의 컨슈머는 group1, 2개의 컨슈머는 group2로 설정
@KafkaListener(id = "listener1", topics = ["insert_member"], groupId = "group1")
fun listener1(data: String) {
    println("listener1 data:: $data")
}

@KafkaListener(id = "listener2", topics = ["insert_member"], groupId = "group1")
fun listener2(data: String) {
    println("listener2 data:: $data")
}

@KafkaListener(id = "listener3", topics = ["insert_member"], groupId = "group2")
fun listener3(data: String) {
    println("listener3 data:: $data")
}

@KafkaListener(id = "listener4", topics = ["insert_member"], groupId = "group2")
fun listener4(data: String) {
    println("listener4 data:: $data")
}

 

결과는 그룹별 하나의 컨슈머만 리스닝을 처리합니다.

// Send..
Send Message = [ test message ] 
  - Offset = [ 159 ], Topic = [ insert_member ], Partition = [ 0 ]
  
// Response  
listener3 data:: test message
listener2 data:: test message

 

Group별 Consumer 작동

반응형