You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何将Map<String, Map<MetricName, ? extends Metric>>转换为JsonObject?

解决Kafka Metrics转JsonObject的空指针与序列化问题

先帮你拆解下两种方法踩的坑,再给出实用的解决方案:

问题分析

1. ObjectMapper返回null的原因

Kafka的MetricNameMetric是框架内部类,默认没有Jackson的序列化支持,直接用writeValueAsString会因为无法识别这些类型导致序列化失败,最终返回null。另外你代码里的StringBuilder sb全程没往里面写入内容,最后返回的是空字符串,这也是一个容易忽略的小问题。

2. JsonObject触发空指针的原因

你直接把Metric强转为JsonElement,这完全不成立——Metric根本不是JsonElement的子类,强转会抛出类型转换异常,进而导致空指针(或者直接崩溃)。而且如果metrics本身为null的话,调用metrics.forEach也会触发空指针。

最优解决方案

推荐两种思路,选适合你的场景:

思路1:手动转换为简单Map后再序列化(最直接)

先把嵌套的复杂Map转换成Jackson/Gson能直接处理的简单键值对(比如Map<String, Map<String, Object>>),再转JSON。这样不需要自定义序列化器,代码更直观:

@GetMapping("/showRawKafkaMetrics")
public String getMetrics() throws JsonProcessingException {
    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    
    // 用来存转换后的所有指标
    Map<String, Map<String, Object>> processedMetrics = new HashMap<>();

    for (MessageListenerContainer container : kafkaListenerEndpointRegistry.getListenerContainers()) {
        Map<String, Map<MetricName, ? extends Metric>> metrics = container.metrics();
        // 先判断metrics是否为空,避免空指针
        if (metrics == null || metrics.isEmpty()) {
            continue;
        }

        metrics.forEach((clientId, metricMap) -> {
            Map<String, Object> simpleMetricMap = new HashMap<>();
            metricMap.forEach((metricName, metric) -> {
                // 提取MetricName的名称作为键,Metric的实际值作为值
                simpleMetricMap.put(metricName.name(), metric.metricValue());
                // 如果需要更多信息(比如metric的标签),可以补充:
                // simpleMetricMap.put(metricName.name() + "_tags", metricName.tags());
            });
            processedMetrics.put(clientId, simpleMetricMap);
        });
    }

    // 把处理后的Map转成JSON字符串
    return objectMapper.writeValueAsString(processedMetrics);
}

思路2:自定义Jackson序列化器(更优雅,适合重复使用)

如果需要多次序列化Kafka的Metric类型,可以自定义序列化器,注册到ObjectMapper中:

// 自定义MetricName序列化器
public class MetricNameSerializer extends StdSerializer<MetricName> {
    public MetricNameSerializer() {
        super(MetricName.class);
    }

    @Override
    public void serialize(MetricName metricName, JsonGenerator gen, SerializerProvider provider) throws IOException {
        // 把MetricName序列化为包含名称、组、标签的JSON对象
        gen.writeStartObject();
        gen.writeStringField("name", metricName.name());
        gen.writeStringField("group", metricName.group());
        gen.writeObjectField("tags", metricName.tags());
        gen.writeEndObject();
    }
}

// 自定义Metric序列化器
public class MetricSerializer extends StdSerializer<Metric> {
    public MetricSerializer() {
        super(Metric.class);
    }

    @Override
    public void serialize(Metric metric, JsonGenerator gen, SerializerProvider provider) throws IOException {
        // 直接序列化Metric的实际值
        gen.writeObject(metric.metricValue());
    }
}

然后在代码中注册使用:

@GetMapping("/showRawKafkaMetrics")
public String getMetrics() throws JsonProcessingException {
    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    
    // 注册自定义序列化器
    SimpleModule module = new SimpleModule();
    module.addSerializer(MetricName.class, new MetricNameSerializer());
    module.addSerializer(Metric.class, new MetricSerializer());
    objectMapper.registerModule(module);

    StringBuilder sb = new StringBuilder();
    for (MessageListenerContainer container : kafkaListenerEndpointRegistry.getListenerContainers()) {
        Map<String, Map<MetricName, ? extends Metric>> metrics = container.metrics();
        if (metrics == null || metrics.isEmpty()) {
            continue;
        }

        metrics.forEach((clientId, metricMap) -> {
            try {
                String json = objectMapper.writeValueAsString(metricMap);
                sb.append("Client ID: ").append(clientId).append("\n");
                sb.append("Metrics: ").append(json).append("\n\n");
            } catch (JsonProcessingException e) {
                log.error("Failed to serialize metrics for client {}", clientId, e);
            }
        });
    }

    return sb.toString();
}

额外注意点

  • 一定要先判断container.metrics()是否为null,避免调用forEach时触发空指针;
  • 如果需要展示Metric的更多元数据(比如分组、标签),可以在转换时把这些信息也加入到简单Map中;
  • 最终返回的JSON可以直接用于监控面板的数据源,比如ECharts、Grafana等。

内容的提问来源于stack exchange,提问作者TechGeek

火山引擎 最新活动