`

MapReduce设计模式:Joins

 
阅读更多

 

MapReduce设计模式:Joins

 

Join这种模式在MapReduce中也是经常出现,首先我们借助于SQL中的概念来理解下Join的内容,对于熟悉SQL的读者可以快速略过该部分:

假如由集合A和B,各有数个记录,记录的内容可以参考SQL中的行数据;其中A中id字段和B中的id字段相同,可以参看这里:http://www.w3schools.com/sql/sql_join.asp。注意我们只是借用了SQL中的JOIN的概念,实际上介绍的要比SQL中的JOIN要丰富的多,可以参考下面的介绍:

Inner Join:对于A和B集合中的数据,其中Join的字段只有同时出现在A和B中,才会出现在最终的结果集合中。

Outer Join:和Inner Join类似,不过记录并没有限制在两者必须同时出现,单出现在一个集合中也有可能出现在结果集合中

  Left Outer Join:位于Left的集合会出现在最终结果集合里,如果右边的集合没有,最终出现的记录为null

  Right Outer Join:位于Right的集合会出现在最终结果集合里,如果左边的集合没有数据,最终出现的记录为null,和Left Join意义一致,只是内容相反。

  Full Join:这个是上面Left Join和Right Join的集合,A和B集合中的数据都会出现在最终结果中

Anti Join:这个集合是Full Join的结果减去Inner Join的结果,实际上是AUB-A^B

Cross Join:这个结果比较有意思,是Cartesian积的意思,对应于SQL中的cross join的结果

 

这几个Join的形式一定要明白,明白其中的差异和相同之处,还有相互之间的关系;先在头脑中建立一种整体概念最关键,其实按照这个思路学习将SQL的Join关系整理一下也不错。

 

假如说现在有集合S和T,其中假设S集合的数据格式如下所示:

 

(k1,s1,S1)
(k2,s2,S2)
(k3,s3,S3)
(k5,s2,S2)
......

 

其中k表示记录的唯一标识,s表示记录的unique id,S表示记录的其它属性

同样T集合的数据和S集合类似:

 

(k1,t1,T1)
(k2,t2,T2)
(k3,t3,T3)
(k7,t3,T3)
......

 

会不会有人以为上面的记录有问题呢?其实上面的集合S和T的记录是没有问题的,觉得有问题的同学可以仔细推敲下。

 

Reducer-Side Join

 

首先我们先看下Reducer-side Join,在Mapper阶段已经按照k参数汇总好数据,在Reducer阶段按照k值得到汇总结果,其中有MapReduce阶段保证同一个key值得value,按照顺序输入到Reducer中。根据数据集合的关系,有一下三种情况需要考虑:

首先是one-to-one的情况,在这种情况下,针对同一个k值,其中的value值分别来自于S和T,针对Reducer-side的算法都适合用,由于Mapper-Shuffle-Sort只能保证同一个key能够映射到同一个Reducer上,并没有保证同一个key的value值得迭代的顺序,那么有可能会出现下面这种情况:

 

k27:[(s22,S22),(t71,T71)]
k27:[(s33,S33)]
k27:[(t99,T99)]
k27:[(t65,T65),(s87,S87)]

 

上面这几种情况都会出现,有可能是S和T同时出现,但是顺序不定,也有可能只出现S或者T,针对不同的Join类型,我们要考虑不同的结果;如对于Inner Join,只出现S或者T肯定是直接略过不考虑,但是对于Outer Join、Cross Join就要处理(如果对于这句话不理解,请再看下前面的Join的内容);这种one-to-one的逻辑比较简单,不再多说,

 

其次是one-to-many的情况,我们假设S为one、T为many的情况,我们可以根据Partitioner保证S在前,T在后出现,这样我们就可以得到S和集合T的各种操作,但是我们无法保证one出现在前,many出现在后面。如果出现many在前,one在后的情况,那麻烦了,需要单独迭代两次才能在Reducer阶段完成,这个对于我们开发者明显不能接受。

解决方法是我们上节提到的Secondary-Sorting,我们可以将键值转换一下,将Mapper输出的结果转换为一下形式:

(k88,s67):[(S67)]
(k88,t22):[(T22)]
(k88,t101):[(T101)]
(k88,t167):[(T167)]
......

 

利用我们上节的Secondary-Sorting的内容,可以得到这个Mapper结果,准确的说是Reducer的输入。有读者可能会有疑问,这样没有办法保证one在前,many在后啊!确实这个也无法保证这一点,但是这种处理有个前面无与伦比的优势:这种方法将逐条处理每条记录,根据Join类型如果能够得到其中的one,就可以直接处理one和many的关系了,其实就是处理one和同一key的many之间的关系,这个时候内存里面最多会有两条记录进行处理,这种处理还能解决处理扩展性的瓶颈;不过出现最坏的情况也是和上面的内容类似,需要在内存里面维护同一key值得所以S和T的集合内容。当然了,具体的处理还和Join的内容有关系。

 

第三种是many-to-many的情况。对于集合S和T,可以出现下面这些数据:

(k82,s105):[(S105)]
(k82,s124):[(S124)]
(k82,s130):[(S130)]
......
(k82,t98):[(T98)]
(k82,t121):[(T121)]

 

我们能够确保集合S在前,T在后。按照Secondary-Sorting的处理过程依然可以使用,处理方法不变;但是在这种情况下Inner Join、Outer Join的处理都免不了将同一个key值的S或者T集合载入到内存中;同样这种处理过程需要根据具体的Join类型来相应的处理。

 

上面我们是将Join放在Reducer-side做,这个肯定不是高效的处理方法,那么Join操作能够放到Mapper-side做吗?答案是肯定的。我们可以得到Join操作的第二种方法:Mapper-Side的Join操作。

 

Mapper-side的输入能像Reducer-side阶段的输入的话,Reducer-side的操作可以直接在Mapper阶段做。这样的话就能省掉Shuffle和Sort的操作,这个提升是巨大的。不过Mapper阶段能保证这一点嘛?常规情况下确实不能。不过存在这种情况,如果该Mapper的输入时前一个Mapreduce程序的输出的话,这一点就能保证了,并且在大多数情况下也确实是这样。

 

还有这么一种情况,如果输入的文件集合S和T集合本身就是按照key排序好的记录,并且这些记录按照一定的顺序放在不同的文件中,那么我们可以同时分别处理相应的文件,同步对应处理集合S和T的输入。为什么会有这种形式的集合文件呢?

应该能够想到,如果这些个文件是其它MapReduce程序的输出的话,确实能够产生这样的文件,这样的文件我们就符合我们上面提到的要求。首先集合S和T是另外MapReduce程序的输出数据集合,其次输出S和T集合的Reducer阶段必须有相同的Reduce数目。这两个限制其实都并不难,所以分别对集合S和T集合的子文件进行并行处理时完全可能的。

 

还有以一种Join方法被称为是Memory-Based Join。顾名思义,就是将Join操作放在内存里面进行。这种情况按照我们上面的数据集合不现实,但是如果数据集合S和T的差异比较大,其中S的集合数据大小能够载入到内存中的话,MemoryBased Join就可以进行了。可以将集合S在每个Mapper阶段读入到内存中,然后对T进行各种Join操作,这个性能和速度提升是很明显的。其中将集合S载入到内存中这个阶段可以在每个Mapper的setup阶段执行,这个阶段会在每个Mapper阶段启动前执行一系列过程。

如果文件差异不大的话,还想使用这种Memory-Based Join方法,也是有变通之处的。我们可以将其中一个集合S表示为

S=S1#S2#S3#...#SN,其中#表示集合并操作

这样可以将Si载入到内存中,和集合T进行Join操作,数据集合需要进行N次,才能处理完S和T的Join关系。

其实如果两个集合S和T体积差距不大,并且两者的体积都比较大不足以载入到内存时,还有另外一种分布式数据中心的方法。将其中一个集合放到Distributed Key-Value Store中,在MapReduce计算中去处理另外一个集合。MapReduce处理的记录每次到Store中去查询,得到结果后再进行各种Join操作。这个Distributed Key-Value Store实现就比较多样了,可以使用Memcache、Redis、Hbase、Data Cluster等实现。

 

本节主要讨论了Join的类型,熟悉数据库的同学并不陌生;然后介绍了基于Reducer-Side Join、Mapper-Side Join、Memory-Based Join等设计模式。在实现这种设计模式时,还要根据数据集合的关系one-to-one、one-to-many、many-to-many等关系来选择合适的方法。在Join过程中,由于Cross Join和Cartesion Join的存在,使得数据处理时必须保存某个数据集合在内存中,这样就存在OOM的风险和瓶颈,在这种情况下,需要仔细选择Join的方法,或者通过预处理,或者通过Distributed Store等方法来解决。总之,对于大数据的处理,我们需要根据数据的特征,选择合适的处理方法;如果有某些特性能够提升我们处理数据的速度和性能,我们绝不能放过这些特性。

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics