Spark
Spark
一、运行模式
1.1 Local 模式
- 进入Spark包解压缩后的路径,执行如下指令
bin/spark-shell
- 访问Web UI
http://localhost:4040/jobs/
- 数据可以放到
spark-3.1.2-bin-hadoop3.2/data
目录中 - 简单的word count
sc.textFile("data/word.txt").flatMap(_.split("
")).map((_,1)).reduceByKey(_+_).collect
- 退出本地模式
quit
- 提交应用
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
./examples/jars/spark-examples_2.12-3.1.2.jar \
10
- --class 表示要执行程序的主类,此处可以更换为咱们自己写的应用程序
- --master local[2] 部署模式,默认为本地模式,数字表示分配的虚拟 CPU 核数量
- spark-examples_2.12-3.0.0.jar 运行的应用类所在的 jar 包,实际使用时,可以设定为咱们自己打的 jar 包
- 数字 10 表示程序的入口参数,用于设定当前应用的任务数量
1.2 Standalone 模式
Spark 自身节点运行的集群模式,也就是我们所谓的 独立部署(Standalone)模式。Spark 的 Standalone 模式体现了经典的 master-slave 模式。
1.2.1 修改配置文件
- 解压Spark包
- 进入解压缩后路径的 conf 目录,修改 slaves.template 文件名为 slaves
mv slaves.template slaves
- 修改 slaves 文件,添加 work 节点
linux1
linux2
linux3
- 修改 spark-env.sh.template 文件名为 spark-env.sh
mv spark-env.sh.template spark-env.sh
- 修改 spark-env.sh 文件,添加 JAVA_HOME 环境变量和集群对应的 master 节点
export JAVA_HOME=/opt/module/jdk1.8.0_144
SPARK_MASTER_HOST=linux1
SPARK_MASTER_PORT=7077
分发 spark-standalone 目录
启动群集
sbin/start-all.sh
- 提交应用
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://linux1:7077 \
./examples/jars/spark-examples_2.12-3.1.2.jar \
10
1) --class 表示要执行程序的主类
2) --master spark://linux1:7077 独立部署模式,连接到 Spark 集群
3) spark-examples_2.12-3.1.2.jar 运行类所在的 jar 包
4) 数字 10 表示程序的入口参数,用于设定当前应用的任务数量
- 查看 Master 资源监控 Web UI 界面: http://xxx:8080
1.2.2 配置历史服务
由于 spark-shell 停止掉后,集群监控 linux1:4040 页面就看不到历史任务的运行情况,所以 开发时都配置历史服务器记录任务运行情况。
- 修改 spark-defaults.conf.template 文件名为 spark-defaults.conf
mv spark-defaults.conf.template spark-defaults.conf
- 修改 spark-default.conf 文件,配置日志存储路径
spark.eventLog.enabled true
spark.eventLog.dir hdfs://xxx:8020/directory
- 修改 spark-env.sh 文件, 添加日志配置
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.fs.logDirectory=hdfs://xxx:8020/directory
-Dspark.history.retainedApplications=30"
参数1含义:WEBUI访问的端口号为18080
参数2含义:指定历史服务器日志存储路径
参数3含义:指定保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。
- 分发配置文件
- 重新启动集群和历史服务
sbin/stop-all.sh
sbin/start-all.sh
sbin/start-history-server.sh
- 查看历史服务:http://xxx:18080
1.2.3 配置高可用(HA)
Linux1 | Linux2 | Linux3 |
---|---|---|
Master Zookeeper Worker | Master Zookeeper Worker | Zookeeper Worker |
- 停止集群
sbin/stop-all.sh
- 修改 spark-env.sh 文件添加如下配置
注释如下内容:
#SPARK_MASTER_HOST=linux1
#SPARK_MASTER_PORT=7077
添加如下内容:
#Master 监控页面默认访问端口为 8080,但是可能会和 Zookeeper 冲突,所以改成 8989,也可以自定义
SPARK_MASTER_WEBUI_PORT=8989
export SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=linux1,linux2,linux3
-Dspark.deploy.zookeeper.dir=/spark"
- 分发配置文件
- 启动群集
sbin/start-all.sh
- 启动 linux2 的单独 Master 节点,此时 linux2 节点 Master 状态处于备用状态
sbin/start-master.sh
- 提交应用到高可用集群
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://linux1:7077,linux2:7077 \
./examples/jars/spark-examples_2.12-3.1.2.jar \
10
停止 linux1 的 Master 资源监控进程
查看 linux2 的 Master 资源监控 Web UI,稍等一段时间后,linux2 节点的 Master 状态 提升为活动状态
1.3 Yarn 模式
独立部署(Standalone)模式由 Spark 自身提供计算资源,无需其他框架提供资源。这 种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是你也要记住,Spark 主 要是计算框架,而不是资源调度框架,所以本身提供的资源调度并不是它的强项,所以还是 和其他专业的资源调度框架集成会更靠谱一些。
1.3.1 配置Yarn模式
- 修改 hadoop 配置文件/hadoop/etc/hadoop/yarn-site.xml, 并分发
<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认 是 true -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认 是 true -->
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
- 修改 conf/spark-env.sh,添加 JAVA_HOME 和 YARN_CONF_DIR 配置
mv spark-env.sh.template spark-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144
export YARN_CONF_DIR=/opt/module/hadoop/etc/hadoop
- 提交应用
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_2.12-3.1.2.jar \
10
1.3.2 配置历史服务器
- 修改 spark-defaults.conf.template 文件名为 spark-defaults.conf
mv spark-defaults.conf.template spark-defaults.conf
- 修改 spark-default.conf 文件,配置日志存储路径
spark.eventLog.enabled true
spark.eventLog.dir hdfs://xxx:8020/directory
- 修改 spark-env.sh 文件, 添加日志配置
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.fs.logDirectory=hdfs://xxx:8020/directory
-Dspark.history.retainedApplications=30"
参数1含义:WEBUI访问的端口号为18080
参数2含义:指定历史服务器日志存储路径
参数3含义:指定保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。
- 修改 spark-defaults.conf
spark.yarn.historyServer.address=linux1:18080
spark.history.ui.port=18080
- 启动历史服务
sbin/start-history-server.sh
- 重新提交应用
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10
二、Spark 运行架构
Spark 框架的核心是一个计算引擎,整体来说,它采用了标准 master-slave 的结构。 如下图所示,它展示了一个 Spark 执行时的基本结构。图形中的 Driver 表示 master, 负责管理整个集群中的作业任务调度。图形中的 Executor 则是 slave,负责实际执行任务。
2.1 核心组件
2.1.1 Driver
Spark 驱动器节点,用于执行 Spark 任务中的 main 方法,负责实际代码的执行工作。 Driver 在 Spark 作业执行时主要负责:
- 将用户程序转化为作业(job)
- 在Executor之间调度任务(task)
- 跟踪Executor的执行情况
- 通过UI展示查询运行情况
2.1.2 Executor
Spark Executor 是集群中工作节点(Worker)中的一个 JVM 进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立。Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有 Executor 节点发生了 故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点 上继续运行。
Executor 有两个核心功能:
- 负责运行组成Spark应用的任务,并将结果返回给驱动器进程
- 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存 数据加速运算。
2.1.3 Master & Worker
Spark 集群的独立部署环境中,不需要依赖其他的资源调度框架,自身就实现了资源调 度的功能,所以环境中还有其他两个核心组件:Master 和 Worker,这里的 Master 是一个进 程,主要负责资源的调度和分配,并进行集群的监控等职责,类似于 Yarn 环境中的 RM, 而 Worker 呢,也是进程,一个 Worker 运行在集群中的一台服务器上,由 Master 分配资源对 数据进行并行的处理和计算,类似于 Yarn 环境中 NM。
2.1.4 ApplicationMaster
Hadoop 用户向 YARN 集群提交应用程序时,提交程序中应该包含 ApplicationMaster,用于向资源调度器申请执行任务的资源容器 Container,运行用户自己的程序任务 job,监控整 个任务的执行,跟踪整个任务的状态,处理任务失败等异常情况。 说的简单点就是,ResourceManager(资源)和 Driver(计算)之间的解耦合靠的就是 ApplicationMaster。
2.2 核心概念
2.2.1 有向无环图(DAG)
大数据计算引擎框架我们根据使用方式的不同一般会分为四类,其中第一类就是 Hadoop 所承载的 MapReduce,它将计算分为两个阶段,分别为 Map 阶段 和 Reduce 阶段。 对于上层应用来说,就不得不想方设法去拆分算法,甚至于不得不在上层应用实现多个 Job 的串联,以完成一个完整的算法,例如迭代计算。 由于这样的弊端,催生了支持 DAG 框 架的产生。因此,支持 DAG 的框架被划分为第二代计算引擎。如 Tez 以及更上层的 Oozie。这里我们不去细究各种 DAG 实现之间的区别,不过对于当时的 Tez 和 Oozie 来 说,大多还是批处理的任务。接下来就是以 Spark 为代表的第三代的计算引擎。第三代计 算引擎的特点主要是 Job 内部的 DAG 支持(不跨越 Job),以及实时计算。 这里所谓的有向无环图,并不是真正意义的图形,而是由 Spark 程序直接映射成的数据 流的高级抽象模型。简单理解就是将整个程序计算的执行过程用图形表示出来,这样更直观, 更便于理解,可以用于表示程序的拓扑结构。
2.3 提交流程
2.3.1 Yarn Client 模式
./bin/spark-shell --master yarn --deploy-mode client
Client 模式将用于监控和调度的 Driver 模块在客户端执行,而不是在 Yarn 中,所以一般用于测试。
- Driver在任务提交的本地机器上运行
- Driver启动后会和ResourceManager通讯申请启动ApplicationMaster
- ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,负责向 ResourceManager 申请 Executor 内存
- ResourceManager接到ApplicationMaster的资源申请后会分配container,然后ApplicationMaster 在资源分配指定的 NodeManager 上启动 Executor 进程
- Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行 main 函数
- 之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分stage,每个stage生 成对应的 TaskSet,之后将 task 分发到各个 Executor 上执行。
2.3.2 Yarn Cluster 模式
$ ./bin/spark-submit --class my.main.Class \
--master yarn \
--deploy-mode cluster \
--jars my-other-jar.jar,my-other-other-jar.jar \
my-main-jar.jar \
app_arg1 app_arg2
Cluster 模式将用于监控和调度的 Driver 模块启动在 Yarn 集群资源中执行。一般应用于实际生产环境。
- 在YARNCluster模式下,任务提交后会和ResourceManager通讯申请启动ApplicationMaster,
- 随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,此时的 ApplicationMaster 就是 Driver。
- Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster 的资源申请后会分配 container,然后在合适的 NodeManager 上启动Executor 进程
- Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main 函数,
- 之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分stage,每个stage生成对应的 TaskSet,之后将 task 分发到各个 Executor 上执行。
三、Spark 核心编程
Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于 处理不同的应用场景。三大数据结构分别是:
- RDD: 弹性分布式数据集
- 累加器: 分布式共享只写变量
- 广播变量: 分布式共享只读变量
3.1 RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型
。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
- 弹性
- 存储的弹性:内存与磁盘的自动切换;
- 容错的弹性:数据丢失可以自动恢复;
- 计算的弹性:计算出错重试机制;
- 分片的弹性:可根据需要重新分片。
- 分布式:数据存储在大数据集群不同节点上
- 数据集:RDD封装了计算逻辑,并不保存数据
- 数据抽象:RDD是一个抽象类,需要子类具体实现
- 不可变:RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的 RDD 里面封装计算逻辑
- 可分区、并行计算
3.1.1 RDD的五个重要属性
org/apache/spark/rdd/RDD.scala
Internally, each RDD is characterized by five main properties:
- A list of partitions
- A function for computing each split
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each split on
- (e.g. block locations for an HDFS file)
- 分区列表
RDD 数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。
/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*
* The partitions in this array must satisfy the following property:
* `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
*/
protected def getPartitions: Array[Partition]
- 分区计算函数
Spark 在计算时,是使用分区函数对每一个分区进行计算
/**
* :: DeveloperApi ::
* Implemented by subclasses to compute a given partition.
*/
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
- RDD之间的依赖关系
RDD 是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个 RDD 建立依赖关系
/**
* Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
protected def getDependencies: Seq[Dependency[_]] = deps
- 分区器(可选)
当数据为 KV 类型数据时,可以通过设定分区器自定义数据的分区
/** Optionally overridden by subclasses to specify how they are partitioned. */
@transient val partitioner: Option[Partitioner] = None
- 首选位置(可选)
计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算
/**
* Optionally overridden by subclasses to specify placement preferences.
*/
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
3.2 RDD 算子
所谓RDD的算子就是RDD方法
3.2.1 转换算子
把旧的RDD包装成一个新的RDD就是转换
3.2.2 行动算子
触发任务的调度和作业的执行
四、SparkSQL
4.1 基础操作
## 支持的格式
spark.read.
csv format jdbc json load option options orc parquet schema table text textFile
## 读取Json
spark.read.json("input/user.json")
val df = spark.read.json("input/user.json")
## 创建临时表
df.createTempView("user")
## 如果表存在直接替换掉
df.createOrReplaceTempView("user")
spark.sql("select * from user").show
## 创建全局表
df.createGlobalTempView("people")
df.createOrReplaceGlobalTempView("people")
## 查询全局表
spark.sql("select * from global_temp.people").show
## 换一个新的session连接
spark.newSession.sql("select age from user").show
4.2 DSL 语法
# 查看DataFrame的Schema
df.printSchema
# 只查看"username"列数据,
df.select("username").show()
# 应用数据,进行计算
df.select($"age"+1).show
# 单引号,引用数据计算
df.select('age +1).show
# 别名
df.select('username,'age+1 as "newage").show()
# 查看"age"大于"30"的数据
df.filter($"age">30).show
# 按照"age"分组,查看数据条数
df.groupBy("age").count.show
# RDD 转换为 DataFrame
# 在 IDEA 中开发程序时,如果需要 RDD 与 DF 或者 DS 之间互相操作,那么需要引入 import spark.implicits._
val idRDD = sc.textFile("data/id.txt")
idRDD.toDF("id").show
# 样例类将 RDD 转换为 DataFrame
case class User(name:String, age:Int)
sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, t._2)).toDF.show
# DataFrame 转换为 RDD,此时得到的 RDD 存储类型为 Row
val rdd = df.rdd
4.3 DataSet
DataSet 是具有强类型的数据集合,需要提供对应的类型信息。
# 创建 DataSet
case class Person(name: String, age: Long)
val caseClassDS = Seq(Person("zhangsan",2)).toDS()
# 使用基本类型的序列创建 DataSet
val ds = Seq(1,2,3,4,5).toDS
#df 转ds
case class Emp