本文将为您介绍Spark支持弹性分布式数据集(RDD)、Spark SQL、PySpark和数据库表的基础操作示例。
已创建E-MapReduce(简称“EMR”)集群,详见:创建集群。
Spark围绕着 RDD 的概念展开,RDD是可以并行操作的元素的容错集合。Spark支持通过集合来创建RDD和通过外部数据集构建RDD两种方式来创建RDD。例如,共享文件系统、HDFS、HBase或任何提供Hadoop InputFormat的数据集。
val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data)
val distFile = sc.textFile("data.txt")
RDD构建成功后,可以对其进行一系列操作,例如Map和Reduce等操作。
例如,运行以下代码,首先从外部存储系统读一个文本文件构造了一个RDD,然后通过RDD的Map算子计算得到了文本文件中每一行的长度,最后通过Reduce算子计算得到了文本文件中各行长度之和。
val lines = sc.textFile("data.txt") val lineLengths = lines.map(s => s.length) val totalLength = lineLengths.reduce((a, b) => a + b)
通常,Spark RDD的常用操作有两种,分别为Transform操作和Action操作。Transform操作并不会立即执行,而是到了Action操作才会被执行。
操作 | 描述 |
---|---|
map() | 参数是函数,函数应用于RDD每一个元素,返回值是新的RDD。 |
flatMap() | 参数是函数,函数应用于RDD每一个元素,将元素数据进行拆分,变成迭代器,返回值是新的RDD。 |
filter() | 参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD。 |
distinct() | 没有参数,将RDD里的元素进行去重操作。 |
union() | 参数是RDD,生成包含两个RDD所有元素的新RDD。 |
intersection() | 参数是RDD,求出两个RDD的共同元素。 |
subtract() | 参数是RDD,将原RDD里和参数RDD里相同的元素去掉。 |
cartesian() | 参数是RDD,求两个RDD的笛卡尔积。 |
操作 | 描述 |
---|---|
collect() | 返回RDD所有元素。 |
count() | 返回RDD中的元素个数。 |
countByValue() | 返回各元素在RDD中出现的次数。 |
reduce() | 并行整合所有RDD数据,例如求和操作。 |
fold(0)(func) | 和reduce()功能一样,但是fold带有初始值。 |
aggregate(0)(seqOp,combop) | 和reduce()功能一样,但是返回的RDD数据类型和原RDD不一样。 |
foreach(func) | 对RDD每个元素都是使用特定函数。 |
Spark SQL支持直接通过SQL语句操作数据,而Spark会将SQL进行解析、优化并执行。
以下示例展示了如何使用Spark SQL进行读取文件。示例如下:
val peopleDF = spark.read.json("examples/src/main/resources/people.json") peopleDF.write.parquet("people.parquet")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19") namesDF.map(attributes => "Name: " + attributes(0)).show()
PySpark是Spark提供的Python API。可以通过PySpark提供的DataFrame接口,完成各种计算逻辑。
操作步骤
初始化SparkSession。
初始化SparkSession作为PySpark的执行入口。
from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate()
from datetime import datetime, date import pandas as pd from pyspark.sql import Row df = spark.createDataFrame([ (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)), (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0)) ], schema='a long, b double, c string, d date, e timestamp')
DataFrame创建完成后,可以通过各种类型的transform算子完成数据计算。
打印DataFrame和Schema。
df.show() df.printSchema()
EMR SparkSQL完全兼容开源SparkSQL语法,以下对基本的库表操作做一个说明,其他详细指南可以参考开源SparkSQL语法说明。
0: jdbc:hive2://emr-master-1:10005> create database db_demo; +---------+ | Result | +---------+ +---------+ No rows selected (0.285 seconds)
0: jdbc:hive2://emr-master-1:10005> desc database db_demo; +----------------------------+----------------------------------------------------+ | database_description_item | database_description_value | +----------------------------+----------------------------------------------------+ | Database Name | db_demo | | Comment | | | Location | hdfs://emr-master-1:8020/apps/spark/warehouse/db_demo.db | | Owner | hive | +----------------------------+----------------------------------------------------
0: jdbc:hive2://emr-master-1:10005> drop database db_demo; +---------+ | Result | +---------+ +---------+ No rows selected (0.266 seconds)
0: jdbc:hive2://emr-master-1:10005> create table tb_demo(id int, name string); +---------+ | Result | +---------+ +---------+ No rows selected (0.128 seconds)
0: jdbc:hive2://emr-master-1:10005> desc table tb_demo; +-----------+------------+----------+ | col_name | data_type | comment | +-----------+------------+----------+ | id | int | NULL | | name | string | NULL | +-----------+------------+----------+
0: jdbc:hive2://emr-master-1:10005> drop table tb_demo; +---------+ | Result | +---------+ +---------+
0: jdbc:hive2://emr-master-1:10005> insert into tb_demo select 1,'name1'; +---------+ | Result | +---------+ +---------+
0: jdbc:hive2://emr-master-1:10005> select * from tb_demo; +-----+-------+ | id | name | +-----+-------+ + 1 + name1 + +-----+-------+ 1 rows selected (0.116 seconds)