如何通过Java代码使用Kafka Connector?问询独立/分布式模式启动方法
Great questions! Let's break this down with practical examples and clear explanations:
1. How to Operate Kafka Connectors with Java Code?
Kafka Connect provides a REST API as the standard way to manage connectors programmatically. While there’s no official dedicated Java SDK, you can easily interact with this API using common HTTP client libraries like Java 11+ HttpClient or Spring’s RestTemplate.
Here are some common operations with code snippets:
Example 1: List All Running Connectors
import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; public class ConnectorManager { private static final String CONNECT_REST_URL = "http://localhost:8083"; // Default Connect REST port public static void main(String[] args) throws Exception { HttpClient client = HttpClient.newHttpClient(); HttpRequest request = HttpRequest.newBuilder() .uri(URI.create(CONNECT_REST_URL + "/connectors")) .GET() .build(); HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString()); System.out.println("Running connectors: " + response.body()); } }
Example 2: Create a SpoolDir Source Connector
Define your connector config as JSON, then send a POST request to register it:
import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.nio.charset.StandardCharsets; public class SpoolDirConnectorCreator { private static final String CONNECT_REST_URL = "http://localhost:8083"; public static void main(String[] args) throws Exception { String connectorConfig = """ { "name": "spool-dir-source", "config": { "connector.class": "org.apache.kafka.connect.file.SpoolDirSourceConnector", "tasks.max": "1", "topic": "spool-dir-topic", "input.dir": "/path/to/your/input", "finished.dir": "/path/to/your/finished", "file.filter": "*.txt" } } """; HttpClient client = HttpClient.newHttpClient(); HttpRequest request = HttpRequest.newBuilder() .uri(URI.create(CONNECT_REST_URL + "/connectors")) .header("Content-Type", "application/json") .POST(HttpRequest.BodyPublishers.ofString(connectorConfig, StandardCharsets.UTF_8)) .build(); HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString()); System.out.println("Creation response: " + response.statusCode() + " - " + response.body()); } }
Other Useful Operations
- Get connector status:
GET /connectors/{connector-name}/status - Pause a connector:
PUT /connectors/{connector-name}/pause - Resume a connector:
PUT /connectors/{connector-name}/resume - Delete a connector:
DELETE /connectors/{connector-name}
2. Can You Start Connectors Directly via Java Code (Standalone/Distributed Mode)?
Yes! You can invoke Kafka Connect’s core main classes directly from your Java code, just like running the shell scripts. Here’s how to do it for both modes:
Prerequisites
First, add these dependencies to your project (adjust versions to match your Kafka setup):
<!-- Maven dependencies --> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-connect-api</artifactId> <version>2.8.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>connect-file</artifactId> <version>2.8.1</version> <!-- For SpoolDir connector --> </dependency> </dependencies>
Standalone Mode
Call ConnectStandalone.main() with your configuration file paths as arguments:
public class StartStandaloneConnector { public static void main(String[] args) { // Mimic the shell command: kafka/bin/connect-standalone.sh connect-standalone.properties file-source.properties String[] connectArgs = { "/absolute/path/to/connect-standalone.properties", "/absolute/path/to/file-source.properties" }; // Launch the standalone connector org.apache.kafka.connect.cli.ConnectStandalone.main(connectArgs); } }
Distributed Mode
Similarly, invoke ConnectDistributed.main() with your distributed config file:
public class StartDistributedConnector { public static void main(String[] args) { // Mimic the shell command: kafka/bin/connect-distributed.sh connect-distributed.properties String[] connectArgs = { "/absolute/path/to/connect-distributed.properties" }; // Launch a distributed connector node org.apache.kafka.connect.cli.ConnectDistributed.main(connectArgs); } }
Key Notes
- Configuration Paths: Use absolute paths for your properties files to avoid classpath issues.
- Classpath: Ensure all required connectors (like
connect-filefor SpoolDir) and dependencies are included in your project’s classpath. - Distributed Cluster: When starting multiple nodes, make sure they share the same
group.idin their config to form a unified cluster.
内容的提问来源于stack exchange,提问作者Ishani Vij




