Kafka、Spark与ES集成测试的正确方法问询
测试Spark-Kafka-Protobuf-ES工作流的完整方案
我来给你拆解一个落地性强的测试方案,完全覆盖你提到的场景——不用Spring,Mock Kafka,Protobuf事件写入,验证偏移量提交和数据不丢失,咱们一步一步来:
1. 先搞定Mock Kafka环境
不用搭真实Kafka集群,直接用embedded-kafka(Java/Scala都支持),它会在测试进程内启动一个轻量的Kafka和ZooKeeper实例,用完自动销毁,非常适合单元/集成测试。
依赖配置(以Maven为例)
<dependency> <groupId>io.github.embeddedkafka</groupId> <artifactId>embedded-kafka_2.12</artifactId> <version>3.4.0</version> <scope>test</scope> </dependency>
Scala用户可以对应调整成sbt依赖格式。
初始化Mock Kafka
在测试类里,你可以这样快速启动并创建测试主题:
import io.github.embeddedkafka.EmbeddedKafka; public class SparkKafkaEsTest { private static final String TEST_TOPIC = "test-events"; @BeforeEach void setUp() { // 启动嵌入式Kafka,默认端口9092 EmbeddedKafka.start(); // 创建测试主题 EmbeddedKafka.createTopic(TEST_TOPIC); } @AfterEach void tearDown() { // 测试结束后自动关闭Kafka EmbeddedKafka.stop(); } }
2. 生成Protobuf测试事件并写入Mock Kafka
Protobuf消息要序列化成字节数组才能写入Kafka,这一步其实很简单:
第一步:构造Protobuf测试实例
假设你已经通过.proto文件生成了Java类,先构造一个测试用的Protobuf对象:
import com.yourpackage.UserEvent; UserEvent testEvent = UserEvent.newBuilder() .setUserId("123") .setEventType("LOGIN") .setTimestamp(System.currentTimeMillis()) .build();
第二步:写入Mock Kafka
用普通的KafkaProducer把序列化后的字节发送到测试主题:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; // 配置Producer,指向Mock Kafka的地址 Properties producerProps = new Properties(); producerProps.put("bootstrap.servers", "localhost:9092"); producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); try (KafkaProducer<String, byte[]> producer = new KafkaProducer<>(producerProps)) { // 把Protobuf对象转成字节数组发送 producer.send(new ProducerRecord<>(TEST_TOPIC, testEvent.toByteArray())).get(); }
这样Mock Kafka里就有了你的Protobuf测试事件啦。
3. 测试Spark任务的核心逻辑
为了方便测试,建议把Spark任务拆成纯转换逻辑和IO/偏移量提交逻辑两部分:
拆分逻辑示例
把Protobuf转成ES文档的逻辑抽成单独的纯函数,方便单独做单元测试:
import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import com.google.protobuf.InvalidProtocolBufferException; public class EventTransformer { // 纯函数:输入Protobuf字节数组,输出ES可接受的Row public static Row transformProtobufToEsDoc(byte[] protoBytes) { try { UserEvent event = UserEvent.parseFrom(protoBytes); return RowFactory.create( event.getUserId(), event.getEventType(), event.getTimestamp() ); } catch (InvalidProtocolBufferException e) { // 根据业务需求处理解析异常,比如返回null或标记为无效事件 throw new RuntimeException("Failed to parse Protobuf event", e); } } }
集成测试Spark完整流程
用Spark的local模式连接Mock Kafka,执行完整的消费-转换-写入ES流程:
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.Trigger; import java.util.HashMap; public void testSparkWorkflow() throws Exception { // 初始化SparkSession(local模式,至少2核支撑流处理) SparkSession spark = SparkSession.builder() .appName("SparkKafkaEsTest") .master("local[2]") .getOrCreate(); // 读取Mock Kafka的流数据 var kafkaStream = spark.readStream() .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", TEST_TOPIC) .option("startingOffsets", "earliest") .load(); // 应用转换逻辑 var esDocs = kafkaStream.selectExpr("value as protoBytes") .map(row -> EventTransformer.transformProtobufToEsDoc((byte[]) row.get(0)), org.apache.spark.sql.catalyst.encoders.RowEncoder.apply( org.apache.spark.sql.types.StructType.fromDDL("userId string, eventType string, timestamp long") )); // 配置ES写入(这里可以用嵌入式ES或者TestContainers的ES容器) Map<String, String> esOptions = new HashMap<>(); esOptions.put("es.nodes", "localhost:9200"); esOptions.put("es.resource", "test_index/_doc"); esOptions.put("es.batch.size.entries", "1"); // 单事件提交,符合你的需求 // 启动流查询,配置偏移量持久化的checkpoint StreamingQuery query = esDocs.writeStream() .format("org.elasticsearch.spark.sql") .options(esOptions) .option("checkpointLocation", "/tmp/spark-checkpoint") // 必须设置,用于偏移量持久化 .trigger(Trigger.ProcessingTime("10 seconds")) .start(); // 等待流处理完成(测试时可以主动触发或等待固定时间) query.awaitTermination(30000); query.stop(); }
4. 验证数据一致性与偏移量提交
这一步是确保不丢数据的关键:
验证ES中的数据
用ES的Java客户端查询测试索引,确认写入的文档数量和内容和发送的测试事件一致:
import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.apache.http.HttpHost; public void verifyEsData() throws IOException { RestHighLevelClient esClient = new RestHighLevelClient( RestClient.builder(new HttpHost("localhost", 9200, "http"))); SearchRequest searchRequest = new SearchRequest("test_index"); SearchResponse response = esClient.search(searchRequest, RequestOptions.DEFAULT); // 验证文档数量等于发送的测试事件数 assertEquals(1, response.getHits().getTotalHits().value); // 验证文档内容正确 String userId = (String) response.getHits().getHits()[0].getSourceAsMap().get("userId"); assertEquals("123", userId); esClient.close(); }
验证偏移量提交
用KafkaConsumer读取消费组的偏移量,确认已经提交到正确的位置:
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import java.util.Collections; import java.util.Properties; public void verifyOffsets() { Properties consumerProps = new Properties(); consumerProps.put("bootstrap.servers", "localhost:9092"); consumerProps.put("group.id", "your-spark-consumer-group"); consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); try (KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps)) { TopicPartition partition = new TopicPartition(TEST_TOPIC, 0); consumer.assign(Collections.singleton(partition)); // 获取已提交的偏移量 long committedOffset = consumer.committed(partition).offset(); // 获取当前主题的最大偏移量 consumer.seekToEnd(Collections.singleton(partition)); long endOffset = consumer.position(partition); // 验证已提交偏移量等于最大偏移量(说明所有事件都被消费并提交) assertEquals(endOffset, committedOffset); } }
额外的容错测试(避免丢数)
为了确保极端场景下不丢数据,你可以加这些测试:
- 模拟任务重启:启动任务消费部分事件后手动停止,再重启,验证是否从上次提交的偏移量继续消费,没有重复或丢失。
- 模拟转换异常:发送一个格式错误的Protobuf字节,验证任务是否能处理异常(比如跳过该事件并提交偏移量,或者重试),不会导致整个流挂掉。
- 模拟ES写入失败:临时关闭ES,验证BulkProcessor是否会重试,偏移量是否在写入成功后才提交。
内容的提问来源于stack exchange,提问作者alina




