본문 바로가기

프로그래밍/카프카

Spring-Kafka Consumer Validation

반응형

들어가기 전에

Consumer는 Kafka에 Topic, Partition의 Record를 읽어오는 역할을 합니다. 하지만 Kafka에 잘못된 데이터가 들어오는 경우 Consumer에서는 데이터를 Validation 처리하여 올바른 데이터만 처리해야합니다. 

이번 글에서는 Spring-Kafka를 사용하는 Consumer에서 어떻게 Validation 처리를 하는지 작성해보겠습니다.

 

목차

  1. Validation 방식
  2. Validation 구성
  3. Consumer Validation 구현
  4. Validation 테스트
  5. @KafkaListener ErrorHandler
  6. 결론

Validation 방식

Spring-Kafka는 Version 2.2 부터 @KafkaListener의 @Payload 인자값을 쉽게 Validate하는 Validator 기능이 추가되었습니다. 

@Configuration
@EnableKafka
class KafkaValidatorConfig: KafkaListenerConfigurer {

    override fun configureKafkaListeners(registrar: KafkaListenerEndpointRegistrar) {
        registrar.setValidator(ConsumerValidator())  // 원하는 Validator를 설정할 수 있다.
    }
    
}

위와 같이 원하는 Validator를 생성하여 등록할 수 있지만 이 글에서는 Spring-Boot에서 지원하는 validation 기능을 사용해보겠습니다.

 

Validation 구성

먼저 build.gradle.kts 파일에 spring-boot-starter-validation 의존성을 주입합니다.

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-validation")
}

 

다음으로 LocalValidatorFactoryBean을 validator로 설정합니다.

@Configuration
@EnableKafka
class KafkaValidatorConfig(
    /** spring validation */
    private val validator: LocalValidatorFactoryBean,
) : KafkaListenerConfigurer {
    
    // validation
    override fun configureKafkaListeners(registrar: KafkaListenerEndpointRegistrar) {
        registrar.setValidator(this.validator)  // validator 설정
    }
}
LocalValidatorFactoryBean은 spring-boot-starter-validation 의존성 주입하면 자동으로 Bean에 등록됩니다.

 

다음으로 Consumer에서 JsonDeserializer방식을 이용하여 Member 객체 형태로 Read합니다. 

@Configuration
@EnableKafka
class KafkaValidatorConfig(
    /** spring validation */
    private val validator: LocalValidatorFactoryBean,
) : KafkaListenerConfigurer {
    
    @Bean
    fun memberFactory(): ConcurrentKafkaListenerContainerFactory<String, Member> {
        return ConcurrentKafkaListenerContainerFactory<String, Member>().also {
            it.consumerFactory =
                DefaultKafkaConsumerFactory(getConfig(), StringDeserializer(), JsonDeserializer(Member::class.java))
            it.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
        }
    }

    private fun getConfig(): Map<String, Any> =
        mapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",    // 마지막 읽은 부분부터 Read
            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
        )
}

 

Member 객체는 아래를 참조해주세요.

data class Member(

   @field:NotNull
   val name: String? = null,

   @field:Min(1)
   @field:Max(20)
   val age: Int? = null
)

 


아래는 Validation을 사용하기 위한 Config 전체 코드입니다.

@Configuration
@EnableKafka
class KafkaValidatorConfig(
    /** spring validation */
    private val validator: LocalValidatorFactoryBean,
) : KafkaListenerConfigurer {

    @Value("\${spring.kafka.consumer.bootstrap-servers}")
    private lateinit var BOOTSTRAP_SERVER: String
    
    // validation
    override fun configureKafkaListeners(registrar: KafkaListenerEndpointRegistrar) {
        registrar.setValidator(this.validator)  // validator 설정
    }
    
    // factory
    @Bean
    fun memberFactory(): ConcurrentKafkaListenerContainerFactory<String, Member> {
        return ConcurrentKafkaListenerContainerFactory<String, Member>().also {
            it.consumerFactory =
                DefaultKafkaConsumerFactory(getConfig(), StringDeserializer(), JsonDeserializer(Member::class.java))
            it.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
        }
    }

    private fun getConfig(): Map<String, Any> =
        mapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to BOOTSTRAP_SERVER,
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",    // 마지막 읽은 부분부터 Read
            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false,
        )
}

 

Consumer Validation 구현

Consumer에서 Read하는 Member 객체 (@Payload) 에 @Valid를 선언하여 사용합니다.

Controller에서 @Valid 사용하는 방식과 동일하게 작성해주시면 됩니다.
@Component
class MemberValidationTestConsumer {

    @KafkaListener(
        id = "member_validate",
        topics = ["insert_member"],
        containerFactory = "memberFactory",
        groupId = "m_group"
    )
    fun memberListener(@Payload @Valid member: Member, meta: ConsumerRecordMetadata, acknowledgment: Acknowledgment) {
        println("memberListener data:: $member")
        println("memberListener offset:${meta.offset()} partition:${meta.partition()}")

        acknowledgment.acknowledge()
    }
    
}

 

Validation 테스트

Member 객체에 age 필드를 Max값을 초과하는 값으로 세팅 후 Kafka에 저장시켜 Consumer가 Read하도록 해보겠습니다.

{
    "name": "yong",
    "age": 100
}

 

결과는 아래와 같은 에러가 발생합니다.

2021-10-12 18:36:07.613 ERROR 10093 --- [ber_retry-0-C-1] o.s.k.l.SeekToCurrentErrorHandler        : Backoff none exhausted for insert_member-0@327

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void me.example.kotlinkafka.member.consumer.listener.MemberValidationTestConsumer.memberListener2(me.example.kotlinkafka.member.domain.dto.Member,org.springframework.kafka.listener.adapter.ConsumerRecordMetadata,org.springframework.kafka.support.Acknowledgment)]
Bean [me.example.kotlinkafka.member.consumer.listener.MemberValidationTestConsumer@1458548]; nested exception is org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void me.example.kotlinkafka.member.consumer.listener.MemberValidationTestConsumer.memberListener2(me.example.kotlinkafka.member.domain.dto.Member,org.springframework.kafka.listener.adapter.ConsumerRecordMetadata,org.springframework.kafka.support.Acknowledgment): 1 error(s): [Field error in object 'member' on field 'age': rejected value [100]; codes [Max.member.age,Max.age,Max.java.lang.Integer,Max]; arguments [org.springframework.context.support.DefaultMessageSourceResolvable: codes [member.age,age]; arguments []; default message [age],20]; default message [20 이하여야 합니다]] , failedMessage=GenericMessage [payload=Member(name=yong, age=100), headers={kafka_offset=327, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3df5cb5e, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=yong, kafka_receivedTopic=insert_member, kafka_receivedTimestamp=1634031367458, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = insert_member, partition = 0, leaderEpoch = 2, offset = 327, CreateTime = 1634031367458, serialized key size = 4, serialized value size = 25, headers = RecordHeaders(headers = [], isReadOnly = false), key = yong, value = Member(name=yong, age=100)), kafka_groupId=m_group}]; nested exception is org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void me.example.kotlinkafka.member.consumer.listener.MemberValidationTestConsumer.memberListener2(me.example.kotlinkafka.member.domain.dto.Member,org.springframework.kafka.listener.adapter.ConsumerRecordMetadata,org.springframework.kafka.support.Acknowledgment): 1 error(s): [Field error in object 'member' on field 'age': rejected value [100]; codes [Max.member.age,Max.age,Max.java.lang.Integer,Max]; arguments [org.springframework.context.support.DefaultMessageSourceResolvable: codes [member.age,age]; arguments []; default message [age],20]; default message [20 이하여야 합니다]] , failedMessage=GenericMessage [payload=Member(name=yong, age=100), headers={kafka_offset=327, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3df5cb5e, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=yong, kafka_receivedTopic=insert_member, kafka_receivedTimestamp=1634031367458, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = insert_member, partition = 0, leaderEpoch = 2, offset = 327, CreateTime = 1634031367458, serialized key size = 4, serialized value size = 25, headers = RecordHeaders(headers = [], isReadOnly = false), key = yong, value = Member(name=yong, age=100)), kafka_groupId=m_group}]
// Exception Trace Log

에러를 요약하자면 age에서 Max값을 초과했다는 에러입니다. 

 

여기에 추가로 발생한 Error를 핸들링하고 싶다면 ErrorHandler을 사용할 수 있습니다.

@KafkaListener ErrorHandler

@KafkaListener(
    id = "member_retry",
    topics = ["insert_member"],
    containerFactory = "memberFactory",
    groupId = "m_group",
    errorHandler = "validationErrorHandler"  // errorHandler 추가
)
fun memberListener2(@Payload @Valid member: Member, meta: ConsumerRecordMetadata, acknowledgment: Acknowledgment) {
    println("memberListener data:: $member")
    println("memberListener offset:${meta.offset()} partition:${meta.partition()}")
    
    acknowledgment.acknowledge()
}

@Bean
fun validationErrorHandler(): KafkaListenerErrorHandler {
    return KafkaListenerErrorHandler { message, exception ->
        // error 처리
        message 
    }
}

ErrorHandler를 Bean에 등록 후 @KafkaListener에서 등록하여 사용할 수 있습니다. 

 

결론

Spring-Kafka는 이렇게 간단한 방법으로 Validation 처리를 할 수 있습니다. 이와 같은 처리방식으로 인해 발생한 Exception 종류에 따라 Retry 정책을 분기처리하여 관리할 수 있을 것 입니다. 

다음 글에서는 Consumer에서 실패 시 Spring-Kafka에서 지원하는 Reply, Retry 기능에 대해서 작성해보겠습니다.

반응형