1、Hadoop之MapReduce

1.1 Map阶段

  1. 对文件进行逻辑切片split,默认大小为hdfs块大小,每一块对应一个mapTask; ​

  2. 对切片中的数据按行读取,解析返回<K,V>形式,key为每一行的偏移量,value为每一行的数据; ​

  3. 调用Map方法处理数据,读取一行调用一次; ​

  4. 对Map方法计算的数据进行分区partition,排序sort; ​ 默认不分区,因为只有一个reduceTask处理数据,分区数=reduceTask数,计算规则:key的hash值对reduce取模,保证相同key一定在同一分区 ​

  5. Map输出数据一同写入到数据缓冲区,达到一定的条件溢写到磁盘 ​

  6. spill溢出的文件进行combiner规约,combiner为map阶段的reduce,并不是每个mapTask都有该流程,对于combine需要慎用,例如:求平均数,如果提前combine则会导致最终的计算结果不一致 ​

  7. 对所有溢出的文件(分区且有序)进行最终merge合并,成为一个大文件

1.2 Reduce阶段

  1. 从MapTask复制拉取其对应的分区文件 ​

  2. 将copy的数据进行merge合并,再对合并后的数据排序,默认按照key字典序排序 ​

  3. 对排序后的数据调用reduce方法

1.3 Shuffle原理

shuffle被称作MapReduce的心脏,是MapReduce的核心。

Map端的shuffle:

1.collect阶段:在map端首先接触的是InputSplit,在InputSplit中含有DataNode中的数据,每一个InputSplit都会分配一个Mapper任务,Mapper任务结束后产生<K2,V2>的输出,这些输出先存放在缓存中,每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spil l.percent),一个后台线程就把内容写到(spill)Linux本地磁盘中的指定目录(mapred.local.dir)下的新建的一个溢出写文件。

2.Spill阶段:写磁盘前,要进行partition、sort和combine等操作。通过分区,将不同类型的数据分开处理,之后对不同分区的数据进行排序,如果有Combiner,还要对排序后的数据进行combine。等最后记录写完,将全部溢出文件合并为一个分区且排序的文件。

3.Merge阶段:最后将磁盘中的数据送到Reduce中,从图中可以看出Map输出有三个分区,有一个分区数据被送到图示的Reduce任务中,剩下的两个分区被送到其他Reducer任务中。而图示的Reducer任务的其他的三个输入则来自其他节点的Map输出。

Reduce端的shuffle:

Reduce端的shuffle主要包括三个阶段,copy、sort(merge)和reduce。

1.copy阶段:Reducer通过Http方式得到输出文件的分区。

reduce端可能从n个map的结果中获取数据,而这些map的执行速度不尽相同,当其中一个map运行结束时,reduce就会从JobTracker中获取该信息。map运行结束后TaskTracker会得到消息,进而将消息汇报给JobTracker,reduce定时从JobTracker获取该信息,reduce端默认有5个数据复制线程从map端复制数据。

2.merge阶段:如果形成多个磁盘文件会进行合并。

从map端复制来的数据首先写到reduce端的缓存中,同样缓存占用到达一定阈值后会将数据写到磁盘中,同样会进行partition、combine、排序等过程。如果形成了多个磁盘文件还会进行合并,最后一次合并的结果作为reduce的输入而不是写入到磁盘中。

3.reducer的参数:最后将合并后的结果作为输入传入Reduce任务中。

最后就是Reduce过程了,在这个过程中产生了最终的输出结果,并将其写到HDFS上。

1.4 MapReduce中的几种排序

排序发生阶段:

一个是在MapSide发生在Spill后Partition前

一个是在ReduceSide发生在copy后Reduce前

排序分类:

1.部分排序:MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部排序。

2.全排序:如何用Hadoop产生一个全局排序的文件?

最简单的方法是使用一个分区。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构。

替代方案:首先创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要思路是使用一个分区来描述输出的全局排序。

例如:可以为待分析文件创建3个分区,在第一分区中,记录的单词首字母a-g,第二分区记录单词首字母h-n, 第三分区记录单词首字母o-z。

3.辅助排序(GroupingComparator分组):Mapreduce框架在记录到达reducer之前按键对记录排序,但键所对应的值并没有被排序。甚至在不同的执行轮次中,这些值的排序也不固定,因为它们来自不同的map任务且这些map任务在不同轮次中完成时间各不相同。一般来说,大多数MapReduce程序会避免让reduce函数依赖于值的排序。但是,有时也需要通过特定的方法对键进行排序和分组等以实现对值的排序。

4.二次排序:在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

自定义排序WritableComparable。

bean对象实现WritableComparable接口重写compareTo方法。

2、Hadoop数据一致性

要想了解Hadoop的数据一致性,需要先了解HDFS中的3个关键字Sync(),Fush(),Hsync()。换句话说Hadoop数据一致性其实就是HDFS数据一致性。

三个关键字中Hsync()底层其实调用的是Sync(),这个方法会将client端buffer中的数据更新到datanode端,直到全部的datanode的ack响应时结束调用,这就保证在hflush结束调用时,client端也能够获取到一致性数据。

Hsync()除会执行Hflush外,还会确保datanode数据更新到磁盘,这样就保证数据不会丢失,保证数据一致性。

HDFS元数据一致性

1.Fsimage和Edits Logs

客户端在进行文件操纵时,NameNode会先往Edits Logs文件中记录元数据的操作日志。在Edits Logs写满之前对内存和Fsimage做数据同步,即合并Edits Logs和Fsimage的数据,然后Edits Logs上的数据即可清除。

然而当Edits Logs满之后,已经再文件的上传不能中断,所以将会往一个新的文件edits.new上写数据, 老的Edits Logs的合并操作将由SecondNameNode来完成,即所谓的CheckPoint操作。

2.CheckPoint

CheckPoint在什么情况下执行:

1.edits.log文件大小限制,由fs.checkpoint.size配置,当edits.log文件达到上限,则会触发checkpoint操作。

2.指定时间内进行checkpoint操作,由fs.checkpoint.period配置。

3.SecondaryNameNode

从NameNode上下载元数据信息(Fsimage、Edits),然后把二者合并,生成新的Fsimage,在本地保存,并将其推送到NameNode,替换旧的Fsimage。

Checkpoint在SecondaryNameNode上执行过程如下:

1.Secondary通知NameNode切换Edits文件;

2.Secondary从namenode获得Fsimage和Edits(通过http);

3.Secondary将Fsimage载入内存,然后开始合并Edits;

4.Secondary将新的Fsimage发回给NameNode

5.NameNode用新的Fsimage替换旧的Fsimage;

HDFS文件数据一致性

1.CheckSum

HDFS会对写入的所有数据计算校验和,并在读取数据时验证检验和,针对每个由dfs.bytes-per-checksum指定字节的数据计算校验和,默认为512个字节。

2.租约机制

防止多个进程向同一个文件写数据的情况,采用了文件加锁的机制。而在HDFS中,同样需要一个机制来防止同一个文件被多个人写入数据。这种机制就是租约(Lease),每当写入数据之前,一个客户端必须获得namenode发放的一个租约。Namenode保证同一个文件只发放一个允许写的租约。

HDFS还使用租约机制保证多个复本变更顺序的一致性,master节点为数据块的一个复本建立一个租约,把该复本称为主复本,主复本对复本的所有更改操作进行序列化,所有的复本都遵循这个顺序进行修改操作,因此,修改操作全局顺序由master节点选择的租约顺序决定。

3.一致性模型

文件系统的一致性模型(coherency model)描述了文件读/写的数据可见性。

3、Hadoop读写流程

3.1 写流程

1.首先向namenode通信,请求上传文件,namenode检查目标文件是否已存在,父目录是否存在 ,还得看看是否有上传的权限,说白了,就是判断是否可以上传

2.namenode返回是否可以上传 ,如果可以,client会先对文件进行切分(逻辑切分)

3.客户端请求第一个 Block上传到哪几个DataNode服务器上。

4.NameNode返回3个DataNode节点,分别为dn1、dn2、dn3。

5.客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成。

6.dn1、dn2、dn3逐级应答客户端。

7.客户端开始往dn1上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以Packet为单位,dn1收到一个Packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个应答队列等待应答。

8.当一个Block传输完成之后,客户端再次请求NameNode上传第二个Block的服务器。(重复执行3-7步)。

9.传输完毕之后,客户端关闭流资源,并且会告诉hdfs数据传输完毕,然后hdfs收到传输完毕就恢复元数据

3.2 读流程

1.客户端通过Distributed FileSystem向NameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址。

2.挑选一台DataNode(就近原则,然后随机)服务器,请求读取数据。

3.DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来做校验)。

4.客户端以Packet为单位接收,先在本地缓存,然后写入目标文件。

5.下载完第一块,在重复上面2.3步下载

4、Hadoop的Map和Reduce个数怎么确定?

深入了解该问题前,请先思考如下问题:

1.Map和Reduce数量过多会导致什么问题?

2.Reduce可以通过什么设置来增加任务数?

3.一个Task的Map数由谁决定?

4.一个Task的Reduce数由谁决定?

Map数量设置:

默认:如果不设置其他参数,默认按照输入的文件数决定Map的数量。

通过合并文件数:在不确定HDFS内文件情况下(小文件),可以通过设置小文件合并的方式来达到控制Map数。

输入合并小文件设置:

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; #执行Map前进行小文件合并
set mapred.max.split.size=128000000; #每个Map最大输入大小,单位为KB
set mapred.min.split.size.per.node=100000000; #一个节点上split的至少的大小,单位为KB
set mapred.min.split.size.per.rack=100000000; #一个交换机下split的至少的大小,单位为KB

输出合并小文件设置:

set hive.merge.mapfiles = true #在Map-only的任务结束时合并小文件
set hive.merge.mapredfiles = true #在Map-Reduce的任务结束时合并小文件
set hive.merge.sparkfiles = true #在hive on spark任务后开启合并小文件
set hive.merge.size.per.task = 256*1000*1000 #合并文件的大小
set hive.merge.smallfiles.avgsize=16000000 #当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge

通过参数设置Map数:

1.通过设置文件切分以控制Map数量:mapred.min.split.size 在客户端进行设置

2.参数:mapred.map.tasks,当然这个参数需要达到一定条件才能生效,只有当InputFormat决定了map任务的个数比mapred.map.tasks值小时才起作用

3.还可以通过JobConf 的conf.setNumMapTasks(int num)设置map数量

Reduce数量设置:

在不指定reduce个数情况下,hive会基于参数hive.exec.reducers.bytes.per.reducerhive.exec.reducers.max来控制reduce数量,总数不会超过参数hive.exec.reducers.max设置的值,当然也可以通过参数mapreduce.job.reduces硬性规定reduce数量。 也可以通过 conf.setNumReduceTasks(int num) 设置

5、Hadoop环形缓冲区

1.为什么要环形缓冲区?

使用环形缓冲区,便于写入缓冲区和写出缓冲区同时进行

2.为什么不等缓冲区满了再spill?

会出现阻塞

3.数据的分区和排序是在哪完成的?

分区是根据元数据meta中的分区号partition来分区的,排序是在spill的时候排序。

4.环形缓冲区数据结构

环形缓冲区是一个数组结构,数组中存放着key和value的数据及元数据信息,元数据存储的格式是int类型,每个key/value对应一个元数据,元数据由4个int组成,第一个int存放value的起始位置,第二个int存放key的起始位置,第三个int存放partition,第四个int存放value的长度。

key/value数据和元数据在环形缓冲区中的存储是由equator分隔的,key/value按照索引递增的方向存储,元数据则按照索引递减的方向存储,将数组抽象为一个环形结构之后,以equator为界,key/value顺时针存储,元数据逆时针存储。

6、Hadoop小文件优化

6.1 HDFS小文件弊端

因为HDFS上每个文件都需要在NameNode上建立索引,这个索引大小约150byte,这样当小文件比较多的时候,就会产生很多索引文件,一方面会占用大量NameNode的空间,另一方面索引过大会使索引速度变慢。

6.2 解决方式

(1)Hadoop本身提供了一些文件压缩的方案。

(2)从系统层面改变现有HDFS存在的问题,其实主要还是小文件的合并,然后建立比较快速的索引。

6.3 Hadoop内置小文件处理方法

(1)Hadoop Archive [ˈɑːkaɪv]: 是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,这样在减少namenode内存使用的同时。

(2)Sequence File: Sequence File由一系列的二进制key/value组成,如果为key小文件名,value为文件内容,则可以将大批小文件合并成一个大文件。

(3)CombineFileInputFormat: CombineFileInputFormat是一种新的inputformat,用于将多个文件合并成一个单独的split,另外,它会考虑数据的存储位置

7、MapReduce实现两个表的Join

Reduce Side Join:

在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0 表示来自文件File1,tag=2 表示来自文件File2。

Map Side Join:

针对两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task 内存中存在一份(比如存放到hash table 中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table 中查找是否有相同的key 的记录,如果有,则连接后输出即可。

8、如果没有定义Partitioner,数据在Reduce是被如何分区的

如果没有自定义的partitioning,则默认的partition算法,即根据每一条数据的key的hashcode值模运算(%)Reduce的数量,得到的数字就是分区号

9、MapTask和ReduceTask的工作机制

1.MapTask工作机制

Read阶段:Map Task通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。

Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。

Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。

Spill阶段:溢写,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

2.ReduceTask工作机制

Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。

Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。

Reduce阶段:reduce()函数将计算结果写到HDFS上。

10、Hadoop序列化和反序列化及自定义Bean对象实现序列化

序列化:就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储(持久化)和网络传输。

反序列化:就是将收到字节序列(或其他数据传输协议)或者是硬盘的持久化数据,转换成内存中的对象。

Hadoop序列化特点:

紧凑:高效使用存储空间

快速:读写数据的额外开销小

可扩展:随着通讯协议的升级而可升级

互操作:支持多语言的交互

自定义bean对象要想序列化传输步骤及注意事项:

必须实现Writable接口;

反序列化时,需要反射调用空参构造函数,所以必须有空参构造;

重写序列化方法;

重写反序列化方法;

注意反序列化的顺序和序列化的顺序完全一致;

要想把结果显示在文件中,需要重写toString(),且用"\t"分开,方便后续用;

如果需要将自定义的bean放在key中传输,则还需要实现comparable接口,因为mapreduce框中的shuffle过程一定会对key进行排序;

11、Hadoop和Spark区别

Hadoop的数据处理单位是block,Spark 提供了可供并行处理的数据抽象RDD

Hadoop 对数据处理只提供了Map和Reduce 两种操作。Spark 提供了两大类算子transformation 和 action,支持的操作更多。

Hadoop 只支持Map->Reduce 的流程。Spark 则依赖DAG 有向无环图的方式来执行Job。速度更快。

Spark 提供了Hadoop 所不支持的cache 和 运行时checkpoint 机制。大大的提高了计算速度和程序可靠性。

Spark 会对Job 划分Stage。同一个Stage 内的task 可以用流水线机制执行,大大提高了速度。

Shuffle 机制:Hadoop 的MapReduce 不支持在线聚合。Spark 采用了类HashMap的结构(三种数据结构)实现了自动聚合功能。Spark 在对Record进行排序的时候可以通过Partition Id 和 key进行排序的方式,Hadoop 只能通过key进行排序

12、Hadoop数据倾斜

hadoop据倾斜的表现

有一个多几个Reduce卡住,卡在99.99%,一直不能结束。

各种container报错OOM

异常的Reducer读写的数据量极大,至少远远超过其它正常的Reducer

伴随着数据倾斜,会出现任务被kill等各种诡异的表现。

Hadoop 中数据倾斜的处理

抽样和范围分区:对原始数据抽样得到的结果集来预设分区边界值。

自定义分区:对于热点 key 发送到一部分 Reduce 实例,其他的发送给剩余的 Reduce 实例。

Combiner:可以聚合精简数据,大量减少数据倾斜。适合于Sum()求和,并不适合Average()求平均数。

局部聚合加全局聚合:第一次在map阶段对那些导致了数据倾斜的key 加上1到n的随机前缀,这样本来相同的key 也会被分到多个 Reducer 中进行局部聚合,数量就会大大降低。第二次 MapReduce,去掉 key 的随机前缀,进行全局聚合。

13、Hadoop的MapReduce中的几种Join

在mapreduce中分为3大join逻辑,分别的是Map端的Join,Reduce端的Join,semi join

map端的join:实现逻辑:首先他会有2个map任务,第一个Map任务会将小表的数据完全加载到内存中,并且将数据映射成hashmap的数据结构,在该结构中key就是对应的我们连接的那个key,第2个map任务会去扫描大表的数据,与小表中的数据的key去匹配,如果相等,就进行一个连接操作

reduce端的join:map任务会将将每个表映射成k,v的数据结构,并对表的数据进行打入来源标记,在reduce任务中,获取2张表享同的key就进行一个连接操作

semi join:该join的是实现其实就是reduce端join 的一个优化 ,会在map端过滤掉不能join的数据,这样就可以减少数据的传输,减少磁盘io.

14、Hadoop的角色和作用

14.1 NameNode

1)接受客户端读写请求

2)管理元数据信息

3)接受DataNode的心跳报告

4)负载均衡

5)负载数据块的副本的存储节点分配

14.2 DataNode

1)处理客户端的读写请求

2)真正的进行数据块的存储

3)向namenode发送心跳报告 4)进行副本的复制

14.3.secondarynamenode

合并fsimage和edits文件

1)帮助namenode备份元数据信息,冷备份,备份的元数据信息是否是最新的 ,有一定的数据延迟的,可能造成数据丢失

2)帮助namenode进行元数据合并 减轻namenode的压力

14.4.客户端

1)发送读写请求

2)对上传的文件进行逻辑切块 和物理切块

3)向namenode反馈数据上传 下载的响应

14.5.YARN

ResourceManager(一个集群只有一个)

ApplicationMaster(每个应用都有一个)

NodeManagers (每个节点都有一个)

集群的主节点 ResourceManager 的职责:

1)处理客户端请求

2)启动或监控 MRAppMaster

3)监控 NodeManager的健康状况 nodemanager定期的向resourcemanager进行发送心跳报告

4)资源的分配与调度

nodemanager的职责:

1)管理单个节点上的资源

2)处理来自 ResourceManager 的命令 (启动mrappmaster的时候)

3)处理来自 MRAppMaster 的命令(启动maptask reducetask任务的时候)

Scheduler(调度器)

调度器根据容量、队列等限制条件(如每个队列分配一定的资源,最多执行一定数量的作业等),将系统中的资源分配给各个正在运行的应用程序;在资源紧张的情况下,可以kill掉优先级低的,来运行优先级高的任务。

Applications Manager(应用程序管理器)

负责管理整个系统中所有应用程序,包括应用程序提交、与调度器协商资源以启动ApplicationMaster、监控ApplicationMaster运行状态并在失败时重新启动它等。

ApplicationMaster

ApplicationMaster 管理在YARN内运行的每个应用程序实例。每个应用程序对应一个ApplicationMaster。ApplicationMaster 负责协调来自 ResourceManager 的资源,并通过 NodeManager 监视容器的执行和资源使用(CPU、内存等的资源分配),通俗讲是管理发起的任务,随着任务创建而创建,任务的完成而结束。

Container

Container是YARN中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等,当AM向RM申请资源时,RM为AM返回的资源便是用Container表示的。YARN会为每个任务分配一个Container,且该任务只能使用该Container中描述的资源。

15、Yarn任务提交流程

(1)作业提交

第1步:Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。

第2步:Client向RM申请一个作业id。

第3步:RM给Client返回该job资源的提交路径和作业id。

第4步:Client提交jar包、切片信息和配置文件到指定的资源提交路径。

第5步:Client提交完资源后,向RM申请运行MrAppMaster。

(2)作业初始化

第6步:当RM收到Client的请求后,将该job添加到容量调度器中。

第7步:某一个空闲的NM领取到该Job。

第8步:该NM创建Container,并产生MrAppMaster。

第9步:下载Client提交的资源到本地。

(3)任务分配

第10步:MrAppMaster向RM申请运行多个MapTask任务资源。

第11步:RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。

(4)任务运行

第12步:MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。

第13步:MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。

第14步:ReduceTask向MapTask获取相应分区的数据。

第15步:程序运行完毕后,MR会向RM申请注销自己。

(5)进度和状态更新

YARN中的任务将其进度和状态(包括counter)返回给应用管理器, 客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新, 展示给用户。

(6)作业完成

除了向应用管理器请求作业进度外, 客户端每5秒都会通过调用waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval来设置。作业完成之后, 应用管理器和Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。