대충 넘어가지 않는 습관을 위한 기록

Kafka 이벤트 수신 및 연동 기능 구현 기록 (ClassNotFoundException, SerializationException 에러 기록..)

uhyvn 2025. 1. 21. 00:38

이번 MSA 기반 서비스에서 대규모 트래픽 처리를 위해 카프카를 도입하였다.

처음 구현해 보는 거라 미숙할 수도 있지만, 기록해 놓고 부족하거나 보완해야 할 점을 또 찾게 되면 다른 포스팅으로 추가하려 한다.

 

 

간단한 개념 정리,,

더보기

우선 카프카에 대한 기본적인 정의와 역할은 아래와 같다.

 

  • Kafka는 분산 스트리밍 플랫폼으로, 주로 실시간 데이터 피드의 빅 데이터 처리를 목적으로 사용됩니다.
  • Kafka는 메시지 큐와 유사하지만, 대용량 데이터 스트림을 저장하고 실시간으로 분석하거나 처리하는 데 중점을 둡니다.

 

역할

  • 실시간 데이터 처리: 대용량 데이터를 실시간으로 처리하고 분석합니다.
  • 데이터 통합: 다양한 소스에서 데이터를 수집하고 이를 통합하여 분석합니다.
  • 내결함성: 데이터 손실 없이 안정적으로 데이터를 저장하고 전송합니다.

 

 

 

기본 구성 요소

메시지(Message)

  • 메시지는 Kafka를 통해 전달되는 데이터 단위입니다. 예를 들어, 로그 데이터나 이벤트 데이터가 메시지가 될 수 있습니다.
  • 메시지는 키(key), 값(value), 타임스탬프(timestamp), 그리고 몇 가지 메타데이터로 구성됩니다.

프로듀서(Producer)

  • 메시지를 생성하고 Kafka에 보내는 역할을 합니다. 예를 들어, 웹 애플리케이션이 로그 데이터를 Kafka에 보내는 경우 프로듀서가 됩니다.
  • 프로듀서는 특정 토픽(topic)에 메시지를 보냅니다.

토픽(Topic)

  • 메시지를 저장하는 장소입니다. 메시지는 토픽에 저장되었다가 소비자에게 전달됩니다.
  • 토픽은 여러 파티션(partition)으로 나누어질 수 있으며, 파티션은 메시지를 순서대로 저장합니다. 파티션을 통해 병렬 처리가 가능합니다.
  • 예: “user-activity”라는 토픽에 사용자의 활동 로그를 저장할 수 있습니다.

파티션(Partition)

  • 파티션은 토픽을 물리적으로 나눈 단위로, 각 파티션은 독립적으로 메시지를 저장하고 관리합니다.
  • 각 파티션은 메시지를 순서대로 저장하며, 파티션 내의 메시지는 고유한 오프셋(offset)으로 식별됩니다.
  • 파티션을 통해 데이터를 병렬로 처리할 수 있으며, 클러스터 내의 여러 브로커에 분산시켜 저장할 수 있습니다.

키(Key)

  • 키는 메시지를 특정 파티션에 할당하는 데 사용되는 값입니다.
  • 동일한 키를 가진 메시지는 항상 동일한 파티션에 저장됩니다.
  • 예를 들어, 특정 사용자 ID를 키로 사용하여 해당 사용자의 모든 이벤트가 동일한 파티션에 저장되도록 할 수 있습니다.

컨슈머(Consumer)

  • 토픽에서 메시지를 가져와 처리하는 역할을 합니다.
  • 컨슈머는 특정 컨슈머 그룹(consumer group)에 속하며, 같은 그룹에 속한 컨슈머들은 토픽의 파티션을 분산 처리합니다.
  • 기본적으로 컨슈머는 스티키 파티셔닝(Sticky Partitioning)을 사용합니다. 이는 특정 컨슈머가 특정 파티션에 붙어서 계속해서 데이터를 처리하는 방식으로, 이는 데이터 지역성을 높여 캐시 히트율을 증가시키고 전반적인 처리 성능을 향상시킵니다.

브로커(Broker)

  • Kafka 클러스터의 각 서버를 의미하며, 메시지를 저장하고 전송하는 역할을 합니다.
  • 하나의 Kafka 클러스터는 여러 브로커로 구성될 수 있으며, 각 브로커는 하나 이상의 토픽 파티션을 관리합니다.

주키퍼(Zookeeper)

  • Kafka 클러스터를 관리하고 조정하는 데 사용되는 분산 코디네이션 서비스입니다.
  • 주키퍼는 브로커의 메타데이터를 저장하고, 브로커 간의 상호작용을 조정합니다.

 

 


 

 

 

 

이번에 내가 카프카 기반으로 구현한 기능은,

1. 결제가 완료되면 유저 포인트를 충전시켜 주는 기능

2. 결제가 취소되면 유저가 충전한 포인트를 환수하는 기능

3. 경매 입찰이 완료되면 실시간으로 포인트를 차감하는 기능

4. 경매에서 다른 사람이 더 높은 가격으로 입찰하게 되면 실시간으로 포인트를 환수시켜 주는 기능

이렇게 네 가지가 있다.

 

 

모두 기록하지는 않겠지만, 카프카와 연관된 주요 코드는 기록하려고 한다.

 

 


 

 

 

 

우선 제일 첫 번째로, 당연하겠지만 의존성을 추가한다.

 

 

build.gradle

// Kafka
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'

 

 

 

그리고 관련 설정을 추가해줘야 하는데, Config 클래스를 작성할 수도 있고, yml파일에 추가할 수도 있다.

나는 더 보기 좋은 설정을 위해 그리고 더 쉽게 관리하기 위해 정말 기본적인 설정을 제외하고는 모두 Config에서 관리했다.

 

yml

spring:
    kafka:
    	bootstrap-servers: {서버}:9092
    	consumer:
      		group-id: {지정 그룹 ID}

 

 

 

유저 도메인의 Consumer Config

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    String kafkaPort;

    @Value("${spring.kafka.consumer.group-id}")
    String groupId;

    private Map<String, Object> commonConsumerConfig() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaPort);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return configProps;
    }

    @Bean
    public ConsumerFactory<String, PaymentEvent> paymentCompletedEventConsumerFactory() {
        Map<String, Object> configProps = new HashMap<>(commonConsumerConfig());
        configProps.put(JsonDeserializer.VALUE_DEFAULT_TYPE, PaymentEvent.class);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean
    public ConsumerFactory<String, UserEvent.Bidding> userEventBiddingConsumerFactory() {
        Map<String, Object> configProps = new HashMap<>(commonConsumerConfig());
        configProps.put(JsonDeserializer.VALUE_DEFAULT_TYPE, UserEvent.Bidding.class);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, PaymentEvent> paymentCompletedEventKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, PaymentEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(paymentCompletedEventConsumerFactory());
        return factory;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, UserEvent.Bidding> userEventBiddingKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, UserEvent.Bidding> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(userEventBiddingConsumerFactory());
        return factory;
    }

}

 

제네릭 타입을 사용하여 더 좋은 가독성을 설정할 수 있지만, 아직 avro를 사용하기 전이라서 avro를 사용하기 전 가독성 최적화는 무의미하다고 생각하여 기존 코드만 먼저 공유.

 

기본적인 설정들 뿐이다. 특이사항이라면

- 받아야 하는 이벤트(dto) 종류가 여러 개라서(더 나은 가독성을 위해 이 글에는 두 개만 작성) 밸류에 Object를 이용해 메소드를 하나로 사용하지 않았다.

- 그리고 받고 나서의 처리도 달랐기 때문에, 당연하겠지만 컨슈머팩토리와 컨테이너팩토리 모두 분리해 줬다.

 

 

 

 

다음으로 이벤트 송신 측(입찰 Producer) 설정을 보자면,

경매(입찰) 도메인의 ProducerConfig

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    String kafkaPort;

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaPort);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

 

간단하다! (기본적으로 yml을 작성해줘야 함)

 

 

 

 

 


 

 

 

 

이제 주요 비즈니스 로직이다.

UserConsumerService

@Slf4j
@Service
@RequiredArgsConstructor
public class UserConsumerService {

    private final UserJpaRepository userJpaRepository;

    @KafkaListener(topics = "payment-completed", groupId = "user-service-group", containerFactory = "paymentCompletedEventKafkaListenerContainerFactory")
    @Transactional
    public void completePayment(PaymentEvent event) {
        log.info("결제 완료 이벤트 수신. userId={}, amount={}", event.getUserId(), event.getAmount());
        User user = getUser(event.getUserId());

        user.updatePoint(event.getUserId(), event.getAmount());
    }

    @KafkaListener(topics = "user-events", groupId = "user-service-group", containerFactory = "userEventBiddingKafkaListenerContainerFactory")
    @Transactional
    public void updatePoints(UserEvent.Bidding event) {
        log.info("입찰 완료 이벤트 수신. userId={}, amount={}", event.getUserId(), event.getAmount());
        User user = getUser(event.getUserId());

        user.updatePoint(event.getUserId(), event.getAmount());
    }

}

 

containerFactory를 나처럼 분리해 줬다면 꼭 위 KafkaListener 어노테이션 설정처럼 지정해줘야 한다!!

쉬운 이해를 위해 Producer 예시 코드를 먼저 같이 보여준 후에 설명하겠다. (비즈니스 로직 예시는 Payment 도메인으로 하겠다.)

 

PaymentProducerServiceImpl

@Service
@RequiredArgsConstructor
public class PaymentProducerServiceImpl implements PaymentProducerService {

  private final KafkaTemplate<String, PaymentEvent> kafkaTemplate;

  public void sendPaymentEvent(PaymentEvent event) {
    kafkaTemplate.send("payment-completed", event);
  }
}

 

다른 팀원이 작성한 코드를 기반으로 예시를 작성했다.

기본적으로 PaymentEvent 클래스를 작성하여, 전송하고 싶은 이벤트를 결정한다.

sendPaymentEvent는 토픽을 payment-completed 로 지정하며, 파라미터로 받아온 이벤트를 함께 발행한다.

 

이 메소드를 호출하는 로직은 예상하겠지만 짧게 기록하자면 아래와 같다.

PaymentServiceImpl

  @Override
  @Transactional
  public TossPaymentResDto confirmPayment(Long userId, ConfirmPaymentReqDto request) {
    
    // .. 결제 관련 로직

    PaymentEvent event = new PaymentEvent(userId, response.getTotalAmount());
    paymentProducer.sendPaymentEvent(event);

    // .. 결제 관련 로직

    return response;
  }

 

이벤트를 dto 생성하듯이 똑같이 생성하고, (방식은 Builder, new 등등 상관없다.)

paymentProducer 클래스의 sendPaymentEvent 메소드를 호출한다.

 

 

 

 

구현은 이렇게 보면 매우 간단하다.

1. 이벤트 생성

2. 이벤트 송신 메소드 호출

 

그러나 여기서 중요한 포인트가 있다.

 

 

 

 

  • 토픽명 일치:
    • 프로듀서와 컨슈머가 동일한 토픽을 사용해야 메시지가 정상적으로 전송되고 수신됩니다. 이름이 다르면 메시지를 제대로 전달하거나 수신할 수 없습니다.
  • 패키지 경로 일치:
    • 카프카 메시지에 대한 직렬화 및 역직렬화 과정에서 패키지 경로가 일치하지 않으면 데이터 변환에 오류가 발생할 수 있습니다. 예를 들어, JSON 데이터와 객체 간 변환이 제대로 되지 않을 수 있습니다.
  • 데이터 직렬화 및 역직렬화 규격 일치:
    • 프로듀서에서 메시지를 직렬화할 때와 컨슈머에서 메시지를 역직렬화할 때 사용하는 클래스와 규격이 일치해야 합니다. 예를 들어, JsonSerializer와 JsonDeserializer를 사용하면 직렬화/역직렬화 대상 클래스가 정확히 동일해야 합니다.

 

 

 

 

기본적인 정의지만, 사용하면서 에러를 만날 수 있다. 나처럼,,

이번에 에러를 하나 오랫동안 해결했는데, 패키지 경로가 올바르지 않아서 뜬 ClassNotFoundException이다.

 

 

이벤트를 수신하려면 위 Config처럼 

configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

이런 부분이 있다.

 

간단히 말하자면, 이 설정은 카프카가 받은 메시지를 JSON 형식으로 Java 객체로 변환하기 위해 JsonDeserializer를 사용하도록 지정하는 코드다.

문제는 여기서 발생한다.

 

 

 

내가 이번에 만난 에러 코드다.

2025-01-16T14:44:11.859+09:00 ERROR 2572 --- [user-service] [ntainer#0-0-C-1] [                                                 ] o.a.k.c.c.internals.CompletedFetch       : [Consumer clientId=consumer-user-service-group-1, groupId=user-service-group] Value Deserializers with error: Deserializers{keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer@7d139153, valueDeserializer=org.springframework.kafka.support.serializer.JsonDeserializer@fa35900}
2025-01-16T14:44:11.859+09:00 ERROR 2572 --- [user-service] [ntainer#0-0-C-1] [                                                 ] o.s.k.l.KafkaMessageListenerContainer    : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
	at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:192) ~[spring-kafka-3.3.1.jar:3.3.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1985) ~[spring-kafka-3.3.1.jar:3.3.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1380) ~[spring-kafka-3.3.1.jar:3.3.1]
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:842) ~[na:na]
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing VALUE for partition payment-completed-0 at offset 0. If needed, please seek past the record to continue consumption.
	at org.apache.kafka.clients.consumer.internals.CompletedFetch.newRecordDeserializationException(CompletedFetch.java:346) ~[kafka-clients-3.8.1.jar:na]
	at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:330) ~[kafka-clients-3.8.1.jar:na]
	at org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:284) ~[kafka-clients-3.8.1.jar:na]
	at org.apache.kafka.clients.consumer.internals.FetchCollector.fetchRecords(FetchCollector.java:168) ~[kafka-clients-3.8.1.jar:na]
	at org.apache.kafka.clients.consumer.internals.FetchCollector.collectFetch(FetchCollector.java:134) ~[kafka-clients-3.8.1.jar:na]
	at org.apache.kafka.clients.consumer.internals.Fetcher.collectFetch(Fetcher.java:145) ~[kafka-clients-3.8.1.jar:na]
	at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.pollForFetches(LegacyKafkaConsumer.java:667) ~[kafka-clients-3.8.1.jar:na]
	at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:618) ~[kafka-clients-3.8.1.jar:na]
	at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:591) ~[kafka-clients-3.8.1.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874) ~[kafka-clients-3.8.1.jar:na]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1685) ~[spring-kafka-3.3.1.jar:3.3.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1660) ~[spring-kafka-3.3.1.jar:3.3.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1438) ~[spring-kafka-3.3.1.jar:3.3.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1328) ~[spring-kafka-3.3.1.jar:3.3.1]
	... 2 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.berry.payment.infrastructure.kafka.PaymentCompletedEvent]
	at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:136) ~[spring-kafka-3.3.1.jar:3.3.1]
	at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:97) ~[spring-kafka-3.3.1.jar:3.3.1]
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:587) ~[spring-kafka-3.3.1.jar:3.3.1]
	at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73) ~[kafka-clients-3.8.1.jar:na]
	at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:327) ~[kafka-clients-3.8.1.jar:na]
	... 14 common frames omitted
Caused by: java.lang.ClassNotFoundException: com.berry.payment.infrastructure.kafka.PaymentCompletedEvent
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) ~[na:na]
	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) ~[na:na]
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520) ~[na:na]
	at java.base/java.lang.Class.forName0(Native Method) ~[na:na]
	at java.base/java.lang.Class.forName(Class.java:467) ~[na:na]
	at org.springframework.util.ClassUtils.forName(ClassUtils.java:321) ~[spring-core-6.2.1.jar:6.2.1]
	at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:132) ~[spring-kafka-3.3.1.jar:3.3.1]
	... 18 common frames omitted

 

 

이 에러를 구글링 하다가 워낙 다른 설명들을 해서 GPT의 도움을 받아서 얻은 답변이 아래와 같다.

 

 

 

주어진 로그 메시지들을 분석해 보면, 문제는 user-service가 Kafka에서 수신한 메시지를 역직렬화(Deserialization)하는 과정에서 발생하고 있습니다. 다음과 같은 문제들이 발견됩니다:

주요 원인:

  1. ClassNotFoundException:
    • user-service에서 com.berry.payment.infrastructure.kafka.PaymentCompletedEvent 클래스를 찾을 수 없다는 예외가 발생하고 있습니다.
    • 이는 user-service가 해당 클래스를 로드하지 못했기 때문에 발생합니다. 이 클래스가 user-service의 클래스패스에 존재하지 않거나, JAR 파일에 포함되지 않았을 수 있습니다.
  2. 역직렬화 실패:
    • Kafka 메시지를 역직렬화할 때 MessageConversionException이 발생하고 있습니다. 이는 클래스 이름을 해결할 수 없기 때문에 발생한 것으로 보입니다.
    • 메시지를 직렬화한 payment-service와 역직렬화하려는 user-service 간에 메시지 클래스에 대한 동기화가 이루어지지 않은 것 같습니다.

해결 방법:

  1. 클래스패스 확인:
    • user-service의 클래스패스에 com.berry.payment.infrastructure.kafka.PaymentCompletedEvent 클래스가 포함되어 있는지 확인합니다.
    • 클래스가 포함된 JAR 파일이 누락되었거나, 빌드 설정에서 제외되었을 수 있으므로 빌드 설정을 검토하고 클래스가 포함되었는지 확인합니다.
  2. 공유 라이브러리 사용:
    • PaymentCompletedEvent 클래스가 payment-service와 user-service 간에 공유되어야 하므로, 공통 라이브러리로 분리하여 두 서비스가 모두 이 라이브러리를 의존하도록 설정하는 것이 좋습니다. 이를 통해 두 서비스가 동일한 클래스 정의를 사용할 수 있도록 합니다.
  3. 역직렬화 설정 확인:
    • user-service의 Kafka 역직렬화 설정이 올바른지 확인합니다. 예를 들어, spring.kafka.consumer.value-deserializer가 올바르게 설정되어 있는지 확인하고, spring.kafka.consumer.properties.spring.json.trusted.packages에 필요한 패키지를 추가하여 신뢰할 수 있는 패키지로 설정합니다.
    properties -> 
    spring.kafka.consumer.properties.spring.json.trusted.packages=*
    또는 필요한 패키지만 명시할 수 있습니다.
  4. 버전 호환성 확인:
    • spring-kafka와 Kafka 클라이언트의 버전이 서로 호환되는지 확인합니다. 호환되지 않는 버전을 사용하면 직렬화 및 역직렬화 과정에서 문제가 발생할 수 있습니다.

이러한 조치를 통해 user-service가 payment-service로부터 수신한 Kafka 메시지를 올바르게 역직렬화할 수 있도록 설정해야 합니다.

 

 

 

 

 

 

나는 무조건 역직렬화가 잘못되어서 나타나는 에러인 줄 알았다.

뭐 어떻게 보면 맞는 말이지만, 결국에는 시작부터 패키지 경로를 읽지 못해서 발생한 에러다.

그렇기 때문에 역직렬화 자체를 실행하지 못했던 것이다.

 

 

 

당연하겠지만 우리 팀은 MSA 기반이기 때문에 각각 다른 도메인 즉, 다른 패키지를 구성하고 있어서

유저 패키지에서는 Payment 패키지 안의 클래스는 읽을 수가 없다...

 

 

그래서 곧바로 공통모듈에 이벤트를 만들어, 두 도메인 모두 동일한 이벤트를 사용하게 했더니 ,

바로 해결되었다.

하지만 좋은 해결책은 아니었다.

 

 

이유로는,

 

  • 모듈 간 의존성: 공통 모듈에 의존하게 되면, 마이크로서비스 간 의존성이 생기고, 이로 인해 배포나 독립적인 개발에 제약이 생길 수 있습니다. 예를 들어, 하나의 서비스에서 공통 이벤트가 변경되면 이를 사용하는 다른 서비스들도 동시에 수정이 필요할 수 있습니다.
  • 유연성 부족: 각 마이크로서비스가 고유한 비즈니스 로직에 맞춰 이벤트를 변경하거나 확장할 필요가 있을 때, 공통 모듈의 변경이 모든 서비스에 영향을 미치게 되어 유연성이 떨어질 수 있습니다.
  • 버전 관리 문제: 여러 마이크로서비스가 서로 다른 버전의 공통 모듈을 사용하면, 이벤트의 구조나 버전이 달라져 호환성 문제가 발생할 수 있습니다. 이는 배포나 데이터 처리 시 어려움을 겪을 수 있습니다.

 

결론:

실무에서는 마이크로서비스의 독립성을 중요시하기 때문에, 이벤트를 각 서비스에 맞게 별도로 관리하는 경우가 더 많습니다. 각 서비스에서 발생하는 이벤트를 그 서비스 내에서 정의하고, 필요한 경우 공통 모듈에서 공유할 수 있는 구조로 이벤트를 처리하는 것이 좋습니다.

 

이렇게 구글링과 GPT에서 답변을 받았다.. 모두 맞는 얘기라 재검색할 필요도 없었다.

 

 

나는 또 다른 대안을 생각했다.

 

내가 발견한 해결책은 두 가지였다.

  1. 패키지 경로 자동 생성 억제
  2. avro를 사용하여 유연한 이벤트 관리

시간이 촉박하여 1번으로 해결했지만, 버저닝과 확장성을 고려한다면 무조건 2번이 맞다.

다음 카프카 관련 포스팅은 꼭 avro를 사용한 이벤트 관리를 하도록 하겠다..!

 

 

 

마지막으로 패키지 경로 자동 생성 억제 방법은 간단하다.

Producer쪽 yml을 수정해 준다.

spring:  
  kafka:
    bootstrap-servers: ${KAFKA_HOST}:${KAFKA_PORT}
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        spring.json.add.type.headers: false

 

Producer는 yml로 설정해 뒀어서 헷갈릴 수 있지만,

추가된 설정은

spring.json.add.type.headers: false

밖에 없다.

이 설정은 JSON 메시지에 타입 정보를 포함시키지 않도록 설정하는 것이다.

기본적으로 JsonDeserializer는 JSON 메시지에 포함된 클래스를 역직렬화하려고 할 때, 클래스 타입을 헤더에 포함시킨다.

하지만 이 설정을 false로 설정하면, 타입 정보를 헤더에서 제외하게 되어, 클래스 경로 문제로 인한 ClassNotFoundException과 같은 오류를 피할 수 있다.

 

 

간략하게 요약하자면!

 

해결 과정:

  • 문제의 원인: JsonDeserializer가 기본적으로 클래스 경로 정보를 헤더에 포함시켜, 이를 찾지 못한 경우 ClassNotFoundException이 발생했다.
  • 해결 방법: 이 설정을 통해 헤더에 클래스 정보를 포함시키지 않음으로써, MSA 환경에서 패키지 경로가 달라서 발생할 수 있는 ClassNotFoundException을 방지하게 되었다.

 

 

 

 

다음 포스팅은 avro로!