Spark 围绕着 RDD 的概念展开,RDD 是可以并行操作的元素的容错集合。Spark 支持通过集合来创建 RDD 和通过外部数据集构建 RDD 两种方式来创建 RDD。例如,共享文件系统、HDFS、HBase 或任何提供 Hadoop InputFormat 的数据集。
val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data)
val distFile = sc.textFile("data.txt")
RDD 构建成功后,可以对其进行一系列操作,例如 Map 和 Reduce 等操作。
例如,运行以下代码,首先从外部存储系统读一个文本文件构造了一个 RDD,然后通过 RDD 的 Map 算子计算得到了文本文件中每一行的长度,最后通过 Reduce 算子计算得到了文本文件中各行长度之和。
val lines = sc.textFile("data.txt") val lineLengths = lines.map(s => s.length) val totalLength = lineLengths.reduce((a, b) => a + b)
通常,Spark RDD 的常用操作有两种,分别为 Transform 操作和 Action 操作。Transform 操作并不会立即执行,而是到了 Action 操作才会被执行。
操作 | 描述 |
---|---|
map() | 参数是函数,函数应用于 RDD 每一个元素,返回值是新的 RDD。 |
flatMap() | 参数是函数,函数应用于 RDD 每一个元素,将元素数据进行拆分,变成迭代器,返回值是新的 RDD。 |
filter() | 参数是函数,函数会过滤掉不符合条件的元素,返回值是新的 RDD。 |
distinct() | 没有参数,将 RDD 里的元素进行去重操作。 |
union() | 参数是 RDD,生成包含两个 RDD 所有元素的新 RDD。 |
intersection() | 参数是 RDD,求出两个 RDD 的共同元素。 |
subtract() | 参数是 RDD,将原 RDD 里和参数 RDD 里相同的元素去掉。 |
cartesian() | 参数是 RDD,求两个 RDD 的笛卡尔积。 |
操作 | 描述 |
---|---|
collect() | 返回 RDD 所有元素。 |
count() | 返回 RDD 中的元素个数。 |
countByValue() | 返回各元素在 RDD 中出现的次数。 |
reduce() | 并行整合所有 RDD 数据,例如求和操作。 |
fold(0)(func) | 和 reduce()功能一样,但是 fold 带有初始值。 |
aggregate(0)(seqOp,combop) | 和 reduce()功能一样,但是返回的 RDD 数据类型和原 RDD 不一样。 |
foreach(func) | 对 RDD 每个元素都是使用特定函数。 |
Spark SQL 支持直接通过 SQL 语句操作数据,而 Spark 会将 SQL 进行解析、优化并执行。
以下示例展示了如何使用 Spark SQL 进行读取文件。示例如下:
val peopleDF = spark.read.json("examples/src/main/resources/people.json") peopleDF.write.parquet("people.parquet")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19") namesDF.map(attributes => "Name: " + attributes(0)).show()
LAS Spark SQL 引擎兼容开源 SparkSQL 语法,以下对基本的库表操作做一个说明,其他详细指南可以参考开源 SparkSQL 语法说明。
0: jdbc:hive2://las-master-1:10001> create database db_demo; +---------+ | Result | +---------+ +---------+ No rows selected (0.285 seconds)
0: jdbc:hive2://las-master-1:10001> desc database db_demo; +----------------------------+----------------------------------------------------+ | database_description_item | database_description_value | +----------------------------+----------------------------------------------------+ | Database Name | db_demo | | Comment | | | Location | hdfs://las-master-1:8060/apps/spark/warehouse/db_demo.db | | Owner | hive | +----------------------------+----------------------------------------------------
0: jdbc:hive2://las-master-1:10001> drop database db_demo; +---------+ | Result | +---------+ +---------+ No rows selected (0.266 seconds)
0: jdbc:hive2://las-master-1:10001> create table tb_demo(id int, name string); +---------+ | Result | +---------+ +---------+ No rows selected (0.128 seconds)
0: jdbc:hive2://las-master-1:10001> desc table tb_demo; +-----------+------------+----------+ | col_name | data_type | comment | +-----------+------------+----------+ | id | int | NULL | | name | string | NULL | +-----------+------------+----------+
0: jdbc:hive2://las-master-1:10001> drop table tb_demo; +---------+ | Result | +---------+ +---------+
0: jdbc:hive2://las-master-1:10001> insert into tb_demo select 1,'name1'; +---------+ | Result | +---------+ +---------+
0: jdbc:hive2://las-master-1:10001> select * from tb_demo; +-----+-------+ | id | name | +-----+-------+ 1 + name1 + +-----+-------+ 1 rows selected (0.116 seconds)
PySpark 是 Spark 提供的 Python API。可以通过 PySpark 提供的 DataFrame 接口,完成各种计算逻辑。
操作步骤。
from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate()
from datetime import datetime, date import pandas as pd from pyspark.sql import Row df = spark.createDataFrame([ (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)), (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0)) ], schema='a long, b double, c string, d date, e timestamp')
df.show() df.printSchema()