首页 > 学院 > 开发设计 > 正文

Hadoop多文件(目录)输出

2019-11-06 08:37:29
字体:
来源:转载
供稿:网友

需求:在maPReduce中输出两张表,每张表对应一个目录, 格式要求snappy.parquet

        通过代码来分析:

MultipleInputs.addInputPath(Job job, Path path, Class<? extends InputFormat> inputFormatClass, Class<? extends Mapper> mapperClass)        输入的文件可以添加多个,从Hadoop提供的API就可以看出,但是,看如下的代码

FileOutputFormat.setOutputPath(job, outputPath);job.setOutputFormatClass(ExampleOutputFormat.class);ExampleOutputFormat.setSchema(job, MessageTypeParser.parseMessageType(OUTPUT_SCHEMA));ExampleOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);

        输出的时候,是采用的set方法,从表面来看也就是说只支持一个目录的输出,想要满足我的需求,需要怎么办呢?

方法一:通过FileSystem进行写文件

FSDataOutputStream output = fileSystem.create(path, true);...output.writeBytes(String);        但是发现这样写文本文件好像很方便,写压缩格式的文件好像行不通,当然,这个方案也没有深入调研,只是因为没饭看到它提供的有设置文件格式的参数或方法,顾作此判断。

        如果判断错误,希望能够指正。

方法二:直接写压缩文件

ParquetWriter<Group> writer = new ParquetWriter<Group>(new Path(path + ".parquet"), new GroupWriteSupport(), CompressionCodecName.SNAPPY, 1024, 1024, 512, true, false, WriterVersion.PARQUET_1_0, conf);

        这种方案来源于方案一,既然写纯文本文件可以,那么应该也可以直接写压缩格式的文件,所以可以看到这里设置了很多参数,目的要和我的需求中的目标保持一致,事实上这个方案成功了,写出来了压缩文件,但是当文件数很多时,hiveDB或impalaDB做查询的时候特别的慢,总是超时,而context.write()方法写出来的文件就能正常查询,并且两者文件的数量级在一个级别。暂时没有找出原因,猜测一下也学是我写的压缩格式的某些参数设置的不是很好,导致查询的时候特别慢。

        于是就去找新的解决方案,功夫不负有心人,我有又找到了一种方案。

方法三:MultipleOutputs设置新的输出路径

FileOutputFormat.setOutputPath(job, outputPath);MultipleOutputs.addNamedOutput(job, "output", ExampleOutputFormat.class, NullWritable.class, Group.class);job.setOutputFormatClass(ExampleOutputFormat.class);ExampleOutputFormat.setSchema(job, MessageTypeParser.parseMessageType(OUTPUT_SCHEMA));ExampleOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);

        可以看到,我在之前的基础上又加了一句,添加一个namedOutput,然后我在reducer中

MultipleOutputs<NullWritable, Group> writer = new MultipleOutputs<NullWritable, Group>(context);        初始化了write对象,然后使用write去把group对象写进目标路径:

writer.write("output", null, group, outputBasePath + "/");        这样就也实现了压缩文件的输出,经测试,查询起来和使用context.write()方法写出的文件几乎差不多。

        接下来总结一下方法三中的技术点:

MultiPleOutputs原理

        MapReduce job中,可以使用FileInputFormat和FileOutputFormat来对输入路径和输出路径来进行设置。在输出目录中,框架自己会自动对输出文件进行命名和组织,如part-(m|r)-00000之类,但有时为了后续流程的方便,我们常需要对输出结果进行一定的分类和组织。以前常用的方法是在MR job运行之后,用脚本对目录下的数据进行一次重新组织,变成我们需要的格式。

        研究了一下MR框架中的MultipleOutputs(是2.0之后的新API,是对老版本中MultipleOutputs与MultipleOutputFormat的一个整合)。

        在一般情况下,Hadoop每一个Reducer产生一个输出文件,文件以part-r-00000,part-r-00001的方式进行命名,如果需要认为的控制输出文件的命名或者每一个Reducer需要写出多个输出文件时,可以采用MultipleOutputs类来完成,MultipleOutputs采用输出记录的键值对(output Key和output Value)或者任意字符串来生成输出文件的名字,文件一般以name-r-nnnnn的格式进行命名,其中name是程序设计的任意名字;nnnnn表示分区号。

使用方法

1.驱动中不需要额外改变,只需要在MapClass或Reduce类中加入以下代码

private MultipleOutputs<Text,IntWritable> mos;public void setup(Context context) throws Exception{    mos=new MultipleOutputs(context);}public void cleanup(Context context) throws Exeception{    mos.close();}

2.然后就可以用mos.write(Key key,Value value,String baSEOutputPath)代替context.write(key,value);

        在MapClass输出时也会有默认的文件part-m-00*或part-r-00*,不过这些文件是无内容的,大小为0。而且只有part-m-00*会传给reduce。

注意:

        multipleOutputs.write(key,value,baseOutputPath)方法的第三个参数表明了该输出所在的目录(相当于用户指定的输出目录)。如果baseOutputPath不包含文件分隔符"/",那么输出的文件格式为baseOutputPath-r-nnnnn(name-r-nnnnn).

        如果包含文件分隔符"/".例如baseOutputPath="029070-99999/1901/part",那么输出文件则为027070-99999/1901/part-r-nnnnn.

3.最后在job的类中设置输出文件类型

MultipleOutputs.addNamedOutput(job, namedOutput, outputFormatClass, keyClass, valueClass);

参考:

MapReduce处理输出多文件格式(MultipleOutputs)

MultipleOutputFormat和MultipleOutputs

API文档


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表