본문 바로가기

반응형

프로그래밍/카프카

(10)
Spring-Kafka Consumer 실패 시 재처리 ( Reply - @SendTo ) 들어가기 전에 Consumer를 통해 카프카 클러스터에서 데이터를 Read할 때 모두 정상처리된다면 좋겠지만 그렇치 못한 경우도 있습니다. 그럴 경우 Spring-Kafka에서 재처리를 편리하게 할 수 있도록 Reply(@SendTo), Retry 기능을 제공합니다. 이번 글에서는 재처리 하는 방법에대해서 정리해보겠습니다. 목차 재처리를 해야하는 이유 ReplyTemplate 구성 @SendTo를 사용한 ReplyTemplate 구현 Reply 처리 플로우 1. 재처리를 해야하는 이유 Consumer를 이용해 카프카 클러스터에서 Read 한 후 Repository에 저장할 수도 있고 메일 발송을 할 수도 있고 다양하게 사용할 수 있습니다. 하지만 Repository 서버가 다운된다거나 메일 발송 중 메일..
Spring-Kafka Consumer Offset 관리 들어가기 전에 Consumer를 AutoCommit을 false로 설정하고 사용하고 있습니다. Consumer에서 로직을 완료하고 partition에 commit을 처리하도록 작성하였습니다. 그런데 로직을 처리하던 중 Exception이 발생했고 partition에 commit을 하지 못했습니다. 그렇다면 이 Consumer는 무한 루프에 빠져 데이터를 계속 읽고 Exception을 계속 발생시킬까요? 정답은 NO! 입니다. Consumer 내부에서 Partition Offset과는 별개로 Consumer만을 위한 Offset을 관리하고 있기 때문입니다. Consumer Polling 방식 Consumer는 KafkaMessageListenerContainer에서 새로운 Thread를 생성하여 while..
Spring-Kafka Consumer Validation 들어가기 전에 Consumer는 Kafka에 Topic, Partition의 Record를 읽어오는 역할을 합니다. 하지만 Kafka에 잘못된 데이터가 들어오는 경우 Consumer에서는 데이터를 Validation 처리하여 올바른 데이터만 처리해야합니다. 이번 글에서는 Spring-Kafka를 사용하는 Consumer에서 어떻게 Validation 처리를 하는지 작성해보겠습니다. 목차 Validation 방식 Validation 구성 Consumer Validation 구현 Validation 테스트 @KafkaListener ErrorHandler 결론 Validation 방식 Spring-Kafka는 Version 2.2 부터 @KafkaListener의 @Payload 인자값을 쉽게 Validat..
Spring-Kafka Lifecycle 목차 Lifecycle Lifecycle Management 주의사항 1. Lifecycle @KafkaListener는 Application Context 안에 Bean이 아닙니다. @KafkaListener는 KafkaListenerEndpointRegistry에서 Bean으로 등록이 됩니다. 등록된 Bean은 framework에 의해 자동으로 선언되고 Container의 Lifecycle을 관리합니다. KafkaListenerEndpointRegistry.java 메서드 중빨간 블럭처리 되어있는 코드에서 container를 등록합니다. 등록된 Bean은 설정값 autoStartup이 true인 것을 자동으로 시작합니다. Listener Container들은 SmartLifeCycle을 Implem..
[kafka] @KafkaListener를 이용한 Consumer 구현 목차 Consumer KafkaListener Annotation 설정 KafkaListener Annotation 사용방법 Simple POJO Listeners 파티션 할당 특정 파티션 InitialOffset 설정 수동 확인 ( Manual Acknowledgment ) Consumer Record Metadata Batch Listeners GroupId 1. Consumer 컨슈머는 카프카 클러스터( 파티션 )에서 저장된 데이터를 읽어오는 역할을 합니다. 구현 방법은 Message Listeners, @KafkaListener 두 가지로 가능합니다. 이번 글에서는 @KafkaListener 를 이용하여 구현한 내용을 정리해보겠습니다. 2. KafkaListener Annotation 설정 개인적..
[kafka] Concurrency 설정 기준 (ConcurrentMessageListenerContainer) 목차 토픽이 1개인 경우 파티션1, Concurrency1, Call3 파티션2, Concurrency2, Call3 파티션3, Concurrency6, Call6 결론 토픽이 3개인 경우 Concurrency 무조건 많다고 좋은게 아니다. 잘못된 생각 컨슈머에서 Concurrency가 무조건 많으면 많은 Message를 Concurrency 만큼 Listen할 수 있겠구나! 컨슈머에서 Concurrency, 즉 Thread가 할당되는 조건은 파티션 단위였습니다. 실제 테스트를 통해 컨슈머가 어떻게 처리하는 지 확인해보겠습니다. 동시성을 확인하기 위해 처리 로직에 Sleep을 길게 주었습니다. @Component class MemberConsumer: AcknowledgingMessageListener ..
[kafka] Concurrency 설정 기준 (ConcurrentMessageListenerContainer) 목차 토픽이 1개인 경우 파티션1, Concurrency1, Call3 파티션2, Concurrency2, Call3 파티션3, Concurrency6, Call6 결론 토픽이 3개인 경우 Concurrency 무조건 많다고 좋은게 아니다. 잘못된 생각 컨슈머에서 Concurrency가 무조건 많으면 많은 Message를 Concurrency 만큼 Listen할 수 있겠구나! 컨슈머에서 Concurrency, 즉 Thread가 할당되는 조건은 파티션 단위였습니다. 실제 테스트를 통해 컨슈머가 어떻게 처리하는 지 확인해보겠습니다. 동시성을 확인하기 위해 처리 로직에 Sleep을 길게 주었습니다. @Component class MemberConsumer: AcknowledgingMessageListener ..
[kafka] Message Listeners를 이용한 Consumer 구현 목차 Consumer Message Listeners 이용한 구현 Listener 종류 MessageListenerContainers 종류 Committing Offset nack 1. Consumer 컨슈머는 카프카 클러스터( 파티션 )에서 저장된 데이터를 읽어오는 역할을 합니다. 구현 방법은 Message Listeners, @KafkaListener 두 가지로 가능합니다. 이번 글에서는 Message Listeners 를 이용하여 구현한 내용을 정리해보겠습니다. 2. Message Listeners 이용한 구현 Message Listener Container를 사용할 때 Receive Data를 listener에서 제공해야만 합니다. 2-1. Listener 종류 Listener When Consu..
[kafka] KafkaTemplate을 이용한 Producer 구현 목차 KafkaTemplate 구성 KafkaTemplate 사용하여 Producer 구현 KafkaTemplate 원초적으로 Producer를 통해 Kafka에 Message를 Send하려면 KafkaProducer 인스턴스를 사용하여 send() 메서드를 호출해야합니다. KafkaTemplate은 KafkaProducer 감싸고 있는 인스턴스라고 생각하시면 이해하시기 편합니다. KafkaTemplate.send() 는 내부에서 결국 KafkaProducer 인스턴스의 send()를 호출하고 있습니다. 1. KafkaTemplate 구성 KafkaProducerConfig.kt DefaultKafkaProducerFactory 인스턴스를 사용하여 KafkaTemplate 를 생성할 수 있습니다. @Co..
그림으로 이해하는 카프카란? 카프카는 실시간으로 기록 스트림을 게시, 구독, 저장 및 처리할 수 있는 분산 데이터 스트리밍 플랫폼입니다. 극단적인 예로 아래와 같은 MSA Service가 있다고 가정합니다. 대충봐도 너무 복잡하다... 아키텍처의 복잡도가 매우 높은 것을 한 눈에 알아볼 수 있습니다. 다음 그림은 카프카를 도입하여 아키텍처를 재설계한 그림입니다. 카프카를 통해 아키텍처를 수정 극단적인 예를 들기 위한 아키텍처 그림입니다. 실제로는 무조건 카프카를 쓰기보다 용도에 맞게 사용해야합니다. 이 것 말고도 다양하게 카프카를 사용할 수 있습니다. 다음으로 카프카 구성에 대해 알아보겠습니다. Name Description 카프카 클러스터 카프카 서버 브로커 카프카 클라이언트와 데이터를 주고받기 위해 사용하는 주체 데이..

반응형