kafka SerializationException

2022, Sep 20    
kafka kafdrop serialization

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition

error log

Error deserializing key/value for partition event.message-0 at offset 62. 
If needed, please seek past the record to continue consumption.

Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. 

Class not found [AAA]; nested exception is java.lang.ClassNotFoundException: AAA at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:138) 

at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:99) 
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:342) 
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1041) 
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:110) 
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1223) 
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:1072) 
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:562) 
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:523) 
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1230) 
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187) 
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) 
at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:78) 
at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:72) 
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:743) 
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:700) 
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 
at java.base/java.lang.Thread.run(Thread.java:835)

cause

kafka consumer가 producer쪽이랑 header가 달라서, 즉 패키지 path가 달라서 그런거임.

solved1

kafkalistener에 spring.json.use.type.headers:false property를 추가한다.

@KafkaListener(    
    topics = KafkaTenant.PREFIX + BusinessTopics.EVENT_MESSAGE,    
    groupId = BusinessTopics.EVENT_MESSAGE + "-crema-business-message-subscriber",    
    properties = {        
        "spring.json.value.default.type:messaging.event.MessageCreated",        
        "spring.json.use.type.headers:false"   
        }
    )

ℹ️ 주의

  • 카프카 이벤트를 배치로 수신받을 때, 여러 유형의 메시지를 한번에 받으면 안된다. 배치 대상에 중복으로 들어가게 되는 위험이 발생해서, 중복으로 처리하게 된다.
  • 즉, 배치로 구현해야할 경우는 json header를 구분해서 받아야한다.

solved2

consumer config에

props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false);

를 추가한다.

solved3

yml에 class를 맵핑시킨다. 단점은 사용하지 않는 이벤트의 헤더까지 맵핑되어야한다.

spring:
  kafka:
    consumer:
      properties:
        spring.json.type.mapping: messaging.event.BtalkCreated:messaging.event.TicketCreated,
messaging.event.BtalkUpdated:messaging.event.TicketUpdated

단, 해당 이벤트의 모든 헤더를 정의하지 않으면 아까와 동일한 SerializationException, MessageConversionException 에러 발생

solved4

또 다른 방법은 이벤트를 수신하는 클래스의 @KafkaListener에 spring.json.type.mapping을 설정하는 것

@KafkaListener(
    topics = {
        "TestTopic"
    },
    groupId = "test-event-subscriber",
    properties = {
        "spring.json.type.mapping:"
            + "messaging.event.BtalkCreated:messaging.event.TicketCreated,"
            + "messaging.event.BtalkUpdated:messaging.event.TicketUpdated"
    }
)
  • 만약에 이렇게 했는데 class not found exception이 발생한다면 spring.json.type.mapping로 설정한 여러 클래스들을 한 줄로 만든담에 개행을 해보도록 한다.

reference