You need to enable JavaScript to run this app.
导航

StarRocks行存表数据接入

最近更新时间2024.02.18 19:02:52

首次发布时间2024.01.15 15:07:50

本文介绍行存表的接入方式。

1 Flink写入

StarRocks的行存表支持Flink Sink,使用方式同列存表一模一样,列存表的使用详情可以参考官方文档

说明

实时写入推荐使用StarRocks Sink。

示例

  1. 在StarRocks中创建表

    CREATE DATABASE if not exists test;
    CREATE TABLE test.usertable(
      `YCSB_KEY` String,
      `FIELD0` String,
      `FIELD1` String,
      `FIELD2` String,
      `FIELD3` String,
      `FIELD4` String,
      `FIELD5` String,
      `FIELD6` String,
      `FIELD7` String,
      `FIELD8` String,
      `FIELD9` String)
    ENGINE=ROW_STORE
    PRIMARY KEY(YCSB_KEY);
    
  2. 在Flink服务中操作如下步骤

    1. 下载flink-connector-starrocks-1.2.8_flink-1.16.jar

说明

该jar适用于火山EMR环境中提供的Flink 1.16.x版本。您还可以根据Flink环境的不同,在 Maven Central Repository 获取相应版本的 Flink connector JAR 文件

2. 将下载的Flink StarRocks Connector文件复制到Flink集群的 /usr/lib/emr/current/flink/lib目录下:
	
	```shell
	cp flink-* /usr/lib/emr/current/flink/lib
	```
	
3. 启动Flink sql 客户端
	
	```shell
	cd /usr/lib/emr/current/flink/
	# 启动集群
	./bin/yarn-session.sh --detached
	 
	 # 启动SQL
	./bin/sql-client.sh embedded
	```
	
4. 启动Flink作业,向StarRocks导入数据
	
	```sql
	SET execution.checkpointing.interval = 10s;
	
	-- 定义Source表, 这里用datagen代替
	CREATE TABLE datagen (
	  `YCSB_KEY` String,
	  `FIELD0` String,
	  `FIELD1` String,
	  `FIELD2` String,
	  `FIELD3` String,
	  `FIELD4` String,
	  `FIELD5` String,
	  `FIELD6` String,
	  `FIELD7` String,
	  `FIELD8` String,
	  `FIELD9` String
	) WITH (
	 'connector' = 'datagen',
	 'rows-per-second'='100'
	);
	
	-- 定义sink表
	CREATE TABLE UserTable
	(
	  `YCSB_KEY` String,
	  `FIELD0` String,
	  `FIELD1` String,
	  `FIELD2` String,
	  `FIELD3` String,
	  `FIELD4` String,
	  `FIELD5` String,
	  `FIELD6` String,
	  `FIELD7` String,
	  `FIELD8` String,
	  `FIELD9` String,
	   PRIMARY KEY (YCSB_KEY) NOT ENFORCED
	)
	WITH
	(
	    'connector' = 'starrocks',
	    'load-url' = '{fe host}:8030',
	    'jdbc-url' = 'jdbc:mysql://{fe host}:9030',
	    'username' = '{user name}',
	    'password' = '{password}',
	    'database-name' = 'test',
	    'table-name' = 'usertable',
	    'sink.parallelism' = '2'
	);
	
	INSERT INTO UserTable select YCSB_KEY,FIELD0,FIELD1,FIELD2,FIELD3,FIELD4,FIELD5,FIELD6,FIELD7,FIELD8,FIELD9 FROM datagen;
	```
  1. 在StarRocks集群上查看test.usertable表中数据

    SELECT * FROM test.usertable;
    

说明

列存表调优时,经常会调大以下Flush参数,提升导入吞吐:

  1. sink.buffer-flush.max-bytes

  2. sink.buffer-flush.max-rows

  3. sink.buffer-flush.interval-ms

但行存的表的逻辑正好相反,行存表能够获得更高的写入QPS和更高的实时性。但整体吞吐性能比列存表相差较大。因此在使用时,不需要调整这3个参数的默认值。

行存表支持绝大多是的DML的语法, 因此可以直接用Flink Mysql JDBC Sink写入到行存表中。

示例

  1. 在StarRocks中创建表

    CREATE DATABASE if not exists test;
    CREATE TABLE test.usertable(
      `YCSB_KEY` String,
      `FIELD0` String,
      `FIELD1` String,
      `FIELD2` String,
      `FIELD3` String,
      `FIELD4` String,
      `FIELD5` String,
      `FIELD6` String,
      `FIELD7` String,
      `FIELD8` String,
      `FIELD9` String)
    ENGINE=ROW_STORE
    PRIMARY KEY(YCSB_KEY);
    
  2. 在Flink服务中操作如下步骤

    1. 启动Flink sql 客户端

      cd /usr/lib/emr/current/flink/
      # 拷贝jdbc connect的jar
      cp connectors/flink-sql-connector-jdbc-1.16.1.jar lib/
      
      # 启动集群
      ./bin/yarn-session.sh --detached
       
       # 启动SQL
      ./bin/sql-client.sh embedded
      
    2. 启动Flink作业,向StarRocks导入数据

      SET execution.checkpointing.interval = 10s;
      
      -- source表
      CREATE TABLE datagen (
        `YCSB_KEY` String,
        `FIELD0` String,
        `FIELD1` String,
        `FIELD2` String,
        `FIELD3` String,
        `FIELD4` String,
        `FIELD5` String,
        `FIELD6` String,
        `FIELD7` String,
        `FIELD8` String,
        `FIELD9` String
      ) WITH (
       'connector' = 'datagen',
       'rows-per-second'='1'
      );
      
      
      CREATE TABLE UserTable (
        `YCSB_KEY` String,
        `FIELD0` String,
        `FIELD1` String,
        `FIELD2` String,
        `FIELD3` String,
        `FIELD4` String,
        `FIELD5` String,
        `FIELD6` String,
        `FIELD7` String,
        `FIELD8` String,
        `FIELD9` String
      ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://{fe host}:9030/test?useServerPrepStmts=true',
        'table-name' = 'usertable',
        'username' = '{user name}',
        'password' = '{password}',
        'sink.parallelism' = '80'
      );
      
      INSERT INTO UserTable 
      SELECT YCSB_KEY,FIELD0,FIELD1,FIELD2,FIELD3,FIELD4,FIELD5,FIELD6,FIELD7,FIELD8,FIELD9 
      FROM datagen;
      

说明

在定义JDBC Sink的时候,尽量不要指定Flink表中的PRIMARY KEY,否则会触发Flink的幂等写入,导致性能下降。

2 Insert方式写入

除了通过Flink实时写入之外,也可以通过Insert方式进行写入行存表。下面以JDBC代码为例介绍。

示例

采用Java程序编写JDBC代码,实现Insert方式将数据导入行存表:

// 设置 JDBC url
String mysqlUrl = "jdbc:mysql://${FE地址}:9030/";
String mysqlUser = "${mysqlUser}";
String mysqlPassword = "${mysqlPassword}";
    
DriverManager.registerDriver(((java.sql.Driver) Class.forName("com.mysql.cj.jdbc.Driver").getDeclaredConstructor().newInstance()));
Connection conn = DriverManager.getConnection(mysqlUrl, mysqlUser, mysqlPassword);

Statement statement = conn.createStatement();
statement.execute("CREATE DATABASE IF NOT EXISTS demo");
statement.execute("DROP TABLE IF EXISTS demo.tbl_point_query");
statement.execute("CREATE TABLE IF NOT EXISTS demo.tbl_point_query( \n" +
        "    `k1` int(11),\n" +
        "    `v1` decimal(27, 9) NULL,\n" +
        "    `v2` varchar(30) NULL,\n" +
        "    `v3` varchar(30) NULL\n" +
        "    )\n" +
        "ENGINE=ROW_STORE\n" +
        "PRIMARY KEY (k1);");
statement.execute("INSERT INTO demo.tbl_point_query VALUES(1, 1.1, 'a', 'a1'),\n" +
        "(2, 1.2, 'b', 'a2'),\n" +
        "(3, 1.3, 'c', 'a3'),\n" +
        "(4, 1.4, 'd', 'a4');");
String sql = "SELECT * FROM demo.tbl_point_query where k1 = ? and v3 = 'a1' and v2 = ?";
PreparedStatement pstmt = conn.prepareStatement(sql);
pstmt.setInt(1, 1);
pstmt.setString(2, "a");
ResultSet rs = pstmt.executeQuery();
int num = 0;
while (rs.next()) {
    num += 1;
    System.out.println(rs.getInt(1));
    System.out.println(rs.getInt(2));
    System.out.println(rs.getInt(3));
    System.out.println(rs.getInt(4));
}

说明

用例中${FE地址}、${mysqlUser}、${mysqlPassword}需要根据实际环境替换

如果您使用的是maven工程,需要在pom.xml文件引入mysql依赖:

<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>8.0.28</version>
</dependency>

3 离线写入

3.1 小批量导入

行存表支持Stream Load的,用法也与列存表一致。列存表的详细用法直接参考开源StarRocks的StreamLoad章节。

示例

  1. 准备数据
    在本地文件系统中创建一个 CSV 格式的数据文件 example1.csv,内容如下:
1,Lily,23
2,Rose,23
3,Alice,24
4,Julia,25
  1. 在数据库 test_db 中创建目标表table1

    CREATE DATABASE IF NOT EXISTS test_db;
    USE test_db;
    CREATE TABLE `table1`
    (
        `id` int(11) NOT NULL COMMENT "用户 ID",
        `name` varchar(65533) NULL COMMENT "用户姓名",
        `score` int(11) NOT NULL COMMENT "用户得分"
    )
    ENGINE=ROW_STORE
    PRIMARY KEY(`id`);
    
  2. 创建导入作业

    curl --location-trusted -u <username>:<password> -H "label:123" \
        -H "Expect:100-continue" \
        -H "column_separator:," \
        -H "columns: id, name, score" \
        -T example1.csv -XPUT \
        http://<fe_host>:8030/api/test_db/table1/_stream_load
    

说明

column_separator参数:定义文件分隔符。example1.csv 文件中包含三列,跟 table1 表的 idnamescore 三列一一对应,并用逗号 (,) 作为列分隔符。

  1. 导入完成后,查询 table1 表的数据

    SELECT * FROM table1;
    

3.2 大批量导入

对于几十GB到上百GB级别的数据量,建议采用Broker Load。Broker Load 是一种异步的导入方式,提交导入作业后,可以通过 SHOW LOAD 语句或者 curl 命令来查看导入作业的结果。
行存表的Broker Load用法也与列存表一致。列存表的详细用法参考StarRocks社区从 HDFS 或外部云存储系统导入数据章节。

示例

  1. 创建一个名为 file1.csv 的数据文件,测试数据如下。并将该文件上传到TOS中

    1,Lily,23
    2,Rose,23
    3,Alice,24
    4,Julia,25
    
  2. 在StarRocks集群创建 StarRocks目标数据库和表

    创建一张名为 table1 的主键模型表。表包含 idnamescore 三列

    CREATE DATABASE IF NOT EXISTS test_db;
    USE test_db;
    CREATE TABLE `table1`
    (
        `id` int(11) NOT NULL COMMENT "用户 ID",
        `name` varchar(65533) NULL DEFAULT "" COMMENT "用户姓名",
        `score` int(11) NOT NULL DEFAULT "0" COMMENT "用户得分"
    )
    ENGINE=ROW_STORE
    PRIMARY KEY(`id`);
    
  3. 在StarRocks集群中将TOS文件file1.csv导入到table1表中

    TRUNCATE TABLE test_db.table1;
    LOAD LABEL test_db.lable_3
    (
        DATA INFILE("s3a://{bucket_name}/input")
        INTO TABLE table1
        COLUMNS TERMINATED BY ","
        (id, name, score)
    )
    WITH BROKER
    (
        "fs.s3a.access.key" = "xxx",
        "fs.s3a.secret.key" = "xxx",
        "fs.s3a.endpoint" = "xxx",
        "fs.s3a.path.style.access"="false",
        "fs.s3a.paging.maximum"="1000",
        "fs.s3a.aws.credentials.provider"="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",
        "fs.s3a.connection.ssl.enabled"="false"
    )
    PROPERTIES
    (
        "timeout" = "36000"
    );
    
  4. 查看导入作业状态

    curl --location-trusted -u <username>:<password> \
        'http://<fe_host>:8030/api/test_db/_load_info?label=lable_3'
    

4 注意事项

  • 行存表并不支持多行导入的事务,通过幂等写入保障数据一致,因此行存建表时一定要明确主键属性。

  • 行存表的写入吞吐没有列存表高,因此需要控制导入的并发,防止大规模导入失败。