MapReduce设计模式:Numerical Computation
我们已经学习了Local Aggregation和Filtering这两种设计模式,本节我们继续学习另外一种:Numerical Computation。这种模式实际上是进行数学运算,即对于一系列输入(v1,v2,v3,...vn),f定义为某种运算法则,使得x=f(v1,v2,v3,...,vn),f运算法则包括max、min、avg、median、stddev等,本节内容比较简单,所以进展比较快。
目前使用MapReduce进行Numerical Computation的实例主要有:
Word Count:这个是作为Hadoop的示例发布的,看起来很是简单啊。No,其实这个例子并不简单,最后会再讲到这个问题的一个扩展,你会发现其中的不简单之处。
Record Count:记录数目统计,实际上就是统计记录数,如单位时间(weekly/daily/hourly)内Log数目、用户交易笔数、用户访问次数等。
min/max/count:最大值、最小值、满足条件值统计,这个实际上是极值统计,这个统计功能我们在生活中也是比价关注的,如微博上的粉丝数、最受关注话题等。
avg/median/stddev:这个在直接统计中可能用处不大,但是在机器学习中都是常用的统计量;如tf-idf中就统计了词频和文档信息
Word Count的例子在http://isilic.iteye.com/blog/1770340中已经有伪码,略过不在细提;我们看下最大最小值得统计,其实也很简单:
class Mapper method map(tern t, Value val) emit(t,val) class Reducer method reduce(term t, Value[val1,val2,...]) min = INF; max = -INF; for val in [val1,val2,val3,...] do: if(val < min) min = val if(val > max) max = val emit(t,pair(min,max))
这个有没有感觉眼熟呢,这个和TOP-K算法的思路是一致的,其实TOP-K算法更为通用。
看下avg和median还有stddev的计算,由于这个计算类似,我就不再分开写,直接在一个Reducer里面实现,可能会有不太协调的地方:
class Mapper method map(tern t, Value val) emit(t,val) class Reducer method reduce(term t, Value[val1,val2,...]) H = sort([val1,val2,val3,...]) size = len(H) if(size % 2 == 0) median = (H[size/2 - 1] + H(size/2))/2 else median = H[int(size/2)] emit(t,median) sum = 0; count = 0; for val in [val1,val2,val3,...] do: sum = sum + val count = count + 1 mean = sum / count emit(t,mean) squares = 0.0 for val in [val1,val2,val3,...] do: squares = squares + (val - mean) * (val - mean) stddev = sqrt(squares/(count - 1)) emit(t,stddev)
其中标准偏差可能需要一点数学知识,其它的都比较简单。
值得注意的是,我们在计算过程中计算出了count,实际上如果想直接计算count的值,或者是符合要求的值,还可以使用另外一种counter的方法。Counter方法是使用一种Hadoop提供的全局计数器,能够在各个Mapper中更改某个预定义好的Counter,使其递增,这个增量会在最后MapReduce程序执行完毕之后,进行合并。
Counter的使用是全局性的,在MapReduce执行完成输出的信息中,就有预定义好的Counter,如输入记录、输出记录、输入byte数、输出byte数、Map task数目、reduce task数目等。这个也算是应该知道的方法。使用该Counter计数时,速度还是不错的,没有输出、也没有各种IO,只是个数据传递;所以执行速度依赖于Map Task数目和Mapper内处理记录的速度。
下面我们通过计算文档词汇相对频率来说明计算过程,这个过程和上面提到的word count有关,但是在计算过程中还有个非常巧妙的地方,大家可以仔细体会下。
文档词汇相对频率是指,当wi出现时,wj同时出现的频率,用公式可以表示为:
f(wj|wi) = N(wi,wj) / sum(N(wi,w))
其中w为文档中出现的词汇,N(wi,wj)表示wi和wj同时出现的总次数。
这个最初的实现是应该是怎么想的呢?大家根据前面的MapReduce框架的学习,这个实现应该是这样:
class Mapper method map(docid a) for all term w in a do: for all term u in w.neighbour: emit(pair(w,u),count 1) class Reducer method setup: key = null keycnt = 0; pairmap = new map method reduce(pair (w,u), Value[val1,val2,...]) if(key == null || key != w) if(len(pairset) > 0) for pair,paircnt in pairmap do: ratio = paircnt / keycnt emit(pair,ratio) key = null keycnt = 0 sum = 0 for val in [val1,val2,val3,...] do: sum = sum + val pairmap.put(pair,sum) keycnt = keycnt + sum
内容还是比较容易简单的,不过这么绕口的逻辑可不符合MapReduce程序的初衷,这种精巧的设计总是给人一种特殊的感觉,如果想更加一般化的话,难度就有点大了。
逻辑复杂性先不论,紧接着的疑问就是:这个能得到结果吗?很遗憾的回答不能。
我们回忆一下http://isilic.iteye.com/blog/1770340里面提到的MapReduce流程,可以看到在shuffle和sort阶段是根据key进行Reducer选择的,注意我们的key是个pair(w,u),我们希望在shuffle过程中得到类似下面的这种排列:
...... (dog,Greyhound) (dog,IbizanHound) (dog,Weimaraner) (dog,schnauzer) ......
也就是说,我们不希望按照pair的整体值来进行分环选择shuffle,我们希望按照pair中值进行shuffle,这个要求还是比较常见的。那么种种要求能实现吗?当然可以,其实这种实现被称为是secondary-sorting。
如果想实现secondary-sorting,需要自己编写一个额外的Partitioner类,提供shuffle过程的选择是按照单个key进行而不是按照pair进行;同时提供PartitionerComparator,提供升序或者降序排列,这个PartitionerComparator能够提供整个整个pair的排列顺序。在Hadoop教程中有个SecondarySort实例,大家仔细学习下。Hadoop提供的实例非常不错,每个实例能够讲明白一个很实用的功能点,理解了这个功能点,才能更好的使用。如果将所有的特性都集中在一个实例中,估计学习曲线会陡峭很多。
Secondary-Sorting的细节大家仔细学习,设置好启动程序就能确保对于每个pair都是按照w来进行排序的,同时能够将输出结果。不过这里面保存了所有的pairmap,实际含义是对于单个w,保存了所有的w的相邻词,这个有可能词汇量非常大,存在OOM的风险。并且必须处理完整个w之后,才能输出。
我们需要改进这种模式,主要问题在于pairmap保存,如果w的值还没有显示完全,相对频度就无法计算。我们通过另外一种形式来改进上面提到的这种方案:
...... (dog,*) (dog,Greyhound) (dog,IbizanHound) (dog,Weimaraner) (dog,schnauzer) ......
这种改进的效果还是很明显的,通过这种形式,我们能最先算出来关于w的所有词频数目,然后在后面的处理过程中再跟别计算w和u的单词数,计算完成就可以直接得到w和u的相对频度。
在上面的实例上修改,可以直接得到改进后的MapReduce程序,伪码如下:
class Mapper method map(docid a) for all term w in a do: for all term u in w.neighbour: emit(pair(w,u),count 1) emit(pair(u,*),count 1) class Reducer method setup: key = null keycnt = 0; method reduce(pair (w,u), Value[val1,val2,...]) if(key != w && u == *) for val in [val1,val2,val3,...] do: keycnt = keycnt + val else sum = 0 for val in [val1,val2,val3,...] do: sum = sum + val ratio = sum / keycnt emit(pair(w,u),ratio)
Ok,仔细理解下这两个问题的解决方法,各有巧妙之处,后者能够很好的解决前者的瓶颈问题,所以还是推荐使用后者;当然后者也依赖了Partitioner的选择和PartitionerComparator的使用。确保Reducer处理过程的顺序和准确性。
Numerical Computation的设计模式还是比较简单的,如果只是考虑算数运算的话。实际上,特别是这些Numerical Computation,我们在使用时使用在大规模文档处理中,速度、效率、时间都是要考虑的因素,如果能通过这些方法尽可能的提升其中某一方面的性能,所做的努力都是值得的,大家可以仔细体会下。
相关推荐
本节介绍如何编写基本的 MapReduce 程序实现数据分析。本节代码是基于 Hadoop 2.7.3 开发的。 任务准备 单词计数(WordCount)的任务是对一组输入文档中的单词进行分别计数。假设文件的量比较大,每个文档又包含...
MapReduce 设计模式,深入理解MapReduce编程模式,更好的利用MapReduce模型
MapReduce设计模式.pdf
, 由于本书不会过多涉及底层框架及MapReduce API,所以希望读者阅读《MapReduce设计模式》之前,能够对Hadoop系统有所了解,知道如何编写MapReduce程序,并了解MapReduce程序框架的工作原理。《MapReduce设计模式》...
MapReduce设计模式.pdf 个人收集电子书,仅用学习使用,不可用于商业用途,如有版权问题,请联系删除!
MapReduce设计模式介绍.ppt
<MapReduce设计模式>英文版,概述性的介绍了MapReduce的常见设计模式和应用场景.详细的源码可以帮助理解.
[奥莱理] MapReduce 设计模式 (英文版) [奥莱理] MapReduce Design Patterns Building Effective Algorithms and Analytics for Hadoop and Other Systems (E-Book) ☆ 出版信息:☆ [作者信息] Donald Miner, ...
单词计数是最简单也是最能体现 MapReduce 思想的程序之一,可以称为 MapReduce 版“Hello World”。单词计数的主要功能是统计一系列文本文件中每个单词出现的次数...其次,确定 MapReduce 程序的设计思路。把文件内容分
MapReduce设计模式 ,值得一看
This book will be unique in some ways and familiar in others. First and foremost, this book is obviously about design patterns, which are templates or general guides to solving problems....
书中主要介绍编程模式,即如何利用MapReduce框架解决一类问题,重在提供解决问题的方法和思路。作者花大量篇幅介绍各种模式的原理及实现机制,并给出相应的应用实例,让读者对每种模式能有更直观的理解。
这是谷歌三大论文之一的 MapReduce: Simplified Data Processing on Large Clusters 英文原文。我的翻译可以见https://blog.csdn.net/m0_37809890/article/details/87830686
hadoop mapreduce 设计模式,mapreduce 程序编写,英文数据,但是代码结构清晰,容易看懂,适合实战
Mapreduce-1:python中的MapReduce的孙子/祖父母对
本文介绍了用Java编写并运行第一个mapreduce作业的步骤及遇到的问题和解决方案。
。。。
。。。
深入理解MapReduce架构设计与实现原理 高清 完整书签 阿里专家