首页 > 学院 > 开发设计 > 正文

hadoop-mongo map/reduce java

2019-11-15 00:09:33
字体:
来源:转载
供稿:网友
hadoop-mongo map/reduce java

官方 http://docs.mongodb.org/ecosystem/tutorial/getting-started-with-hadoop/

mongo-haoop项目地址https://github.com/mongodb/mongo-hadoop

该代码托管 https://github.com/cclient/mongo_hadoop_map-reduce

原分析由nodejs+async编写

用游标迭代查询mongo数据库,分析数据

因数据量较大,目前执行分析任务耗时4个小时,这只是极限数据量的1%

为优化,采用hadoop-mongo 方案

优点:mongo只能单机单线程(不作shard的情况),hadoop-mongo可以集群处理。

完成代码

近期一直写的脚本语言,再回头写点JAVA,好悲催,感觉很受限制。

初步代码 很粗糙

MAIN 入口

 1 package group.artifactid; 2  3 //cc MaxTemperature application to find the maximum temperature in the weather dataset 4 //vv MaxTemperature 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.MapWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.maPReduce.Job;10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;12 13 import com.mongodb.hadoop.MongoConfig;14 import com.mongodb.hadoop.io.BSONWritable;15 import com.mongodb.hadoop.util.MongoTool;16 17 import com.mongodb.hadoop.MongoConfig;18 import com.mongodb.hadoop.MongoInputFormat;19 import com.mongodb.hadoop.MongoOutputFormat;20 import com.mongodb.hadoop.util.MapredMongoConfigUtil;21 import com.mongodb.hadoop.util.MongoConfigUtil;22 import com.mongodb.hadoop.util.MongoTool;23 import org.apache.hadoop.conf.Configuration;24 import org.apache.hadoop.io.IntWritable;25 import org.apache.hadoop.util.ToolRunner;26 27 public class MongoMaxTemperature extends MongoTool {28     public MongoMaxTemperature() {29         Configuration conf = new Configuration();30         MongoConfig config = new MongoConfig(conf);31         setConf(conf);32         MongoConfigUtil.setInputFormat(getConf(), MongoInputFormat.class);33         MongoConfigUtil.setOutputFormat(getConf(), MongoOutputFormat.class);34         config.setInputURI("mongodb://localhost:27017/db1.collection1");35         config.setMapper(MongoMaxTemperatureMapper.class);36         // Combiner37         config.setCombiner(MongoMaxTemperatureCombine.class);38         // config.setReducer(MongoMaxTemperatureReducer.class);39         config.setReducer(MongoMaxTemperatureReducerCombine.class);40         config.setMapperOutputKey(Text.class);41         config.setMapperOutputValue(Text.class);42         config.setOutputKey(Text.class);43         config.setOutputValue(BSONWritable.class);44         config.setOutputURI("mongodb://localhost:27017/db2.collection2");45     }46 47     public static void main(String[] args) throws Exception {48         System.exit(ToolRunner.run(new MongoMaxTemperature(), args));49     }50 }

MAPER代码

package group.artifactid;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.bson.BSONObject;import com.mongodb.hadoop.io.BSONWritable;public class MongoMaxTemperatureMapper extends        Mapper<Object, BSONObject, Text, Text> {    @Override    public void map(final Object key, BSONObject val, Context context)            throws IOException, InterruptedException {        String apmac = (String) val.get("apMac");        String clientmac = (String) val.get("clientMac");        String url = (String) val.get("url");        String proto = (String) val.get("proto");        if (proto.equals("http")&&!url.equals("")) {            if (url.indexOf("http://") == 0) {                url = url.substring(7);            }            int firstargindex = url.indexOf('/');            if(firstargindex>-1){                url = url.substring(0, firstargindex);                }            //验证输入 带.则参数错误,临时转为}            url=url.replace('.','}');            context.write(new Text(apmac), new Text(clientmac + url));        }    }}

COMBINE代码

package group.artifactid;import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.Comparator;import java.util.List;import java.util.Map;import com.mongodb.hadoop.io.BSONWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import org.bson.BasicBSONObject;public class MongoMaxTemperatureReducerCombine extends        Reducer<Text, Text, Text, BSONWritable> {    public class UrlCount {        public UrlCount(String url, int count) {            this.Url = url;            this.Count = count;        }        String Url;        int Count;    }    public List<UrlCount> compresstopobj(BasicBSONObject topobj, int topnum) {        List<UrlCount> studentList = new ArrayList<UrlCount>();        for (Map.Entry<String, Object> entry : topobj.entrySet()) {            String Url = entry.getKey();            String scount = entry.getValue().toString();            studentList.add(new UrlCount(Url, Integer.parseInt(scount)));        }        Collections.sort(studentList, new Comparator<UrlCount>() {            @Override            public int compare(UrlCount o1, UrlCount o2) {                if (o1.Count > o2.Count) {                    return -1;                } else if (o1.Count < o2.Count) {                    return 1;                } else {                    return 0;                }            }        });//        System.out.print("--------这里排序成功,但入库时,mongo按键名()排序,这里的排序是为筛选前100条用/n");//        for (int i = 0; i < studentList.size(); i++) {//            System.out.print(studentList.get(i).Count + "/n");//        }        if (studentList.size() > topnum) {            studentList = studentList.subList(0, topnum);        }        return studentList;    }    @Override    public void reduce(Text apmac, Iterable<Text> values, Context context)            throws IOException, InterruptedException {        BasicBSONObject clientmacmap = new BasicBSONObject();        int count = 0;        for (Text value : values) {            String subline = value.toString();            String clientmac = subline.substring(0, 17);            int indexcount = subline.indexOf("|");            int maplastcount = 1;            String url = null;            if (indexcount > -1) {                indexcount++;                url = subline.substring(17, indexcount);                String mapcount = subline.substring(indexcount);                maplastcount = Integer.parseInt(mapcount);            } else {                url = subline.substring(17);            }            BasicBSONObject urlmap = (BasicBSONObject) clientmacmap                    .get(clientmac);            if (urlmap == null) {                urlmap = new BasicBSONObject();                clientmacmap.put(clientmac, urlmap);            }            Object eveurl = urlmap.get(url);            if (eveurl == null && !url.equals(" ")) {                urlmap.put(url, maplastcount);            } else {                urlmap.put(url, Integer.parseInt(eveurl.toString())                        + maplastcount);            }            count++;            if (count == 10000) {                List<UrlCount> arr = compresstopobj(urlmap, 100);                BasicBSONObject newurlcmap = new BasicBSONObject();                for (int i = 0; i < arr.size(); i++) {                    UrlCount cuc = arr.get(i);                    newurlcmap.put(cuc.Url, cuc.Count);                }                urlmap = newurlcmap;            }        }        for (Map.Entry<String, Object> entry : clientmacmap.entrySet()) {            BasicBSONObject urlmap = (BasicBSONObject) entry.getValue();            List<UrlCount> arr = compresstopobj(urlmap, 100);            BasicBSONObject newurlcmap = new BasicBSONObject();            for (int i = 0; i < arr.size(); i++) {                UrlCount cuc = arr.get(i);                newurlcmap.put(cuc.Url, cuc.Count);            }            urlmap = newurlcmap;        }        context.write(apmac, new BSONWritable(clientmacmap));    }}

REDUCER代码

package group.artifactid;import java.io.DataOutputStream;import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.Comparator;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.TreeSet;import com.mongodb.hadoop.io.BSONWritable;import org.apache.commons.io.output.ByteArrayOutputStream;import org.apache.hadoop.io.ArrayWritable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.MapWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.util.StringUtils;import org.apache.zookeeper.server.util.SerializeUtils;import org.bson.BasicBSONObject;public class MongoMaxTemperatureReducer extends        Reducer<Text, Text, Text, BSONWritable> {    public class UrlCount {        public UrlCount(String url, int count) {            this.Url = url;            this.Count = count;        }        String Url;        int Count;    }    class SortByCount implements Comparator {        public int compare(Object o1, Object o2) {            UrlCount s1 = (UrlCount) o1;            UrlCount s2 = (UrlCount) o2;            if (s1.Count > s2.Count)                return 1;            return 0;        }    }    public List<UrlCount> compresstopobj(BasicBSONObject topobj, int topnum) {        List<UrlCount> studentList = new ArrayList<UrlCount>();        for (Map.Entry<String, Object> entry : topobj.entrySet()) {            String Url = entry.getKey();            String scount = entry.getValue().toString();            System.out.print(scount + "/n");            studentList.add(new UrlCount(Url, Integer.parseInt(scount)));        }        Collections.sort(studentList, new SortByCount());        if (studentList.size() > topnum) {            studentList = studentList.subList(0, topnum);        }        return studentList;    }    @Override    public void reduce(Text apmac, Iterable<Text> values, Context context)            throws IOException, InterruptedException {        BasicBSONObject clientmacmap = new BasicBSONObject();        int count = 0;        for (Text value : values) {            String subline = value.toString();            String clientmac = subline.substring(0, 17);            String url = subline.substring(17);            BasicBSONObject urlmap = (BasicBSONObject) clientmacmap                    .get(clientmac);            if (urlmap == null) {                urlmap = new BasicBSONObject();                clientmacmap.put(clientmac, urlmap);            }            Object eveurl = urlmap.get(url);            if (eveurl == null && !url.equals(" ")) {                urlmap.put(url, 1);            } else {                urlmap.put(url, Integer.parseInt(eveurl.toString()) + 1);            }            count++;            if (count == 1000) {                List<UrlCount> arr = compresstopobj(urlmap, 100);                BasicBSONObject newurlcmap = new BasicBSONObject();                for (int i = 0; i < arr.size(); i++) {                    UrlCount cuc = arr.get(i);                    newurlcmap.put(cuc.Url, cuc.Count);                }                urlmap = newurlcmap;            }        }        context.write(apmac, new BSONWritable(clientmacmap));    }}

Mongo collection 数据格式

{    "_id" : ObjectId("54d83f3548c9bc218e056ce6"),"apMac" : "aa:bb:cc:dd:ee:ff","proto" : "http",    "url" : "extshort.weixin.QQ.comhttp",    "clientMac" : "ff:ee:dd:cc:bb:aa"}

clientMac和url 先拼在一起,再按mac长度分割

数据流程

orgin->map

map:[{"aa:bb:cc:dd:ee:ff":[ff:ee:dd:cc:bb:aaextshort.weixin.qq.comhttp]}]

假如是多条数据则

map:[{"aa:bb:cc:dd:ee:ff":["ff:ee:dd:cc:bb:aaextshort.weixin.qq.comhttp","ff:ee:dd:cc:bb:aaextshort.weixin.qq.comhttp1","ff:ee:dd:cc:bb:aaextshort.weixin.qq.comhttp2"]}]

map->compine

如果有相同的client+url 则统计个数,以|分隔

compine:[{"aa:bb:cc:dd:ee:ff":[ff:ee:dd:cc:bb:aaextshort.weixin.qq.comhttp|100]}]

compine->reducer

reducer中 按mac长度分割出 clientMac url 再按“|”分割出 个数

统计前每个clientMac的前100条

reduce:

{    "_id": "00:21:26:00:0A:FF",    "aa:bb:cc:1c:b9:8f": {        "c}tieba}baidu}com|": 1,        "short}weixin}qq}comhttp:|": 1,        "get}sogou}com|": 1,        "md}openapi}360}cn|": 1,        "74}125}235}224|": 1,        "mmbiz}qpic}cn|": 1,        "tb}himg}baidu}com|": 1    },    "cc:bb:aa:d5:30:8a": {        "captive}apple}com|": 2,        "www}airport}us|": 1,        "www}itools}info|": 2,        "www}thinkdifferent}us|": 1,        "www}ibook}info|": 1    },    "ee:ee:bb:78:31:74": {        "www}itools}info|": 1,        "www}ibook}info|": 1    }    }


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表