You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Kafka、Spark与ES集成测试的正确方法问询

测试Spark-Kafka-Protobuf-ES工作流的完整方案

我来给你拆解一个落地性强的测试方案,完全覆盖你提到的场景——不用Spring,Mock Kafka,Protobuf事件写入,验证偏移量提交和数据不丢失,咱们一步一步来:

1. 先搞定Mock Kafka环境

不用搭真实Kafka集群,直接用embedded-kafka(Java/Scala都支持),它会在测试进程内启动一个轻量的Kafka和ZooKeeper实例,用完自动销毁,非常适合单元/集成测试。

依赖配置(以Maven为例)

<dependency>
    <groupId>io.github.embeddedkafka</groupId>
    <artifactId>embedded-kafka_2.12</artifactId>
    <version>3.4.0</version>
    <scope>test</scope>
</dependency>

Scala用户可以对应调整成sbt依赖格式。

初始化Mock Kafka

在测试类里,你可以这样快速启动并创建测试主题:

import io.github.embeddedkafka.EmbeddedKafka;

public class SparkKafkaEsTest {
    private static final String TEST_TOPIC = "test-events";

    @BeforeEach
    void setUp() {
        // 启动嵌入式Kafka,默认端口9092
        EmbeddedKafka.start();
        // 创建测试主题
        EmbeddedKafka.createTopic(TEST_TOPIC);
    }

    @AfterEach
    void tearDown() {
        // 测试结束后自动关闭Kafka
        EmbeddedKafka.stop();
    }
}

2. 生成Protobuf测试事件并写入Mock Kafka

Protobuf消息要序列化成字节数组才能写入Kafka,这一步其实很简单:

第一步:构造Protobuf测试实例

假设你已经通过.proto文件生成了Java类,先构造一个测试用的Protobuf对象:

import com.yourpackage.UserEvent;

UserEvent testEvent = UserEvent.newBuilder()
    .setUserId("123")
    .setEventType("LOGIN")
    .setTimestamp(System.currentTimeMillis())
    .build();

第二步:写入Mock Kafka

用普通的KafkaProducer把序列化后的字节发送到测试主题:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

// 配置Producer,指向Mock Kafka的地址
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

try (KafkaProducer<String, byte[]> producer = new KafkaProducer<>(producerProps)) {
    // 把Protobuf对象转成字节数组发送
    producer.send(new ProducerRecord<>(TEST_TOPIC, testEvent.toByteArray())).get();
}

这样Mock Kafka里就有了你的Protobuf测试事件啦。

3. 测试Spark任务的核心逻辑

为了方便测试,建议把Spark任务拆成纯转换逻辑IO/偏移量提交逻辑两部分:

拆分逻辑示例

把Protobuf转成ES文档的逻辑抽成单独的纯函数,方便单独做单元测试:

import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import com.google.protobuf.InvalidProtocolBufferException;

public class EventTransformer {
    // 纯函数:输入Protobuf字节数组,输出ES可接受的Row
    public static Row transformProtobufToEsDoc(byte[] protoBytes) {
        try {
            UserEvent event = UserEvent.parseFrom(protoBytes);
            return RowFactory.create(
                event.getUserId(),
                event.getEventType(),
                event.getTimestamp()
            );
        } catch (InvalidProtocolBufferException e) {
            // 根据业务需求处理解析异常,比如返回null或标记为无效事件
            throw new RuntimeException("Failed to parse Protobuf event", e);
        }
    }
}

集成测试Spark完整流程

用Spark的local模式连接Mock Kafka,执行完整的消费-转换-写入ES流程:

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import java.util.HashMap;

public void testSparkWorkflow() throws Exception {
    // 初始化SparkSession(local模式,至少2核支撑流处理)
    SparkSession spark = SparkSession.builder()
        .appName("SparkKafkaEsTest")
        .master("local[2]")
        .getOrCreate();

    // 读取Mock Kafka的流数据
    var kafkaStream = spark.readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", TEST_TOPIC)
        .option("startingOffsets", "earliest")
        .load();

    // 应用转换逻辑
    var esDocs = kafkaStream.selectExpr("value as protoBytes")
        .map(row -> EventTransformer.transformProtobufToEsDoc((byte[]) row.get(0)),
            org.apache.spark.sql.catalyst.encoders.RowEncoder.apply(
                org.apache.spark.sql.types.StructType.fromDDL("userId string, eventType string, timestamp long")
            ));

    // 配置ES写入(这里可以用嵌入式ES或者TestContainers的ES容器)
    Map<String, String> esOptions = new HashMap<>();
    esOptions.put("es.nodes", "localhost:9200");
    esOptions.put("es.resource", "test_index/_doc");
    esOptions.put("es.batch.size.entries", "1"); // 单事件提交,符合你的需求

    // 启动流查询,配置偏移量持久化的checkpoint
    StreamingQuery query = esDocs.writeStream()
        .format("org.elasticsearch.spark.sql")
        .options(esOptions)
        .option("checkpointLocation", "/tmp/spark-checkpoint") // 必须设置,用于偏移量持久化
        .trigger(Trigger.ProcessingTime("10 seconds"))
        .start();

    // 等待流处理完成(测试时可以主动触发或等待固定时间)
    query.awaitTermination(30000);
    query.stop();
}

4. 验证数据一致性与偏移量提交

这一步是确保不丢数据的关键:

验证ES中的数据

用ES的Java客户端查询测试索引,确认写入的文档数量和内容和发送的测试事件一致:

import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.apache.http.HttpHost;

public void verifyEsData() throws IOException {
    RestHighLevelClient esClient = new RestHighLevelClient(
        RestClient.builder(new HttpHost("localhost", 9200, "http")));

    SearchRequest searchRequest = new SearchRequest("test_index");
    SearchResponse response = esClient.search(searchRequest, RequestOptions.DEFAULT);

    // 验证文档数量等于发送的测试事件数
    assertEquals(1, response.getHits().getTotalHits().value);
    // 验证文档内容正确
    String userId = (String) response.getHits().getHits()[0].getSourceAsMap().get("userId");
    assertEquals("123", userId);

    esClient.close();
}

验证偏移量提交

用KafkaConsumer读取消费组的偏移量,确认已经提交到正确的位置:

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import java.util.Properties;

public void verifyOffsets() {
    Properties consumerProps = new Properties();
    consumerProps.put("bootstrap.servers", "localhost:9092");
    consumerProps.put("group.id", "your-spark-consumer-group");
    consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");

    try (KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps)) {
        TopicPartition partition = new TopicPartition(TEST_TOPIC, 0);
        consumer.assign(Collections.singleton(partition));
        // 获取已提交的偏移量
        long committedOffset = consumer.committed(partition).offset();
        // 获取当前主题的最大偏移量
        consumer.seekToEnd(Collections.singleton(partition));
        long endOffset = consumer.position(partition);
        // 验证已提交偏移量等于最大偏移量(说明所有事件都被消费并提交)
        assertEquals(endOffset, committedOffset);
    }
}

额外的容错测试(避免丢数)

为了确保极端场景下不丢数据,你可以加这些测试:

  • 模拟任务重启:启动任务消费部分事件后手动停止,再重启,验证是否从上次提交的偏移量继续消费,没有重复或丢失。
  • 模拟转换异常:发送一个格式错误的Protobuf字节,验证任务是否能处理异常(比如跳过该事件并提交偏移量,或者重试),不会导致整个流挂掉。
  • 模拟ES写入失败:临时关闭ES,验证BulkProcessor是否会重试,偏移量是否在写入成功后才提交。

内容的提问来源于stack exchange,提问作者alina

火山引擎 最新活动