最近更新时间:2024.04.30 14:09:53
首次发布时间:2024.04.30 14:09:53
StarRocks 支持通过 Spark 读取或写入数据。您可以使用 Spark Connector 连接 Spark 与 StarRocks 实现数据导入,其原理是在内存中对数据进行攒批,按批次使用 Stream Load 将数据导入 StarRocks。Spark Connector 支持 DataFrame 和 SQL 接入形式,并支持 Batch 和 Structured Streaming 作业类型。
您可以从 Maven 中央仓库 中下载与您 Spark 版本匹配的最新的 spark-connector-starrocks.jar
文件,也可以使用由 EMR 团队提供的 Spark Connector 版本。
说明
EMR 团队提供的 Spark Connector Jar 文件随 Spark 安装包一同附送,您可以在支持部署 Spark 组件的 EMR 集群 /usr/lib/emr/current/spark/jars
路径下找到对应的 jar 文件。
Spark Connector 默认不包含 JDBC 驱动,您需要确保在 classpath 路径下包含 mysql-connector-java.jar
文件。
相对于开源版本的 Spark Connector,我们更加推荐您使用 EMR 团队提供的 Spark Connector 版本,相对而言优势如下:
能够与 EMR 集群,及其周边生态更好的集成。
增加一些 EMR 团队定制开发的竞争力特性。
更加及时修复一些已知的 bug。
不过兼容开源是 EMR 作为开源大数据平台的基本原则,您仍然可以坚持使用开源 Spark Connector 版本。
本小节以导入数据到 StarRocks 明细表 examples.tb_duplicate_key
为例,该表的建表语句如下:
CREATE TABLE IF NOT EXISTS tb_duplicate_key ( event_time BIGINT NOT NULL COMMENT 'timestamp of event', event_type INT NOT NULL COMMENT 'type of event', user_id INT NOT NULL COMMENT 'id of user', device VARCHAR(128) NULL COMMENT 'device' ) ENGINE = OLAP DUPLICATE KEY(event_time, event_type) DISTRIBUTED BY HASH(user_id) PROPERTIES ( 'replication_num' = '3' );
您可以直接通过 Spark SQL 形式将数据写入 StarRocks 对应数据表中,步骤如下:
进入 Spark SQL 交互终端,参考 Spark SQL Client 使用方式 进入 Spark SQL 交互终端。
通过 CREATE TABLE
创建一张 StarRocks tb_duplicate_key
表的映射表,不要求同名:
CREATE TABLE IF NOT EXISTS tb_duplicate_key USING starrocks OPTIONS ( "starrocks.fe.http.url"="{fe_ip}:8030", "starrocks.fe.jdbc.url"="jdbc:mysql://{fe_ip}:9030", "starrocks.table.identifier"="examples.tb_duplicate_key", "starrocks.user"="system_query_user", "starrocks.password"="******" );
INSERT INTO
操作将数据插入映射表:INSERT INTO tb_duplicate_key VALUES (1703128450, 1, 1001, 'PHONE'), (1703128451, 0, 1002, 'PAD'), (1703128452, 1, 1003, 'TV');
正常情况下,您可以在 StarRocks 中查询到刚刚由 Spark 侧写入的数据。
本小节以 Batch 任务为例,演示将内存中构造的数据通过 Spark DataFrame 方式导入 StarRocks 的 tb_duplicate_key
表。Scala 示例代码如下:
val spark = SparkSession .builder() .appName("load_data_example") .getOrCreate() import spark.implicits._ // 模拟数据 val data = Seq( (1703128450, 1, 1001L, "PHONE"), (1703128451, 0, 1002L, "PAD"), (1703128452, 1, 1003L, "TV"), ) // 将数据写入 StarRocks val df = data.toDF("event_time", "event_type", "user_id", "device") df.write .format("starrocks") .option("starrocks.fe.http.url", "{fe_ip}:8030") .option("starrocks.fe.jdbc.url", "jdbc:mysql://{fe_ip}:9030") .option("starrocks.table.identifier", "examples.tb_duplicate_key") .option("starrocks.user", "system_query_user") .option("starrocks.password", "******") .mode("append") .save()
关于如何提交 Spark 任务可以参考 Spark 使用文档。
参数 | 必须 | 参数值 | 说明 |
---|---|---|---|
starrocks.fe.jdbc.url | 是 | 配置 FE 节点 MySQL 服务器地址,格式为 jdbc:mysql://<fe_host>:<fe_query_port> 。 | |
starrocks.fe.http.url | 是 | 配置 FE 节点 HTTP 服务器,格式为 <fe_host>:<fe_http_port> ,多个以英文逗号 , 分隔。 | |
starrocks.table.identifier | 是 | 目标导入的 StarRocks 数据表,格式为: | |
starrocks.user | 是 | StarRocks 访问用户名称。 | |
starrocks.password | 是 | StarRocks 访问用户密码。 | |
starrocks.write.label.prefix | 否 |
| 用于配置 Stream Load 导入任务 label 的前缀,推荐为作业依据具体的业务场景配置 label 前缀。 |
starrocks.write.enable.transaction-stream-load | 否 |
| 用于配置导入操作是否使用 Stream Load 事务接口,相对于普通接口在内存占用和性能层面均有更好的表现。 如果配置了 |
starrocks.write.buffer.size | 否 |
| 用于配置缓存在内存中的数据量,当缓存数据量达到该阈值后会触发一次 Stream Load 导入。 |
starrocks.write.buffer.rows | 否 | Integer.MAX_VALUE | 用于配置缓存在内存中的数据条数,当缓存数据量达到该阈值后会触发一次 Stream Load 导入。 |
starrocks.write.flush.interval.ms | 否 |
| 用于配置数据发送的时间间隔。 |
starrocks.write.max.retries | 否 |
| 用于配置最大失败重试次数。 如果该参数配置值大于 0,则忽略 |
starrocks.write.retry.interval.ms | 否 |
| 失败重试时间间隔。 |
starrocks.columns | 否 | 用于配置写入部分列,多个列名以英文逗号 , 分隔。 | |
starrocks.write.num.partitions | 否 | 用于配置 Spark 并行写入的分区数,默认由 Spark 决定。 | |
starrocks.write.partition.columns | 否 | 用于配置 Spark 分区的列,如果不指定则使用所有写入的列进行分区。 该参数仅在配置 | |
starrocks.timezone | 否 | 用于配置时区。 |
您可以访问 StarRocks 官方文档 了解关于使用 Spark Connector 向 StarRocks 导入数据的更多介绍,以及如何使用 Spark Connector 读取 StarRocks 中的数据。