You need to enable JavaScript to run this app.
导航
StarRocks Spark Connector
最近更新时间:2024.07.24 16:31:27首次发布时间:2024.04.30 14:09:53

StarRocks 作为高性能分析型数据库,支持通过 Spark 读取或写入数据,您可以使用 Spark Connector 连接 Spark 与 StarRocks 实现数据导入与导出。目前 EMR 提供的 Spark Connector 版本支持原生和旁路(Bypass)两种不同的模式读写 StarRocks 数据,区别如下表所示:

模式

数据导入

数据导出

原生模式

将数据在内存中攒批,然后以 StreamLoad 方式导入 StarRocks。

通过 RPC 请求 StarRocks 读取数据。

Bypass 模式

绕过 StarRocks 服务层,直接写 StarRocks 存储在对象存储系统中的数据文件。

绕过 StarRocks 服务层,直接读 StarRocks 存储在对象存储系统中的数据文件。

Bypass 模式相对于原生模式具备如下优势:

  • 实现了 Spark StarRocks Catalog,无需事先在 Spark 中建外表,即可在 Spark 中直接访问 StarRocks 库表元数据信息。
  • 对于 StarRocks 存算分离集群,支持通过 Spark 直接读写 StarRocks 存储在对象存储系统中的数据文件,避免占用 StarRocks 集群计算资源,影响在线查询。

Bypass 模式

Bypass 模式目前已在 EMR on ECS、EMR on VKE,以及 Serverless Spark 等产品形态中提供开箱即用。

配置参数

参数

必须

默认值

说明

spark.sql.extensions

设置为 com.starrocks.connector.spark.StarRocksExtensions,用于改写 Spark 执行计划。

spark.sql.catalog.starrocks

设置为 com.starrocks.connector.spark.catalog.StarRocksCatalog,支持 Spark 直接访问 StarRocks 库表元数据。

spark.sql.catalog.starrocks.writer.mode

设置为 BYPASS,表示直写 StarRocks 数据文件。

spark.sql.catalog.starrocks.reader.mode

设置为 BYPASS,表示直读 StarRocks 数据文件。

spark.sql.catalog.starrocks.fe.http.url

StarRocks FE 的 HTTP 地址,支持输入多个 FE 地址,使用逗号 , 分隔。格式为 http://<fe_host1>:<fe_http_port1>,<fe_host2>:<fe_http_port2>

spark.sql.catalog.starrocks.fe.jdbc.url

StarRocks FE 的 MySQL Server 连接地址,格式为 jdbc:mysql://<fe_host>:<fe_query_port>

spark.sql.catalog.starrocks.user

StarRocks 集群账号的用户名。

spark.sql.catalog.starrocks.password

StarRocks 集群账号的用户密码。

spark.sql.catalog.starrocks.fs.s3a.endpoint

StarRocks 存算分离集群对应的 TOS 对象存储 S3 访问 endpoint。

spark.sql.catalog.starrocks.fs.s3a.endpoint.region

StarRocks 存算分离集群对应的 TOS 对象存储 S3 访问 region。

spark.sql.catalog.starrocks.fs.s3a.connection.ssl.enabled

设置为 true

spark.sql.catalog.starrocks.fs.s3a.path.style.access

设置为 false

spark.sql.catalog.starrocks.fs.s3a.access.key

访问 TOS 对象存储的静态 AccessKey。

spark.sql.catalog.starrocks.fs.s3a.secret.key

访问 TOS 对象存储的静态 SecretKey。

spark.sql.catalog.starrocks.fs.s3a.retry.limit

5

访问 TOS 对象存储的失败重试次数。

spark.sql.catalog.starrocks.batch.size

4096

执行写入操作时的攒批粒度,适当增大该参数可以优化写入性能。

使用说明

EMR on ECS

说明

在 EMR on ECS 中使用 Bypass 模式,您需要确保:

  • StarRocks 版本需要选择 3.2.3 版本及以上。
  • Hadoop 集群需要选择 EMR-3.11.0 版本及以上。
  • 确保 Hadoop 集群与 StarRocks 集群位于同一个 VPC 内。

Spark SQL 方式

启动 Spark SQL 客户端需要设置的参数示例如下:

spark-sql \
    --conf spark.sql.extensions="com.starrocks.connector.spark.StarRocksExtensions" \
    --conf spark.sql.catalog.starrocks="com.starrocks.connector.spark.catalog.StarRocksCatalog" \
    --conf spark.sql.catalog.starrocks.batch.size=8092 \
    --conf spark.sql.catalog.starrocks.writer.mode="BYPASS" \
    --conf spark.sql.catalog.starrocks.reader.mode="BYPASS" \
    --conf spark.sql.catalog.starrocks.fe.http.url="http://192.168.6.7:8030" \
    --conf spark.sql.catalog.starrocks.fe.jdbc.url="jdbc:mysql://192.168.6.7:9030" \
    --conf spark.sql.catalog.starrocks.user="root" \
    --conf spark.sql.catalog.starrocks.password="******" \
    --conf spark.sql.catalog.starrocks.fs.s3a.endpoint="https://tos-s3-cn-beijing.ivolces.com" \
    --conf spark.sql.catalog.starrocks.fs.s3a.endpoint.region="cn-beijing" \
    --conf spark.sql.catalog.starrocks.fs.s3a.connection.ssl.enabled=true \
    --conf spark.sql.catalog.starrocks.fs.s3a.path.style.access=false \
    --conf spark.sql.catalog.starrocks.fs.s3a.retry.limit=27 \
    --conf spark.sql.catalog.starrocks.fs.s3a.access.key="******" \
    --conf spark.sql.catalog.starrocks.fs.s3a.secret.key="******" \
    --jars /opt/emr/current/spark/starrocks-connector/starrocks-spark-connector-3.5_2.12-1.1.2-ve-bypass-2.jar

进入 Spark SQL 终端后,您可以切换到 StarRocks Catalog,然后执行查询语句:

USE starrocks.demo;
SELECT * FROM example_table;

pySpark 方式

准备 demo.py 程序示例:

import sys
import time
from operator import add

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

if __name__ == "__main__":
    spark = SparkSession\
        .builder\
        .appName("Bypass Read")\
        .config("spark.shuffle.service.enabled", "true")\
        .config("spark.sql.extensions", "com.starrocks.connector.spark.StarRocksExtensions")\
        .config("spark.sql.catalog.starrocks", "com.starrocks.connector.spark.catalog.StarRocksCatalog")\
        .config("spark.sql.catalog.starrocks.fe.http.url", "http://xxx:8030")\
        .config("spark.sql.catalog.starrocks.fe.jdbc.url", "jdbc:mysql://xxx:9030")\
        .config("spark.sql.catalog.starrocks.user", "root")\
        .config("spark.sql.catalog.starrocks.password", "******")\
        .config("spark.sql.catalog.starrocks.writer.mode","BYPASS") \
        .config("spark.sql.catalog.starrocks.reader.mode","BYPASS") \
        .config("spark.sql.catalog.starrocks.fs.s3a.endpoint", "https://tos-s3-cn-beijing.ivolces.com")\
        .config("spark.sql.catalog.starrocks.fs.s3a.endpoint.region", "cn-beijing")\
        .config("spark.sql.catalog.starrocks.fs.s3a.connection.ssl.enabled", "true")\
        .config("spark.sql.catalog.starrocks.fs.s3a.path.style.access", "false")\
        .config("spark.sql.catalog.starrocks.fs.s3a.retry.limit", "27")\
        .config("spark.sql.catalog.starrocks.fs.s3a.access.key", "******")\
        .config("spark.sql.catalog.starrocks.fs.s3a.secret.key", "******")\
        .getOrCreate()
    spark.sql("use starrocks.demo").show()   
    spark.sql("show tables").show()
    spark.sql("select count(1) from tb_all_primitivetype_read_duplicate ").show()
    spark.sql("insert into tb_all_primitivetype_read_duplicate select * from tb_all_primitivetype_read_duplicate limit 10 ").show()
    spark.sql("select count(1) from tb_all_primitivetype_read_duplicate ").show()
    spark.stop()
  • 以 Client 模式提交执行
spark-submit \
    --conf spark.app.name="bypass-test" \
    --jars /opt/emr/current/spark/starrocks-connector/starrocks-spark-connector-3.5_2.12-1.1.2-ve-bypass-2.jar \
    --conf "spark.yarn.jars=hdfs://master-1-1.emr-58048cfb501d90d6411d.cn-beijing.emr-volces.com:8020/user/application/label-pipeline/spark-jars-v2/*,hdfs://master-1-1.emr-58048cfb501d90d6411d.cn-beijing.emr-volces.com:8020/user/application/label-pipeline/app-jars/*" \
    demo.py
  • 以 Cluster 模式提交执行
spark-submit \
    --deploy-mode cluster \
    --conf spark.app.name="bypass-test" \
    --conf "spark.yarn.jars=hdfs://master-1-1.emr-58048cfb501d90d6411d.cn-beijing.emr-volces.com:8020/user/application/label-pipeline/spark-jars-v2/*,hdfs://master-1-1.emr-58048cfb501d90d6411d.cn-beijing.emr-volces.com:8020/user/application/label-pipeline/app-jars/*" \
    demo.py

EMR on VKE

说明

在 EMR on VKE 中使用 Bypass 模式,您需要确保:

  • StarRocks 版本需要选择 3.2.3 版本及以上。
  • EMR on VKE 集群需要选择 EMR-1.6.0 版本及以上,且包含 Spark 组件。
  • 确保 EMR on VKE 集群与 StarRocks 集群位于同一个 VPC 内。
  • 阅读“使用 kubectl 管理作业” 了解如何向 VKE 集群提交 Spark 作业。

在掌握了如何向 EMR on VKE 集群提交 Spark 作业之后,您可以修改作业 yaml 文件,添加如下配置以实现通过 Spark 读写 StarRocks 数据文件:

spec:
    deps:
      jars:
        # starrocks-spark-connector jar 文件,VKE 集群镜像已内置
        - local:///opt/spark/starrocks-connector/starrocks-spark-connector-3.5_2.12-1.1.2-ve-bypass-2.jar
    sparkConf:
      spark.sql.extensions: "com.starrocks.connector.spark.StarRocksExtensions"
      spark.sql.catalog.starrocks: "com.starrocks.connector.spark.catalog.StarRocksCatalog"
      # 开启直读直写模式
      spark.sql.catalog.starrocks.writer.modeL: "BYPASS"
      spark.sql.catalog.starrocks.reader.mode: "BYPASS"
      # 对应 StarRocks 集群的连接配置,即 FE 各节点的内网 IP 地址
      spark.sql.catalog.starrocks.fe.http.url: "http://192.168.0.180:8030,192.168.0.181:8030,192.168.0.178:8030"
      spark.sql.catalog.starrocks.fe.jdbc.url: "jdbc:mysql://192.168.0.180:9030,192.168.0.181:9030,192.168.0.178:9030"
      # 对应 StarRocks 集群的认证配置
      spark.sql.catalog.starrocks.user: "root"
      spark.sql.catalog.starrocks.password: "******"
      # 对应 StarRocks 存算分离集群 TOS 连接和认证配置
      spark.sql.catalog.starrocks.fs.s3a.endpoint: "https://tos-s3-cn-beijing.ivolces.com"
      spark.sql.catalog.starrocks.fs.s3a.endpoint.region: "cn-beijing"
      spark.sql.catalog.starrocks.fs.s3a.connection.ssl.enabled: "true"
      spark.sql.catalog.starrocks.fs.s3a.path.style.access: "false"
      spark.sql.catalog.starrocks.fs.s3a.access.key: "******"
      spark.sql.catalog.starrocks.fs.s3a.secret.key: "******"
      # 【推荐】指定加载 so 文件路径,如果不指定则会尝试自解压,有 30s 左右的时间开销
      spark.driver.extraJavaOptions: "-Dcom.starrocks.format.jni.lib.path=/opt/starrocks/native"
      spark.executor.extraJavaOptions: "-Dcom.starrocks.format.jni.lib.path=/opt/starrocks/native"
      # 【可选】用于设置写入时攒批粒度
      spark.sql.catalog.starrocks.batch.size: "8092"

EMR Serverless Spark

参考 Serverless Spark 读写 StarRocks 操作手册

原生模式

配置参数

参数

必须

参数值

说明

starrocks.fe.jdbc.url

配置 FE 节点 MySQL 服务器地址,格式为 jdbc:mysql://<fe_host>:<fe_query_port>

starrocks.fe.http.url

配置 FE 节点 HTTP 服务器,格式为 <fe_host>:<fe_http_port>,多个以英文逗号 , 分隔。

starrocks.table.identifier

目标导入的 StarRocks 数据表,格式为:<database_name>.<table_name>

starrocks.user

StarRocks 访问用户名称。

starrocks.password

StarRocks 访问用户密码。

starrocks.write.label.prefix

  • 默认值:spark-

用于配置 Stream Load 导入任务 label 的前缀,推荐为作业依据具体的业务场景配置 label 前缀。

starrocks.write.enable.transaction-stream-load

  • 默认值:true

用于配置导入操作是否使用 Stream Load 事务接口,相对于普通接口在内存占用和性能层面均有更好的表现。

说明

如果配置了 starrocks.write.max.retries > 0,则该参数不生效。

starrocks.write.buffer.size

  • 默认值:104857600
  • 单位:默认为字节,支持 kbmbgb

用于配置缓存在内存中的数据量,当缓存数据量达到该阈值后会触发一次 Stream Load 导入。

starrocks.write.buffer.rows

Integer.MAX_VALUE

用于配置缓存在内存中的数据条数,当缓存数据量达到该阈值后会触发一次 Stream Load 导入。

starrocks.write.flush.interval.ms

  • 默认值:300000
  • 单位:毫秒

用于配置数据发送的时间间隔。

starrocks.write.max.retries

  • 默认值:3

用于配置最大失败重试次数。

说明

如果该参数配置值大于 0,则忽略 starrocks.write.enable.transaction-stream-load 配置

starrocks.write.retry.interval.ms

  • 默认值:10000
  • 单位:毫秒

失败重试时间间隔。

starrocks.columns

用于配置写入部分列,多个列名以英文逗号 , 分隔。

starrocks.write.num.partitions

用于配置 Spark 并行写入的分区数,默认由 Spark 决定。

starrocks.write.partition.columns

用于配置 Spark 分区的列,如果不指定则使用所有写入的列进行分区。

说明

该参数仅在配置starrocks.write.num.partitions 后生效。

starrocks.timezone

用于配置时区。

说明

Spark Connector 原生模式在导入数据时底层基于 Stream Load 实现,除了本小节列出的配置参数外,您也可以通过 starrocks.write.properties.{stream load 参数名} 的形式直接设置 Stream Load 的导入行为。例如通过 starrocks.write.properties.format 设置导入的数据格式,对应 Stream Load 的 format 参数。

使用说明

本小节以导入数据到 StarRocks 明细表 examples.tb_duplicate_key 为例,该表的建表语句如下:

CREATE TABLE IF NOT EXISTS tb_duplicate_key
(
    event_time BIGINT       NOT NULL COMMENT 'timestamp of event',
    event_type INT          NOT NULL COMMENT 'type of event',
    user_id    INT          NOT NULL COMMENT 'id of user',
    device     VARCHAR(128) NULL     COMMENT 'device'
) ENGINE = OLAP DUPLICATE KEY(event_time, event_type)
DISTRIBUTED BY HASH(user_id)
PROPERTIES (
    'replication_num' = '3'
);

Spark SQL 方式

您可以直接通过 Spark SQL 形式将数据写入 StarRocks 对应数据表中,步骤如下:

  1. 进入 Spark SQL 交互终端,参考 Spark SQL Client 使用方式 进入 Spark SQL 交互终端,需要额外添加如下参数:
--jars /opt/emr/current/spark/starrocks-connector/starrocks-spark-connector-3.5_2.12-1.1.2-ve-bypass-2.jar
  1. 通过 CREATE TABLE 创建一张 StarRocks tb_duplicate_key 表的外表,不要求同名:
CREATE TABLE IF NOT EXISTS tb_duplicate_key 
USING starrocks 
OPTIONS
(
    "starrocks.fe.http.url"="192.168.10.2:8030",
    "starrocks.fe.jdbc.url"="jdbc:mysql://192.168.10.2:9030",
    "starrocks.table.identifier"="examples.tb_duplicate_key",
    "starrocks.user"="system_query_user",
    "starrocks.password"="******"
);
  1. 通过 INSERT INTO 操作将数据插入外表:
INSERT INTO tb_duplicate_key
VALUES (1703128450, 1, 1001, 'PHONE'),
       (1703128451, 0, 1002, 'PAD'),
       (1703128452, 1, 1003, 'TV');

正常情况下,您可以在 StarRocks 中查询到刚刚由 Spark 侧写入的数据。

Spark DataFrame 方式

本小节以 Batch 任务为例,演示将内存中构造的数据通过 Spark DataFrame 方式导入 StarRocks 的 tb_duplicate_key 表。Scala 示例代码如下:

val spark = SparkSession
  .builder()
  .appName("load_data_example")
  .getOrCreate()

import spark.implicits._

// 模拟数据
val data = Seq(
  (1703128450, 1, 1001L, "PHONE"),
  (1703128451, 0, 1002L, "PAD"),
  (1703128452, 1, 1003L, "TV"),
)

// 将数据写入 StarRocks
val df = data.toDF("event_time", "event_type", "user_id", "device")
df.write
  .format("starrocks")
  .option("starrocks.fe.http.url", "192.168.10.2:8030")
  .option("starrocks.fe.jdbc.url", "jdbc:mysql://192.168.10.2:9030")
  .option("starrocks.table.identifier", "examples.tb_duplicate_key")
  .option("starrocks.user", "system_query_user")
  .option("starrocks.password", "******")
  .mode("append")
  .save()

关于如何提交 Spark 任务可以参考 Spark 使用文档,需要额外添加如下参数:

--jars /opt/emr/current/spark/starrocks-connector/starrocks-spark-connector-3.5_2.12-1.1.2-ve-bypass-2.jar

更多信息

您可以访问 StarRocks 官方文档 了解关于使用 Spark Connector 向 StarRocks 导入数据的更多介绍,以及如何使用 Spark Connector 读取 StarRocks 中的数据。