`

MapReduce设计模式:Numerical Computation

 
阅读更多

 

MapReduce设计模式:Numerical Computation

 

我们已经学习了Local Aggregation和Filtering这两种设计模式,本节我们继续学习另外一种:Numerical Computation。这种模式实际上是进行数学运算,即对于一系列输入(v1,v2,v3,...vn),f定义为某种运算法则,使得x=f(v1,v2,v3,...,vn),f运算法则包括max、min、avg、median、stddev等,本节内容比较简单,所以进展比较快。

 

目前使用MapReduce进行Numerical Computation的实例主要有:

Word Count:这个是作为Hadoop的示例发布的,看起来很是简单啊。No,其实这个例子并不简单,最后会再讲到这个问题的一个扩展,你会发现其中的不简单之处。

Record Count:记录数目统计,实际上就是统计记录数,如单位时间(weekly/daily/hourly)内Log数目、用户交易笔数、用户访问次数等。

min/max/count:最大值、最小值、满足条件值统计,这个实际上是极值统计,这个统计功能我们在生活中也是比价关注的,如微博上的粉丝数、最受关注话题等。

avg/median/stddev:这个在直接统计中可能用处不大,但是在机器学习中都是常用的统计量;如tf-idf中就统计了词频和文档信息

 

Word Count的例子在http://isilic.iteye.com/blog/1770340中已经有伪码,略过不在细提;我们看下最大最小值得统计,其实也很简单:

class Mapper
  method map(tern t, Value val)
    emit(t,val)
    
class Reducer
  method reduce(term t, Value[val1,val2,...])
		min =  INF;
		max = -INF;
		for val in [val1,val2,val3,...] do:
		  if(val < min)
		    min = val
		  if(val > max)
		    max = val
		emit(t,pair(min,max))

 

这个有没有感觉眼熟呢,这个和TOP-K算法的思路是一致的,其实TOP-K算法更为通用。

 

看下avg和median还有stddev的计算,由于这个计算类似,我就不再分开写,直接在一个Reducer里面实现,可能会有不太协调的地方:

class Mapper
  method map(tern t, Value val)
    emit(t,val)
    
class Reducer
  method reduce(term t, Value[val1,val2,...])
    H = sort([val1,val2,val3,...])
    size = len(H)
    if(size % 2 == 0)
      median = (H[size/2 - 1] + H(size/2))/2
    else 
      median = H[int(size/2)]
    emit(t,median)
    sum = 0;
    count = 0;
    for val in [val1,val2,val3,...] do:
	      sum = sum + val
	      count = count + 1
    mean =  sum / count
    emit(t,mean)	
    squares = 0.0
    for val in [val1,val2,val3,...] do:
         squares = squares + (val - mean) * (val - mean)
    stddev = sqrt(squares/(count - 1))
    emit(t,stddev)

 

其中标准偏差可能需要一点数学知识,其它的都比较简单。

值得注意的是,我们在计算过程中计算出了count,实际上如果想直接计算count的值,或者是符合要求的值,还可以使用另外一种counter的方法。Counter方法是使用一种Hadoop提供的全局计数器,能够在各个Mapper中更改某个预定义好的Counter,使其递增,这个增量会在最后MapReduce程序执行完毕之后,进行合并。

Counter的使用是全局性的,在MapReduce执行完成输出的信息中,就有预定义好的Counter,如输入记录、输出记录、输入byte数、输出byte数、Map task数目、reduce task数目等。这个也算是应该知道的方法。使用该Counter计数时,速度还是不错的,没有输出、也没有各种IO,只是个数据传递;所以执行速度依赖于Map Task数目和Mapper内处理记录的速度。

 

下面我们通过计算文档词汇相对频率来说明计算过程,这个过程和上面提到的word count有关,但是在计算过程中还有个非常巧妙的地方,大家可以仔细体会下。

文档词汇相对频率是指,当wi出现时,wj同时出现的频率,用公式可以表示为:

 f(wj|wi) = N(wi,wj) / sum(N(wi,w))

 其中w为文档中出现的词汇,N(wi,wj)表示wi和wj同时出现的总次数。

 

这个最初的实现是应该是怎么想的呢?大家根据前面的MapReduce框架的学习,这个实现应该是这样:

class Mapper
  method map(docid a)
    for all term w in a do:
      for all term u in w.neighbour:
        emit(pair(w,u),count 1)
    
class Reducer
  method setup:
    key = null
    keycnt = 0;
    pairmap = new map
  method reduce(pair (w,u), Value[val1,val2,...])
    if(key == null || key != w)
      if(len(pairset) > 0)
        for pair,paircnt in pairmap do:
          ratio = paircnt / keycnt
          emit(pair,ratio)
      key = null
      keycnt = 0
    sum = 0
    for val in [val1,val2,val3,...] do:
      sum = sum + val
    pairmap.put(pair,sum)
    keycnt = keycnt + sum

 

内容还是比较容易简单的,不过这么绕口的逻辑可不符合MapReduce程序的初衷,这种精巧的设计总是给人一种特殊的感觉,如果想更加一般化的话,难度就有点大了。

逻辑复杂性先不论,紧接着的疑问就是:这个能得到结果吗?很遗憾的回答不能。

我们回忆一下http://isilic.iteye.com/blog/1770340里面提到的MapReduce流程,可以看到在shuffle和sort阶段是根据key进行Reducer选择的,注意我们的key是个pair(w,u),我们希望在shuffle过程中得到类似下面的这种排列:

......
(dog,Greyhound)
(dog,IbizanHound)
(dog,Weimaraner)
(dog,schnauzer)
......

 

也就是说,我们不希望按照pair的整体值来进行分环选择shuffle,我们希望按照pair中值进行shuffle,这个要求还是比较常见的。那么种种要求能实现吗?当然可以,其实这种实现被称为是secondary-sorting。

如果想实现secondary-sorting,需要自己编写一个额外的Partitioner类,提供shuffle过程的选择是按照单个key进行而不是按照pair进行;同时提供PartitionerComparator,提供升序或者降序排列,这个PartitionerComparator能够提供整个整个pair的排列顺序。在Hadoop教程中有个SecondarySort实例,大家仔细学习下。Hadoop提供的实例非常不错,每个实例能够讲明白一个很实用的功能点,理解了这个功能点,才能更好的使用。如果将所有的特性都集中在一个实例中,估计学习曲线会陡峭很多。

Secondary-Sorting的细节大家仔细学习,设置好启动程序就能确保对于每个pair都是按照w来进行排序的,同时能够将输出结果。不过这里面保存了所有的pairmap,实际含义是对于单个w,保存了所有的w的相邻词,这个有可能词汇量非常大,存在OOM的风险。并且必须处理完整个w之后,才能输出。

 

我们需要改进这种模式,主要问题在于pairmap保存,如果w的值还没有显示完全,相对频度就无法计算。我们通过另外一种形式来改进上面提到的这种方案:

 

......
(dog,*)
(dog,Greyhound)
(dog,IbizanHound)
(dog,Weimaraner)
(dog,schnauzer)
......

 

 

这种改进的效果还是很明显的,通过这种形式,我们能最先算出来关于w的所有词频数目,然后在后面的处理过程中再跟别计算w和u的单词数,计算完成就可以直接得到w和u的相对频度。

 

 

 

在上面的实例上修改,可以直接得到改进后的MapReduce程序,伪码如下:

 

 

class Mapper
  method map(docid a)
    for all term w in a do:
      for all term u in w.neighbour:
        emit(pair(w,u),count 1)
        emit(pair(u,*),count 1)
    
class Reducer
  method setup:
    key = null
    keycnt = 0;
  method reduce(pair (w,u), Value[val1,val2,...])
    if(key != w && u == *)
      for val in [val1,val2,val3,...] do:
        keycnt = keycnt + val
    else 
       sum = 0
       for val in [val1,val2,val3,...] do:
         sum = sum + val
       ratio = sum / keycnt
       emit(pair(w,u),ratio)

 

Ok,仔细理解下这两个问题的解决方法,各有巧妙之处,后者能够很好的解决前者的瓶颈问题,所以还是推荐使用后者;当然后者也依赖了Partitioner的选择和PartitionerComparator的使用。确保Reducer处理过程的顺序和准确性。

 

Numerical Computation的设计模式还是比较简单的,如果只是考虑算数运算的话。实际上,特别是这些Numerical Computation,我们在使用时使用在大规模文档处理中,速度、效率、时间都是要考虑的因素,如果能通过这些方法尽可能的提升其中某一方面的性能,所做的努力都是值得的,大家可以仔细体会下。

 

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics