提示: 分布式处理系统会把计算逻辑分发到数据侧,极大提高系统的水平扩展性。WordCount运行机制讲了一堆理论知识,为了让各位看官透彻理解,也为Spark程序算法优化打下坚实的基础,我们拿WordCount来举例说明,顺便说说负载均衡。额。。。还没看“动手写WordCount”的兄弟姐妹们,建议先去看看。1)数据位置感知下面是WordCount的业务逻辑代码:val file = "hdfs://127.0.0.1:9000/file.txt"val lines = sc.textFile(file)val words = lines.flatMap(line => line.split("//s+"))val wordCount = words.countByValue()lines是Spark的RDD,它包含了在哪些机器上有file文件的块,信息是从HDFS来的。每文件块映射到RDD上就是一个分区,对的,没看错。如果一个文件块128MB,那么HDFS上一个1GB大小的文件就有8个文件块,由这个文件创建的RDD就会有8个分区。之前说了,在HDFS上每个文件块默认会有3份,那RDD的分区选择了那一份呢?对滴,根据负载选择服务器负载最低的那一份。负载自动均衡了吧。2)计算逻辑分发有了这些信息,我们就知道把后续的计算逻辑该分发到哪儿去。首先,我们得说清楚什么是计算逻辑,各位看官们想一下,类方法里面的代码是如何运行的。充分必要条件:方法代码 + 类实例(对象)的状态。似成相识吧,程序 = 算法 + 数据。算法在代码中,数据在对象的状态中。Spark要分发计算逻辑,也是分了两部分。第一部分是代码。为什么spark-submit执行一开始,总是一堆jar包被分发,原因就在这儿。第二部分是类实例。类在哪儿?作为RDD各API参数的闭包。val words = lines.flatMap(line => line.split("//s+"))flatMap的参数 **_.split("/s+")** 是闭包,闭包是引用了外部自由变量的函数,在Scala中是由匿名类实现的。更多信息,请小伙伴们GfsoSO哈。上面的一行代码中,Spark要分发的实例就是 **_.split("/s+")** 的实例。val wordCount = words.countByValue()实际上RDD的API countByValue 也有需要分发的闭包实例,只是都在Spark的源码中,让一码给大家整理到明面上来哈。val wordCount = words .mapPartitions(convertWordsInPartitionToWordCountMap) .reduce(mergeMaps)前面我们提到了RDD的分区,mapPartitions会方法中的逻辑放到RDD的每个分区上执行,注意是远程在Slave上执行的哈。而reduce是在把每个分区的结果拿到Driver后,对结果进行两两合并,最终得到结果。3)WordCount分布式运行原理先仔细看图,相信不用下面的解释,各位看官也能看懂了。(上面的图是张巨高清的图,手机上看不清,建议转发文章到邮箱,然后到电脑上看,看懂这张图,就真的把WordCount分布式运行的机制搞懂了。)对于WordCount而言,分布式在每个Slave的每个分区上,统计本分区内的单词计数,生成一个Map,然后将它传回给Driver,再由Driver两两合并来自各个分区的所有Map,形成最终的单词计数。今天我们不仅说清楚了WordCount背后的分布式运行机制,而且解释了Spark的水平扩展能力,以及负载均衡。
新闻热点
疑难解答