1、Spark宽窄依赖

窄依赖(Narrow Dependency): 指父RDD的每个分区只被子RDD的一个分区所使用。例如map、filter等这些算子的一个RDD,对它的父RDD只有简单的一对一的关系,RDD的每个partition仅仅依赖于父RDD 中的一个partition,父RDD和子RDD的partition之间的对应关系,是一对一的。

宽依赖(Shuffle Dependency): 父RDD的每个分区都可能被子RDD的多个分区使用,例如groupByKey、 reduceByKey,sortBykey等算子,这些算子其实都会产生shuffle操作,每一个父RDD的partition中的数据都可能会传输一部分到下一个RDD的每个partition中。此时就会出现,父RDD和子RDD的partition之间,具有错综复杂的关系,这种情况就叫做两个RDD 之间是宽依赖,同时,他们之间会发生shuffle操作。

2、Map、FlatMap、MapPartation的区别

1.Map:map方法返回的是一个object,map将流中的当前元素替换为此返回值,map是对rdd中的每一个元素进行操作; ​

2.FlatMap:flatMap方法返回的是一个Stream,FlatMap将流中的当前元素替换为此返回流拆解的流元素; ​

3.MapPartitions则是对rdd中的每个分区的迭代器进行操作;

MapPartitions的优点:

如果是普通的Map,比如一个Partition中有1万条数据。ok,那么你的Function要执行和计算1万次。

使用MapPartitions操作之后,一个Task仅仅会执行一次Function,Function一次接收所有的partition数据。只要执行一次就可以了,性能比较高。如果在Map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,Map需要为每个元素创建一个链接而MapPartition为每个Partition创建一个链接),则MapPartitions效率比Map高的多。

SparkSql或DataFrame默认会对程序进行mapPartition的优化。

MapPartitions的缺点:

如果是普通的Map操作,一次Function的执行就处理一条数据;那么如果内存不够用的情况下, 比如处理了1千条数据了,那么这个时候内存不够了,那么就可以将已经处理完的1千条数据从内存里面垃圾回收掉,或者用其他方法,腾出空间来吧。所以说普通的map操作通常不会导致内存的OOM异常。

但是MapPartitions操作,对于大量数据来说,比如甚至一个Partition,100万数据,一次传入一个Function以后,那么可能一下子内存不够,但是又没有办法去腾出内存空间来,可能就OOM,内存溢出。

3、Rdd、DataFrame、DataSet的区别

明细概述

1.细说DataFrame

DataFrame的前身是SchemaRDD。Spark1.3更名为DataFrame。不继承RDD,自己实现了RDD的大部分功能。与RDD类似,DataFrame也是一个分布式数据集:

1)DataFrame可以看做分布式 Row 对象的集合,提供了由列组成的详细模式信息,使其可以得到优化。DataFrame 不仅有比RDD更多的算子,还可以进行执行计划的优化。

2)DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema。

3)DataFrame也支持嵌套数据类型(struct、array和map)。

4)DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。

5)Dataframe的劣势在于在编译期缺少类型安全检查,导致运行时出错。

2.细说DataSet

1)DataSet是在Spark1.6中添加的新的接口。

2)与RDD相比,保存了更多的描述信息,概念上等同于关系型数据库中的二维表。

3)与DataFrame相比,保存了类型信息,是强类型的,提供了编译时类型检查。

4)调用Dataset的方法先会生成逻辑计划,然后Spark的优化器进行优化,最终生成物理计划,然后提交到集群中运行。

5)DataSet包含了DataFrame的功能,在Spark2.0中两者得到了统一:DataFrame表示为DataSet[Row],即DataSet的子集。

区别概述

1)RDD:以Person 为类型参数,但不了解其内部结构。

2)DataFrame:提供了详细的结构信息schema 列的名称和类型。这样看起来就像一张表了。

3)DataSet:不光有schema 信息,还有类型信息。

4、Spark Stage的划分

在Spark中,Stage的划分是基于RDD之间的依赖关系,这些依赖关系分为窄依赖和宽依赖两种类型。

窄依赖指的是父RDD的每个分区最多被一个子RDD的分区所用,表现为一对一或一对多的关系,这种情况下不会有shuffle的产生。而宽依赖则指父RDD的一个分区的数据去到子RDD的不同分区里面,这种情况下会有shuffle的产生。Stage的划分原则是从后往前,遇到宽依赖就断开,形成新的Stage。每个Stage的Task数量由最后一个RDD的Partition数决定。ShuffleMapTask用于提供shuffle数据,而ResultTask则用于生成最终结果。

具体来说,窄依赖允许父RDD的分区对应一个或多个子RDD分区,而宽依赖则需要等待所有父分区处理完毕。Stage的划分是根据RDD之间的依赖关系形成的DAG(有向无环图),DAG提交给DAGScheduler,后者将DAG划分为相互依赖的多个Stage。每个Stage包含一个或多个task,这些task有相同的shuffle dependencies。Stage的切割规则是从后往前,遇到宽依赖就切割Stage,形成一组并行的task运行。这种划分方式使得Spark能够有效地处理大数据集,并通过并行计算加速处理速度‌

5、Spark Rdd机制

1).基础意义:

​ RDD在spark中也叫做弹性分布式数据集,是spark最基本的数据处理模型。RDD在spark中(源码)是一个抽象类,代表一个弹性,不可变,可分区,可并行计算的集合。

2).RDD特点:

RDD表示只读分布区数据集,对RDD的改动只能通过转换操作,由一个RDD转换得到另一个新的RDD,新的RDD包含了从其他RDD衍生所必要的信息。RDD之间存在依赖,之间按照血缘关系延时计算,如果关系较长可以持久化RDD来切断血缘关系。 ​

1.分区,RDD逻辑上是分区的,数据抽象存在,计算时通过compute获得每个分区的数据。如果数据时从文件系统而来,则compute通过读取指定文件系统获取数据,如果依赖其他RDD,则compute通过转换获得数据 ​。

2.只读

3.依赖(宽窄依赖)

4.缓存 ​ e.checkpoint

6、Spark Shuffle理解

逻辑层面:主要从RDD血缘出发,从DAG角度讲解shuffle,再说明spark容错机制

物理层面:从执行角度剖析Shuffle是如何发生

以下是从逻辑层面和物理层面进行讲述:

逻辑层面:​RDD血缘和容错机制

血缘:RDD在spark执行后会被解析为DAG(被称之为无向循环图),在DAG中最初的RDD为基础RDD,之后的RDD产生依赖基础RDD,子RDD与基础RDD存在依赖关系,程序运行过程中无论那个RDD出现问题,都会依赖这种关系在某个点重新计算生成。血缘的表现方式为宽依赖和窄依赖。

容错

spark程序执行过程中,可能遇到的几种失败有

Driver出错:标记整个任务失败,需重启Driver

Executor出错:通常是所在的worker失败,会重新调度到其他excutor执行,重新执行的task会根具RDD血缘重新执行

Task出错:spark会对失败的task进行重试,3次后如果任然是失败,整个任务也就失败。

数据恢复和重试都是依赖RDD血缘机制

宽窄依赖

窄依赖的标准定义是:子RDD中的分区与父RDD中的分区只存在一对一的映射关系

宽依赖的标准定义是:子RDD中分区与父RDD中分区存在一对多的映射关系

宽依赖还有个名字,叫shuffle依赖,也就是说宽依赖必然会发生在shuffle操作,shuffle也是划分stage的重要依据。而窄依赖由于不需要发生shuffle,所有计算都是在分区所在节点完成,类似于MR中的ChainMapper。所以说,在如果在程序中选取的算子形成了宽依赖,那么就必然会触发shuffle

所以当RDD在shuffle过程中某个分区出现了故障,只需要找到当前对应的Stage,而这个Stage必然是某个shuffle算子所进行划分的,找到了这个算子,就离定位错误原因越来越近了。

物理层面:Spark物理层面对文件的shuffle方式有hash shuffle 、 sort_base shuffle

Hash Shuffle:

Hash Shuffle分为两个阶段,分别是shuffle write 和 shuffle fetch,前者是map任务划分分区,输出中间结果,后者是reduce获取中间结果。

Hash Shuffle整个过程中,所产生的文件数据和reduce数相同,每个repartition都有一个缓冲区用于接收结果,写满后就会输出到一个文件,用于reduce拉去数据结果。

Hash Shuffle实现缺陷也很明显:产生太多文件数,缓冲区占用太多空间。对于文件数太多问题,spark引入File consolidation机制,指共同输出文件以减低文件数(相同分区写同一个文件),会产生C(cpu)*R个文件。

Sort-Base Shuffle:

每个map任务最后都会只输出两个文件,一个是data文件,一个是index文件,过程采用类似mapreduce一样的归并排序,index会记录每个partition的偏移量,write完成后,reduce会更具index得到属于自己的分区,这种情况下,shuffle产生的结果文件问2*M。

在基于排序的shuffle中,spark还提供了一种折中方案,bypass sort-base shuffle,即当spark的reduce的任务小于spark.shuffle.sort.bypassMergeThreshold(默认200),任务会按照hash shuffle处理数据,不进行归并排序。只在shuffle wite后将文件合并为一个,并生成index文件。(其中中间会产生大量文件,只是最后被合并为一个)。

7、Spark Map和MapValue

Map会改变源RDD,且将函数作用于Key和Value,并返回一个新的RDD

MapValue则是将函数作用于源RDD的Value且不产生新的RDD,既:源RDD的分区不受到影响

8、Spark中的Order by、Sort by、Distribute by、Cluster by

Order by :order by会对输入做全局排序,因此只有一个Reducer(多个Reducer无法保证全局有序),然而只有一个Reducer,会导致当输入规模较大时,消耗较长的计算时间

Sort by :不是全局排序,其在数据进入reducer前完成排序,因此,如果用sort by进行排序,并且设置mapred.reduce.tasks>1,则sort by只会保证每个reducer的输出有序,并不保证全局有序

Distribute by :控制在map端如何拆分数据给reduce端的。hive会根据Distribute by后面的列和对应Reduce的个数进行数据分发,默认是采用Hash算法。Sort by为每个Reduce产生一个排序文件。在有些情况下,你需要控制某个特定行应该到哪个Reducer,这通常是为了进行后续的聚集操作。Distribute by刚好可以做这件事。因此,Distribute by经常和sort by配合使用。

1.Map输出的文件大小不均;

2.Reduce输出文件大小不均;

3.小文件过多;

4.文件超大;

Cluster by :除了具有distribute by的功能外还兼具sort by的功能。但是排序只能是倒叙排序,不能指定排序规则为ASC或者DESC。

9、Spark作业提交流程

  1. spark-submit 提交代码,执行 new SparkContext(),在 SparkContext 里构造 DAGSchedulerTaskScheduler

  2. TaskScheduler 会通过后台的一个进程,连接 Master,向 Master 注册 Application。

  3. Master 接收到 Application 请求后,会使用相应的资源调度算法,在 Worker 上为这个 Application 启动多个 Executer。

  4. Executor 启动后,会自己反向注册到 TaskScheduler 中。

  5. 所有 Executor 都注册到 Driver 上之后,SparkContext 结束初始化,接下来往下执行我们自己的代码。

  6. 每执行到一个 Action,就会创建一个 Job。

  7. Job 会提交给 DAGScheduler。

  8. DAGScheduler 会将 Job划分为多个 stage,然后每个 stage 创建一个 TaskSet。

  9. TaskScheduler 会把每一个 TaskSet 里的 Task,提交到 Executor 上执行。

  10. Executor 上有线程池,每接收到一个 Task,就用 TaskRunner 封装,然后从线程池里取出一个线程执行这个 task。

  11. (TaskRunner 将我们编写的代码,拷贝,反序列化,执行 Task,每个 Task 执行 RDD 里的一个 partition)

10、Coalesce和Repartition

1)关系: 两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法: coalesce(numPartitions, shuffle = true)

2)区别: repartition一定会发生shuffle,coalesce根据传入的参数来判断是否发生shuffle 一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce

11、Spark 持久化机制和checkpoint 机制区别

目的不同:cache 是为了加速计算,也就是加速后续的job。checkpoint 则是为了在job 运行失败的时候能够快速恢复!

存储位置不同:cache 主要使用内存,偶尔使用磁盘存储。checkpoint 为了可靠读写主要采用HDFS 作为存储空间

对Lineage 影响不同:cache 对lineage无影响。缓存的RDD 丢失后可以通过lineage 重新计算。如果对RDD 进行 checkpoint,HDFS 因为是可靠存储的,所以不需要再保存lineage了

应用场景不同:cache 机制适用于会被多次读取,占用空间不是特别大的RDD。checkpoint 机制则是适用于数据依赖关系特别复杂,重新计算代价高的RDD,比如某RDD关联的数据过多、计算链过长、被多次重复使用。

12、Spark由哪几个部分构成?

Master节点:常驻Master 进程,该进程负责管理所有的Worker 节点。(分配任务、收集运行信息、监控worker的存活状

Worker节点:常驻Worker进程,该进程与Master 节点通信,还管理Spark 任务的执行。(启动Executor,监控任务运行状态)

Executor执行器:Executor 是一个JVM 进程,是Spark 计算资源的单位。可以运行多个计算任务。

Task: Spark 应用会被拆分为多个计算任务,分配给Executor 执行。Task 以线程的方式运行在Executor 中。

13、Map 是类似于桶数组的形式,类比说一下RDD 的内部结构

RDD 就像一个分布式数组,每个子part 含有相同类型的元素,但是元素可以分布在不同的机器上。

14、Spark 中 DAG 是如何形成的

DAG详解

DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就形成了DAG,根据RDD之间依赖关系的不同将DAG划分成不同的Stage(调度阶段)。

对于窄依赖,partition的转换处理在一个Stage中完成计算。

对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。

DAG的边界:

开始:通过SparkContext创建的RDD

触发Action:一旦触发Action就形成了一个完整的DAG

DAG划分Stage:

1)一个Application有一个或者多个job,一个job对应一个DAG

2)一个job分为不同的stage

3)一个stage下面有一个或者多个TaskSet

4)一个TaskSet有很多Task(一个Task就是所需的cpucores)

5)一个TaskSet就对应一个RDD,很多RDD称为TaskSets

理解:

1.一个Spark的Application应用中一个或者多个DAG(也就是一个Job),取决于触发了多少次Action

2.一个DAG中会有不同的阶段/stage,划分阶段/stage的依据就是宽依赖

3.一个阶段/stage中可以有多个Task,一个分区对应一个Task

4.spark会根据shuffle/宽依赖使用回溯算法来对DAG进行Stage划分,从后往前,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到当前的stage/阶段中。

15、Spark内存管理机制

在进行分析Spark内存管理前,我们先看下Spark架构图:

从上图可以看出,Spark Application中包括两个JVM进程:Driver和Executor

Driver是主控进程,负责创建SparkSession/SparkContext,提交Job,将Job转化为Task,协调执行器之间的Task执行。

Executor主要负责执行具体的计算任务,并将结果返回给Driver。

Spark的内存管理在这两个部分中也是有所不同的,其中Driver内存管理比较简单,这里不做讲解,我们主要分析的是Executor的内存管理。

Executor内存分为两种(如下图所示):In-Heap memoryExternal memory

On-Heap 内存(In-Heap memory):对象在 JVM Heap 上分配并由 GC 绑定;

Off-Heap内存(External memory):对象通过序列化的方式分配在JVM之外的内存中,由应用程序管理,不受GC约束;

On-Heap 内存

On-Heap 内存即为堆内内存,它的大小由提交作业时指定 --excutor-memory 参数配置。Spark对内存的管理是一种逻辑上的规划管理,具体内存的申请和释放由JVM完成,Spark只是在JVM内存申请后和释放前记录这些内存(这是因为Spark的内存管理是基于JVM内存管理的),对于序列化对象可以精确计算实际占用内存,对于非序列化对象,占用内存采用周期采样估算得出,这样就导致实际可用内存小于spark记录的可用内存,容易导致OOM异常,但通过划分不同区域进行管理,可一定程度上减少了异常的出现。下图为Spark堆内内存的分配图(图例为统一内存管理):

从上图我们可以看出,Spark堆内内存分为四个部分:

Storage:用于缓存分布式数据集,比如 RDD Cache、广播变量等等

Execution:用来执行分布式任务。分布式任务的计算,主要包括数据的转换、过滤、映射、排序、聚合、归并等环节

Other:提供Spark内部对象、用户自定义对象的内存空间

System Reserved:不受开发者控制,它是 Spark 预留的、用来存储各种 Spark 内部对象的内存区域;

Off-Heap内存

为了进一步优化内存的使用以及提高 Shuffle 时排序的效率,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。利用 JDK Unsafe API(从 Spark 2.0 开始,在管理堆外的存储内存时不再基于 Tachyon,而是与堆外的执行内存一样,基于 JDK Unsafe API 实现),Spark 可以直接操作系统堆外内存,减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。堆外内存可以被精确地申请和释放,而且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差。

下图为Spark堆外内存的分配图(图例为统一内存管理):

在默认情况下堆外内存并不启用,可通过配置spark.memory.offHeap.enabled参数启用,并由spark.memory.offHeap.size参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式基本相同,所有运行中的并发任务共享存储内存和执行内存。

内存管理机制

了解了Spark的内存分类后,我们就来看下Spark内存的管理机制。Spark的内存管理机制分为两种(如下图所示):静态内存管理和统一内存管理

静态内存管理(Static Memory Management):将内存静态地分成两个固定的分区,用户在启动前可对其进行配置

统一内存管理(Unified Memory Manager):将内存进行统一管理,支持Storage和Execution内存动态占用

下面对这两种内存管理机制进行详细的介绍:

静态内存管理

Spark的静态内存管理将内存静态地分成两个固定的分区,Storage Memory和Execution Memory等内存的大小在应用程序处理过程中是固定的,但用户可以在应用程序启动前对其进行配置。设置参数如下可参考:

静态内存管理图示-堆内

静态内存管理图示-堆外

作为传统的内存管理模型,它的优缺点还是很明显的:

优点:实现比较简单

缺点:

1.尽管存储内存有可用空间,但无法使用,并且由于执行程序内存已满而会导致磁盘溢出(反之亦然)。

2.静态内存管理不支持使用堆外内存进行存储,所以全部分配给执行空间。

注:从Spark 1.6.0开始,采用新的内存管理器替代静态内存管理器,静态管理方式任然被保留,可通过spark.memory.useLegacyMode参数启用。但静态内存分配方法在 Spark 3.0 中被淘汰了,不能使用了

动态内存管理

Spark从1.6.0版本开始,采用新的内存管理器代替了静态内存管理器,为Spark提供动态内存分配。它分配一个内存区域作为存储和执行共享的统一内存容器。当不使用执行内存时,存储内存可以获得所有可用内存,反之亦然。如果任何存储或执行内存需要更多空间,一个名为acquireMemory() 的函数将扩展其中一个内存池并缩小另一个内存池。借用的存储内存可以在任何给定时间被逐出。然而,由于实现的复杂性,借用的执行内存不会在第一个设计中被逐出。

动态内存管理机制的动态占用规则如下:

只有在Execution memory中没有使用blocks时,Storage memory才能从Execution memory中借用空间。

如果Storage memory中没有使用blocks,执行内存也可以从Storage memory中借用空间。

如果Execution memory中的blocks被Storage memory占用,而Execution需要更多的内存,可以强制驱逐Storage Memory占用的多余blocks

如果Storage Memory中的blocks被Execution memory使用,Storage需要更多的内存,则不能强行驱逐Execution Memory占用的多余blocks;它最终将拥有更少的内存区域。它会等到Spark释放掉Execution memory存储的多余block,然后占用它们。

动态内存占用图示如下:

动态内存管理图示-堆内

动态内存管理图示-堆外

动态内存管理机制是用来解决静态内存管理机制不够灵活的问题的,它的优点如下:

1.存储内存和执行内存之间的边界不是静态的,在内存压力的情况下,边界会移动,即一个区域会通过从另一个区域借用空间来增长。

2.当应用程序没有缓存和传播时,执行会使用所有内存以避免不必要的磁盘溢出。

3.当应用程序有缓存时,它会保留最小的存储内存,这样数据块就不会受到影响。

4.这种方法为各种工作负载提供了合理的开箱即用性能,而无需用户了解内存如何在内部划分方面的专业知识。

深入底层源码

Spark内存管理相关类都在 spark core 模块的 org.apache.spark.memory 包下。

在Spark3.0之前,Spark有两种内存管理模式,静态内存管理(Static MemoryManager)和动态(统一)内存管理(Unified MemoryManager)

这个包实现了Spark的内存管理系统。该系统由两个主要组件的组成,即JVM范围内的内存管理和单个任务的内存管理:

MemoryManager:管理Spark在JVM中的总体内存使用情况。此组件实现了在任务之间划分可用内存以及在存储(缓存和数据传输使用的内存)和执行(计算使用的内存,如混洗、联接、排序和聚合)之间分配内存的策略。

TaskMemoryManager:管理各个任务分配的内存。任务与TaskMemoryManager交互,从不直接与JVM范围的MemoryManager进行交互。

在内部,这些组件中的每一个都有额外的内存记账抽象:

MemoryConsumer:是TaskMemoryManager的客户端,对应于任务中的单个运算符和数据结构。TaskMemoryManager从MemoryConsumers接收内存分配请求,并向使用者发出回调,以便在内存不足时触发溢出。

MemoryPool:是MemoryManager用来跟踪存储和执行之间的内存分配的记账抽象。

示意图:

MemoryManager有两种实现,它们处理内存池大小的方式各不相同:

UnifiedMemoryManager:Spark 1.6+中的默认设置强制执行存储和执行内存之间的软边界,允许通过从另一个区域借用内存来满足一个区域中的内存请求。

StaticMemoryManager:通过静态划分Spark的内存并防止存储和执行相互借用内存,在存储和执行内存之间实施硬边界。仅出于传统兼容性目的保留此模式。

在Spark3.0之后,静态内存管理被淘汰了,因此只剩下动态(统一)内存管理(Unified MemoryManager),因此内存管理源码有了较大改动。

MemoryConsumer:是TaskMemoryManager的客户端,对应于任务中的单个运算符和数据结构。TaskMemoryManager从MemoryConsumers接收内存分配请求,并向使用者发出回调,以便在内存不足时触发溢出。

MemoryMode:Spark的内存类型有两种:堆内和堆外

SparkOutOfMemoryError:当任务无法从内存管理器获取内存时,会引发此异常。我们应该使用throw this异常,而不是抛出OutOfMemoryError,这会杀死执行器,这只会杀死当前任务。

TaskMemoryManager:管理单个任务分配的内存

TooLargePageException:页面过大异常

16、Spark数据倾斜处理

数据倾斜的表现:

1.Executor lost,OOM,Shuffle 过程出错;

2.Driver OOM;

3.单个 Executor 执行时间特别久,整体任务卡在某个阶段不能结束;

4.正常运行的任务突然失败;

数据倾斜的处理方案:

1.使用 Hive 预处理:适用于导致数据倾斜的是Hive表,Hive 表中的数据本身不均匀;

2.过滤导致倾斜的 key:适用于只有少数几个热点 key,并且过滤之后对结果没影响;

3.提高 shuffle 操作的并行度;

4.两阶段聚合:局部 + 全局;

5.将reduce join转为map join:适用于一个 RDD 或表的数据量比较小,即一个大表和一个小表的情况;

6.采样倾斜 key 并分拆 join 操作。适用于两个表都很大,但其中某一个 RDD/Hive 表中的少数几个 key 的数据量过大,而另一个RDD/Hive表中的所有key都分布比较均匀。采用计算出最多的几个 key,将这几个 key 对应的数据从原来的 RDD 中拆分出来,形成一个单独的 - RDD,并给每个 key 都打上 n 以内的随机数作为前缀;

7.使用随机前缀和扩容 RDD 进行 join:适用于 RDD 中有大量的key导致数据倾斜;

17、Spark 任务提交流程

1.Spark-Shell提交任务,向RM申请资源;

2.RM分配container,在对应NodeMANAGER启动AM,然后AM启动driver;

3.Driver 向 ResourceManager 申请资源 Executor;

4.RM返回 container 给driver;

5.Driver在相应NodeMANAGER启动executor;

6.Executor向driver反向注册;

7.Executor 全部注册完,Driver 开始执行 main 函数;

8.Driver 执行函数时,遇到 action 算子就会触发一个 job,根据宽依赖划分 stage,每个 stage 生成 taskSet,将 task 分发到 Executor 上执行;

18、ReduceByKey和GroupByKey的区别

从shuffle角度:ReduceByKey和GroupByKey都存在shuffle的操作,但是ReduceByKey可以在shuffle前对分区内相同key的数据进行预聚合(combine)功能,这样会减少落磁盘的数据量(io);而GroupByKey只是进行分组,不存在数据量减少的问题,ReduceByKey的性能高

从功能角度:ReduceByKey其实包含分组和聚合的功能。GroupByKey只能分组,不能聚合。所以在分组聚合的场合下,推荐使用reduceByKey,而仅仅只是分组而不需要聚合的,那么只能使用GroupByKey。

19、算子解释

ReduceByKey、FoldByKey、AggregateByKey、CombineByKey 区别

ReduceByKey 没有初始值 分区内和分区间逻辑相同

foldByKey 有初始值 分区内和分区间逻辑相同

aggregateByKey 有初始值 分区内和分区间逻辑可以不同

combineByKey 初始值可以变化结构 分区内和分区间逻辑不同

算子集合:

combineByKey:最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。 combineByKey:方法需要三个参数 第一个参数表示:将相同key的第一个数据进行结构的转换,实现操作 第二个参数表示:分区内的计算规则 第三个参数表示:分区间的计算规则。

foldByKey:当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey

aggregateByKey:aggregateByKey是将数据根据不同的规则分别进行不同的分区内计算和分区间计算,规则分别设立

aggregateByKey的返回值和初始化值必须相同

aggregateByKey 算子是函数柯里化,存在两个参数列表:

第一个参数列表中的参数表示初始值

第二个参数列表中含有两个参数

1. 第一个参数表示分区内的计算规则

2. 第二个参数表示分区间的计算规则

groupByKey:将数据源的数据根据 key 对 value 进行分组,形成一个对偶元组,元组的第二个元素是value,groupBy分组后的元组第二个元素是KV。

reduceByKey:将数据按照相同的 Key 对 Value 进行聚合

partitionBy:将数据按照指定 Partitioner 重新进行分区。隐式转换为PairRDDFunctions,二次编译。 区别于coaleas,repartition是数量上的改变,二partitionBy是功能上的改变。

subtract:前一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来

union:对原 RDD 和参数 RDD 求并集后返回一个新的 RDD,要求类型相同

intersection:对源 RDD 和参数 RDD 求交集后返回一个新的 RDD,要求类型相同

repartition:该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。对于分区数多少的RDD都可以转换,因为无论如何都会经 shuffle 过程。

coalesce:根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率。spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本 coalesce方法默认不会打乱分区重新组合,容易导致数据倾斜 但coalease还有第二个参数,默认为false,不shuffle,设置为true即可进行shuffle让数据变的均衡。

glom:将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

flatMap:将处理的数据进行扁平化后再进行映射处理

mapPartitionsWithIndex:将待处理的数据以分区为单位发送到计算节点进行处理

mapPartitions: map和mapPartition的区别:map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。是串行,效率低 。mapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据。并行,效率高,但消耗内存,容易造成内存溢出。

20、spark小文件处理

1. 合并小文件

  • 在读取时合并:使用 Spark 的 coalescerepartition 方法在读取数据时合并小文件。`coalesce` 减少分区数量,而 repartition 可以根据需要增加或减少分区数量。

  • 使用 Hadoop 文件格式:如果数据存储在 HDFS 上,可以考虑使用像 Parquet 或 ORC 这样的列式存储格式,这些格式天然支持合并小文件。

2. 避免产生小文件

  • 在写入数据时,合理设置分区数,以减少输出的小文件数量。

  • 在数据转换过程中,注意数据倾斜和分区策略,避免某些分区数据量过小。

3. 使用高效的数据存储格式

  • 使用像 Parquet 或 ORC 这样的高效文件格式,它们可以更好地处理小文件问题,同时提供更好的压缩和编码特性。

4. 调整 Spark 配置

  • 调整 Spark 作业的配置,比如 spark.sql.files.maxPartitionBytesspark.sql.files.openCostInBytes,以控制读取文件时的分区大小和行为。

5. 合并文件后再处理

  • 在数据处理之前,使用外部工具(如 Hadoop 的 FileCrush 或自定义脚本)合并小文件,然后再用 Spark 进行处理。

6. 使用 Spark Structured Streaming

  • 如果是流处理场景,可以考虑使用 Spark Structured Streaming 的文件源选项,它可以有效地处理文件夹中不断累积的小文件。