使用Kafka-Elasticsearch Connector时数据未同步至Elasticsearch的问题
Hey there, let's work through why your chat app's Kafka messages aren't making it to Elasticsearch. I'll walk you through the most common checks tailored to your setup (Confluent Kafka, Elasticsearch Sink Connector, Kafka REST API with JSON):
1. First, Confirm Kafka Connect is Up and Running
If Connect isn't running, nothing will sync. Check its status with:
curl http://<your-connect-host>:<connect-port>/connectors
If you get an empty array or a connection error, start Connect first. For local Confluent dev environments, use:
confluent local services connect start
2. Validate Your Elasticsearch Sink Connector Configuration
This is where most folks hit snags. Double-check these critical settings for your JSON/no-Schema Registry use case:
connector.class: Must beio.confluent.connect.elasticsearch.ElasticsearchSinkConnectortopics: Exact name of the Kafka topic your chat app uses (case-sensitive!)connection.url: Your Elasticsearch endpoint (e.g.,http://localhost:9200) — ensure Connect can reach this (no firewall blocks, ES is listening on the right interface)key.ignore: Set totrueif your chat messages don't include a Kafka key (super common for chat apps)schema.ignore: Set totruesince you're using raw JSON via Kafka REST (no Schema Registry)value.converter: Useorg.apache.kafka.connect.json.JsonConverterwithvalue.converter.schemas.enable: falseto disable schema checks
Here's a working config example for your scenario:
{ "name": "chat-es-sink", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max": "1", "topics": "chat-messages", "connection.url": "http://localhost:9200", "key.ignore": "true", "schema.ignore": "true", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false" } }
Create the connector with:
curl -X POST -H "Content-Type: application/json" --data @es-sink-config.json http://<connect-host>:<connect-port>/connectors
3. Check Your Kafka Topic's Message Format
Since you're using Kafka REST API, make sure your messages are valid JSON objects (not arrays or schema-wrapped data). Use the console consumer to inspect messages:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic chat-messages --from-beginning --property print.value=true
Each message should look something like this:
{"sender": "user123", "content": "Hey everyone!", "timestamp": "2024-05-20T14:30:00"}
If messages are wrapped in extra schema fields or aren't valid JSON, the sink connector will fail to parse them.
4. Inspect Connector Status and Logs
After creating the connector, check its health:
curl http://<connect-host>:<connect-port>/connectors/chat-es-sink/status
If tasks show FAILED, the response will include error details. For deeper context, check Connect's logs (default location for Confluent local: ./confluent/logs/connect.log). Common errors to look for:
Connection refused: Connect can't reach Elasticsearch — verify ES is running, checknetwork.hostinelasticsearch.yml(set to0.0.0.0for local access)document mapping type name can't be empty: You includedtype.namein your config but are using Elasticsearch 7+ — remove this setting (it's deprecated in 7.x+)Schema not found: You forgot to enableschema.ignore— the connector is trying to fetch a schema that doesn't exist
5. Verify Elasticsearch Index and Health
Check if the sink connector created an index in ES (defaults to your Kafka topic name, lowercase):
curl http://localhost:9200/_cat/indices?v
Also confirm ES is healthy:
curl http://localhost:9200/_cluster/health
Status should be green or yellow — if it's red, fix Elasticsearch issues first (e.g., disk space, node failures).
6. Confirm Kafka REST API Message Delivery
Make sure you're sending messages correctly via Kafka REST. A valid POST request looks like this:
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \ --data '{"records": [{"value": {"sender": "user456", "content": "Testing sync!"}}]}' \ http://<kafka-rest-host>:<kafka-rest-port>/topics/chat-messages
If you're sending messages with extra schema fields or to the wrong topic, they won't be picked up by the sink connector.
Walk through these steps one by one — odds are one of these checks will uncover the issue. If you're still stuck, share the connector's error logs or status response, and we can dig deeper!
内容的提问来源于stack exchange,提问作者223seneca




