Kmeans是十分常见的数据挖掘算法,其逻辑较为简单,应用范围广。通过百度搜索java实现的Kmeans算法,可参考的版本很多,比如: http://blog.csdn.net/jdplus/article/details/23960127 还有: http://www.cnblogs.com/chaoku/p/3748456.html
虽然作者都表示亲测有效,不会有任何问题,然而在实际应用中每个人的环境不同,尤其是hadoop版本的不同,总会出现这样或者那样的问题。不过他们的算法给了很好的参考,按照他们的逻辑照虎画猫,也是可行的。 我的hadoop版本较为老旧,其中最为突出的问题就是在老版本的hadoop中并没有
org.apache.hadoop.maPReduce.Job;
这个包,这个版本上的差别照成了并不能直接拿大牛们的代码复制过来就用。随后在参考了hadoop官网中的案例重新对Kmeans算法进行了实现,代码参考“潇洒子弦”较多,也容纳我的思考,主要有耽三个大方面的不同:
实现的版本不同,基于低级版本的hadoop予以实现。
在计算距离上有了变化,采用了欧式距离,按照原来的实现方案并不能有效聚类成需要的组别数呢。
将中心点写入新文件中语句也有变动,按照原始的写法,似乎会覆盖掉。以下是主要代码:
package mykmeans;import java.io.IOException;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.Text;import org.apache.hadoop.util.LineReader;public class CopyOfUtils { //读取中心文件的数据 public static ArrayList<ArrayList<Double>> getCentersFromHDFS(String centersPath,boolean isDirectory) throws IOException{ ArrayList<ArrayList<Double>> result = new ArrayList<ArrayList<Double>>(); Path path = new Path(centersPath); Configuration conf = new Configuration(); FileSystem fileSystem = path.getFileSystem(conf); if(isDirectory){ FileStatus[] listFile = fileSystem.listStatus(path); for (int i = 0; i < listFile.length; i++) { result.addAll(getCentersFromHDFS(listFile[i].getPath().toString(),false)); } return result; } FSDataInputStream fsis = fileSystem.open(path); LineReader lineReader = new LineReader(fsis, conf); Text line = new Text(); while(lineReader.readLine(line) > 0){ //ArrayList<Double> tempList = textToArray(line); ArrayList<Double> tempList = new ArrayList<Double>(); String[] fields = line.toString().replaceAll("/t", "").split(","); for(int i=0;i<fields.length;i++){ tempList.add(Double.parseDouble(fields[i])); } result.add(tempList); } lineReader.close(); return result; } //删掉文件 public static void deletePath(String pathStr) throws IOException{ Configuration conf = new Configuration(); Path path = new Path(pathStr); FileSystem hdfs = path.getFileSystem(conf); hdfs.delete(path ,true); } public static ArrayList<Double> textToArray(Text text){ ArrayList<Double> list = new ArrayList<Double>(); String[] fileds = text.toString().replaceAll("/t", "").split("/,"); for(int i=0;i<fileds.length;i++){ list.add(Double.parseDouble(fileds[i])); } return list; } public static boolean compareCenters(String centerPath,String newPath) throws IOException{ System.out.println("比较两个中心点是否相等"); List<ArrayList<Double>> oldCenters = CopyOfUtils.getCentersFromHDFS(centerPath,false); List<ArrayList<Double>> newCenters = CopyOfUtils.getCentersFromHDFS(newPath,true); int size = oldCenters.size(); int fildSize = oldCenters.get(0).size(); double distance = 0; for(int i=0;i<size;i++){ for(int j=0;j<fildSize;j++){ double t1 = Math.abs(oldCenters.get(i).get(j)); double t2 = Math.abs(newCenters.get(i).get(j)); distance += Math.pow((t1 - t2) / (t1 + t2), 2); } } if(distance <= 0.00001){ //删掉新的中心文件以便最后依次归类输出 CopyOfUtils.deletePath(newPath); return true; }else{ //先清空中心文件,将新的中心文件复制到中心文件中,再删掉中心文件 CopyOfUtils.deletePath(centerPath); Configuration conf = new Configuration(); Path outPath = new Path(centerPath); FileSystem fileSystem = outPath.getFileSystem(conf); FSDataOutputStream out = fileSystem.create(outPath); //out. //将newCenter的内容写到文件里面 Path inPath = new Path(newPath); FileStatus[] listFiles = fileSystem.listStatus(inPath); for (int i = 0; i < listFiles.length; i++) { FSDataInputStream in = fileSystem.open(listFiles[i].getPath()); int byteRead = 0; byte[] buffer = new byte[256]; while ((byteRead = in.read(buffer)) > 0) { out.write(buffer, 0, byteRead); } in.close(); } out.close(); //删掉新的中心文件以便第二次任务运行输出 CopyOfUtils.deletePath(newPath); } return false; }}package mykmeans;import java.io.*;import java.text.DecimalFormat;import java.util.*;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.filecache.DistributedCache;import org.apache.hadoop.conf.*;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;import org.apache.hadoop.util.*;public class CopyOfNewMapReduce extends Configured implements Tool{ public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text>{ // private String centerpath; public ArrayList<ArrayList<Double>> centers = null; public int k = 3; public void configure(JobConf job){ String center = job.get("map.center.file"); try { centers = Utils.getCentersFromHDFS(center,false); k = centers.size(); System.out.println("centers point is: "+centers.toString()); } catch (IOException e) { System.err.println("cannot find the map center file!"); e.printStackTrace(); } } @Override public void map(LongWritable key, Text value, OutputCollector<IntWritable, Text> output, Reporter report) throws IOException { //读取一行数据 ArrayList<Double> fileds = new ArrayList<Double>(); String[] temp = value.toString().replaceAll("/t", "").split(","); for(int i = 0; i<temp.length;i++){ fileds.add(Double.parseDouble(temp[i])); } int sizeOfFileds = fileds.size(); double minDistance = 99999999; int centerIndex = 0; //依次取出k个中心点与当前读取的记录做计算 for(int i=0;i<k;i++){ double currentDistance = 0; for(int j=0;j<sizeOfFileds;j++){ double centerPoint = centers.get(i).get(j); double filed = fileds.get(j); currentDistance += (centerPoint-filed)*(centerPoint-filed); } currentDistance = Math.sqrt(currentDistance); //循环找出距离该记录最接近的中心点的ID if(currentDistance<minDistance){ minDistance = currentDistance; centerIndex = i; } } //以中心点为Key 将记录原样输出 output.collect(new IntWritable(centerIndex+1), value); } } public static class Reduce extends MapReduceBase implements Reducer<IntWritable, Text, Text, Text> { @Override public void reduce(IntWritable key, Iterator<Text> value, OutputCollector<Text, Text> output, Reporter report) throws IOException {ArrayList<ArrayList<Double>> filedsList = new ArrayList<ArrayList<Double>>();DecimalFormat df0 = new DecimalFormat("###.000000"); //依次读取记录集,每行为一个ArrayList<Double> System.out.println(key+": "+value.toString()); while(value.hasNext()){ ArrayList<Double> tempList = new ArrayList<Double>(); String[] temp0 = value.next().toString().replaceAll("/t", "").split(","); for(int i = 0; i< temp0.length; i++){ tempList.add(Double.parseDouble(df0.format(Double.parseDouble(temp0[i])))); } filedsList.add(tempList); } //计算新的中心 //每行的元素个数 int filedSize = filedsList.get(0).size(); double[] avg = new double[filedSize]; for(int i=0;i<filedSize;i++){ //求没列的平均值 double sum = 0; int size = filedsList.size(); for(int j=0;j<size;j++){ sum += filedsList.get(j).get(i); } avg[i] = sum / size; avg[i] = Double.parseDouble(df0.format(avg[i])); } output.collect(new Text("") , new Text(Arrays.toString(avg).replace("[", "").replace("]", "").replaceAll("/t", ""))); } } @Override public int run(String[] args) throws Exception { JobConf conf = new JobConf(getConf(), CopyOfNewMapReduce.class); conf.setJobName("kmeans"); conf.setMapperClass(Map.class); conf.setMapOutputKeyClass(IntWritable.class); conf.setMapOutputValueClass(Text.class); if(!"false".equals(args[3])||"true".equals(args[3])){ conf.setReducerClass(Reduce.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); } FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.set("map.center.file", args[2]); JobClient.runJob(conf); return 0; } public static void main(String[] args)throws Exception{ int count = 0; int res = 0; while(true){ res = ToolRunner.run(new Configuration(), new CopyOfNewMapReduce(), args); System.out.println(" 第 " + ++count + " 次计算 "); if(Utils.compareCenters(args[2],args[1] )){ String lastarg[] = new String[args.length]; for(int i=0; i < args.length-1; i++){ lastarg[i] = args[i]; } lastarg[args.length-1] = "false"; res = ToolRunner.run(new Configuration(), new CopyOfNewMapReduce(), lastarg); break; } } System.exit(res); }}编译后,打成jar包,注意java版本的一致性。 在hadoop客户端执行:
~hadoop>bin/hadoop jar MyKmeans.jar mykmeans.CopyOfNewMapReduce /xxxx/kmeans/input*.file /xxx/output /xxx/kmeans/cluster.file true需要保证输入的数据内容以逗号“,”隔开,初始中心点需要自行设置,而不是随机取的,同样以逗号“,”隔开。 最后实现的效果在output文件夹下,每个聚类在一个文件中,输出的数据格式截图如下:
如此便实现了低版本的hadoop上kmeans算法的实现,有问题欢迎交流。
引用:MapReduce Kmeans聚类算法.http://www.cnblogs.com/chaoku/p/3748456.html
新闻热点
疑难解答