MessageConversionException failed to resolve class name

2023, Nov 05    
kafka MessageConversionException

error log

11:27:42.751 ERROR [,] [r#10-0-C-1] m.c.s.KafkaFailedDeserializationFunction: 
12 apply           topic [event.test]
org.springframework.messaging.converter.MessageConversionException: 
failed to resolve class name. Class not found [test.com.TestService]; 
nested exception is java.lang.ClassNotFoundException: test.com.TestService
	at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:142)
	at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:572)

cause

  • kafka_serializationException/#solved4 방식처럼 spring.json.type.mapping를 통해서 메시지 역직렬화 클래스를 지정할 때, 해당 topic으로 전달되는 모든 json데이터를 정의하지 않으면, MessageConversionException가 발생하게 된다.
  • 하지만 해당 토픽에서 발생하는 모든 이벤트 클래스를 맵핑하는 것에 대해 비지니스 처리가 필요하지 않는다면 전부를 맵핑용 클래스로 생성할 필요가 없기 때문에 방법이 필요

solve 1.

  • Object클래스로 맵핑을 시키고, 해당 클래스로 캐스팅하는 방법이 있다.
@KafkaListener(
    id = CtalkAppChannelEventSubscriber.ID,
    topics = KafkaTenant.PREFIX + CtalkTopics.EVENT_APP_CHANNEL,
    groupId = CtalkTopics.EVENT_APP_CHANNEL + "-messengerconnector-ctalk-appsetting-subscriber",
    properties = {
        "spring.json.type.mapping:" +
            "test.com.messaging.event.MessageCreated:java.lang.Object," +
            "test.com.messaging.event.TestAService:test.com.TestAService," +
            "test.com.messaging.event.TestBService:test.com.TestBService"
    },
    autoStartup = "false"
)
@KafkaHandler
public void handle(Object object) {
    try {
        if (object instanceof MessageCreated) {
            log.debug("MessageCreated event");
        } else if (object instanceof MessageDeleted) {
            log.debug("MessageDeleted event");
        } else {
            log.debug("else!!!");
        }
    } catch (Exception e) {
       
    }
}

@KafkaHandler
public void handle(TestAService testAService) {
    try {
        // TestAService 
    } catch (Exception e) {
       
    }
}
  • 위의 방법은 Object로 맵핑시킬 class path는 일치해야하는 단점이 있다.
  • DefaultJackson2JavaTypeMapper 에서 일치하지 않으면, MessageConversionException throw한다.
private JavaType getClassIdType(String classId) {
        if (this.getIdClassMapping().containsKey(classId)) {
            return TypeFactory.defaultInstance().constructType((Type)this.getIdClassMapping().get(classId));
        } else {
            try {
                if (!this.isTrustedPackage(classId)) {
                    throw new IllegalArgumentException("The class '" + classId + "' is not in the trusted packages: " + this.trustedPackages + ". If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).");
                } else {
                    return TypeFactory.defaultInstance().constructType(ClassUtils.forName(classId, this.getClassLoader()));
                }
            } catch (ClassNotFoundException var3) {
                throw new MessageConversionException("failed to resolve class name. Class not found [" + classId + "]", var3);
            } catch (LinkageError var4) {
                throw new MessageConversionException("failed to resolve class name. Linkage error [" + classId + "]", var4);
            }
        }
    }
  • 그래서 이 부분은 어쩔 수 없지만, 실제로 클래스를 생성하지 않아도 되니 일단 이 방법으로 적용했다.

better than solve1.

  • Object말고 별도로 제네릭 타입을 받도록 디폴트 클래스를 생성해서 처리할 수 있다.
@KafkaListener(
    id = CtalkAppChannelEventSubscriber.ID,
    topics = KafkaTenant.PREFIX + CtalkTopics.EVENT_APP_CHANNEL,
    groupId = CtalkTopics.EVENT_APP_CHANNEL + "-messengerconnector-ctalk-appsetting-subscriber",
    properties = {
        "spring.json.type.mapping:" +
            "test.com.messaging.event.MessageCreated:test.com.Ignore," +
            "test.com.messaging.event.TestAService:test.com.TestAService," +
            "test.com.messaging.event.TestBService:test.com.TestBService"
    },
    autoStartup = "false"
)
public class CtalkAppChannelEventSubscriber {
	@KafkaHandler
	public void handle(TestAService testAService) {
	    try {
	        // TestAService 
	    } catch (Exception e) {
	       
	    }
	}
	
	@KafkaHandler(isDefault = true)
	public void ignore(Ignore ignore) {
	    log.trace("Received ignore: {}", ignore);
	}
}