You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

使用Kafka-Elasticsearch Connector时数据未同步至Elasticsearch的问题

Troubleshooting Kafka Connect Elasticsearch Sink Sync Issues

Hey there, let's work through why your chat app's Kafka messages aren't making it to Elasticsearch. I'll walk you through the most common checks tailored to your setup (Confluent Kafka, Elasticsearch Sink Connector, Kafka REST API with JSON):

1. First, Confirm Kafka Connect is Up and Running

If Connect isn't running, nothing will sync. Check its status with:

curl http://<your-connect-host>:<connect-port>/connectors

If you get an empty array or a connection error, start Connect first. For local Confluent dev environments, use:

confluent local services connect start

2. Validate Your Elasticsearch Sink Connector Configuration

This is where most folks hit snags. Double-check these critical settings for your JSON/no-Schema Registry use case:

  • connector.class: Must be io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
  • topics: Exact name of the Kafka topic your chat app uses (case-sensitive!)
  • connection.url: Your Elasticsearch endpoint (e.g., http://localhost:9200) — ensure Connect can reach this (no firewall blocks, ES is listening on the right interface)
  • key.ignore: Set to true if your chat messages don't include a Kafka key (super common for chat apps)
  • schema.ignore: Set to true since you're using raw JSON via Kafka REST (no Schema Registry)
  • value.converter: Use org.apache.kafka.connect.json.JsonConverter with value.converter.schemas.enable: false to disable schema checks

Here's a working config example for your scenario:

{
  "name": "chat-es-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "chat-messages",
    "connection.url": "http://localhost:9200",
    "key.ignore": "true",
    "schema.ignore": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false"
  }
}

Create the connector with:

curl -X POST -H "Content-Type: application/json" --data @es-sink-config.json http://<connect-host>:<connect-port>/connectors

3. Check Your Kafka Topic's Message Format

Since you're using Kafka REST API, make sure your messages are valid JSON objects (not arrays or schema-wrapped data). Use the console consumer to inspect messages:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic chat-messages --from-beginning --property print.value=true

Each message should look something like this:

{"sender": "user123", "content": "Hey everyone!", "timestamp": "2024-05-20T14:30:00"}

If messages are wrapped in extra schema fields or aren't valid JSON, the sink connector will fail to parse them.

4. Inspect Connector Status and Logs

After creating the connector, check its health:

curl http://<connect-host>:<connect-port>/connectors/chat-es-sink/status

If tasks show FAILED, the response will include error details. For deeper context, check Connect's logs (default location for Confluent local: ./confluent/logs/connect.log). Common errors to look for:

  • Connection refused: Connect can't reach Elasticsearch — verify ES is running, check network.host in elasticsearch.yml (set to 0.0.0.0 for local access)
  • document mapping type name can't be empty: You included type.name in your config but are using Elasticsearch 7+ — remove this setting (it's deprecated in 7.x+)
  • Schema not found: You forgot to enable schema.ignore — the connector is trying to fetch a schema that doesn't exist

5. Verify Elasticsearch Index and Health

Check if the sink connector created an index in ES (defaults to your Kafka topic name, lowercase):

curl http://localhost:9200/_cat/indices?v

Also confirm ES is healthy:

curl http://localhost:9200/_cluster/health

Status should be green or yellow — if it's red, fix Elasticsearch issues first (e.g., disk space, node failures).

6. Confirm Kafka REST API Message Delivery

Make sure you're sending messages correctly via Kafka REST. A valid POST request looks like this:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
  --data '{"records": [{"value": {"sender": "user456", "content": "Testing sync!"}}]}' \
  http://<kafka-rest-host>:<kafka-rest-port>/topics/chat-messages

If you're sending messages with extra schema fields or to the wrong topic, they won't be picked up by the sink connector.

Walk through these steps one by one — odds are one of these checks will uncover the issue. If you're still stuck, share the connector's error logs or status response, and we can dig deeper!

内容的提问来源于stack exchange,提问作者223seneca

火山引擎 最新活动