spark笔记
Spark 内核概述
运行架构
Spark 框架的核心是一个计算引擎,整体来说,它采用了标准 master-slave 的结构。
如下图所示,它展示了一个 Spark 执行时的基本结构。图形中的 Driver 表示 master, 负责管理整个集群中的作业任务调度。图形中的 Executor 则是 slave,负责实际执行任务。
核心组件
Driver
Spark 驱动器节点,用于执行 Spark 任务中的 main 方法,负责实际代码的执行工作。 Driver 在 Spark 作业执行时主要负责:
- 将用户程序转化为作业(job)
- 在 Executor 之间调度任务(task)
- 跟踪 Executor 的执行情况
- 通过 UI 展示查询运行情况
实际上,我们无法准确地描述 Driver 的定义,因为在整个的编程过程中没有看到任何有关 Driver 的字眼。所以简单理解,所谓的 Driver 就是驱使整个应用运行起来的程序,也称之为 Driver 类。
Executor
Spark Executor 是集群中工作节点(Worker)中的一个 JVM 进程,负责在 Spark 作业 中运行具体任务(Task),任务彼此之间相互独立。Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有 Executor 节点发生了 故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点 上继续运行。’’
Executor 有两个核心功能:
- 负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程
- 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存 式存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存 数据加速运算。
核心概念
Executor 与 Core
Spark Executor 是集群中运行在工作节点(Worker)中的一个 JVM 进程,是整个集群中 的专门用于计算的节点。在提交应用中,可以提供参数指定计算节点的个数,以及对应的资 源。这里的资源一般指的是工作节点 Executor 的内存大小和使用的虚拟 CPU 核(Core)数 量。
应用程序相关启动参数如下:
名称 | 说明 |
---|---|
–num-executors | 配置 Executor 的数量 |
–executor-memory | 配置每个 Executor 的内存大小 |
–executor-cores | 配置每个 Executor 的虚拟 CPU core 数量 |
并行度(Parallelism)
在分布式计算框架中一般都是多个任务同时执行,由于任务分布在不同的计算节点进行 计算,所以能够真正地实现多任务并行执行,记住,这里是并行,而不是并发。这里我们将 整个集群并行执行任务的数量称之为并行度。那么一个作业到底并行度是多少呢?这个取决 于框架的默认配置。应用程序也可以在运行过程中动态修改。
提交流程
所谓的提交流程,其实就是我们开发人员根据需求写的应用程序通过 Spark 客户端提交 给 Spark 运行环境执行计算的流程。在不同的部署环境中,这个提交过程基本相同,但是又 有细微的区别,我们这里不进行详细的比较,但是因为国内工作中,将 Spark 引用部署到 Yarn 环境中会更多一些,所以本课程中的提交流程是基于 Yarn 环境的。
上图为 Spark 通用运行流程图,体现了基本的 Spark 应用程序在部署中的基本提交流程。
这个流程是按照如下的核心步骤进行工作的:
任务提交后,都会先启动 Driver 程序;
随后 Driver 向集群管理器注册应用程序;
之后集群管理器根据此任务的配置文件分配 Executor 并启动;
Driver 开始执行 main 函数,Spark 查询为懒执行,当执行到 Action 算子时开始反向推 算,根据宽依赖进行 Stage 的划分,随后每一个 Stage 对应一个 Taskset,Taskset 中有多 个 Task,查找可用资源 Executor 进行调度;
根据本地化原则,Task 会被分发到指定的 Executor 去执行,在任务执行的过程中, Executor 也会不断与 Driver 进行通信,报告任务运行情况。
YARN 模式运行机制
Yarn Client 模式
执行脚本提交任务,实际是启动一个 SparkSubmit 的 JVM 进程;
SparkSubmit 类中的 main 方法反射调用用户代码的 main 方法;
启动 Driver 线程,执行用户的作业,并创建 ScheduleBackend;
YarnClientSchedulerBackend 向 RM 发送指令:bin/java ExecutorLauncher;
Yarn 框架收到指令后会在指定的 NM 中启动 ExecutorLauncher(
实际上还是调用 ApplicationMaster 的 main 方法);
1 | object ExecutorLauncher { |
AM 向 RM 注册,申请资源;
获取资源后 AM 向 NM 发送指令:bin/java CoarseGrainedExecutorBackend;
CoarseGrainedExecutorBackend 进程会接收消息,跟 Driver 通信,注册已经启动的 Executor;然后启动计算对象 Executor 等待接收任务
Driver 分配任务并监控任务的执行。
注意:SparkSubmit、ApplicationMaster 和 YarnCoarseGrainedExecutorBackend 是独立的进程;Executor 和 Driver 是对象。
Yarn Cluster 模式
执行脚本提交任务,实际是启动一个 SparkSubmit 的 JVM 进程;
SparkSubmit 类中的 main 方法反射调用 YarnClusterApplication 的 main 方法;
YarnClusterApplication 创建 Yarn 客户端,然后向 Yarn 服务器发送执行指令:bin/java ApplicationMaster;
Yarn 框架收到指令后会在指定的 NM 中启动 ApplicationMaster;
ApplicationMaster 启动 Driver 线程,执行用户的作业;
AM 向 RM 注册,申请资源;
获取资源后 AM 向 NM 发送指令:bin/java YarnCoarseGrainedExecutorBackend;
CoarseGrainedExecutorBackend 进程会接收消息,跟 Driver 通信,注册已经启动的 Executor;然后启动计算对象 Executor 等待接收任务
Driver 线程继续执行完成作业的调度和任务的执行。
Driver 分配任务并监控任务的执行。
注意:SparkSubmit、ApplicationMaster 和 CoarseGrainedExecutorBackend 是独立的进程;Driver 是独立的线程;Executor 和 YarnClusterApplication 是对象
Spark 通讯架构解析
通讯组件Driver => Executor
通讯流程
Spark 通信架构如下图所示:
➢ RpcEndpoint:RPC 通信终端。Spark 针对每个节点(Client/Master/Worker)都称之为一
个 RPC 终端,且都实现 RpcEndpoint 接口,内部根据不同端点的需求,设计不同的消
息和不同的业务处理,如果需要发送(询问)则调用 Dispatcher。在 Spark 中,所有的
终端都存在生命周期:
1 | ⚫ Constructor |
➢ RpcEnv:RPC 上下文环境,每个 RPC 终端运行时依赖的上下文环境称为 RpcEnv;在 把当前 Spark 版本中使用的 NettyRpcEnv
➢ Dispatcher:消息调度(分发)器,针对于 RPC 终端需要发送远程消息或者从远程 RPC 接收到的消息,分发至对应的指令收件箱(发件箱)。如果指令接收方是自己则存入收 件箱,如果指令接收方不是自己,则放入发件箱;
➢ Inbox:指令消息收件箱。一个本地 RpcEndpoint 对应一个收件箱,Dispatcher 在每次向 Inbox 存入消息时,都将对应 EndpointData 加入内部 ReceiverQueue 中,另外 Dispatcher 创建时会启动一个单独线程进行轮询 ReceiverQueue,进行收件箱消息消费;
➢ RpcEndpointRef:RpcEndpointRef 是对远程 RpcEndpoint 的一个引用。当我们需要向一 个具体的 RpcEndpoint 发送消息时,一般我们需要获取到该 RpcEndpoint 的引用,然后 通过该应用发送消息。
➢ OutBox:指令消息发件箱。对于当前 RpcEndpoint 来说,一个目标 RpcEndpoint 对应一 个发件箱,如果向多个目标RpcEndpoint发送信息,则有多个OutBox。当消息放入Outbox 后,紧接着通过 TransportClient 将消息发送出去。消息放入发件箱以及发送过程是在同
一个线程中进行;
➢ RpcAddress:表示远程的 RpcEndpointRef 的地址,Host + Port。
➢ TransportClient:Netty 通信客户端,一个 OutBox 对应一个 TransportClient,TransportClient 不断轮询 OutBox,根据 OutBox 消息的 receiver 信息,请求对应的远程 TransportServer;
➢ TransportServer:Netty 通信服务端,一个 RpcEndpoint 对应一个 TransportServer,接受 远程消息后调用 Dispatcher 分发消息至对应收发件箱;
Spark 任务调度
Spark 任务调度概述
当 Driver 起来后,Driver 则会根据用户程序逻辑准备任务,并根据 Executor 资源情况 逐步分发任务。在详细阐述任务调度前,首先说明下 Spark 里的几个概念。一个 Spark 应用 程序包括 Job、Stage 以及 Task 三个概念:
Job 是以 Action 方法为界,遇到一个 Action 方法则触发一个 Job;
Stage 是 Job 的子集,以 RDD 宽依赖(即 Shuffle)为界,遇到 Shuffle 做一次划分;
Task 是 Stage 的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个 task。
Spark 的任务调度总体来说分两路进行,一路是 Stage 级的调度,一路是 Task 级的调度,总体调度流程如下图所示:
Spark RDD 通过其 Transactions 操作,形成了 RDD 血缘(依赖)关系图,即 DAG,最 后通过 Action 的调用,触发 Job 并调度执行,执行过程中会创建两个调度器:DAGScheduler 和 TaskScheduler。
➢ DAGScheduler 负责 Stage 级的调度,主要是将 job 切分成若干 Stages,并将每个 Stage 打包成 TaskSet 交给 TaskScheduler 调度。
➢ TaskScheduler 负责 Task 级的调度,将 DAGScheduler 给过来的 TaskSet 按照指定的调度 策略分发到 Executor 上执行,调度过程中 SchedulerBackend 负责提供可用资源,其中 SchedulerBackend 有多种实现,分别对接不同的资源管理系统。
Driver 初始化 SparkContext 过程中,会分别初始化 DAGScheduler、TaskScheduler、 SchedulerBackend 以及 HeartbeatReceiver,并启动 SchedulerBackend 以及 HeartbeatReceiver。 SchedulerBackend 通过 ApplicationMaster 申请资源,并不断从 TaskScheduler 中拿到合适的 Task 分发到 Executor 执行。HeartbeatReceiver 负责接收 Executor 的心跳信息,监控 Executor 的存活状况,并通知到 TaskScheduler。
Spark Stage 级调度
Spark 的任务调度是从 DAG 切割开始,主要是由 DAGScheduler 来完成。当遇到一个 Action 操作后就会触发一个 Job 的计算,并交给 DAGScheduler 来提交,下图是涉及到 Job 提交的相关方法调用流程图。
Job 由最终的 RDD 和 Action 方法封装而成;
SparkContext 将 Job 交给 DAGScheduler 提交,它会根据 RDD 的血缘关系构成的 DAG 进行切分,将一个 Job 划分为若干 Stages,具体划分策略是,由最终的 RDD 不断通过 依赖回溯判断父依赖是否是宽依赖,即以 Shuffle 为界,划分 Stage,窄依赖的 RDD 之 间被划分到同一个 Stage 中,可以进行 pipeline 式的计算。划分的 Stages 分两类,一类 叫做 ResultStage,为 DAG 最下游的 Stage,由 Action 方法决定,另一类叫做 ShuffleMapStage,为下游 Stage 准备数据,下面看一个简单的例子 WordCount。
Job 由 saveAsTextFile 触发,该 Job 由 RDD-3 和 saveAsTextFile 方法组成,根据 RDD 之 间的依赖关系从 RDD-3 开始回溯搜索,直到没有依赖的 RDD-0,在回溯搜索过程中,RDD- 3 依赖 RDD-2,并且是宽依赖,所以在 RDD-2 和 RDD-3 之间划分 Stage,RDD-3 被划到最 后一个 Stage,即 ResultStage 中,RDD-2 依赖 RDD-1,RDD-1 依赖 RDD-0,这些依赖都是 窄依赖,所以将 RDD-0、RDD-1 和 RDD-2 划分到同一个 Stage,形成 pipeline 操作,。即 ShuffleMapStage 中,实际执行的时候,数据记录会一气呵成地执行 RDD-0 到 RDD-2 的转 化。不难看出,其本质上是一个深度优先搜索(Depth First Search)算法。
一个 Stage 是否被提交,需要判断它的父 Stage 是否执行,只有在父 Stage 执行完毕才 能提交当前 Stage,如果一个 Stage 没有父 Stage,那么从该 Stage 开始提交。Stage 提交时会 将 Task 信息(分区信息以及方法等)序列化并被打包成 TaskSet 交给 TaskScheduler,一个Partition 对应一个 Task,另一方面 TaskScheduler 会监控 Stage 的运行状态,只有 Executor 丢 失或者 Task 由于 Fetch 失败才需要重新提交失败的 Stage 以调度运行失败的任务,其他类型 的 Task 失败会在 TaskScheduler 的调度过程中重试。
相对来说 DAGScheduler 做的事情较为简单,仅仅是在 Stage 层面上划分 DAG,提交 Stage 并监控相关状态信息。TaskScheduler 则相对较为复杂,下面详细阐述其细节。
Spark Task 级调度
Spark Task 的调度是由 TaskScheduler 来完成,由前文可知,DAGScheduler 将 Stage 打 包到交给 TaskScheTaskSetduler,TaskScheduler 会将 TaskSet 封装为 TaskSetManager 加入到 调度队列中,TaskSetManager 结构如下图所示。
TaskSetManager 负 责监控 管理 同一 个 Stage 中的 Tasks, TaskScheduler 就是以 TaskSetManager 为单元来调度任务。
前面也提到,TaskScheduler 初始化后会启动 SchedulerBackend,它负责跟外界打交道, 接收 Executor 的注册信息,并维护 Executor 的状态,所以说 SchedulerBackend 是管“粮食” 的,同时它在启动后会定期地去“询问”TaskScheduler 有没有任务要运行,也就是说,它会定 期地“问”TaskScheduler“我有这么余粮,你要不要啊”,TaskScheduler 在 SchedulerBackend“问” 它的时候,会从调度队列中按照指定的调度策略选择 TaskSetManager 去调度运行,大致方 法调用流程如下图所示:
上图中,将 TaskSetManager 加入 rootPool 调度池中之后,调用 SchedulerBackend 的 riviveOffers 方法给 driverEndpoint 发送 ReviveOffer 消息;driverEndpoint 收到 ReviveOffer 消息后调用 makeOffers 方法,过滤出活跃状态的 Executor(这些 Executor 都是任务启动时反 向注册到 Driver 的 Executor),然后将 Executor 封装成 WorkerOffer 对象;准备好计算资源 (WorkerOffer)后,taskScheduler 基于这些资源调用 resourceOffer 在 Executor 上分配 task。
调度策略
TaskScheduler 支持两种调度策略,一种是 FIFO,也是默认的调度策略,另一种是 FAIR。 在 TaskScheduler 初始化过程中会实例化 rootPool,表示树的根节点,是 Pool 类型。
FIFO 调度策略
如果是采用 FIFO 调度策略,则直接简单地将 TaskSetManager 按照先来先到的方式入 队,出队时直接拿出最先进队的 TaskSetManager,其树结构如下图所示,TaskSetManager 保 存在一个 FIFO 队列中。
FAIR 调度策略
FAIR 调度策略的树结构如下图所示:
FAIR 模式中有一个 rootPool 和多个子 Pool,各个子 Pool 中存储着所有待分配的 TaskSetMagager。
在 FAIR 模式中,需要先对子 Pool 进行排序,再对子 Pool 里面的 TaskSetMagager 进行排序,因为 Pool 和 TaskSetMagager 都继承了 Schedulable 特质,因此使用相同的排序算法。
排序过程的比较是基于 Fair-share 来比较的,每个要排序的对象包含三个属性: runningTasks值(正在运行的Task数)、minShare值、weight值,比较时会综合考量runningTasks 值,minShare 值以及 weight 值。
注意,minShare、weight 的值均在公平调度配置文件 fairscheduler.xml 中被指定,调度 池在构建阶段会读取此文件的相关配置。
如果A对象的runningTasks大于它的minShare,B对象的runningTasks小于它的minShare, 那么 B 排在 A 前面;(runningTasks 比 minShare 小的先执行)
如果 A、B 对象的 runningTasks 都小于它们的 minShare,那么就比较 runningTasks 与 minShare 的比值(minShare 使用率),谁小谁排前面;(minShare 使用率低的先执行)
如果 A、B 对象的 runningTasks 都大于它们的 minShare,那么就比较 runningTasks 与 weight 的比值(权重使用率),谁小谁排前面。(权重使用率低的先执行)
如果上述比较均相等,则比较名字
整体上来说就是通过minShare和weight这两个参数控制比较过程,可以做到让minShare 使用率和权重使用率少(实际运行 task 比例较少)的先运行。
FAIR 模式排序完成后,所有的 TaskSetManager 被放入一个 ArrayBuffer 里,之后依次 被取出并发送给 Executor 执行。
从调度队列中拿到 TaskSetManager 后,由于 TaskSetManager 封装了一个 Stage 的所有 Task,并负责管理调度这些 Task,那么接下来的工作就是 TaskSetManager 按照一定的规则 一个个取出 Task 给 TaskScheduler,TaskScheduler 再交给 SchedulerBackend 去发到 Executor 上执行。
本地化调度
DAGScheduler 切割 Job,划分 Stage, 通过调用 submitStage 来提交一个 Stage 对应的 tasks,submitStage 会调用 submitMissingTasks,submitMissingTasks 确定每个需要计算的 task 的 preferredLocations,通过调用 getPreferrdeLocations()得到 partition 的优先位置,由于一个 partition 对应一个 Task,此 partition 的优先位置就是 task 的优先位置,对于要提交到 TaskScheduler 的 TaskSet 中的每一个 Task,该 task 优先位置与其对应的 partition 对应的优先 位置一致。
从调度队列中拿到 TaskSetManager 后,那么接下来的工作就是 TaskSetManager 按照一定的规则一个个取出 task 给 TaskScheduler,TaskScheduler 再交给 SchedulerBackend 去发到 Executor 上执行。前面也提到,TaskSetManager 封装了一个 Stage 的所有 Task,并负责管理 调度这些 Task。
根据每个 Task 的优先位置,确定 Task 的 Locality 级别,Locality 一共有五种,优先级 由高到低顺序:
在调度执行时,Spark 调度总是会尽量让每个 task 以最高的本地性级别来启动,当一个 task 以 X 本地性级别启动,但是该本地性级别对应的所有节点都没有空闲资源而启动失败, 此时并不会马上降低本地性级别启动而是在某个时间长度内再次以 X 本地性级别来启动该 task,若超过限时时间则降级启动,去尝试下一个本地性级别,依次类推。
可以通过调大每个类别的最大容忍延迟时间,在等待阶段对应的 Executor 可能就会有 相应的资源去执行此 task,这就在在一定程度上提到了运行性能。
失败重试与黑名单机制
除了选择合适的 Task 调度运行外,还需要监控 Task 的执行状态,前面也提到,与外部 打交道的是 SchedulerBackend,Task 被提交到 Executor 启动执行后,Executor 会将执行状态 上报给 SchedulerBackend,SchedulerBackend 则告诉 TaskScheduler,TaskScheduler 找到该 Task 对应的 TaskSetManager,并通知到该 TaskSetManager,这样 TaskSetManager 就知道 Task 的失败与成功状态,对于失败的 Task,会记录它失败的次数,如果失败次数还没有超过最大 重试次数,那么就把它放回待调度的 Task 池子中,否则整个 Application 失败。
在记录 Task 失败次数过程中,会记录它上一次失败所在的 Executor Id 和 Host,这样下 次再调度这个 Task 时,会使用黑名单机制,避免它被调度到上一次失败的节点上,起到一 定的容错作用。黑名单记录 Task 上一次失败所在的 Executor Id 和 Host,以及其对应的“拉 黑”时间,“拉黑”时间是指这段时间内不要再往这个节点上调度这个 Task 了。
Spark Shuffle
Shuffle 核心
ShuffleMapStage 与 ResultStage
在划分 stage 时,最后一个 stage 称为 finalStage,它本质上是一个 ResultStage 对象,前 面的所有 stage 被称为 ShuffleMapStage。
ShuffleMapStage 的结束伴随着 shuffle 文件的写磁盘。
ResultStage 基本上对应代码中的 action 算子,即将一个函数应用在 RDD 的各个 partition 的数据集上,意味着一个 job 的运行结束。
HashShuffle 解析
未优化的 HashShuffle
这里我们先明确一个假设前提:每个 Executor 只有 1 个 CPU core,也就是说,无论这 个 Executor 上分配多少个 task 线程,同一时间都只能执行一个 task 线程。
如下图中有 3 个 Reducer,从 Task 开始那边各自把自己进行 Hash 计算(分区器: hash/numreduce 取模),分类出 3 个不同的类别,每个 Task 都分成 3 种类别的数据,想把不 同的数据汇聚然后计算出最终的结果,所以 Reducer 会在每个 Task 中把属于自己类别的数 据收集过来,汇聚成一个同类别的大集合,每 1 个 Task 输出 3 份本地文件,这里有 4 个 Mapper Tasks,所以总共输出了 4 个 Tasks x 3 个分类文件 = 12 个本地小文件。
优化后的 HashShuffle
优化的 HashShuffle 过程就是启用合并机制,合并机制就是复用 buffer,开启合并机制 的配置是 spark.shuffle.consolidateFiles。该参数默认值为 false,将其设置为 true 即可开启优 化机制。通常来说,如果我们使用 HashShuffleManager,那么都建议开启这个选项。
这里还是有 4 个 Tasks,数据类别还是分成 3 种类型,因为 Hash 算法会根据你的 Key 进行分类,在同一个进程中,无论是有多少过 Task,都会把同样的 Key 放在同一个 Buffer 里,然后把 Buffer 中的数据写入以 Core 数量为单位的本地文件中,(一个 Core 只有一种类 型的 Key 的数据),每 1 个 Task 所在的进程中,分别写入共同进程中的 3 份本地文件,这里 有 4 个 Mapper Tasks,所以总共输出是 2 个 Cores x 3 个分类文件 = 6 个本地小文件。
SortShuffle 解析
普通 SortShuffle
在该模式下,数据会先写入一个数据结构,reduceByKey 写入 Map,一边通过 Map 局 部聚合,一遍写入内存。Join 算子写入 ArrayList 直接写入内存中。然后需要判断是否达到 阈值,如果达到就会将内存数据结构的数据写入到磁盘,清空内存数据结构
在溢写磁盘前,先根据 key 进行排序,排序过后的数据,会分批写入到磁盘文件中。默 认批次为 10000 条,数据会以每批一万条写入到磁盘文件。写入磁盘文件通过缓冲区溢写的 方式,每次溢写都会产生一个磁盘文件,也就是说一个 Task 过程会产生多个临时文件。
最后在每个 Task 中,将所有的临时文件合并,这就是 merge 过程,此过程将所有临时 文件读取出来,一次写入到最终文件。意味着一个 Task 的所有数据都在这一个文件中。同 时单独写一份索引文件,标识下游各个Task的数据在文件中的索引,start offset和end offset
bypass SortShuffle
bypass 运行机制的触发条件如下:
shuffle reduce task 数量小于等于 spark.shuffle.sort.bypassMergeThreshold 参数的值,默认 为 200。
不是聚合类的 shuffle 算子(比如 reduceByKey)。
此时 task 会为每个 reduce 端的 task 都创建一个临时磁盘文件,并将数据按 key 进行 hash 然后根据 key 的 hash 值,将 key 写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件 都合并成一个磁盘文件,并创建一个单独的索引文件。
该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一模一样的,因为都要 创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘 文件,也让该机制相对未经优化的 HashShuffleManager 来说,shuffle read 的性能会更好。
而该机制与普通 SortShuffleManager 运行机制的不同在于:不会进行排序。也就是说, 启用该机制的最大好处在于,shuffle write 过程中,不需要进行数据的排序操作,也就节省 掉了这部分的性能开销。
Spark 内存管理
堆内和堆外内存规划
作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM 的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark 引入了堆外 (Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的 使用。堆内内存受到 JVM 统一管理,堆外内存是直接向操作系统进行内存的申请和释放。
堆内内存
在Spark程序启动时,堆内内存的大小由spark-submit中的–executor-memory 或 spark.executor.memory参数配置。Spark对于堆内内存的管理是一种逻辑上的“规划式”管理,因为对象实例占用内存的申请和释放都由JVM完成,Spark只能在申请后和释放前记录这些内存。
申请内存流程如下:
- Spark 在代码中 new 一个对象实例;
- JVM 从堆内内存分配空间,创建对象并返回对象引用;
- Spark 保存该对象的引用,记录该对象占用的内存。
释放内存流程如下:
- Spark 记录该对象释放的内存,删除该对象的引用;
- 等待 JVM 的垃圾回收机制释放该对象占用的堆内内存。
对于Spark的序列化对象,由于是字节流的形式,其占用的内存大小可以直接计算,而对于非序列化对象,其占用的内存是通过周期性的采样估算而得,即并不是新增数据项都会计算一次占用内存的大小,这种方法降低了时间开销,但是有可能误差较大,导致某一时刻的实际内存远远超出预期。所以Spark并不能准确记录实际可用的堆内内存,从而也无法避免内存溢出—OOM。
堆外内存
Spark引入堆外内存(Off-Heap),使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据;
堆外内存意味着把内存对象分配到Java虚拟以外的内存,这些内存直接受操作系统(而不是虚拟机)管理。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。Spark可以直接操作系统堆外内存,减少了不必要的系统开销,以及频繁的GC扫描和回收,提高了处理性能。堆外内存可以被精确地申请和释放(JVM对于内存的清理是无法精确指定时间点的,因此无法实现精确的释放),而且序列化的数据占用的空间可以被精确地计算,所以相比堆内内存来说降低了难度,也降低了误差;
在默认情况下堆外内存并不启用,可以通过配置spark.memory.offheap.enabled参数启用,并由spark.memory.offheap.size设定堆外空间的大小。除了没有Other空间外,堆内内存与堆外内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。
内存空间动态分配
在Spark1.6之前使用静态内存管理,即存储内存、执行内存和其他内存的大小在Spark应用程序运行期间是固定的。缺点很多,Spark1.6 之后引入的统一内存管理机制,与静态内存管理的区别在于存储内存和执行 内存共享同一块空间,可以动态占用对方的空闲区域,统一内存管理的堆内内存结构如图所 示:
其中最重要的优化在于动态占用机制,其规则如下:
设定基本的存储内存和执行内存区域(spark.storage.storageFraction 参数),该设定确定 了双方各自拥有的空间的范围;
双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空 间;(存储空间不足是指不足以放下一个完整的 Block)
执行内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后”归还”借用的 空间;
存储内存的空间被对方占用后,无法让对方”归还”,因为需要考虑 Shuffle 过程中的很 多因素,实现起来较为复杂。
统一内存管理的动态占用机制如图所示:
凭借统一内存管理机制,Spark 在一定程度上提高了堆内和堆外内存资源的利用率,降低了 开发者维护 Spark 内存的难度,但并不意味着开发者可以高枕无忧。如果存储内存的空间太 大或者说缓存的数据过多,反而会导致频繁的全量垃圾回收,降低任务执行时的性能,因为 缓存的 RDD 数据通常都是长期驻留内存的。所以要想充分发挥 Spark 的性能,需要开发者 进一步了解存储内存和执行内存各自的管理方式和实现原理。
存储内存管理
RDD 的持久化机制
弹性分布式数据集(RDD)作为 Spark 最根本的数据抽象,是只读的分区记录(Partition) 的集合,只能基于在稳定物理存储中的数据集上创建,或者在其他已有的 RDD 上执行转换 (Transformation)操作产生一个新的 RDD。转换后的 RDD 与原始的 RDD 之间产生的依赖 关系,构成了血统(Lineage)。凭借血统,Spark 保证了每一个 RDD 都可以被重新恢复。但 RDD 的所有转换都是惰性的,即只有当一个返回结果给 Driver 的行动(Action)发生时, Spark 才会创建任务读取 RDD,然后真正触发转换的执行。
Task 在启动之初读取一个分区时,会先判断这个分区是否已经被持久化,如果没有则需 要检查 Checkpoint 或按照血统重新计算。所以如果一个 RDD 上要执行多次行动,可以在 第一次行动中使用 persist 或 cache 方法,在内存或磁盘中持久化或缓存这个 RDD,从而在 后面的行动时提升计算速度。
事实上,cache 方法是使用默认的 MEMORY_ONLY 的存储级别将 RDD 持久化到内 存,故缓存是一种特殊的持久化。 堆内和堆外存储内存的设计,便可以对缓存 RDD 时使用 的内存做统一的规划和管理。
RDD 的持久化由 Spark 的 Storage 模块负责,实现了 RDD 与物理存储的解耦合。Storage 模块负责管理 Spark 在计算过程中产生的数据,将那些在内存或磁盘、在本地或远程存取数 据的功能封装了起来。在具体实现时 Driver 端和 Executor 端的 Storage 模块构成了主从式 的架构,即 Driver 端的 BlockManager 为 Master,Executor 端的 BlockManager 为 Slave。 Storage 模块在逻辑上以 Block 为基本存储单位,RDD 的每个 Partition 经过处理后唯一对应 一个 Block(BlockId 的格式为 rdd_RDD-ID_PARTITION-ID )。Driver 端的 Master 负责整 个 Spark 应用程序的 Block 的元数据信息的管理和维护,而 Executor 端的 Slave 需要将 Block 的更新等状态上报到 Master,同时接收 Master 的命令,例如新增或删除一个 RDD。
在对 RDD 持久化时,Spark 规定了 MEMORY_ONLY、MEMORY_AND_DISK 等 7 种 不同的存储级别,而存储级别是以下 5 个变量的组合:
1 | class StorageLevel private( |
Spark 中 7 种存储级别如下:
持久化级别 | 含义 |
---|---|
MEMORY_ONLY | 以非序列化的 Java 对象的方式持久化在 JVM 内存 中。如果内存无法完全存储 RDD 所有的 partition,那么那些没有持久化的 partition 就会在下一次需要使用它们的时候,重新被计算 |
MEMORY_AND_DISK | 同上,但是当某些 partition 无法存储在内存中时,会 持久化到磁盘中。下次需要使用这些 partition 时,需 要从磁盘上读取 |
MEMORY_ONLY_SER | 同 MEMORY_ONLY,但是会使用 Java 序列化方式, 将 Java 对象序列化后进行持久化。可以减少内存开 销,但是需要进行反序列化,因此会加大 CPU 开销 |
MEMORY_AND_DISK_SER | 同 MEMORY_AND_DISK,但是使用序列化方式持久 化 Java 对象 |
DISK_ONLY | 使用非序列化 Java 对象的方式持久化,完全存储到 磁盘上 |
MEMORY_ONLY_2 MEMORY_AND_DISK_2 | 如果是尾部加了 2 的持久化级别,表示将持久化数据 复用一份,保存到其他节点,从而在数据丢失时,不 需要再次计算,只需要使用备份数据即可 |
通过对数据结构的分析,可以看出存储级别从三个维度定义了 RDD 的 Partition(同时也就 是 Block)的存储方式:
➢ 存储位置:磁盘/堆内内存/堆外内存。如 MEMORY_AND_DISK 是同时在磁盘和堆 内内存上存储,实现了冗余备份。OFF_HEAP 则是只在堆外内存存储,目前选择堆外 内存时不能同时存储到其他位置。
➢ 存储形式:Block 缓存到存储内存后,是否为非序列化的形式。如 MEMORY_ONLY 是 非序列化方式存储,OFF_HEAP 是序列化方式存储。
➢ 副本数量:大于 1 时需要远程冗余备份到其他节点。如 DISK_ONLY_2 需要远程备份 1 个副本
RDD 的缓存过程
RDD 在缓存到存储内存之前,Partition 中的数据一般以迭代器(Iterator)的数据结构来访问,这是 Scala 语言中一种遍历数据集合的方法。通过 Iterator 可以获取分区中每一条序 列化或者非序列化的数据项(Record),这些 Record 的对象实例在逻辑上占用了 JVM 堆内内 存的 other 部分的空间,同一 Partition 的不同 Record 的存储空间并不连续。
RDD 在缓存到存储内存之后,Partition 被转换成 Block,Record 在堆内或堆外存储内存中 占用一块连续的空间。将 Partition 由不连续的存储空间转换为连续存储空间的过程,Spark 称之为”展开”(Unroll)。
Block 有序列化和非序列化两种存储格式,具体以哪种方式取决于该 RDD 的存储级别。非 序列化的 Block 以一种 DeserializedMemoryEntry 的数据结构定义,用一个数组存储所有的 对象实例,序列化的 Block 则以 SerializedMemoryEntry 的数据结构定义,用字节缓冲区 (ByteBuffer)来存储二进制数据。每个 Executor 的 Storage 模块用一个链式 Map 结构 (LinkedHashMap)来管理堆内和堆外存储内存中所有的 Block 对象的实例,对这个 LinkedHashMap 新增和删除间接记录了内存的申请和释放。
因为不能保证存储空间可以一次容纳 Iterator 中的所有数据,当前的计算任务在 Unroll 时 要向 MemoryManager 申请足够的 Unroll 空间来临时占位,空间不足则 Unroll 失败,空间 足够时可以继续进行。
对于序列化的 Partition,其所需的 Unroll 空间可以直接累加计算,一次申请。
对于非序列化的 Partition 则要在遍历 Record 的过程中依次申请,即每读取一条 Record, 采样估算其所需的 Unroll 空间并进行申请,空间不足时可以中断,释放已占用的 Unroll 空 间。 如果最终 Unroll 成功,当前 Partition 所占用的 Unroll 空间被转换为正常的缓存 RDD 的存 储空间,如下图所示
在静态内存管理时,Spark 在存储内存中专门划分了一块 Unroll 空间,其大小是固定的,统一内存管理时则没有对 Unroll 空间进行特别区分,当存储空间不足时会根据动态占用机 制进行处理。
淘汰与落盘
由于同一个 Executor 的所有的计算任务共享有限的存储内存空间,当有新的 Block 需 要缓存但是剩余空间不足且无法动态占用时,就要对 LinkedHashMap 中的旧 Block 进行淘 汰(Eviction),而被淘汰的 Block 如果其存储级别中同时包含存储到磁盘的要求,则要对其 进行落盘(Drop),否则直接删除该 Block。
存储内存的淘汰规则为:
➢ 被淘汰的旧 Block 要与新 Block 的 MemoryMode 相同,即同属于堆外或堆内内存;
➢ 新旧 Block 不能属于同一个 RDD,避免循环淘汰;
➢ 旧 Block 所属 RDD 不能处于被读状态,避免引发一致性问题;
➢ 遍历 LinkedHashMap 中 Block,按照最近最少使用(LRU)的顺序淘汰,直到满足新 Block 所需的空间。其中 LRU 是 LinkedHashMap 的特性
落盘的流程则比较简单,如果其存储级别符合_useDisk 为 true 的条件,再根据其_deserialized 判断是否是非序列化的形式,若是则对其进行序列化,最后将数据存储到磁盘,在 Storage 模块中更新其信息。
执行内存管理
执行内存主要用来存储任务在执行 Shuffle 时占用的内存,Shuffle 是按照一定规则对 RDD 数据重新分区的过程,我们来看 Shuffle 的 Write 和 Read 两阶段对执行内存的使用:
Shuffle Write
若在 map 端选择普通的排序方式,会采用 ExternalSorter 进行外排,在内存中存储数据时主 要占用堆内执行空间。
若在 map 端选择 Tungsten 的排序方式,则采用 ShuffleExternalSorter 直接对以序列化形式 存储的数据排序,在内存中存储数据时可以占用堆外或堆内执行空间,取决于用户是否开启 了堆外内存以及堆外执行内存是否足够。
Shuffle Read
在对 reduce 端的数据进行聚合时,要将数据交给 Aggregator 处理,在内存中存储数据时占 用堆内执行空间。
如果需要进行最终结果排序,则要将再次将数据交给 ExternalSorter 处理,占用堆内执行空间。
在 ExternalSorter 和 Aggregator 中,Spark 会使用一种叫 AppendOnlyMap 的哈希表在堆内执 行内存中存储数据,但在 Shuffle 过程中所有数据并不能都保存到该哈希表中,当这个哈希 表占用的内存会进行周期性地采样估算,当其大到一定程度,无法再从 MemoryManager 申 请到新的执行内存时,Spark 就会将其全部内容存储到磁盘文件中,这个过程被称为溢存 (Spill),溢存到磁盘的文件最后会被归并(Merge)。
Shuffle Write 阶段中用到的 Tungsten 是 Databricks 公司提出的对 Spark 优化内存和 CPU 使 用的计划(钨丝计划),解决了一些 JVM 在性能上的限制和弊端。Spark 会根据 Shuffle 的情 况来自动选择是否采用 Tungsten 排序。
Tungsten 采用的页式内存管理机制建立在 MemoryManager 之上,即 Tungsten 对执行内存 的使用进行了一步的抽象,这样在 Shuffle 过程中无需关心数据具体存储在堆内还是堆外。 每个内存页用一个 MemoryBlock 来定义,并用 Object obj 和 long offset 这两个变量统一标 识一个内存页在系统内存中的地址。
堆内的 MemoryBlock 是以 long 型数组的形式分配的内存,其 obj 的值为是这个数组的对象 引用,offset 是 long 型数组的在 JVM 中的初始偏移地址,两者配合使用可以定位这个数组 在堆内的绝对地址;堆外的 MemoryBlock 是直接申请到的内存块,其 obj 为 null,offset 是 这个内存块在系统内存中的 64 位绝对地址。Spark 用 MemoryBlock 巧妙地将堆内和堆外内
存页统一抽象封装,并用页表(pageTable)管理每个 Task 申请到的内存页。 Tungsten 页式管理下的所有内存用 64 位的逻辑地址表示,由页号和页内偏移量组成: 页号:占 13 位,唯一标识一个内存页,Spark 在申请内存页之前要先申请空闲页号。 页内偏移量:占 51 位,是在使用内存页存储数据时,数据在页内的偏移地址。 有了统一的寻址方式,Spark 可以用 64 位逻辑地址的指针定位到堆内或堆外的内存,整个 Shuffle Write 排序的过程只需要对指针进行排序,并且无需反序列化,整个过程非常高效, 对于内存访问效率和 CPU 使用效率带来了明显的提升。
Spark 的存储内存和执行内存有着截然不同的管理方式:对于存储内存来说,Spark 用一个 LinkedHashMap 来集中管理所有的 Block,Block 由需要缓存的 RDD 的 Partition 转化而成; 而对于执行内存,Spark 用 AppendOnlyMap 来存储 Shuffle 过程中的数据,在 Tungsten 排序 中甚至抽象成为页式内存管理,开辟了全新的 JVM 内存管理机制。