首页 > 学院 > 开发设计 > 正文

Hadoop那些事儿(四)---MapReduce编程实例(基础)

2019-11-08 03:21:43
字体:
来源:转载
供稿:网友

前言

上一篇文章,以WordCount为例讲了一下MaPReduce的代码结构及运行机制,这篇文章将通过几个简单的例子进一步认识MapReduce。

1.数据检索

问题描述

假设有很多条数据,我们从中查找包含某个字符串的语句。

解决方案

这个问题比较简单,首先在Map中获取当前读取的文件的文件名作为key,将要解析的数据按句号分割,逐句判断,如果包含指定的字符串则作为value输出。在Reduce中对属于同一文件的语句进行合并,然后输出。

测试数据

输入: in1.txt:

浔阳江头夜送客,枫叶荻花秋瑟瑟。主人下马客在船,举酒欲饮无管弦。醉不成欢惨将别,别时茫茫江浸月。忽闻水上琵琶声,主人忘归客不发。寻声暗问弹者谁?琵琶声停欲语迟。移船相近邀相见,添酒回灯重开宴。千呼万唤始出来,犹抱琵琶半遮面。转轴拨弦三两声,未成曲调先有情。弦弦掩抑声声思,似诉平生不得志。低眉信手续续弹,说尽心中无限事。轻拢慢捻抹复挑,初为霓裳后六幺。大弦嘈嘈如急雨,小弦切切如私语。嘈嘈切切错杂弹,大珠小珠落玉盘。间关莺语花底滑,幽咽泉流冰下难。冰泉冷涩弦凝绝,凝绝不通声暂歇。别有幽愁暗恨生,此时无声胜有声。银瓶乍破水浆迸,铁骑突出刀枪鸣。曲终收拨当心画,四弦一声如裂帛。东船西舫悄无言,唯见江心秋月白。沉吟放拨插弦中,整顿衣裳起敛容。自言本是京城女,家在虾蟆陵下住。十三学得琵琶成,名属教坊第一部。曲罢曾教善才服,妆成每被秋娘妒。五陵年少争缠头,一曲红绡不知数。钿头银篦击节碎,血色罗裙翻酒污。今年欢笑复明年,秋月春风等闲度。弟走从军阿姨死,暮去朝来颜色故。门前冷落鞍马稀,老大嫁作商人妇。商人重利轻别离,前月浮梁买茶去。去来江口守空船,绕船月明江水寒。夜深忽梦少年事,梦啼妆泪红阑干。我闻琵琶已叹息,又闻此语重唧唧。同是天涯沦落人,相逢何必曾相识!我从去年辞帝京,谪居卧病浔阳城。浔阳地僻无音乐,终岁不闻丝竹声。住近湓江地低湿,黄芦苦竹绕宅生。其间旦暮闻何物?杜鹃啼血猿哀鸣。春江花朝秋月夜,往往取酒还独倾。岂无山歌与村笛?呕哑嘲哳难为听。今夜闻君琵琶语,如听仙乐耳暂明。莫辞更坐弹一曲,为君翻作《琵琶行》。感我此言良久立,却坐促弦弦转急。凄凄不似向前声,满座重闻皆掩泣。座中泣下谁最多?江州司马青衫湿。

in2.txt:

汉皇重色思倾国,御宇多年求不得。杨家有女初长成,养在深闺人未识。天生丽质难自弃,一朝选在君王侧。回眸一笑百媚生,六宫粉黛无颜色。春寒赐浴华清池,温泉水滑洗凝脂。侍儿扶起娇无力,始是新承恩泽时。云鬓花颜金步摇,芙蓉帐暖度春宵。春宵苦短日高起,从此君王不早朝。承欢侍宴无闲暇,春从春游夜专夜。后宫佳丽三千人,三千宠爱在一身。金屋妆成娇侍夜,玉楼宴罢醉和春。姊妹弟兄皆列土,可怜光彩生门户。遂令天下父母心,不重生男重生女。骊宫高处入青云,仙乐风飘处处闻。缓歌谩舞凝丝竹,尽日君王看不足。渔阳鼙鼓动地来,惊破霓裳羽衣曲。九重城阙烟尘生,千乘万骑西南行。翠华摇摇行复止,西出都门百余里。六军不发无奈何,宛转蛾眉马前死。花钿委地无人收,翠翘金雀玉搔头。君王掩面救不得,回看血泪相和流。黄埃散漫风萧索,云栈萦纡登剑阁。峨嵋山下少人行,旌旗无光日色薄。蜀江水碧蜀山青,圣主朝朝暮暮情。行宫见月伤心色,夜雨闻铃肠断声。天旋地转回龙驭,到此踌躇不能去。马嵬坡下泥土中,不见玉颜空死处。君臣相顾尽沾衣,东望都门信马归。归来池苑皆依旧,太液芙蓉未央柳。芙蓉如面柳如眉,对此如何不泪垂。春风桃李花开日,秋雨梧桐叶落时。西宫南内多秋草,落叶满阶红不扫。梨园弟子白发新,椒房阿监青娥老。夕殿萤飞思悄然,孤灯挑尽未成眠。迟迟钟鼓初长夜,耿耿星河欲曙天。鸳鸯瓦冷霜华重,翡翠衾寒谁与共。悠悠生死别经年,魂魄不曾来入梦。临邛道士鸿都客,能以精诚致魂魄。为感君王辗转思,遂教方士殷勤觅。排空驭气奔如电,升天入地求之遍。上穷碧落下黄泉,两处茫茫皆不见。忽闻海上有仙山,山在虚无缥渺间。楼阁玲珑五云起,其中绰约多仙子。中有一人字太真,雪肤花貌参差是。金阙西厢叩玉扃,转教小玉报双成。闻道汉家天子使,九华帐里梦魂惊。揽衣推枕起徘徊,珠箔银屏迤逦开。云鬓半偏新睡觉,花冠不整下堂来。风吹仙袂飘飘举,犹似霓裳羽衣舞。玉容寂寞泪阑干,梨花一枝春带雨。含情凝睇谢君王,一别音容两渺茫。昭阳殿里恩爱绝,蓬莱宫中日月长。回头下望人寰处,不见长安见尘雾。惟将旧物表深情,钿合金钗寄将去。钗留一股合一扇,钗擘黄金合分钿。但教心似金钿坚,天上人间会相见。临别殷勤重寄词,词中有誓两心知。七月七日长生殿,夜半无人私语时。在天愿作比翼鸟,在地愿为连理枝。天长地久有时尽,此恨绵绵无绝期。

in3.txt:

春江潮水连海平,海上明月共潮生。滟滟随波千万里,何处春江无月明!江流宛转绕芳甸,月照花林皆似霰;空里流霜不觉飞,汀上白沙看不见。江天一色无纤尘,皎皎空中孤月轮。江畔何人初见月?江月何年初照人?人生代代无穷已,江月年年只相似。不知江月待何人,但见长江送流水。白云一片去悠悠,青枫浦上不胜愁。谁家今夜扁舟子?何处相思明月楼?可怜楼上月徘徊,应照离人妆镜台。玉户帘中卷不去,捣衣砧上拂还来。此时相望不相闻,愿逐月华流照君。鸿雁长飞光不度,鱼龙潜跃水成文。昨夜闲潭梦落花,可怜春半不还家。江水流春去欲尽,江潭落月复西斜。斜月沉沉藏海雾,碣石潇湘无限路。不知乘月几人归,落月摇情满江树。

预期结果:

in1.txt 春江花朝秋月夜,往往取酒还独倾---|---去来江口守空船,绕船月明江水寒---|---商人重利轻别离,前月浮梁买茶去---|---今年欢笑复明年,秋月春风等闲度---|---东船西舫悄无言,唯见江心秋月白---|---醉不成欢惨将别,别时茫茫江浸月---|---in2.txt 七月七日长生殿,夜半无人私语时---|---昭阳殿里恩爱绝,蓬莱宫中日月长---|---行宫见月伤心色,夜雨闻铃肠断声---|---in3.txt 不知乘月几人归,落月摇情满江树---|---斜月沉沉藏海雾,碣石潇湘无限路---|---江水流春去欲尽,江潭落月复西斜---|---此时相望不相闻,愿逐月华流照君---|---可怜楼上月徘徊,应照离人妆镜台---|---谁家今夜扁舟子?何处相思明月楼?---|---不知江月待何人,但见长江送流水---|---人生代代无穷已,江月年年只相似---|---江畔何人初见月?江月何年初照人?---|---江天一色无纤尘,皎皎空中孤月轮---|---江流宛转绕芳甸,月照花林皆似霰;---|---滟滟随波千万里,何处春江无月明!---|---春江潮水连海平,海上明月共潮生---|---

以上例子是检索文件中包含“”字的诗句。

看图说话

通过下面的图来看具体的流程: 这里写图片描述

代码

package train;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Mapper.Context;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import train.InvertedIndex.Combine;import train.InvertedIndex.Map;import train.InvertedIndex.Reduce;/** * 查找包含指定字符串的句子 * @author hadoop * */public class Search { public static class Map extends Mapper<Object,Text,Text,Text>{ private static final String word = "月"; private FileSplit fileSplit; public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ fileSplit = (FileSplit)context.getInputSplit(); String fileName = fileSplit.getPath().getName().toString(); //按句号分割 StringTokenizer st = new StringTokenizer(value.toString(),"。"); while(st.hasMoreTokens()){ String line = st.nextToken().toString(); if(line.indexOf(word)>=0){ context.write(new Text(fileName),new Text(line)); } } } } public static class Reduce extends Reducer<Text,Text,Text,Text>{ public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ String lines = ""; for(Text value:values){ lines += value.toString()+"---|---"; } context.write(key, new Text(lines)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("mapred.job.tracker", "localhost:9001"); args = new String[]{"hdfs://localhost:9000/user/hadoop/input/search_in","hdfs://localhost:9000/user/hadoop/output/search_out"}; //检查运行命令 String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length != 2){ System.err.println("Usage search <int> <out>"); System.exit(2); } //配置作业名 Job job = new Job(conf,"search"); //配置作业各个类 job.setJarByClass(InvertedIndex.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}

在map中,通过context.getInputSplit()获取到数据所在的文件,然后将读取的数据按句号分隔,并遍历,如果包含指定字符“月”,则将文件名作为key,该句作value写出。

在reduce中是一个简单的合并的过程。

2.最大值 最小值 平均数

问题描述

给定一批数字,获取其中的最大值 最小值 以及求得平均数

解决方案

这个问题也很简单,首先在map中读取数据并进行切割,定义一个递增的数字作key,切下来的数字作为value.在reduce中遍历value,计算数量并求和同时比较大小获取最大最小值,最后求其平均数

测试数据

输入

in1.txt

1 1 1 1 1 1 1 1 1 1 5 5 5 5 5 5 5 5 5 5

in2.txt

5 8 10 17 328 9 13 32 21

预期结果

平均数 11最大值 32最小值 1

看图说话

这里写图片描述

代码

package train;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import test.WordCount;/** * 计算平均数 * @author hadoop * */public class Average1 { public static class Map extends Mapper<Object,Text,IntWritable,IntWritable>{ private static IntWritable no = new IntWritable(1); //计数作为key private Text number = new Text(); //存储切下的数字 public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ StringTokenizer st = new StringTokenizer(value.toString()); while(st.hasMoreTokens()){ number.set(st.nextToken()); context.write(no, new IntWritable(Integer.parseInt(number.toString()))); } } } public static class Reduce extends Reducer<IntWritable,IntWritable,Text,IntWritable>{ //定义全局变量 int count = 0; //数字的数量 int sum = 0; //数字的总和 int max = -2147483648; int min = 2147483647; public void reduce(IntWritable key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{ for(IntWritable val:values){ if(val.get()>max){ max = val.get(); } if(val.get()<min){ min = val.get(); } count++; sum+=val.get(); } int average = (int)sum/count; //计算平均数 //System.out.println(sum+"--"+count+"--"+average); context.write(new Text("平均数"), new IntWritable(average)); context.write(new Text("最大值"), new IntWritable(max)); context.write(new Text("最小值"), new IntWritable(min)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub Configuration conf = new Configuration(); //conf.set("mapred.job.tracker", "localhost:9001"); conf.addResource("config.xml"); args = new String[]{"hdfs://localhost:9000/user/hadoop/input/average1_in","hdfs://localhost:9000/user/hadoop/output/average1_out"}; //检查运行命令 String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length != 2){ System.err.println("Usage WordCount <int> <out>"); System.exit(2); } //配置作业名 Job job = new Job(conf,"average1 "); //配置作业各个类 job.setJarByClass(Average1.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); //Mapper的输出类型 *强调内容* job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}

3.平均成绩

问题描述

给定三个输入文件,每个文件中分别写有多个学生的数学 英语 语文成绩,求每个学生三科的平均成绩。

解决方案

这个问题同样很简单,在map中解析数据并以学生名字作为key,成绩作为value输出。

测试数据

输入:

in1.txt

张三 80李四 83王五 91赵六 88

in2.txt

张三 92李四 100王五 94赵六 88

in3.txt

张三 89李四 98王五 84赵六 93

预期结果

张三 87李四 93王五 89赵六 89

看图说话

这里写图片描述

代码

package train;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.Reducer.Context;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import train.Average1.Map;import train.Average1.Reduce;/** * 计算每个学生的平均成绩 * @author hadoop * */public class Average2 { public static class Map extends Mapper<Object,Text,Text,IntWritable>{ public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ //按行分割数据 StringTokenizer st = new StringTokenizer(value.toString(),"/n"); while(st.hasMoreTokens()){ //按空格分割每行数据 StringTokenizer stl = new StringTokenizer(st.nextToken()); String name = stl.nextToken(); String score = stl.nextToken(); //名字 分数 context.write(new Text(name), new IntWritable(Integer.parseInt(score))); } } } public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{ public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{ int count = 0; //数量 int sum = 0; //总和 for(IntWritable val:values){ count++; sum+=val.get(); } int average = (int)sum/count; //计算平均数 System.out.println(sum+"--"+count+"--"+average); context.write(key, new IntWritable(average)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub Configuration conf = new Configuration(); //conf.set("mapred.job.tracker", "localhost:9001"); conf.addResource("config.xml"); args = new String[]{"hdfs://localhost:9000/user/hadoop/input/average2_in","hdfs://localhost:9000/user/hadoop/output/average2_out"}; //检查运行命令 String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length != 2){ System.err.println("Usage WordCount <int> <out>"); System.exit(2); } //配置作业名 Job job = new Job(conf,"average1 "); //配置作业各个类 job.setJarByClass(Average2.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); //Mapper的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}

4.数据去重

问题描述

给定几组数据,对数据进行去重操作并输出

解决方案

在shuffing洗牌阶段时,会按照key进行归类,所以数据到达reduce方法时,key值是唯一的,只要将从文件中读取的数据作为key值输出即可,而value值置空即可。

测试数据

输入 in1.txt

Etoak-001Etoak-002Etoak-003Etoak-002Etoak-004Etoak-005Etoak-006Etoak-001Etoak-007Etoak-008

in2.txt

Etoak-009Etoak-010Etoak-011Etoak-012Etoak-013Etoak-009Etoak-014Etoak-015Etoak-011Etoak-016

预期结果:

Etoak-001 Etoak-002 Etoak-003 Etoak-004 Etoak-005 Etoak-006 Etoak-007 Etoak-008 Etoak-009 Etoak-010 Etoak-011 Etoak-012 Etoak-013 Etoak-014 Etoak-015 Etoak-016

看图说话

这里写图片描述

代码

package train;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import test.WordCount;/** * 数据去重 * @author hadoop * */public class Duplicate { //输出键Text,输出值为Text public static class Map extends Mapper<Object,Text,Text,Text>{ //在Map中直接将从文件中接收到的数据的value作为key写到输出中,value为空即可 public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ context.write(value, new Text("")); } } //上面map的阶段的结果经过shuffle洗牌后将传递给reduce //在reduce阶段,直接将获取到的数据的key作为输出key,value置空 public static class Reduce extends Reducer<Text,Text,Text,Text>{ public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ context.write(key, new Text("")); System.out.println(key); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("mapred.job.tracker", "localhost:9001"); args = new String[]{"hdfs://localhost:9000/user/hadoop/input/duplicate_in","hdfs://localhost:9000/user/hadoop/output/duplicate_out"}; //检查运行命令 String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length != 2){ System.err.println("Usage Duplicate <int> <out>"); System.exit(2); } //配置作业名 Job job = new Job(conf,"duplicate"); //配置作业各个类 job.setJarByClass(Duplicate.class); job.setMapperClass(Map.class); job.setCombinerClass(Reduce.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}

5.排序

问题描述

将给定的一组数据按升序进行排序,并给出每个数字的次序

解决方案

使用mapreduce默认的排序规则,对于Intwritable类型的数据按照key值大小进行排序

测试数据

输入: in1.txt:

901499915889

in2.txt:

6554322110

in3.txt:

109218

预期结果:

1 01 02 13 84 94 94 95 106 147 158 218 219 3210 5411 6512 8813 999

看图说话

这里写图片描述

代码

package train;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import train.Duplicate.Map;import train.Duplicate.Reduce;/** * 升序排序(使用mapreduce提供的默认排序规则) * 对于IntWritable类型的数据,按key值大小进行排序 * @author hadoop * */public class Sort { //将输入数据的value装换为int类型并作为key输出 public static class Map extends Mapper<Object,Text,IntWritable,IntWritable>{ private static IntWritable numble = new IntWritable(); private static final IntWritable one = new IntWritable(1); public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ String line = value.toString(); numble.set(Integer.parseInt(line)); context.write(numble, one); } } //全局num确定每个数字的顺序位次 //遍历values来确定每个数字输出的次数 public static class Reduce extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{ private static IntWritable num = new IntWritable(1); public void reduce(IntWritable key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{ //System.out.println(key+" "+num); for(IntWritable value:values){ context.write(num, key); System.out.println(key+"--"+value+"--"+num); } num = new IntWritable(num.get()+1); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("mapred.job.tracker", "localhost:9001"); args = new String[]{"hdfs://localhost:9000/user/hadoop/input/sort_in","hdfs://localhost:9000/user/hadoop/output/sort_out"}; //检查运行命令 String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length != 2){ System.err.println("Usage Sort <int> <out>"); System.exit(2); } //配置作业名 Job job = new Job(conf,"sort"); //配置作业各个类 job.setJarByClass(Sort.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}

注意,这个代码不需要设置combin,否则结果会不一致,因为会多一次合并

6.倒排索引

问题描述

有多条数据,对数据按照属性值进行分组,比如对于多条语句,按所含的单词进行分组

测试数据

输入: in1.txt

Life is brief , and then you die, you know ?

in2.txt:

Innovation distinguishes between a leader and a follower

in3.txt

We're here to put a dent in the universe . Otherwise why else even be here ?

预期结果:

, in1.txt:1;. in3.txt:1;? in3.txt:1;Innovation in2.txt:1;Life in1.txt:1;Otherwise in3.txt:1;We're in3.txt:1;a in3.txt:1;in2.txt:2;and in2.txt:1;in1.txt:1;be in3.txt:1;between in2.txt:1;brief in1.txt:1;dent in3.txt:1;die, in1.txt:1;distinguishes in2.txt:1;else in3.txt:1;even in3.txt:1;follower in2.txt:1;here in3.txt:2;in in3.txt:1;is in1.txt:1;know in1.txt:1;leader in2.txt:1;put in3.txt:1;the in3.txt:1;then in1.txt:1;to in3.txt:1;universe in3.txt:1;why in3.txt:1;you in1.txt:2;? in1.txt:1;

看图说话

这里写图片描述

代码

package train;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;/** * 倒排索引 * @author hadoop * */public class InvertedIndex { //输出值:key为单词+文件地址 value为频数,均指定1 public static class Map extends Mapper<Object,Text,Text,Text>{ private Text keyStr = new Text(); private Text valueStr = new Text(); private FileSplit fileSplit; public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ //获取输入文件信息 fileSplit = (FileSplit)context.getInputSplit(); //按空格切割 StringTokenizer st = new StringTokenizer(value.toString().trim()); while(st.hasMoreTokens()){ String filePath = fileSplit.getPath().getName().toString(); keyStr.set(st.nextToken()+":"+filePath); valueStr.set("1"); context.write(keyStr,valueStr); } } } //合并频数 //输出:key为单词 value为文件地址+频数 public static class Combine extends Reducer<Text,Text,Text,Text>{ private Text newValue = new Text(); public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ int sum = 0; //合并频数 for(Text value:values){ sum += Integer.parseInt(value.toString()); } //拆分原有key,将单词作为新key,文件地址+频数 作为value int index = key.toString().indexOf(":"); String word = key.toString().substring(0,index); String filePath = key.toString().substring(index+1,key.toString().length()); key.set(word); newValue.set(filePath+":"+sum); context.write(key,newValue); } } //将每个单词对应的多个文件及频数整合到一行 public static class Reduce extends Reducer<Text,Text,Text,Text>{ Text newValue = new Text(); public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ String files = ""; for(Text value:values){ files += value+";"; } newValue.set(files); context.write(key,newValue); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("mapred.job.tracker", "localhost:9001"); args = new String[]{"hdfs://localhost:9000/user/hadoop/input/invertedIndex_in","hdfs://localhost:9000/user/hadoop/output/invertedIndex_out"}; //检查运行命令 String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length != 2){ System.err.println("Usage invertedIndex <int> <out>"); System.exit(2); } //配置作业名 Job job = new Job(conf,"invertedIndex"); //配置作业各个类 job.setJarByClass(InvertedIndex.class); job.setMapperClass(Map.class); job.setCombinerClass(Combine.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表