首页 > 编程 > Java > 正文

MapTask阶段shuffle源码分析

2019-11-26 09:21:22
字体:
来源:转载
供稿:网友

1. 收集阶段

Mapper中,调用context.write(key,value)实际是调用代理NewOutPutCollectorwirte方法

public void write(KEYOUT key, VALUEOUT value          ) throws IOException, InterruptedException {  output.write(key, value); }

实际调用的是MapOutPutBuffercollect(),在进行收集前,调用partitioner来计算每个key-value的分区号

@Override  public void write(K key, V value) throws IOException, InterruptedException {   collector.collect(key, value,            partitioner.getPartition(key, value, partitions));  }

2. NewOutPutCollector对象的创建

@SuppressWarnings("unchecked")  NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,            JobConf job,            TaskUmbilicalProtocol umbilical,            TaskReporter reporter            ) throws IOException, ClassNotFoundException {  // 创建实际用来收集key-value的缓存区对象   collector = createSortingCollector(job, reporter);  // 获取总的分区个数   partitions = jobContext.getNumReduceTasks();   if (partitions > 1) {    partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)     ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);   } else {    // 默认情况,直接创建一个匿名内部类,所有的key-value都分配到0号分区    partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {     @Override     public int getPartition(K key, V value, int numPartitions) {      return partitions - 1;     }    };   }  }

3. 创建环形缓冲区对象

@SuppressWarnings("unchecked") private <KEY, VALUE> MapOutputCollector<KEY, VALUE>     createSortingCollector(JobConf job, TaskReporter reporter)  throws IOException, ClassNotFoundException {  MapOutputCollector.Context context =   new MapOutputCollector.Context(this, job, reporter);  // 从当前Job的配置中,获取mapreduce.job.map.output.collector.class,如果没有设置,使用MapOutputBuffer.class  Class<?>[] collectorClasses = job.getClasses(   JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);  int remainingCollectors = collectorClasses.length;  Exception lastException = null;  for (Class clazz : collectorClasses) {   try {    if (!MapOutputCollector.class.isAssignableFrom(clazz)) {     throw new IOException("Invalid output collector class: " + clazz.getName() +      " (does not implement MapOutputCollector)");    }    Class<? extends MapOutputCollector> subclazz =     clazz.asSubclass(MapOutputCollector.class);    LOG.debug("Trying map output collector class: " + subclazz.getName());   // 创建缓冲区对象    MapOutputCollector<KEY, VALUE> collector =     ReflectionUtils.newInstance(subclazz, job);   // 创建完缓冲区对象后,执行初始化    collector.init(context);    LOG.info("Map output collector class = " + collector.getClass().getName());    return collector;   } catch (Exception e) {    String msg = "Unable to initialize MapOutputCollector " + clazz.getName();    if (--remainingCollectors > 0) {     msg += " (" + remainingCollectors + " more collector(s) to try)";    }    lastException = e;    LOG.warn(msg, e);   }  }  throw new IOException("Initialization of all the collectors failed. " +   "Error in last collector was :" + lastException.getMessage(), lastException); }

3. MapOutPutBuffer的初始化   环形缓冲区对象

@SuppressWarnings("unchecked")  public void init(MapOutputCollector.Context context          ) throws IOException, ClassNotFoundException {   job = context.getJobConf();   reporter = context.getReporter();   mapTask = context.getMapTask();   mapOutputFile = mapTask.getMapOutputFile();   sortPhase = mapTask.getSortPhase();   spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);   // 获取分区总个数,取决于ReduceTask的数量   partitions = job.getNumReduceTasks();   rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();   //sanity checks   // 从当前配置中,获取mapreduce.map.sort.spill.percent,如果没有设置,就是0.8   final float spillper =    job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);   // 获取mapreduce.task.io.sort.mb,如果没设置,就是100MB   final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);   indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,                     INDEX_CACHE_MEMORY_LIMIT_DEFAULT);   if (spillper > (float)1.0 || spillper <= (float)0.0) {    throw new IOException("Invalid /"" + JobContext.MAP_SORT_SPILL_PERCENT +      "/": " + spillper);   }   if ((sortmb & 0x7FF) != sortmb) {    throw new IOException(      "Invalid /"" + JobContext.IO_SORT_MB + "/": " + sortmb);   }// 在溢写前,对key-value排序,采用的排序器,使用快速排序,只排索引   sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",      QuickSort.class, IndexedSorter.class), job);   // buffers and accounting   int maxMemUsage = sortmb << 20;   maxMemUsage -= maxMemUsage % METASIZE;   // 存放key-value   kvbuffer = new byte[maxMemUsage];   bufvoid = kvbuffer.length;  // 存储key-value的属性信息,分区号,索引等   kvmeta = ByteBuffer.wrap(kvbuffer)     .order(ByteOrder.nativeOrder())     .asIntBuffer();   setEquator(0);   bufstart = bufend = bufindex = equator;   kvstart = kvend = kvindex;   maxRec = kvmeta.capacity() / NMETA;   softLimit = (int)(kvbuffer.length * spillper);   bufferRemaining = softLimit;   LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);   LOG.info("soft limit at " + softLimit);   LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);   LOG.info("kvstart = " + kvstart + "; length = " + maxRec);   // k/v serialization    // 获取快速排序的Key的比较器,排序只按照key进行排序!   comparator = job.getOutputKeyComparator();  // 获取key-value的序列化器   keyClass = (Class<K>)job.getMapOutputKeyClass();   valClass = (Class<V>)job.getMapOutputValueClass();   serializationFactory = new SerializationFactory(job);   keySerializer = serializationFactory.getSerializer(keyClass);   keySerializer.open(bb);   valSerializer = serializationFactory.getSerializer(valClass);   valSerializer.open(bb);   // output counters   mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);   mapOutputRecordCounter =    reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);   fileOutputByteCounter = reporter     .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);   // 溢写到磁盘,可以使用一个压缩格式! 获取指定的压缩编解码器   // compression   if (job.getCompressMapOutput()) {    Class<? extends CompressionCodec> codecClass =     job.getMapOutputCompressorClass(DefaultCodec.class);    codec = ReflectionUtils.newInstance(codecClass, job);   } else {    codec = null;   }   // 获取Combiner组件   // combiner   final Counters.Counter combineInputCounter =    reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);   combinerRunner = CombinerRunner.create(job, getTaskID(),                       combineInputCounter,                       reporter, null);   if (combinerRunner != null) {    final Counters.Counter combineOutputCounter =     reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);    combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);   } else {    combineCollector = null;   }   spillInProgress = false;   minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);   // 设置溢写线程在后台运行,溢写是在后台运行另外一个溢写线程!和收集是两个线程!   spillThread.setDaemon(true);   spillThread.setName("SpillThread");   spillLock.lock();   try {   // 启动线程    spillThread.start();    while (!spillThreadRunning) {     spillDone.await();    }   } catch (InterruptedException e) {    throw new IOException("Spill thread failed to initialize", e);   } finally {    spillLock.unlock();   }   if (sortSpillException != null) {    throw new IOException("Spill thread failed to initialize",      sortSpillException);   }  }

4. Paritionner的获取

从配置中读取mapreduce.job.partitioner.class,如果没有指定,采用HashPartitioner.class

如果reduceTask > 1, 还没有设置分区组件,使用HashPartitioner

@SuppressWarnings("unchecked") public Class<? extends Partitioner<?,?>> getPartitionerClass()   throws ClassNotFoundException {  return (Class<? extends Partitioner<?,?>>)   conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class); }
public class HashPartitioner<K, V> extends Partitioner<K, V> { /** Use {@link Object#hashCode()} to partition. **/ public int getPartition(K key, V value,             int numReduceTasks) {  return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; }}

分区号的限制:0 <= 分区号 < 总的分区数(reduceTask的个数)

if (partition < 0 || partition >= partitions) {    throw new IOException("Illegal partition for " + key + " (" +      partition + ")");   }

5.MapTask shuffle的流程

              ①在map()调用context.write()

              ②调用MapoutPutBuffer的collect()

  •                             调用分区组件Partitionner计算当前这组key-value的分区号

              ③将当前key-value收集到MapOutPutBuffer中

  •                             如果超过溢写的阀值,在后台启动溢写线程,来进行溢写!

              ④溢写前,先根据分区号,将相同分区号的key-value,采用快速排序算法,进行排序!

  •                             排序并不在内存中移动key-value,而是记录排序后key-value的有序索引!

              ⑤ 开始溢写,按照排序后有序的索引,将文件写入到一个临时的溢写文件中

  •                             如果没有定义Combiner,直接溢写!
  •                             如果定义了Combiner,使用CombinerRunner.conbine()对key-value处理后再次溢写!

              ⑥多次溢写后,每次溢写都会产生一个临时文件

              ⑦最后,执行一次flush(),将剩余的key-value进行溢写

              ⑧MergeParts: 将多次溢写的结果,保存为一个总的文件!

  •                      在合并为一个总的文件前,会执行归并排序,保证合并后的文件,各个分区也是有序的!
  •                      如果定义了Conbiner,Conbiner会再次运行(前提是溢写的文件个数大于3)!
  •                      否则,就直接溢写!

              ⑨最终保证生成一个最终的文件,这个文件根据总区号,分为若干部分,每个部分的key-value都已经排好序,等待ReduceTask来拷贝相应分区的数据

6. Combiner

combiner其实就是Reducer类型:

Class<? extends Reducer<K,V,K,V>> cls =    (Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();

Combiner的运行时机:

MapTask:

  •               ①每次溢写前,如果指定了Combiner,会运行
  •               ②将多个溢写片段,进行合并为一个最终的文件时,也会运行Combiner,前提是片段数>=3

ReduceTask:

              ③reduceTask在运行时,需要启动shuffle进程拷贝MapTask产生的数据!

  •                      数据在copy后,进入shuffle工作的内存,在内存中进行merge和sort!
  •                      数据过多,内部不够,将部分数据溢写在磁盘!
  •                      如果有溢写的过程,那么combiner会再次运行!

①一定会运行,②,③需要条件!

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对武林网的支持。如果你想了解更多相关内容请查看下面相关链接

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表