首页 > 网站 > 建站经验 > 正文

Hadoop MapReduce多输出详细介绍

2019-11-02 16:44:53
字体:
来源:转载
供稿:网友

Hadoop MapReduce多输出

FileOutputFormat及其子类产生的文件放在输出目录下。每个reducer一个文件并且文件由分区号命名:part-r-00000,part-r-00001,等等。有时可能要对输出的文件名进行控制或让每个reducer输出多个文件。MapReduce为此提供了MultipleOutputFormat类。

MultipleOutputFormat类可以将数据写到多个文件,这些文件的名称源于输出的键和值或者任意字符串。这允许每个reducer(或者只有map作业的mapper)创建多个文件。采用name-r-nnnnn形式的文件名用于map输出,name-r-nnnnn形式的文件名用于reduce输出,其中name是由程序设定的任意名字,nnnnn是一个指名块号的整数(从0开始)。块号保证从不同块(mapper或者reducer)写的输出在相同名字情况下不会冲突。

1. 重定义输出文件名

我们可以对输出的文件名进行控制。考虑这样一个需求:按男女性别来区分度假订单数据。这需要运行一个作业,作业的输出是男女各一个文件,此文件包含男女性别的所有数据记录。

这个需求可以使用MultipleOutputs来实现:

package com.sjf.open.test;import java.io.IOException;import org.apache.commons.lang3.StringUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.GzipCodec;import org.apache.hadoop.mapred.JobPriority;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import com.sjf.open.utils.ConfigUtil;/** * Created by xiaosi on 16-11-7. */public class VacationOrderBySex extends Configured implements Tool {  public static void main(String[] args) throws Exception {    int status = ToolRunner.run(new VacationOrderBySex(), args);    System.exit(status);  }  public static class VacationOrderBySexMapper extends Mapper<LongWritable, Text, Text, Text> {    public String fInputPath = "";    @Override    protected void setup(Context context) throws IOException, InterruptedException {      super.setup(context);      fInputPath = ((FileSplit) context.getInputSplit()).getPath().toString();    }    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {      String line = value.toString();      if(fInputPath.contains("vacation_hot_country_order")){        String[] params = line.split("/t");        String sex = params[2];        if(StringUtils.isBlank(sex)){          return;        }        context.write(new Text(sex.toLowerCase()), value);      }    }  }  public static class VacationOrderBySexReducer extends Reducer<Text, Text, NullWritable, Text> {    private MultipleOutputs<NullWritable, Text> multipleOutputs;    @Override    protected void setup(Context context) throws IOException, InterruptedException {      multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);    }    @Override    protected void reduce(Text key, Iterable<Text> values, Context context)        throws IOException, InterruptedException {      for (Text value : values) {        multipleOutputs.write(NullWritable.get(), value, key.toString());      }    }    @Override    protected void cleanup(Context context) throws IOException, InterruptedException {      multipleOutputs.close();    }  }  @Override  public int run(String[] args) throws Exception {    if (args.length != 2) {      System.err.println("./run <input> <output>");      System.exit(1);    }    String inputPath = args[0];    String outputPath = args[1];    int numReduceTasks = 16;    Configuration conf = this.getConf();    conf.setBoolean("mapred.output.compress", true);    conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);    Job job = Job.getInstance(conf);    job.setJobName("vacation_order_by_jifeng.si");    job.setJarByClass(VacationOrderBySex.class);    job.setMapperClass(VacationOrderBySexMapper.class);    job.setReducerClass(VacationOrderBySexReducer.class);    job.setMapOutputKeyClass(Text.class);    job.setMapOutputValueClass(Text.class);    job.setOutputKeyClass(NullWritable.class);    job.setOutputValueClass(Text.class);    FileInputFormat.setInputPaths(job, inputPath);    FileOutputFormat.setOutputPath(job, new Path(outputPath));    job.setNumReduceTasks(numReduceTasks);    boolean success = job.waitForCompletion(true);    return success ? 0 : 1;  }}
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表