You need to enable JavaScript to run this app.
优惠活动
大模型
产品
解决方案
定价
更多
文档控制台
免费开始使用

Flink创建Iceberg JDBC Catalog时遭遇org.apache.flink.runtime.util.HadoopUtils类初始化失败错误求助

我最近在搭建一个简单的Flink Java作业,目标是:

  • 创建一个基于PostgreSQL的Iceberg JDBC Catalog
  • 将Iceberg的warehouse设置为Hadoop文件系统

Maven构建打包完全成功,但把jar提交到本地Flink 1.20.2集群执行时,在执行CREATE CATALOG语句的环节直接报错,抛出了java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.runtime.util.HadoopUtils。折腾了好一阵没找到根源,来求助大家!


一、我的项目配置文件

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                             http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>iceberg-s3-setup</artifactId>
    <version>1.0.0</version>
    <packaging>jar</packaging>

    <name>Iceberg S3 Setup Job</name>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <flink.version>1.20.2</flink.version>
        <iceberg.version>1.10.0</iceberg.version>
        <hadoop.version>3.3.6</hadoop.version>
    </properties>

    <dependencies>
        <!-- Flink core -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Flink Table API bridge (required for StreamTableEnvironment) -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Iceberg Flink runtime (matches Flink 1.18) -->
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-flink-runtime-1.20</artifactId>
            <version>${iceberg.version}</version>
        </dependency>

        <!-- Iceberg AWS bundle (contains S3FileIO) -->
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-aws</artifactId>
            <version>${iceberg.version}</version>
        </dependency>

        <!-- Hadoop AWS (for s3a://) -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-aws</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Shade plugin to create an uber-jar (optional but recommended) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.5.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals><goal>shade</goal></goals>
                        <configuration>
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <!-- Optional: make the jar runnable directly -->
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.example.IcebergS3SetupJava</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

主类代码(IcebergS3SetupJava.java)

package com.example;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class IcebergS3SetupJava {
    public static void main(String[] args) throws Exception {
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     env.enableCheckpointing(10_000L); // 10 seconds
     EnvironmentSettings settings = EnvironmentSettings.newInstance()
             .inStreamingMode()
             .build();
     StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
     // Create Iceberg JDBC catalog pointing to MinIO/S3
     tEnv.executeSql(
         "CREATE CATALOG iceberg_catalog WITH (\n" +
         " 'type' = 'iceberg',\n" +
         " 'catalog-type' = 'jdbc',\n" +
         " 'catalog-impl' = 'org.apache.iceberg.jdbc.JdbcCatalog', \n" +
         " 'jdbc.uri' = 'jdbc:postgresql://<postgres_ip>:<postgres_port>/iceberg_catalog',\n" +
         " 'jdbc.user' = 'user',\n" +
         " 'jdbc.password' = 'pass',\n" +
         " 'warehouse' = 'hdfs://<my_ip>:8020/user/warehouse',\n" +
         ")"
     );
     // Create sample table
     tEnv.executeSql(
         "CREATE TABLE iceberg_catalog.default.sample (\n" +
         " id BIGINT COMMENT 'unique id',\n" +
         " data STRING\n" +
         ") WITH (\n" +
         " 'format-version' = '2'\n" +
         ")"
     );
     System.out.println("Iceberg catalog and sample table created successfully on S3!");
     // This job only does DDL → execute to make it run
     env.execute("Iceberg S3 Catalog & Table Creation");
 }
}

二、我的环境信息

  • Flink版本:1.20.2
  • Iceberg版本:1.10.0
  • PostgreSQL版本:16
  • Java版本:11
  • Hadoop版本:3.3.6(pom中指定)

三、提交命令

flink run -c com.example.IcebergS3SetupJava target/iceberg-s3-setup-1.0.0.jar

四、完整错误信息

java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.runtime.util.HadoopUtils

五、我已经尝试过的排查点

  1. 确认pom.xml中Hadoop 3.3.6和Flink 1.20.2的版本兼容性,查了官方文档是支持的
  2. 检查Maven shade插件的打包结果,uber-jar中确实包含了Hadoop相关的class文件
  3. 本地Flink集群已经配置了HADOOP_HOME环境变量,指向我的Hadoop安装目录

但还是卡在这个初始化错误上,有没有大佬能帮我分析下问题出在哪?或者给我一些具体的排查方向?感谢!

火山引擎 最新活动