最近更新时间:2022.09.30 17:21:39
首次发布时间:2022.09.30 17:21:39
ByteHouse 是火山引擎旗下基于开源 ClickHouse 的企业级分析型数据库,是一个同时支持实时和离线导入的自助数据分析平台,能够对 PB 级海量数据进行高效分析。
本文将介绍如何在 E-MapReduce(EMR) 集群提交 Flink SQL 和 Flink jar 任务,将数据写入到 ByteHouse 集群的方法。
已创建火山引擎 EMR 集群。具体操作,请参见 E-MapReduce 快速入门-火山引擎
已创建火山引擎 ByteHouse 集群。具体操作,请参见 ByteHouse 快速入门-火山引擎
生成访问密钥,在火山引擎的 密钥管理 页面,查找对应用户的访问秘钥(Access Key ID 和 Secret Access Key)
向 ByteHouse 写数据,是通过 ByteHouse Gateway 实现的。具体方式为在使用过程中将参数 Region
,根据使用场景设置为不同的值 。同时需要 EMR 集群的各个节点能够与之进行通信,当前有以下两种方式:
设置 Region
为 VOLCANO
,给 EMR 集群的每个节点绑定一个公网 IP;
ByteHouse Gateway 也支持火山引擎内网访问方式,需要 ByteHouse 侧给 EMR 集群加白,可 联系客服 进行操作。
运行 Flink SQL client 时根据如下路径指定 jar
cd /usr/lib/emr/current ./bin/sql-client.sh --jar connectors/flink-connector-bytehouse-cdw-assembly-x.x.x-x.x.jar
可运行如下 SQL,进行测试运行
CREATE TABLE random_source ( test_key STRING, test_value BIGINT, ts BIGINT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); CREATE TABLE cnch_table (test_key STRING, test_value BIGINT, ts BIGINT) WITH ( 'connector' = 'bytehouse-cdw', 'database' = '<此处填写 ByteHouse 库名>', 'table-name' = '<此处填写 ByteHouse 表名>', 'bytehouse.gateway.region' = 'VOLCANO', 'bytehouse.gateway.access-key-id' = '<此处填写用户实际的 AK>', 'bytehouse.gateway.secret-key' = '<此处填写用户实际的 SK>' ); INSERT INTO cnch_table SELECT * FROM random_source;
访问 ByteHouse Connector 下载地址,选择对应版本目录下的文件进行下载
下载后的文件命名格式为:flink-connector-bytehouse-cdw-assembly-#.#.#-#.#.jar
注意
EMR 1.4.0集群中集成了 Flink 1.15;EMR 1.3.1集群中集成了 Flink 1.11。
在本地 Maven 项目的 pom.xml 文件中添加以下配置以导入对应依赖,其中 flink-cnch-connector 中安装至本地maven仓库
mvn install:install-file -Dfile=/xxx/xxx/flink-connector-bytehouse-cdw-assembly-1.11.4-1.15.jar -DgroupId=com.bytedance -DartifactId=flink-cnch-connector -Dversion=1.0.0 -Dpackaging=jar
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>flinkTest</artifactId> <version>1.0-SNAPSHOT</version> <properties> <java.version>1.8</java.version> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.11.3</flink.version> <scala.version>2.11</scala.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>com.bytedance</groupId> <artifactId>flink-cnch-connector</artifactId> <version>1.0</version> </dependencies> </project>
/** * Synthetic {@code RowData} generator mimicking the feed of crime cases reported by Neighbourhood * Police Centres (NPCs) in Singapore. */ public class DummyRowDataSource extends RichParallelSourceFunction<RowData> { private static final AtomicLong pullCount = new AtomicLong(); static Map<String, Timer> map = new ConcurrentHashMap<>(2, 0.9f, 1); private final List<String> offences = Arrays.asList("Unlicensed Moneylending", "Harassment"); private final AtomicInteger caseNo = new AtomicInteger(); private volatile boolean cancelled = false; private Random random; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); random = new Random(); map.computeIfAbsent( "holder", s -> { final Timer timer = new Timer("RandomStringSource ", true); timer.schedule( new TimerTask() { @Override public void run() { System.out.printf("source is pulled %s times\n", pullCount.get()); } }, 5000, 5000); return timer; }); } @Override public void run(SourceContext<RowData> ctx) throws Exception { while (!cancelled) { Thread.sleep(random.nextInt(10) + 5); synchronized (ctx.getCheckpointLock()) { final GenericRowData genericRowData = new GenericRowData(RowKind.INSERT, 4); genericRowData.setField(0, RowDataConversion.fieldDataOf(2000 + random.nextInt(20))); genericRowData.setField(1, RowDataConversion.fieldDataOf(generateRandomWord(4))); genericRowData.setField(2, RowDataConversion.fieldDataOf(randomOffences())); genericRowData.setField(3, RowDataConversion.fieldDataOf(caseNo.incrementAndGet())); ctx.collect(genericRowData); pullCount.incrementAndGet(); } } } @Override public void cancel() { cancelled = true; } private String generateRandomWord(int wordLength) { StringBuilder sb = new StringBuilder(wordLength); for (int i = 0; i < wordLength; i++) { // For each letter in the word char tmp = (char) ('a' + random.nextInt('z' - 'a')); // Generate a letter between a and z sb.append(tmp); // Add it to the String } return sb.toString(); } private String randomOffences() { return offences.get(random.nextInt(2)); } }
根据提示,填写用户实际的 AK,SK,ByteHouse 的库名,表名。
public class CnchSinkDataStreamExample { public static void main(String[] args) throws Exception { final String region = "VOLCANO"; final String ak = "<此处填写用户实际的 AK>"; final String sk = "<此处填写用户实际的 SK>"; final String dbName = "<此处填写 ByteHouse 库名>"; final String tableName = "<此处填写 ByteHouse 表名>"; // create env final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // add source DataStream<RowData> dataStream = env.addSource(new DummyRowDataSource()).returns(TypeInformation.of(RowData.class)); List<TableColumn> columns = Arrays.asList( TableColumn.of("year", DataTypes.INT()), TableColumn.of("npc", DataTypes.STRING()), TableColumn.of("offence", DataTypes.STRING()), TableColumn.of("case_no", DataTypes.INT())); try (AbstractClickHouseSinkFunction<RowData, InsertBatch<RowData>, ?> cnchSink = new CnchSinkFunctionBuilder(dbName, tableName) .withSchema(columns) .withGatewayConnection(region) .withGatewayCredentials(ak, sk) .withFlushInterval(Duration.ofSeconds(1)) .build()) { // add sink dataStream.addSink(cnchSink); // trigger pipeline env.execute(CnchSinkDataStreamExample.class.getSimpleName()); } } }
为了减少潜在的包冲突文件,建议用户打 fat jar,集成相关依赖。
在项目 pom.xml 文件中,添加如下 build 方式,以生成 fat jar。
<build> <finalName>fat-jar-example</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.4.2</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>assemble-all</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
可通过 scp 指令
scp xxxx.jar root@xxx.xxx.xxx.xxx:/{path}/
提交任务的命令:
/usr/lib/emr/current/flink/bin/flink run-application -t yarn-application <JAR 包文件名>
注意:
可以以 flink 用户身份运行
运行前执行 export HADOOP_CONF_DIR=/etc/emr/hadoop/conf
Yarn ResourceManager UI 访问方式,可参考访问链接 E-MapReduce-火山引擎