Kafka 和 Kafka Connect 是用于处理实时数据流的强大工具,但它们的术语可能会让人感到困惑。特别是在源和汇以及生产者和消费者之间。
在 Kafka 中,生产者(Producers)向主题(Topics)发布消息(Messages),消费者(Consumers)从主题中拉取这些消息并进行处理。但是,在 Kafka Connect 中,我们使用的术语是源(Sources)和汇(Sinks),他们的作用是将数据从其他系统引入到Kafka、或者将数据从Kafka推送到其他系统中。
以下是一个Kafka生产者示例:
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) throws Exception{
String topicName = "Hello-Kafka";
String key = "Key1";
String value = "Hello World";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost1:9092");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
producer.send(record);
producer.close();
System.out.println("SimpleProducer Completed.");
}
}
接下来是一个 Kafka Connect 汇示例:
import org.apache.kafka.connect.sink.*;
import org.apache.kafka.connect.sink.SinkRecord;
import java.util.Collection;
import java.util.Map;
public class KafkaConnectSinkExample extends SinkTask {
public KafkaConnectSinkExample() {
// 准备一些初始化代码,例如创建连接到传出数据的目标系统
}
public String version() {
return "1.0";
}
public void put(Collection<SinkRecord> records) throws Exception {
for (SinkRecord record : records)
{
// 在这里处理每个传入的记录
String key = record.key();
String value = record.value();
// 将记录推送到外部
}
}
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
// 将缓冲的数据推送到外部系统
}
public void stop() {
// 执行一些清理操作,例如关闭与外部系统的连接