你好,我是徐文浩。

过去几讲里,无论是Hive这样基于MapReduce的系统,还是Dremel这样抛开MapReduce的系统,其实都已经反映了MapReduce这个大数据处理的计算模型,在2010年这个时间节点已经有一些“落后”了。来自Facebook的Hive选择了在MapReduce上优化改良,仍然基于MapReduce的模型。而Google自家的Dremel则是另起炉灶,用一个新的底层架构来支持OLAP的数据分析。

不过,在工业界之外,学术界一样不会在整个大数据飞速发展的时代里缺席。2010年,来自Berkeley的博士生Matei Zaharia发表了一篇论文《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》。伴随着这篇论文的,是一个开源系统,也就是Spark。在之后的几年里,Spark不仅逐步侵蚀了Hadoop的市场份额,成为大数据分析的第一选择,也在实时数据处理,以及大规模机器学习里占据了重要的地位。

Spark的计算模型,其实可以看作一个是支持多轮迭代的MapReduce模型。不过它在实现层面,又和稍显笨拙的MapReduce完全不同。通过引入RDD这样一个函数式对象的数据集的概念,Spark在多轮的数据迭代里,不需要像MapReduce一样反反复复地读写硬盘,大大提升了处理数据的性能。

最重要的是,Spark的这篇引入了RDD概念的论文,让我们看到了一个优秀的系统设计并不需要有多复杂,Spark论文发表的时候,整个系统也就1万多行代码,一个人花上一两个月就能写出来。整个系统也没有太多复杂难懂的学术概念,只是针对MapReduce明显的不足之处做了改造而已。

那么,接下来我们就一起来看看Spark是如何做到这一点的,它的系统设计又有什么精妙之处。通过这一讲的学习,我希望你能够掌握以下几点:

使用硬盘来“容错”的MapReduce

在深入到Spark的论文之前,我们先来简单回顾一下MapReduce的整个流程。我把我们在讲解MapReduce的论文中用到的数据流程图放到了这里,你可以再复习下。

图片

MapReduce的过程并不复杂,Map函数的输出结果会输出到所在节点的本地硬盘上。Reduce函数会从Map函数所在的节点里拉取它所需要的数据,然后再写入本地。接着通过一个外部排序的过程,把数据进行分组。最后将排序分完组的数据,通过Reduce函数进行处理。

你会发现,在这个过程里,任何一个中间环节,我们都需要去读写硬盘。Map函数的处理结果,并不会直接通过网络发送给Reduce所在的Worker节点,Reduce也不会直接在内存中做数据排序。

这还只是单个的MapReduce任务,如果我们要通过MapReduce来跑一些机器学习任务,比如通过L-BFGS这样的算法,来进行大规模的逻辑回归的模型训练,我们需要跑上百个MapReduce的任务。其中每一个MapReduce的任务都差不多,都是把所有日志读入,然后和一个梯度向量做计算,算出新的梯度。接着下一轮计算要再读入一遍原来的日志数据,再和新的梯度做计算。在这个过程中,我们原始的日志,会被重复读取上百遍。

图片

而我们知道,内存的读写速度是远远快于硬盘的,这里我放了《深入浅出计算机组成原理》专栏里的一张数据表。可以看到,无论你在用机械硬盘还是SSD硬盘,访问数据的延时和内存比都有1000倍以上的差距。

图片

之所以MapReduce把所有数据都往硬盘里一写,是因为它追求的是设计上的“简单”,以及在大规模集群下的“容错”能力。把Map节点的输出数据,直接通过网络发送给Reduce的确是一个很直观的想法,让Reduce节点在内存里就直接处理数据,也的确可以提升性能。

但是,如果在这个过程中,Map或者Reduce的节点出现故障了怎么办?

因为Reduce对于前面的Map函数有依赖关系,所以任何一个Map节点故障,意味着Reduce只收到了部分数据,而且它还不知道是哪一部分。那么Reduce任务只能失败掉,然后等Map节点重新来过。而且,Reduce的失败,还会导致其他的Map节点计算的数据也要重来一遍,引起连锁反应,最终等于是整个任务重来一遍。

图片

这只是我们尝试让数据不需要落地到硬盘处理中,会遇到的一种情况,我们还可能遇到网络拥塞、内存不足以处理传输的数据等种种情况。

事实上,你可以认为传统的MPI分布式计算系统就是这样,让一个节点直接往另外一个节点发送消息来传递数据的,但是这样的系统,容错能力很差,所以集群的规模往往也上不去。

而MapReduce针对这个问题的解决方案非常简单粗暴,那就是把整个数据处理的环节完全拆分开来,然后把一个个阶段的中间数据都落地到硬盘上。这样,针对单个节点的故障,我们只需要重新运行对应节点分配的任务就好了,其他已经完成的工作不会“半途而废”。

分布式系统里的“函数式”数据集

但是,这样的方式真的是效率最高的吗?

虽然因为整个GFS和MapReduce系统规模很大,节点很多,我们的各个节点必然会出现故障。但是,毕竟硬件也好、系统也好,出现故障是小概率事件。我们在第8讲里看过,一个数据中心里的18万块硬盘,90天坏了439块,算到年度的损坏率也就是1%左右。我们为了这1%的错误,却要把99%的数据都反复从硬盘里读出来写进去,感觉的确有点划不来。

图片

所以,很自然地,我们需要有一个更有效率的容错方式。一个很直观的想法自然也就冒出来了,那就是我们是否可以做这三件事情:

举一个具体的例子,如果前面提到的大规模逻辑回归的机器模型训练,需要进行100轮迭代。我们最好的解决方案,既不是每轮迭代都需要重新读写数据,那样太浪费硬盘的I/O了。也不是把几小时的计算过程都放在内存里,那样万一计算梯度结果的数据在第99轮丢失了,我们就要从头开始。

我们完全可以把读取的日志数据缓存在内存里,然后把每10轮计算完的梯度数据写入到硬盘上。这样,一旦出现故障,我们只需要重新读取一次日志数据,并最多计算10轮迭代的过程就好了。

val points = spark.textFile(...)
                  .map(parsePoint).persist()
var w = // random initial vector
for (i <- 1 to ITERATIONS) {
  val gradient = points.map{ p =>
    p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
  }.reduce((a,b) => a+b)
  w -= gradient
}

论文3.2.1 部分使用Spark实现分布式逻辑回归的代码。我们的输入数据会通过.persistent缓存在内存中,而不需要每个迭代都重新从硬盘读取。

其实这个思路,就是Spark整个系统设计的出发点。根据这个朴素的思路,Spark定了一个新的概念RDD,全称是Resilient Distributed Dataset,中文叫做弹性分布式数据集。整个系统设计的其实就是 “弹性”+“分布式”+“数据集” 这三点的组合。

分布式数据集这两个关键字,我们应该不需要再进行太多的解释了。我们在GFS、Bigtable乃至Dremel里,都采用了数据分区的方式,来确保数据是分布式的。

“函数式”的RDD

而RDD最核心的设计关键点,就在这个弹性上。论文里的2.1部分,给出了RDD明确的定义,这句话是这样说的:

RDD是只读的、已分区的记录集合,RDD只能通过明确的操作,以及通过两种数据创建:稳定存储系统中的数据;其他RDD。
 
这个明确的操作,是指map、filter和join这样的操作,以和其他的操作区分开来。

按照这个定义,我们可以看到这个是对于数据的一个抽象。我们的任何一个数据集,进行一次转换就是一个新的RDD,但是这个RDD并不需要实际输出到硬盘上。实际上,这个数据都不会作为一个完整的数据集缓存在内存中,而只是一个RDD的“抽象概念”。只有当我们对某一个RDD实际调用persistent函数的时候,这个RDD才会实际作为一个完整的数据集,缓存在内存中。

一旦被缓存到内存里,这个RDD就能够再次被下游的其他数据转换反复使用。一方面,这个数据不需要写入到硬盘,所以我们减少了一次数据写。另一方面,下游的其他转化也不需要再从硬盘读数据,于是,我们就节省了大量的硬盘I/O的开销。

图片

lines = spark.textFile("hdfs://...") 
errors = lines.filter(_.startsWith("ERROR")) 
errors.persist()

// Count errors mentioning MySQL: 
errors.filter(_.contains("MySQL")).count() 

// Return the time fields of errors mentioning 
// HDFS as an array (assuming time is field 
// number 3 in a tab-separated format): 
errors.filter(_.contains("HDFS")).map(_.split(’\t’)(3)).collect()

我们可以对照着论文2.2.1中的示例代码,来看这样一个过程。我们从HDFS上,读入原始数据,根据关键词ERROR进行了一次过滤,然后把它persistent下来。而接下来分别有两个分析任务,会用到这个缓存在内存里的ERROR数据,一个是找出所有带有MySQL关键词的错误日志,然后进行统计行数;另一个则是找到所有带有HDFS关键字的日志,然后按照Tab分割并收集第3列的数据。

这里面,一开始从HDFS里读入的line数据,因为没有persistent,所以不会缓存在内存中。而errors会缓存在内存里面,供后面两个任务作为输入使用。errors我们不需要写入到硬盘里,而后面分析MYSQL和HDFS关键字错误的两个任务,也不需要从硬盘读数据,数据都是直接在内存中读写,所以性能大大加快了。

从RDD的这个逻辑上,其实我们可以看到计算机工程上的其他系统中的影子。

第一个是惰性求值(Lazy-Evaluation),我们的一层层数据转化,只要没有调用persistent,都可以先不做计算,而只是记录这个计算过程中的函数。而当persistent一旦被调用,那么我们就需要把实际的数据结果计算出来,并存储到内存里,再供后面的数据转换程序调用。

第二个是数据库里的视图功能。为了查询方便,对于复杂的多表关联,很多时候我们会预先建好一张数据库的逻辑视图。那么我们在查询逻辑视图的时候,其实还是通过一个多表关联SQL去查询原始表的,这个就好像我们并没有调用persistent,把数据实际持久化下来。

当然,我们也可以把对应视图的查询结果,直接写入一张中间表,这样实际上就相当于把计算的结果持久化下来了,后续查询的SQL就会查询这个中间表。如果视图里的数据会被后续的SQL反复多次查询,并且对应的原始数据集也和RDD一样是不可变的话,一样会大大提升系统整体的效率。

宽依赖关系和检查点

那么,通过调用persistent来把数据缓存到内存里,就减少了大量的硬盘读写。但是我们仍然会面临节点失效,导致RDD需要重新计算的情况。所以Spark对这部分流程做了进一步的优化,这个优化说起来其实也不复杂。

那就是,如果一个节点失效了,导致的数据重新计算,需要影响的节点太多,那么我们就把计算结果输出到硬盘上。而如果影响的节点少,那么我们就只是单独重新计算被响应到的那些节点就好了。

所以,在Spark里,会对整个数据计算的拓扑图在分布式系统下的依赖关系做一个分类。如果一个RDD的一个分区,只会影响到下游的一个节点,那么我们就称这样的上下游依赖关系为窄依赖。而如果一个RDD的一个分区,会影响到下游的多个节点,那么我们就称这样的上下游关系为宽依赖

对于窄依赖,即使重算一遍,也只是影响一条线上的少数几个节点,所以对应的中间数据结果,并不需要输出到硬盘上。而对于宽依赖,一旦上游的一个节点失效了,需要重新计算。那么它对应的多个下游节点,都需要重新从这个节点拉取数据并重新计算,需要占用更多的网络带宽和计算资源。

换句话说,在宽依赖下,一个上游节点的失效,会以几倍的影响在下游得到放大。所以,在宽依赖的场景下,上游会像MapReduce里的Map一样,把输出结果序列化到硬盘上,以减少故障后的恢复成本。

图片

同样的,对于有多轮迭代,或者是整个拓扑图很长的数据处理任务,Spark在persistent的时候,支持你添加一个REPLICATE参数,把当前的计算结果作为一个检查点存储下来。一旦添加了这个参数,数据就不只是存储在内存中,而是会序列化到硬盘里。这样,同样可以减少你在出现故障时候的重新计算的时间。

可以看到,无论是persistent、宽依赖下的数据会被持久化存储,还是允许用户去自己通过检查点存储中间步骤的计算结果,都是为了在日常情况下的性能,和容错情况下的性能做一个平衡。

如果不做任何持久化存储,那么平时系统会跑得很快,但是一旦某个节点出错就要从头再来。而如果都做持久化存储,那么节点出错的时候,计算可以恢复得很快,但是没有问题的时候会有很多浪费。

本质上,Spark就是根据人的经验,在这两点上做好了平衡。

小结

其实,Spark里的RDD的设计思想并不复杂。和MapReduce一样,RDD的设计思路也是来自于函数式编程。相对于过程式地把每一个数据转换(Transformation)的结果存储下来,RDD相当于记录了输入数据,以及对应的计算输入数据的函数。

这个方式,和把一步步的计算结果存储下来的效果一样,都可以解决容错问题。当某一个RDD的某一个分区因为故障丢失的时候,我们可以通过输入数据和对应的函数,快速计算出当前RDD的实际内容。而这个输入数据+对应函数的组合,就是RDD中的Lineage图。

RDD和其他分布式系统最大的差异,就在代表弹性的R这个关键字上。这个弹性体现在两个方面。

第一个是数据存储上。数据不再是存放在硬盘上的,而是可以缓存在内存中。只有当内存不足的时候,才会把它们换出到硬盘上。同时,数据的持久化,也支持硬盘、序列化后的内存存储,以及反序列化后Java对象的内存存储三种形式。每一种都比另一种需要占用更多的内存,但是计算速度会更快。

第二个是选择把什么数据输出到硬盘上。Spark会根据数据计算的Lineage,来判断某一个RDD对于前置数据是宽依赖,还是窄依赖的。如果是宽依赖的,意味着一个节点的故障,可能会导致大量的数据要进行重新计算,乃至数据网络传输的需求。那么,它就会把数据计算的中间结果存储到硬盘上。

同时,Spark也支持你自己定义检查点,你可以把一些关键节点的数据通过检查点的方式,持久化到硬盘上,避免出现特定节点的故障,导致大量数据需要重新计算的问题。

可以看到,Spark的RDD,是在没有破坏MapReduce的易用性的前提下,支持了MapReduce可以支持的所有运算方式。并且,它通过尽可能利用内存,使得需要多个MapReduce的组合或者迭代的任务的执行速度大大加快了。从分布式的逻辑回归来看,Spark的性能会比使用原始的MapReduce一轮轮迭代快上20倍。这也是为什么,Spark一出现在市场上,就很快替代了大量的MapReduce的分析工作,并在迭代式的机器学习算法中成为了主流。

推荐阅读

其实在Spark出现之前,已经有很多人意识到MapReduce这个计算框架在性能上的不足了。也有各种各样的论文和系统,尝试去解决需要反复多轮迭代算法的效率问题。在RDD的论文中也有提到两篇,分别是来自于Google的Pregel,它的出发点是用来解决PageRank的多轮迭代。以及改造自Hadoop的Haloop,它的出发点是解决大规模数据集上迭代式的机器学习问题。

如果你想对比看看Spark RDD这个更加统一抽象的框架,相对于这些“专用系统的改进和优化,你可以回头去看看这两篇论文。我把链接放在了这里:Pregel论文Haloop论文

思考题

论文里提到,除了对RDD持久化之外,我们还可以自己定义RDD如何进行分区,并且提到了可以对存储优化有用,比如把两个需要Join操作的数据集进行相同的哈希分区。那么,为什么这么做会对存储优化有用呢?它在应用层面到底优化了什么?

欢迎你在留言区分享自己的思考和答案,和你的同学、朋友一起讨论,共同进步。