Skip to main content
 首页 » 编程设计

Spring Boot/Kafka Json 反序列化之可信包

2025年02月15日35wuhuacong

我刚刚开始在 Spring Boot 中使用 Kafka 并想发送和使用 JSON 对象。

当我尝试使用来自 Kafka 主题的消息时,出现以下错误:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition dev.orders-0 at offset 9903. If needed, please seek past the record to continue consumption. 
Caused by: java.lang.IllegalArgumentException: The class 'co.orders.feedme.feed.domain.OrderItem' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*). 
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139) ~[spring-kafka-2.1.5.RELEASE.jar:2.1.5.RELEASE] 
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113) ~[spring-kafka-2.1.5.RELEASE.jar:2.1.5.RELEASE] 
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:218) ~[spring-kafka-2.1.5.RELEASE.jar:2.1.5.RELEASE] 
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923) ~[kafka-clients-1.0.1.jar:na] 
at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93) ~[kafka-clients-1.0.1.jar:na] 

我试图通过在 application.properties 中定义以下属性将我的包添加到受信任包列表中:
spring.kafka.consumer.properties.spring.json.trusted.packages = co.orders.feedme.feed.domain 

这似乎没有任何区别。将我的包添加到 Spring 的 Kafka JsonDeserializer 的受信任包列表的正确方法是什么?

请您参考如下方法:

好的,我已经更详细地阅读了文档并找到了我问题的答案。我正在使用 Kotlin,所以我的消费者的创建看起来像这样

@Bean 
fun consumerFactory(): ConsumerFactory<String, FeedItem> { 
    val configProps = HashMap<String, Any>() 
    configProps[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers 
    configProps[ConsumerConfig.GROUP_ID_CONFIG] = "feedme" 
    configProps[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java 
    configProps[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java 
    configProps[JsonDeserializer.TRUSTED_PACKAGES] = "co.orders.feedme.feed.domain" 
    return DefaultKafkaConsumerFactory(configProps) 
} 

现在我只需要一种方法来覆盖 JsonDeserializer 中 Jackson ObjectMapper 的创建,以便它可以与我没有零参数构造函数的 Kotlin 数据类一起使用 :)