Hadoop基础-知识必备

conjuring-ai
144
2023-10-06

1、Hadoop之MapReduce

1.1 Map阶段

  1. 对文件进行逻辑切片 split,默认大小为 hdfs 块大小,每一块对应一个 mapTask;

  2. 对切片中的数据按行读取,解析返回 <K,V> 形式,key 为每一行的偏移量,value 为每一行的数据;

  3. 调用 Map 方法处理数据,读取一行调用一次;

  4. 对 Map 方法计算的数据进行分区 partition,排序 sort; 默认不分区,因为只有一个 reduceTask 处理数据,分区数 =reduceTask 数,计算规则:key 的 hash 值对 reduce 取模,保证相同 key 一定在同一分区

  5. Map 输出数据一同写入到数据缓冲区,达到一定的条件溢写到磁盘

  6. spill 溢出的文件进行 combiner 规约,combiner 为 map 阶段的 reduce,并不是每个 mapTask 都有该流程,对于 combine 需要慎用,例如:求平均数,如果提前 combine 则会导致最终的计算结果不一致

  7. 对所有溢出的文件(分区且有序)进行最终 merge 合并,成为一个大文件

1.2 Reduce阶段

  1. 从 MapTask 复制拉取其对应的分区文件

  2. 将 copy 的数据进行 merge 合并,再对合并后的数据排序,默认按照 key 字典序排序

  3. 对排序后的数据调用 reduce 方法

1.3 Shuffle原理

shuffle 被称作 MapReduce 的心脏,是 MapReduce 的核心。

Map 端的 shuffle:

1.collect 阶段:在 map 端首先接触的是 InputSplit,在 InputSplit 中含有 DataNode 中的数据,每一个 InputSplit 都会分配一个 Mapper 任务,Mapper 任务结束后产生 <K2,V2> 的输出,这些输出先存放在缓存中,每个 map 有一个环形内存缓冲区,用于存储任务的输出。默认大小 100MB(io.sort.mb 属性),一旦达到阀值 0.8(io.sort.spil l.percent),一个后台线程就把内容写到 (spill)Linux 本地磁盘中的指定目录(mapred.local.dir)下的新建的一个溢出写文件。

2.Spill 阶段:写磁盘前,要进行 partition、sort 和 combine 等操作。通过分区,将不同类型的数据分开处理,之后对不同分区的数据进行排序,如果有 Combiner,还要对排序后的数据进行 combine。等最后记录写完,将全部溢出文件合并为一个分区且排序的文件。

3.Merge 阶段:最后将磁盘中的数据送到 Reduce 中,从图中可以看出 Map 输出有三个分区,有一个分区数据被送到图示的 Reduce 任务中,剩下的两个分区被送到其他 Reducer 任务中。而图示的 Reducer 任务的其他的三个输入则来自其他节点的 Map 输出。

Reduce 端的 shuffle:

Reduce 端的 shuffle 主要包括三个阶段,copy、sort(merge) 和 reduce。

1.copy 阶段:Reducer 通过 Http 方式得到输出文件的分区。

reduce 端可能从 n 个 map 的结果中获取数据,而这些 map 的执行速度不尽相同,当其中一个 map 运行结束时,reduce 就会从 JobTracker 中获取该信息。map 运行结束后 TaskTracker 会得到消息,进而将消息汇报给 JobTracker,reduce 定时从 JobTracker 获取该信息,reduce 端默认有 5 个数据复制线程从 map 端复制数据。

2.merge 阶段:如果形成多个磁盘文件会进行合并。

从 map 端复制来的数据首先写到 reduce 端的缓存中,同样缓存占用到达一定阈值后会将数据写到磁盘中,同样会进行 partition、combine、排序等过程。如果形成了多个磁盘文件还会进行合并,最后一次合并的结果作为 reduce 的输入而不是写入到磁盘中。

3.reducer 的参数:最后将合并后的结果作为输入传入 Reduce 任务中。

最后就是 Reduce 过程了,在这个过程中产生了最终的输出结果,并将其写到 HDFS 上。

1.4 MapReduce中的几种排序

排序发生阶段:

一个是在 MapSide 发生在 Spill 后 Partition 前

一个是在 ReduceSide 发生在 copy 后 Reduce 前

排序分类:

1. 部分排序:MapReduce 根据输入记录的键对数据集排序。保证输出的每个文件内部排序。

2. 全排序:如何用 Hadoop 产生一个全局排序的文件?

最简单的方法是使用一个分区。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了 MapReduce 所提供的并行架构。

替代方案:首先创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要思路是使用一个分区来描述输出的全局排序。

例如:可以为待分析文件创建 3 个分区,在第一分区中,记录的单词首字母 a-g,第二分区记录单词首字母 h-n, 第三分区记录单词首字母 o-z。

3. 辅助排序(GroupingComparator 分组):Mapreduce 框架在记录到达 reducer 之前按键对记录排序,但键所对应的值并没有被排序。甚至在不同的执行轮次中,这些值的排序也不固定,因为它们来自不同的 map 任务且这些 map 任务在不同轮次中完成时间各不相同。一般来说,大多数 MapReduce 程序会避免让 reduce 函数依赖于值的排序。但是,有时也需要通过特定的方法对键进行排序和分组等以实现对值的排序。

4. 二次排序:在自定义排序过程中,如果 compareTo 中的判断条件为两个即为二次排序。

自定义排序 WritableComparable。

bean 对象实现 WritableComparable 接口重写 compareTo 方法。

2、Hadoop数据一致性

要想了解 Hadoop 的数据一致性,需要先了解 HDFS 中的 3 个关键字Sync(),Fush(),Hsync()。换句话说 Hadoop 数据一致性其实就是 HDFS 数据一致性。

三个关键字中 Hsync()底层其实调用的是 Sync(),这个方法会将 client 端 buffer 中的数据更新到 datanode 端,直到全部的 datanode 的 ack 响应时结束调用,这就保证在 hflush 结束调用时,client 端也能够获取到一致性数据。

Hsync() 除会执行 Hflush 外,还会确保 datanode 数据更新到磁盘,这样就保证数据不会丢失,保证数据一致性。

HDFS 元数据一致性

1.Fsimage 和 Edits Logs

客户端在进行文件操纵时,NameNode 会先往 Edits Logs 文件中记录元数据的操作日志。在 Edits Logs 写满之前对内存和 Fsimage 做数据同步,即合并 Edits Logs 和 Fsimage 的数据,然后 Edits Logs 上的数据即可清除。

然而当 Edits Logs 满之后,已经再文件的上传不能中断,所以将会往一个新的文件 edits.new 上写数据, 老的 Edits Logs 的合并操作将由 SecondNameNode 来完成,即所谓的 CheckPoint 操作。

2.CheckPoint

CheckPoint 在什么情况下执行:

1.edits.log 文件大小限制,由fs.checkpoint.size配置,当 edits.log 文件达到上限,则会触发 checkpoint 操作。

2. 指定时间内进行 checkpoint 操作,由fs.checkpoint.period配置。

3.SecondaryNameNode

从 NameNode 上下载元数据信息(Fsimage、Edits),然后把二者合并,生成新的 Fsimage,在本地保存,并将其推送到 NameNode,替换旧的 Fsimage。

Checkpoint 在 SecondaryNameNode 上执行过程如下:

1.Secondary 通知 NameNode 切换 Edits 文件;

2.Secondary 从 namenode 获得 Fsimage 和 Edits(通过 http);

3.Secondary 将 Fsimage 载入内存,然后开始合并 Edits;

4.Secondary 将新的 Fsimage 发回给 NameNode

5.NameNode 用新的 Fsimage 替换旧的 Fsimage;

HDFS 文件数据一致性

1.CheckSum

HDFS 会对写入的所有数据计算校验和,并在读取数据时验证检验和,针对每个由 dfs.bytes-per-checksum 指定字节的数据计算校验和,默认为 512 个字节。

2. 租约机制

防止多个进程向同一个文件写数据的情况,采用了文件加锁的机制。而在 HDFS 中,同样需要一个机制来防止同一个文件被多个人写入数据。这种机制就是租约 (Lease),每当写入数据之前,一个客户端必须获得 namenode 发放的一个租约。Namenode 保证同一个文件只发放一个允许写的租约。

HDFS 还使用租约机制保证多个复本变更顺序的一致性,master 节点为数据块的一个复本建立一个租约,把该复本称为主复本,主复本对复本的所有更改操作进行序列化,所有的复本都遵循这个顺序进行修改操作,因此,修改操作全局顺序由 master 节点选择的租约顺序决定。

3. 一致性模型

文件系统的一致性模型 (coherency model) 描述了文件读 / 写的数据可见性。

3、Hadoop读写流程

3.1 写流程

1. 首先向 namenode 通信,请求上传文件,namenode 检查目标文件是否已存在,父目录是否存在 ,还得看看是否有上传的权限,说白了,就是判断是否可以上传

2.namenode 返回是否可以上传 ,如果可以,client 会先对文件进行切分(逻辑切分)

3. 客户端请求第一个 Block 上传到哪几个 DataNode 服务器上。

4.NameNode 返回 3 个 DataNode 节点,分别为 dn1、dn2、dn3。

5. 客户端通过 FSDataOutputStream 模块请求 dn1 上传数据,dn1 收到请求会继续调用 dn2,然后 dn2 调用 dn3,将这个通信管道建立完成。

6.dn1、dn2、dn3 逐级应答客户端。

7. 客户端开始往 dn1 上传第一个 Block(先从磁盘读取数据放到一个本地内存缓存),以 Packet 为单位,dn1 收到一个 Packet 就会传给 dn2,dn2 传给 dn3;dn1 每传一个 packet 会放入一个应答队列等待应答。

8. 当一个 Block 传输完成之后,客户端再次请求 NameNode 上传第二个 Block 的服务器。(重复执行 3-7 步)。

9. 传输完毕之后,客户端关闭流资源,并且会告诉 hdfs 数据传输完毕,然后 hdfs 收到传输完毕就恢复元数据

3.2 读流程

1. 客户端通过 Distributed FileSystem 向 NameNode 请求下载文件,NameNode 通过查询元数据,找到文件块所在的 DataNode 地址。

2. 挑选一台 DataNode(就近原则,然后随机)服务器,请求读取数据。

3.DataNode 开始传输数据给客户端(从磁盘里面读取数据输入流,以 Packet 为单位来做校验)。

4. 客户端以 Packet 为单位接收,先在本地缓存,然后写入目标文件。

5. 下载完第一块,在重复上面 2.3 步下载

4、Hadoop的Map和Reduce个数怎么确定?

深入了解该问题前,请先思考如下问题:

1.Map 和 Reduce 数量过多会导致什么问题?

2.Reduce 可以通过什么设置来增加任务数?

3. 一个 Task 的 Map 数由谁决定?

4. 一个 Task 的 Reduce 数由谁决定?

Map 数量设置:

默认:如果不设置其他参数,默认按照输入的文件数决定 Map 的数量。

通过合并文件数:在不确定 HDFS 内文件情况下 (小文件),可以通过设置小文件合并的方式来达到控制 Map 数。

输入合并小文件设置:

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; #执行Map前进行小文件合并
set mapred.max.split.size=128000000; #每个Map最大输入大小,单位为KB
set mapred.min.split.size.per.node=100000000; #一个节点上split的至少的大小,单位为KB
set mapred.min.split.size.per.rack=100000000; #一个交换机下split的至少的大小,单位为KB

输出合并小文件设置:

set hive.merge.mapfiles = true #在Map-only的任务结束时合并小文件
set hive.merge.mapredfiles = true #在Map-Reduce的任务结束时合并小文件
set hive.merge.sparkfiles = true #在hive on spark任务后开启合并小文件
set hive.merge.size.per.task = 256*1000*1000 #合并文件的大小
set hive.merge.smallfiles.avgsize=16000000 #当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge

通过参数设置 Map 数:

1. 通过设置文件切分以控制 Map 数量:mapred.min.split.size 在客户端进行设置

2. 参数:mapred.map.tasks,当然这个参数需要达到一定条件才能生效,只有当 InputFormat 决定了 map 任务的个数比mapred.map.tasks值小时才起作用

3. 还可以通过 JobConf 的conf.setNumMapTasks(int num)设置 map 数量

Reduce 数量设置:

在不指定 reduce 个数情况下,hive 会基于参数hive.exec.reducers.bytes.per.reducerhive.exec.reducers.max来控制 reduce 数量,总数不会超过参数hive.exec.reducers.max设置的值,当然也可以通过参数mapreduce.job.reduces硬性规定 reduce 数量。 也可以通过 conf.setNumReduceTasks(int num) 设置

5、Hadoop环形缓冲区

1. 为什么要环形缓冲区?

使用环形缓冲区,便于写入缓冲区和写出缓冲区同时进行

2. 为什么不等缓冲区满了再 spill?

会出现阻塞

3. 数据的分区和排序是在哪完成的?

分区是根据元数据 meta 中的分区号 partition 来分区的,排序是在 spill 的时候排序。

4. 环形缓冲区数据结构

环形缓冲区是一个数组结构,数组中存放着 key 和 value 的数据及元数据信息,元数据存储的格式是 int 类型,每个 key/value 对应一个元数据,元数据由 4 个 int 组成,第一个 int 存放 value 的起始位置,第二个 int 存放 key 的起始位置,第三个 int 存放 partition,第四个 int 存放 value 的长度。

key/value 数据和元数据在环形缓冲区中的存储是由 equator 分隔的,key/value 按照索引递增的方向存储,元数据则按照索引递减的方向存储,将数组抽象为一个环形结构之后,以 equator 为界,key/value 顺时针存储,元数据逆时针存储。

6、Hadoop小文件优化

6.1 HDFS小文件弊端

因为 HDFS 上每个文件都需要在 NameNode 上建立索引,这个索引大小约 150byte,这样当小文件比较多的时候,就会产生很多索引文件,一方面会占用大量 NameNode 的空间,另一方面索引过大会使索引速度变慢。

6.2 解决方式

(1)Hadoop 本身提供了一些文件压缩的方案。

(2)从系统层面改变现有 HDFS 存在的问题,其实主要还是小文件的合并,然后建立比较快速的索引。

6.3 Hadoop内置小文件处理方法

(1)Hadoop Archive [ˈɑːkaɪv]: 是一个高效地将小文件放入 HDFS 块中的文件存档工具,它能够将多个小文件打包成一个 HAR 文件,这样在减少 namenode 内存使用的同时。

(2)Sequence File: Sequence File 由一系列的二进制 key/value 组成,如果为 key 小文件名,value 为文件内容,则可以将大批小文件合并成一个大文件。

(3)CombineFileInputFormat: CombineFileInputFormat 是一种新的 inputformat,用于将多个文件合并成一个单独的 split,另外,它会考虑数据的存储位置

7、MapReduce实现两个表的Join

Reduce Side Join:

在 map 阶段,map 函数同时读取两个文件 File1 和 File2,为了区分两种来源的 key/value 数据对,对每条数据打一个标签(tag), 比如:tag=0 表示来自文件 File1,tag=2 表示来自文件 File2。

Map Side Join:

针对两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个 map task 内存中存在一份(比如存放到 hash table 中),然后只扫描大表:对于大表中的每一条记录 key/value,在 hash table 中查找是否有相同的 key 的记录,如果有,则连接后输出即可。

8、如果没有定义Partitioner,数据在Reduce是被如何分区的

如果没有自定义的 partitioning,则默认的 partition 算法,即根据每一条数据的 key 的 hashcode 值模运算(%)Reduce 的数量,得到的数字就是分区号

9、MapTask和ReduceTask的工作机制

1.MapTask 工作机制

Read 阶段:Map Task 通过用户编写的 RecordReader,从输入 InputSplit 中解析出一个个 key/value。

Map 阶段:该节点主要是将解析出的 key/value 交给用户编写 map() 函数处理,并产生一系列新的 key/value。

Collect 收集阶段:在用户编写 map()函数中,当数据处理完成后,一般会调用 OutputCollector.collect() 输出结果。在该函数内部,它会将生成的 key/value 分区(调用 Partitioner),并写入一个环形内存缓冲区中。

Spill 阶段:溢写,当环形缓冲区满后,MapReduce 会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

Combine 阶段:当所有数据处理完成后,MapTask 对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

2.ReduceTask 工作机制

Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

Merge 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。

Sort 阶段:按照 MapReduce 语义,用户编写 reduce() 函数输入数据是按 key 进行聚集的一组数据。为了将 key 相同的数据聚在一起,Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此,ReduceTask 只需对所有数据进行一次归并排序即可。

Reduce 阶段:reduce() 函数将计算结果写到 HDFS 上。

10、Hadoop序列化和反序列化及自定义Bean对象实现序列化

序列化:就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储(持久化)和网络传输。

反序列化:就是将收到字节序列(或其他数据传输协议)或者是硬盘的持久化数据,转换成内存中的对象。

Hadoop 序列化特点:

紧凑:高效使用存储空间

快速:读写数据的额外开销小

可扩展:随着通讯协议的升级而可升级

互操作:支持多语言的交互

自定义 bean 对象要想序列化传输步骤及注意事项:

必须实现 Writable 接口;

反序列化时,需要反射调用空参构造函数,所以必须有空参构造;

重写序列化方法;

重写反序列化方法;

注意反序列化的顺序和序列化的顺序完全一致;

要想把结果显示在文件中,需要重写 toString(),且用"\t"分开,方便后续用;

如果需要将自定义的 bean 放在 key 中传输,则还需要实现 comparable 接口,因为 mapreduce 框中的 shuffle 过程一定会对 key 进行排序;

11、Hadoop和Spark区别

Hadoop 的数据处理单位是 block,Spark 提供了可供并行处理的数据抽象 RDD

Hadoop 对数据处理只提供了 Map 和 Reduce 两种操作。Spark 提供了两大类算子 transformation 和 action,支持的操作更多。

Hadoop 只支持 Map->Reduce 的流程。Spark 则依赖 DAG 有向无环图的方式来执行 Job。速度更快。

Spark 提供了 Hadoop 所不支持的 cache 和 运行时 checkpoint 机制。大大的提高了计算速度和程序可靠性。

Spark 会对 Job 划分 Stage。同一个 Stage 内的 task 可以用流水线机制执行,大大提高了速度。

Shuffle 机制:Hadoop 的 MapReduce 不支持在线聚合。Spark 采用了类 HashMap 的结构(三种数据结构)实现了自动聚合功能。Spark 在对 Record 进行排序的时候可以通过 Partition Id 和 key 进行排序的方式,Hadoop 只能通过 key 进行排序

12、Hadoop数据倾斜

hadoop 据倾斜的表现

有一个多几个 Reduce 卡住,卡在 99.99%,一直不能结束。

各种 container 报错 OOM

异常的 Reducer 读写的数据量极大,至少远远超过其它正常的 Reducer

伴随着数据倾斜,会出现任务被 kill 等各种诡异的表现。

Hadoop 中数据倾斜的处理

抽样和范围分区:对原始数据抽样得到的结果集来预设分区边界值。

自定义分区:对于热点 key 发送到一部分 Reduce 实例,其他的发送给剩余的 Reduce 实例。

Combiner:可以聚合精简数据,大量减少数据倾斜。适合于 Sum()求和,并不适合 Average() 求平均数。

局部聚合加全局聚合:第一次在 map 阶段对那些导致了数据倾斜的 key 加上 1 到 n 的随机前缀,这样本来相同的 key 也会被分到多个 Reducer 中进行局部聚合,数量就会大大降低。第二次 MapReduce,去掉 key 的随机前缀,进行全局聚合。

13、Hadoop的MapReduce中的几种Join

在 mapreduce 中分为 3 大 join 逻辑, 分别的是 Map 端的 Join,Reduce 端的 Join,semi join

map 端的 join:实现逻辑: 首先他会有 2 个 map 任务,第一个 Map 任务会将小表的数据完全加载到内存中,并且将数据映射成 hashmap 的数据结构,在该结构中 key 就是对应的我们连接的那个 key, 第 2 个 map 任务会去扫描大表的数据, 与小表中的数据的 key 去匹配,如果相等,就进行一个连接操作

reduce 端的 join:map 任务会将将每个表映射成 k,v 的数据结构,并对表的数据进行打入来源标记, 在 reduce 任务中,获取 2 张表享同的 key 就进行一个连接操作

semi join:该 join 的是实现其实就是 reduce 端 join 的一个优化 ,会在 map 端过滤掉不能 join 的数据, 这样就可以减少数据的传输,减少磁盘 io.

14、Hadoop的角色和作用

14.1 NameNode

1)接受客户端读写请求

2)管理元数据信息

3)接受 DataNode 的心跳报告

4)负载均衡

5)负载数据块的副本的存储节点分配

14.2 DataNode

1)处理客户端的读写请求

2)真正的进行数据块的存储

3)向 namenode 发送心跳报告 4)进行副本的复制

14.3.secondarynamenode

合并 fsimage 和 edits 文件

1)帮助 namenode 备份元数据信息,冷备份,备份的元数据信息是否是最新的 ,有一定的数据延迟的,可能造成数据丢失

2)帮助 namenode 进行元数据合并 减轻 namenode 的压力

14.4.客户端

1)发送读写请求

2)对上传的文件进行逻辑切块 和物理切块

3)向 namenode 反馈数据上传 下载的响应

14.5.YARN

ResourceManager(一个集群只有一个)

ApplicationMaster(每个应用都有一个)

NodeManagers (每个节点都有一个)

集群的主节点 ResourceManager 的职责:

1)处理客户端请求

2)启动或监控 MRAppMaster

3)监控 NodeManager 的健康状况 nodemanager 定期的向 resourcemanager 进行发送心跳报告

4)资源的分配与调度

nodemanager 的职责:

1)管理单个节点上的资源

2)处理来自 ResourceManager 的命令 (启动 mrappmaster 的时候)

3)处理来自 MRAppMaster 的命令(启动 maptask reducetask 任务的时候)

Scheduler(调度器)

调度器根据容量、队列等限制条件(如每个队列分配一定的资源,最多执行一定数量的作业等),将系统中的资源分配给各个正在运行的应用程序;在资源紧张的情况下,可以 kill 掉优先级低的,来运行优先级高的任务。

Applications Manager(应用程序管理器)

负责管理整个系统中所有应用程序,包括应用程序提交、与调度器协商资源以启动 ApplicationMaster、监控 ApplicationMaster 运行状态并在失败时重新启动它等。

ApplicationMaster

ApplicationMaster 管理在 YARN 内运行的每个应用程序实例。每个应用程序对应一个 ApplicationMaster。ApplicationMaster 负责协调来自 ResourceManager 的资源,并通过 NodeManager 监视容器的执行和资源使用(CPU、内存等的资源分配),通俗讲是管理发起的任务,随着任务创建而创建,任务的完成而结束。

Container

Container 是 YARN 中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等,当 AM 向 RM 申请资源时,RM 为 AM 返回的资源便是用 Container 表示的。YARN 会为每个任务分配一个 Container,且该任务只能使用该 Container 中描述的资源。

15、Yarn任务提交流程

(1)作业提交

第 1 步:Client 调用job.waitForCompletion方法,向整个集群提交 MapReduce 作业。

第 2 步:Client 向 RM 申请一个作业 id。

第 3 步:RM 给 Client 返回该 job 资源的提交路径和作业 id。

第 4 步:Client 提交 jar 包、切片信息和配置文件到指定的资源提交路径。

第 5 步:Client 提交完资源后,向 RM 申请运行 MrAppMaster。

(2)作业初始化

第 6 步:当 RM 收到 Client 的请求后,将该 job 添加到容量调度器中。

第 7 步:某一个空闲的 NM 领取到该 Job。

第 8 步:该 NM 创建 Container,并产生 MrAppMaster。

第 9 步:下载 Client 提交的资源到本地。

(3)任务分配

第 10 步:MrAppMaster 向 RM 申请运行多个 MapTask 任务资源。

第 11 步:RM 将运行 MapTask 任务分配给另外两个 NodeManager,另两个 NodeManager 分别领取任务并创建容器。

(4)任务运行

第 12 步:MR 向两个接收到任务的 NodeManager 发送程序启动脚本,这两个 NodeManager 分别启动 MapTask,MapTask 对数据分区排序。

第 13 步:MrAppMaster 等待所有 MapTask 运行完毕后,向 RM 申请容器,运行 ReduceTask。

第 14 步:ReduceTask 向 MapTask 获取相应分区的数据。

第 15 步:程序运行完毕后,MR 会向 RM 申请注销自己。

(5)进度和状态更新

YARN 中的任务将其进度和状态 (包括 counter) 返回给应用管理器, 客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置 ) 向应用管理器请求进度更新, 展示给用户。

(6)作业完成

除了向应用管理器请求作业进度外, 客户端每 5 秒都会通过调用waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval来设置。作业完成之后, 应用管理器和 Container 会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。

动物装饰