You need to enable JavaScript to run this app.
优惠活动
大模型
产品
解决方案
定价
更多
文档控制台
免费开始使用

Spring Boot Kafka消费者信任包配置异常求助

问题排查:Kafka JsonDeserializer 信任包配置错误

看到你的错误提示和配置,问题其实很明显:你把具体的类名当成了信任包路径来配置,而JsonDeserializertrusted-packages参数需要的是包名,不是单个类的全限定名。

先看你的错误信息:

Caused by: java.lang.IllegalArgumentException: The class 'com.example.kw.dtos.Classdata' is not in the trusted packages: [java.util, java.lang, com.example.kw.dtos.Classdata]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

这里的信任包列表里出现了com.example.kw.dtos.Classdata,这是类名,不是包名,所以JsonDeserializer会认为这个类不在任何信任的包下(因为它把类名当成了包路径去匹配)。

修正方案

1. 修改YAML配置

spring.json.trusted.packages的值改成包名com.example.kw.dtos,而不是类名:

spring:
  kafka:
    bootstrap-servers: ip_here
    topic:
      json: topic_here
    consumer:
      group-id: group_id
      auto-offset-reset: earliest
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            trusted:
              packages: 'com.example.kw.dtos'

2. 修改Java配置类

同样,把JsonDeserializer.TRUSTED_PACKAGES的值改成包名:

@Configuration
@EnableKafka
public class MsgListener {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 这里改成包名,不是类名
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.kw.dtos");
        return props;
    }

    @Bean
    public ConsumerFactory<String, Classdata> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(
                consumerConfigs(),
                new StringDeserializer(),
                new JsonDeserializer<>(Classdata.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Classdata> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Classdata> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

额外说明

如果你想快速测试(或者确定所有消息来源都是可信的),也可以直接设置信任所有包:

# YAML里这样写
spring.kafka.consumer.properties.spring.json.trusted.packages: '*'
// Java配置里这样写
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");

这样修改后,JsonDeserializer就能正确识别com.example.kw.dtos.Classdata属于信任的包,不会再抛出这个异常了。

内容的提问来源于stack exchange,提问作者HeyItsMe

火山引擎 最新活动