Flink SQL运行正常但无数据写入Elasticsearch集群求助
我希望通过SQL查询MySQL或Hive中的数据并写入Elasticsearch(ES)集群,程序可成功运行但ES中无数据。
软件版本:Flink: 1.11,ES: 6.2.2,Hive: 1.2.1,MySQL: 5.7
我的代码
public class HiveExample { public static void main(String[] args) throws DatabaseNotExistException { EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode() .build(); TableEnvironment tabEnv = TableEnvironment.create(settings); String sql = "insert into user_action_es_sink " + "select 100123,5,11,1,'a','b','111','bbb',cast(11111 as bigint),cast('2020-11-11' as date) from dragonfly.web_page limit 10" ; String sporeUserAuthCreateTableSQL = "CREATE TABLE users (\n" + " `id` INT,\n" + " `userid` INT,\n" + " `type` INT,\n" + " PRIMARY KEY (id) NOT ENFORCED" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://localhost:3306/spore',\n" + " 'table-name' = 'spore_user_auth',\n" + " 'driver' = 'com.mysql.jdbc.Driver',\n" + " 'username' = 'xxxx',\n" + " 'password' = 'xxxx'\n" + ")"; tabEnv.executeSql(sporeUserAuthCreateTableSQL); String esTable = "CREATE TABLE user_action_es_sink (\n" + " uid INT,\n" + " appid INT,\n" + " prepage_id INT,\n" + " page_id INT,\n" + " action_id STRING,\n" + " page_name STRING,\n" + " action_name STRING,\n" + " prepage_name STRING,\n" + " stat_time BIGINT,\n" + " dt DATE\n" + // " PRIMARY KEY (uid,dt) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'elasticsearch-6',\n" + " 'hosts' = 'http://localhost:9200',\n" + " 'index' = 'mytest',\n" + " 'document-type' = 'user_action'\n" + // " 'sink.bulk-flush.max-size' = '0',\n" + // " 'sink.bulk-flush.max-actions' = '0',\n" + // " 'sink.bulk-flush.interval' = '0'\n"+ // " 'format' = 'json',\n" + // " 'json.fail-on-missing-field' = 'false',\n"+ // " 'json.ignore-parse-errors' = 'true'\n" + ")"; tabEnv.executeSql(esTable); tabEnv.executeSql("insert into user_action_es_sink select 100123,5,11,1,'a','b','111','bbb',cast(11111 as bigint),cast('2020-11-11' as date) from users limit 10").print(); } }
我的pom文件
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-orc-nohive_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId> <version>1.6.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>1.11.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> <!--<scope>test</scope>--> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpcore --> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpcore</artifactId> <version>4.4.13</version> </dependency> </dependencies>
问题排查与解决方案
我帮你梳理几个高概率的问题点,你可以逐一验证:
1. 依赖冲突(最可能的原因)
你的pom里同时引入了两个版本的flink-connector-elasticsearch6:一个是1.6.0,另一个是1.11.0。这两个版本完全不兼容,会导致类加载混乱——虽然程序没抛出明显异常,但实际的ES sink逻辑可能根本没正常执行。
解决方法:删除1.6.0的那个依赖,只保留和Flink版本一致的1.11.0依赖,确保依赖版本统一:
<!-- 删除这个冲突的依赖 --> <!-- <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId> <version>1.6.0</version> </dependency> --> <!-- 保留这个和Flink版本匹配的依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>1.11.0</version> </dependency>
2. ES Sink批量刷新配置未生效
你注释掉了所有批量刷新的参数,Flink ES6 Sink的默认批量阈值是:sink.bulk-flush.max-actions=1000、sink.bulk-flush.max-size=1mb、sink.bulk-flush.interval=1s。你只写入10条数据,可能没达到默认的批量触发条件,数据一直留在缓冲区,直到作业结束才会刷新;如果作业因某种原因没正常完成,数据就不会写入ES。
解决方法:取消刷新参数的注释,调整为适合测试的配置,比如强制每条数据都触发刷新:
CREATE TABLE user_action_es_sink ( uid INT, appid INT, prepage_id INT, page_id INT, action_id STRING, page_name STRING, action_name STRING, prepage_name STRING, stat_time BIGINT, dt DATE, PRIMARY KEY (uid,dt) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = 'http://localhost:9200', 'index' = 'mytest', 'document-type' = 'user_action', 'sink.bulk-flush.max-actions' = '1', -- 每条数据都触发刷新 'sink.bulk-flush.interval' = '1000', -- 1秒强制刷新兜底 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' );
另外建议加上主键配置,ES会用主键作为文档ID,避免重复写入,也能让Flink更稳定地处理数据。
3. 数据源无数据
你的写入SQL是从users表(对应MySQL的spore_user_auth)查询10条数据,如果这个表本身是空的,那自然没有数据写入ES。
验证方法:执行tabEnv.executeSql("select count(*) from users").print();,查看输出的计数是否大于0。如果是0,需要先确认MySQL表中是否有数据。
4. ES索引未创建或权限问题
- 先检查ES索引是否存在:用命令
curl http://localhost:9200/mytest查看,如果返回404,说明索引不存在。Flink默认会自动创建索引,但如果ES集群禁用了自动创建索引的功能,就需要手动创建。 - 手动创建索引的示例(根据你的字段类型调整mapping):
curl -X PUT http://localhost:9200/mytest -H "Content-Type: application/json" -d '{ "mappings": { "user_action": { "properties": { "uid": {"type": "integer"}, "appid": {"type": "integer"}, "prepage_id": {"type": "integer"}, "page_id": {"type": "integer"}, "action_id": {"type": "keyword"}, "page_name": {"type": "keyword"}, "action_name": {"type": "keyword"}, "prepage_name": {"type": "keyword"}, "stat_time": {"type": "long"}, "dt": {"type": "date"} } } } }'
- 另外检查Flink程序是否有ES的写入权限,比如ES是否开启了安全认证,你的配置里没有用户名密码,如果ES需要认证,需要在WITH参数里添加
'username' = 'xxx'和'password' = 'xxx'。
5. 查看Flink作业日志
即使程序没抛出异常,Flink的作业日志里可能会有隐藏的警告或错误,比如批量写入失败、连接超时等。你可以查看Flink的日志目录(默认是log/文件夹)里的作业日志,搜索elasticsearch相关的关键词,看看是否有异常信息。
内容的提问来源于stack exchange,提问作者小墨鱼




