首页 > 学院 > 开发设计 > 正文

通过mapreduce程序读取hdfs文件写入hbase

2019-11-08 20:47:06
字体:
来源:转载
供稿:网友
1.在eclipse上面创建maven工程pom文件如下:<PRoject xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>hadoop-hbase-maven</groupId> <artifactId>hadoop-bahase-maven</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.0.3</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.0.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.7</version> <scope>system</scope> <systemPath>${java_HOME}/lib/tools.jar</systemPath> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build></project>2.java程序如下:package com.lijie.hbase;import java.io.IOException;import java.text.SimpleDateFormat;import java.util.Calendar;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.client.Mutation;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;public class Hadoop2Hbase { @SuppressWarnings("deprecation") public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("hbase.zookeeper.quorum", "lijie"); conf.set(TableOutputFormat.OUTPUT_TABLE, "t1"); Job job = new Job(conf, "Hadoop2Hbase"); TableMapReduceUtil.addDependencyJars(job); job.setJarByClass(Hadoop2Hbase.class); job.setMapperClass(HbaseMapper.class); job.setReducerClass(HbaseReducer.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TableOutputFormat.class); FileInputFormat.setInputPaths(job, "hdfs://192.168.80.123:9000/mytest/*"); job.waitForCompletion(true); } static class HbaseMapper extends Mapper<LongWritable, Text, LongWritable, Text> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException { SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); String[] split = value.toString().split("/t"); context.write( key, new Text(split[0]+sdf.format(Calendar.getInstance().getTime()) + "/t" + value.toString())); } } static class HbaseReducer extends TableReducer<LongWritable, Text, NullWritable> { @Override protected void reduce( LongWritable key, Iterable<Text> values, Reducer<LongWritable, Text, NullWritable, Mutation>.Context context) throws IOException, InterruptedException { for (Text text : values) { String[] split = text.toString().split("/t"); Put put = new Put(split[0].getBytes()); put.addColumn("cf".getBytes(), "oneColumn".getBytes(), text .toString().getBytes()); put.addColumn("cf".getBytes(), "id".getBytes(), split[1].getBytes()); put.addColumn("cf".getBytes(), "name".getBytes(), split[2].getBytes()); put.addColumn("cf".getBytes(), "age".getBytes(), split[3].getBytes()); put.addColumn("cf".getBytes(), "addr".getBytes(), split[4].getBytes()); context.write(NullWritable.get(), put); } } }}3.在hbase上面创建表t1hbase(main):001:0> create 't1','cf'0 row(s) in 0.7700 seconds4.模拟一个文件上传到hdfs1001 lijie 24 shengzhen1002 zhangsan 25 chongqing1003 lisi 30 shanghai5.执行程序,并查看t1表信息hbase(main):002:0> scan 't1'ROW COLUMN+CELL 100120170209144933 column=cf:addr, timestamp=1486651412673, value=shengzhen 100120170209144933 column=cf:age, timestamp=1486651412673, value=24 100120170209144933 column=cf:id, timestamp=1486651412673, value=1001 100120170209144933 column=cf:name, timestamp=1486651412673, value=lijie 100120170209144933 column=cf:oneColumn, timestamp=1486651412673, value=100120170209144933/x091001/x09lijie/x0924/x09shengzhen 100220170209144933 column=cf:addr, timestamp=1486651412673, value=chongqing 100220170209144933 column=cf:age, timestamp=1486651412673, value=25 100220170209144933 column=cf:id, timestamp=1486651412673, value=1002 100220170209144933 column=cf:name, timestamp=1486651412673, value=zhangsan 100220170209144933 column=cf:oneColumn, timestamp=1486651412673, value=100220170209144933/x091002/x09zhangsan/x0925/x09chongqing 100320170209144933 column=cf:addr, timestamp=1486651412673, value=shanghai 100320170209144933 column=cf:age, timestamp=1486651412673, value=30 100320170209144933 column=cf:id, timestamp=1486651412673, value=1003 100320170209144933 column=cf:name, timestamp=1486651412673, value=lisi 100320170209144933 column=cf:oneColumn, timestamp=1486651412673, value=100320170209144933/x091003/x09lisi/x0930/x09shanghai 3 row(s) in 0.1400 seconds

一个简单的demo分享!


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表