You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Flink SQL运行正常但无数据写入Elasticsearch集群求助

排查Flink批量写入Elasticsearch无数据但无异常的问题

我希望通过SQL查询MySQL或Hive中的数据并写入Elasticsearch(ES)集群,程序可成功运行但ES中无数据。
软件版本:Flink: 1.11,ES: 6.2.2,Hive: 1.2.1,MySQL: 5.7

我的代码

public class HiveExample { 
    public static void main(String[] args) throws DatabaseNotExistException { 
        EnvironmentSettings settings = EnvironmentSettings.newInstance() 
            .useBlinkPlanner() 
            .inBatchMode() 
            .build(); 
        TableEnvironment tabEnv = TableEnvironment.create(settings); 
        String sql = "insert into user_action_es_sink " + 
            "select 100123,5,11,1,'a','b','111','bbb',cast(11111 as bigint),cast('2020-11-11' as date) from dragonfly.web_page limit 10" ; 
        String sporeUserAuthCreateTableSQL = "CREATE TABLE users (\n" + 
            " `id` INT,\n" + 
            " `userid` INT,\n" + 
            " `type` INT,\n" + 
            " PRIMARY KEY (id) NOT ENFORCED" + 
            ") WITH (\n" + 
            " 'connector' = 'jdbc',\n" + 
            " 'url' = 'jdbc:mysql://localhost:3306/spore',\n" + 
            " 'table-name' = 'spore_user_auth',\n" + 
            " 'driver' = 'com.mysql.jdbc.Driver',\n" + 
            " 'username' = 'xxxx',\n" + 
            " 'password' = 'xxxx'\n" + 
            ")"; 
        tabEnv.executeSql(sporeUserAuthCreateTableSQL); 
        String esTable = "CREATE TABLE user_action_es_sink (\n" + 
            " uid INT,\n" + 
            " appid INT,\n" + 
            " prepage_id INT,\n" + 
            " page_id INT,\n" + 
            " action_id STRING,\n" + 
            " page_name STRING,\n" + 
            " action_name STRING,\n" + 
            " prepage_name STRING,\n" + 
            " stat_time BIGINT,\n" + 
            " dt DATE\n" + 
            // " PRIMARY KEY (uid,dt) NOT ENFORCED\n" + 
            ") WITH (\n" + 
            " 'connector' = 'elasticsearch-6',\n" + 
            " 'hosts' = 'http://localhost:9200',\n" + 
            " 'index' = 'mytest',\n" + 
            " 'document-type' = 'user_action'\n" + 
            // " 'sink.bulk-flush.max-size' = '0',\n" + 
            // " 'sink.bulk-flush.max-actions' = '0',\n" + 
            // " 'sink.bulk-flush.interval' = '0'\n"+ 
            // " 'format' = 'json',\n" + 
            // " 'json.fail-on-missing-field' = 'false',\n"+ 
            // " 'json.ignore-parse-errors' = 'true'\n" + 
            ")"; 
        tabEnv.executeSql(esTable); 
        tabEnv.executeSql("insert into user_action_es_sink select 100123,5,11,1,'a','b','111','bbb',cast(11111 as bigint),cast('2020-11-11' as date) from users limit 10").print(); 
    } 
}

我的pom文件

<dependencies> 
    <dependency> 
        <groupId>org.apache.flink</groupId> 
        <artifactId>flink-java</artifactId> 
        <version>${flink.version}</version> 
    </dependency> 
    <dependency> 
        <groupId>org.apache.flink</groupId> 
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> 
        <version>${flink.version}</version> 
        <!--<scope>provided</scope>--> 
    </dependency> 
    <dependency> 
        <groupId>org.apache.flink</groupId> 
        <artifactId>flink-clients_${scala.binary.version}</artifactId> 
        <version>${flink.version}</version> 
        <!--<scope>provided</scope>--> 
    </dependency> 
    <dependency> 
        <groupId>org.apache.flink</groupId> 
        <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> 
        <version>${flink.version}</version> 
        <!--<scope>provided</scope>--> 
    </dependency> 
    <dependency> 
        <groupId>org.apache.flink</groupId> 
        <artifactId>flink-table-planner_${scala.binary.version}</artifactId> 
        <version>${flink.version}</version> 
    </dependency> 
    <dependency> 
        <groupId>org.apache.flink</groupId> 
        <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> 
        <version>${flink.version}</version> 
        <!--<scope>provided</scope>--> 
    </dependency> 
    <dependency> 
        <groupId>org.apache.flink</groupId> 
        <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> 
        <version>${flink.version}</version> 
        <!--<scope>provided</scope>--> 
    </dependency> 
    <dependency> 
        <groupId>org.apache.flink</groupId> 
        <artifactId>flink-connector-hive_${scala.binary.version}</artifactId> 
        <version>${flink.version}</version> 
        <!--<scope>provided</scope>--> 
    </dependency> 
    <dependency> 
        <groupId>org.apache.flink</groupId> 
        <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> 
        <version>${flink.version}</version> 
        <!--<scope>provided</scope>--> 
    </dependency> 
    <dependency> 
        <groupId>org.apache.hive</groupId> 
        <artifactId>hive-exec</artifactId> 
        <version>${hive.version}</version> 
        <!--<scope>provided</scope>--> 
    </dependency> 
    <dependency> 
        <groupId>org.apache.hadoop</groupId> 
        <artifactId>hadoop-client</artifactId> 
        <version>${hadoop.version}</version> 
    </dependency> 
    <dependency> 
        <groupId>org.apache.flink</groupId> 
        <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> 
        <version>${flink.version}</version> 
    </dependency> 
    <dependency> 
        <groupId>mysql</groupId> 
        <artifactId>mysql-connector-java</artifactId> 
        <version>${mysql.version}</version> 
    </dependency> 
    <dependency> 
        <groupId>junit</groupId> 
        <artifactId>junit</artifactId> 
        <version>${junit.version}</version> 
    </dependency> 
    <dependency> 
        <groupId>org.apache.flink</groupId> 
        <artifactId>flink-orc-nohive_2.12</artifactId> 
        <version>${flink.version}</version> 
    </dependency> 
    <dependency> 
        <groupId>org.apache.flink</groupId> 
        <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId> 
        <version>1.6.0</version> 
    </dependency> 
    <dependency> 
        <groupId>org.apache.flink</groupId> 
        <artifactId>flink-connector-elasticsearch6_2.11</artifactId> 
        <version>1.11.0</version> 
    </dependency> 
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json --> 
    <dependency> 
        <groupId>org.apache.flink</groupId> 
        <artifactId>flink-json</artifactId> 
        <version>${flink.version}</version> 
        <!--<scope>test</scope>--> 
    </dependency> 
    <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpcore --> 
    <dependency> 
        <groupId>org.apache.httpcomponents</groupId> 
        <artifactId>httpcore</artifactId> 
        <version>4.4.13</version> 
    </dependency> 
</dependencies>

问题排查与解决方案

我帮你梳理几个高概率的问题点,你可以逐一验证:

1. 依赖冲突(最可能的原因)

你的pom里同时引入了两个版本的flink-connector-elasticsearch6:一个是1.6.0,另一个是1.11.0。这两个版本完全不兼容,会导致类加载混乱——虽然程序没抛出明显异常,但实际的ES sink逻辑可能根本没正常执行。
解决方法:删除1.6.0的那个依赖,只保留和Flink版本一致的1.11.0依赖,确保依赖版本统一:

<!-- 删除这个冲突的依赖 -->
<!--
<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId> 
    <version>1.6.0</version> 
</dependency>
-->
<!-- 保留这个和Flink版本匹配的依赖 -->
<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-connector-elasticsearch6_2.11</artifactId> 
    <version>1.11.0</version> 
</dependency>

2. ES Sink批量刷新配置未生效

你注释掉了所有批量刷新的参数,Flink ES6 Sink的默认批量阈值是:sink.bulk-flush.max-actions=1000sink.bulk-flush.max-size=1mbsink.bulk-flush.interval=1s。你只写入10条数据,可能没达到默认的批量触发条件,数据一直留在缓冲区,直到作业结束才会刷新;如果作业因某种原因没正常完成,数据就不会写入ES。
解决方法:取消刷新参数的注释,调整为适合测试的配置,比如强制每条数据都触发刷新:

CREATE TABLE user_action_es_sink (
 uid INT,
 appid INT,
 prepage_id INT,
 page_id INT,
 action_id STRING,
 page_name STRING,
 action_name STRING,
 prepage_name STRING,
 stat_time BIGINT,
 dt DATE,
 PRIMARY KEY (uid,dt) NOT ENFORCED
) WITH (
 'connector' = 'elasticsearch-6',
 'hosts' = 'http://localhost:9200',
 'index' = 'mytest',
 'document-type' = 'user_action',
 'sink.bulk-flush.max-actions' = '1', -- 每条数据都触发刷新
 'sink.bulk-flush.interval' = '1000', -- 1秒强制刷新兜底
 'format' = 'json',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
);

另外建议加上主键配置,ES会用主键作为文档ID,避免重复写入,也能让Flink更稳定地处理数据。

3. 数据源无数据

你的写入SQL是从users表(对应MySQL的spore_user_auth)查询10条数据,如果这个表本身是空的,那自然没有数据写入ES。
验证方法:执行tabEnv.executeSql("select count(*) from users").print();,查看输出的计数是否大于0。如果是0,需要先确认MySQL表中是否有数据。

4. ES索引未创建或权限问题

  • 先检查ES索引是否存在:用命令curl http://localhost:9200/mytest查看,如果返回404,说明索引不存在。Flink默认会自动创建索引,但如果ES集群禁用了自动创建索引的功能,就需要手动创建。
  • 手动创建索引的示例(根据你的字段类型调整mapping):
curl -X PUT http://localhost:9200/mytest -H "Content-Type: application/json" -d '{
  "mappings": {
    "user_action": {
      "properties": {
        "uid": {"type": "integer"},
        "appid": {"type": "integer"},
        "prepage_id": {"type": "integer"},
        "page_id": {"type": "integer"},
        "action_id": {"type": "keyword"},
        "page_name": {"type": "keyword"},
        "action_name": {"type": "keyword"},
        "prepage_name": {"type": "keyword"},
        "stat_time": {"type": "long"},
        "dt": {"type": "date"}
      }
    }
  }
}'
  • 另外检查Flink程序是否有ES的写入权限,比如ES是否开启了安全认证,你的配置里没有用户名密码,如果ES需要认证,需要在WITH参数里添加'username' = 'xxx''password' = 'xxx'

5. 查看Flink作业日志

即使程序没抛出异常,Flink的作业日志里可能会有隐藏的警告或错误,比如批量写入失败、连接超时等。你可以查看Flink的日志目录(默认是log/文件夹)里的作业日志,搜索elasticsearch相关的关键词,看看是否有异常信息。

内容的提问来源于stack exchange,提问作者小墨鱼

火山引擎 最新活动