You need to enable JavaScript to run this app.
导航
开发指南
最近更新时间:2025.04.01 20:13:41首次发布时间:2024.11.12 16:54:08
我的收藏
有用
有用
无用
无用

RDD 基础操作

Spark 围绕着 RDD 的概念展开,RDD 是可以并行操作的元素的容错集合。Spark 支持通过集合来创建 RDD 和通过外部数据集构建 RDD 两种方式来创建 RDD。例如,共享文件系统、HDFS、HBase 或任何提供 Hadoop InputFormat 的数据集。

创建 RDD 示例:

  • 通过集合来创建 RDD。
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
  • 通过外部数据集构建 RDD。
val distFile = sc.textFile("data.txt")

RDD 构建成功后,可以对其进行一系列操作,例如 Map 和 Reduce 等操作。
例如,运行以下代码,首先从外部存储系统读一个文本文件构造了一个 RDD,然后通过 RDD 的 Map 算子计算得到了文本文件中每一行的长度,最后通过 Reduce 算子计算得到了文本文件中各行长度之和。

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

Spark RDD 常用操作

通常,Spark RDD 的常用操作有两种,分别为 Transform 操作和 Action 操作。Transform 操作并不会立即执行,而是到了 Action 操作才会被执行。

  • Transform 操作。

操作

描述

map()

参数是函数,函数应用于 RDD 每一个元素,返回值是新的 RDD。

flatMap()

参数是函数,函数应用于 RDD 每一个元素,将元素数据进行拆分,变成迭代器,返回值是新的 RDD。

filter()

参数是函数,函数会过滤掉不符合条件的元素,返回值是新的 RDD。

distinct()

没有参数,将 RDD 里的元素进行去重操作。

union()

参数是 RDD,生成包含两个 RDD 所有元素的新 RDD。

intersection()

参数是 RDD,求出两个 RDD 的共同元素。

subtract()

参数是 RDD,将原 RDD 里和参数 RDD 里相同的元素去掉。

cartesian()

参数是 RDD,求两个 RDD 的笛卡尔积。

  • Action 操作。

操作

描述

collect()

返回 RDD 所有元素。

count()

返回 RDD 中的元素个数。

countByValue()

返回各元素在 RDD 中出现的次数。

reduce()

并行整合所有 RDD 数据,例如求和操作。

fold(0)(func)

和 reduce()功能一样,但是 fold 带有初始值。

aggregate(0)(seqOp,combop)

和 reduce()功能一样,但是返回的 RDD 数据类型和原 RDD 不一样。

foreach(func)

对 RDD 每个元素都是使用特定函数。

Spark SQL 基础操作

Spark SQL 支持直接通过 SQL 语句操作数据,而 Spark 会将 SQL 进行解析、优化并执行。
以下示例展示了如何使用 Spark SQL 进行读取文件。示例如下:

  • 示例 1:Spark 支持多种数据格式,本示例读取了 JSON 格式文件的数据,并输出为 Parquet 格式。
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
peopleDF.write.parquet("people.parquet")
  • 示例 2:通过 SQL 从 parquetFile 表中读出年龄在 13 岁到 19 岁之间的年轻人的名字,并转化为 DataFrame,随后通过 Map 操作将名字转化为一个可读的形式并输出。
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()

Spark SQL 库表操作

LAS Spark SQL 引擎兼容开源 SparkSQL 语法,以下对基本的库表操作做一个说明,其他详细指南可以参考开源 SparkSQL 语法说明

数据库操作

创建数据库

0: jdbc:hive2://las-master-1:10001> create database db_demo;
+---------+
| Result  |
+---------+
+---------+
No rows selected (0.285 seconds)

查看数据库信息

0: jdbc:hive2://las-master-1:10001> desc database db_demo;
+----------------------------+----------------------------------------------------+
| database_description_item  |             database_description_value             |
+----------------------------+----------------------------------------------------+
| Database Name              | db_demo                                            |
| Comment                    |                                                    |
| Location                   | hdfs://las-master-1:8060/apps/spark/warehouse/db_demo.db |
| Owner                      | hive                                               |
+----------------------------+----------------------------------------------------

删除数据库

0: jdbc:hive2://las-master-1:10001> drop database db_demo;
+---------+
| Result  |
+---------+
+---------+
No rows selected (0.266 seconds)

表操作

创建表

0: jdbc:hive2://las-master-1:10001> create table tb_demo(id int, name string);
+---------+
| Result  |
+---------+
+---------+
No rows selected (0.128 seconds)

描述表信息

0: jdbc:hive2://las-master-1:10001> desc table tb_demo;
+-----------+------------+----------+
| col_name  | data_type  | comment  |
+-----------+------------+----------+
| id        | int        | NULL     |
| name      | string     | NULL     |
+-----------+------------+----------+

删除表

0: jdbc:hive2://las-master-1:10001> drop table tb_demo;
+---------+
| Result  |
+---------+
+---------+

插入数据

0: jdbc:hive2://las-master-1:10001> insert into tb_demo select 1,'name1';
+---------+
| Result  |
+---------+
+---------+

查询表数据

0: jdbc:hive2://las-master-1:10001> select * from tb_demo;
+-----+-------+
| id  | name  |
+-----+-------+
1   + name1 +
+-----+-------+
1 rows selected (0.116 seconds)

PySpark 基础操作

PySpark 是 Spark 提供的 Python API。可以通过 PySpark 提供的 DataFrame 接口,完成各种计算逻辑。
操作步骤。

  1. 初始化 SparkSession。
  2. 初始化 SparkSession 作为 PySpark 的执行入口。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
  1. 创建 DataFrame。
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row
df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
  1. DataFrame 创建完成后,可以通过各种类型的 transform 算子完成数据计算。
  2. 打印 DataFrame 和 Schema。
df.show()
df.printSchema()