首页 > 学院 > 开发设计 > 正文

MapReduce并行编程模型和框架

2019-11-11 04:55:34
字体:
来源:转载
供稿:网友

传统的串行处理方式

有四组文本数据:

“the weather is good”, “today is good”, “good weather is good”, “today has good weather”

对这些文本数据进行词频统计:

import java.util.Hashtable;import java.util.Iterator;import java.util.StringTokenizer;/** * 传统的串行计算方式词频统计 * * @version 2017年1月12日 下午4:05:33 */public class WordCount { public static void main(String[] args) { String[] text = new String[]{ "the weather is good","today is good", "good weather is good","today has good weather" }; //同步、线程安全 Hashtable ht = new Hashtable(); //HashMap ht = new HashMap(); for(int i=0;i<=3;i++){ //字符串根据分隔符解析 StringTokenizer st = new StringTokenizer(text[i]); while (st.hasMoreTokens()) { String world = st.nextToken(); if(!ht.containsKey(world)){ ht.put(world, new Integer(1)); }else{ int wc = ((Integer)ht.get(world)).intValue()+1; ht.put(world, new Integer(wc)); } }//end of while }//end of for //输出统计结果 for(Iterator itr = ht.keySet().iterator();itr.hasNext();){ String world = (String) itr.next(); System.out.PRintln(world+": " +(Integer)ht.get(world)+ "; "); } }}

一个MR分布式程序

求出每个年份的最高气温:

MaxTemperatureMapper.Java:import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class MaxTemperatureMapper extends Mapper<LongWritable, Text,Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { //解析字段 String line =value.toString(); try{ String year = line.substring(0,4); int airTemperature =Integer.parseInt(line.substring(5)); context.write(new Text(year),new IntWritable(airTemperature)); }catch(Exception e){ System.out.println("error in line:" + line); } }} MaxTemperatureReducer.java:import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;/** * reducer 比较每年度温度最高值 * */public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int MaxValue = Integer.MIN_VALUE; for(IntWritable value:values){ MaxValue = Math.max(MaxValue, value.get()); } context.write(key , new IntWritable(MaxValue)); }}MaxTemperatureDriver.java:import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class MaxTemperatureDriver extends Configured implements Tool{ @Override public int run(String[] args) throws Exception { // 对 参数进行判断:参数个数不为2,打印错误信息 if (args.length != 2){ System.err.printf("Usage: %s <input><output>",getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; } Configuration conf =getConf(); @SuppressWarnings("deprecation") //不检测过期的方法 Job job = new Job(conf); job.setJobName("Max Temperature"); job.setJarByClass(getClass()); FileInputFormat.addInputPath(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); return job.waitForCompletion(true)?0:1; } public static void main(String[] args)throws Exception{ int exitcode = ToolRunner.run(new MaxTemperatureDriver(), args); System.exit(exitcode); }}

上传数据至hadoop集群:

这里写图片描述

原始数据: Temperature1:

1990 211990 181991 211992 301990 21

Temperature2:

1991 211990 181991 241992 301993 21

将程序打包上传至主节点某个目录下,执行

hadoop jar /data/jar/maxtemperature.jar hdfs://192.168.75.128:9000/input hdfs://192.168.75.128:9000/output/temperature

执行结果:

结果数据:

1990 211991 241992 301993 21

完整的MapReduce编程模型

Combiner:进行中间结果数据网络传输优化的工作。Combiner程序的执行是在Map节点完成计算之后、输出结果之前。

Partitioner:将所有主键相同的键值对传输给同一个Reduce节点。分区的过程在Map节点输出后、传入Reduce节点之前完成的。

下面是针对四组数据的MapReduce完整的并行编程模型:

“the weather is good”, “today is good”, “good weather is good”, “today has good weather”

这里写图片描述

完整的MapReduce编程模型

(1)用户程序会分成三个部分:Mapper,Reducer,Driver (2)Mapper的输入数据是KV对的形式,KV的类型可以设置 (3)Mapper的输出数据是KV对的形式,KV的类型可以设置 (4)Mapper中的业务逻辑写在map方法中 (5)map方法是每进来一个KV对调用一次 (6)Reducer的输入数据应该对应Mapper的输出数据,也是KV (7)Reducer的业务逻辑写在reduce方法中 (8)reduce方法是对每一个< key,valueList> 调用一次 (9)用户的Mapper和Reducer都要继承各自的父类 (10)整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job对象。

Hadoop系统架构和MapReduce执行流程

为了实现Hadoop系统设计中本地化计算的原则,数据存储节点DataNode与计算节点TaskTracker将合并设置,让每个从节点同时运行作为DataNode和TaskTracker,以此让每个Tasktracker尽量处理存储在本地DataNode上的数据。

而数据存储主控节点NameNode与作业执行主控节点JobTracker既可以设置在同一个主控节点上,在集群规模较大或者这两个主控节点负载都很高以至于互相影响时,也可以分开设置在两个不同的节点上。

这里写图片描述

Hadoop系统的基本组成构架

MapReduce程序的执行流程:

MapReduce执行一个用户提交的MapReduce程序的基本过程。

这里写图片描述

Hadoop MapReduce 程序执行流程

1) 首先,用户程序客户端通过作业客户端接口程序JobClient提交一个用户程序。 2) 然后JobClient向JobTracker提交作业执行请求并获得一个Job ID。 3) JobClient同时也会将用户程序作业和待处理的数据文件信息准备好并存储在HDFS中。 4) JobClient正式向JobTracker提交和执行该作业。 5) JobTracker接受并调度该作业,进行作业的初始化准备工作,根据待处理数据的实际分片情况,调度和分配一定的Map节点来完成作业。 6) JobTracker 查询作业中的数据分片信息,构建并准备相应的任务。 7) JobTracker 启动TaskTracker节点开始执行具体的任务。 8) TaskTracker根据所分配的具体任务,获取相应的作业数据。 9) TaskTracker节点创建所需要的Java虚拟机,并启动相应的Map任务(或Reduce任务)的执行。 10) TaskTracker执行完所分配的任务之后,若是Map任务,则把中间结果数据输出到HDFS中;若是Reduce任务,则输出最终结果。 11) TaskTracker向JobTracker报告所分配的任务的完成。若是Map任务完成并且后续还有Reduce任务,则JobTracker会分配和启动Reduce节点继续处理中间结果并输出最终结果。

参考学习资料:

1.HashMap和Hashtable的区别: http://www.importnew.com/7010.html 2.StringTokenizer类的使用方法: http://yacole.iteye.com/blog/41512


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表