0.20.280.320.390.420.50.610.680.720.761.81.881.9822.022.12.242.322.382.43.43.523.583.63.653.723.773.883.913.943.984
package kmeans_sparkimport java.util.Randomimport java.lang.Math._import org.apache.spark.rdd.RDDimport org.apache.spark._import org.apache.spark.SparkContext._import org.apache.spark.mllib.linalg.Vectorsimport org.apache.spark.mllib.linalg.Vectorobject KMeans { def main(args: Array[String]) { val conf = new SparkConf().setAppName("kmeans in Spark") val sc = new SparkContext(conf) val input = args(0) //输入数据 val output = args(1) //输出路径 val k = args(2).toInt //聚类个数 var s = 0d //聚类效果评价标准 val shold = 0.1 //收敛阀值 var s1 = Double.MaxValue var times = 0 var readyForIteration = true val func1 = (x: (newVector, Int, Double), y: (newVector, Int, Double)) => { (x._1 + y._1, x._2 + y._2, x._3 + y._3)} val points = sc.textFile(input).map(line => { val temp = line.split("/t").map(ele => ele.toDouble) Vectors.dense(temp)}).cache() //将输入数据转换成RDD var centers = points.takeSample(false, k, new Random().nextLong()) //生成随机初始质心 PRint("------------------------------------------------/n") print("Print the centers for the next iteration: /n") printCenters(centers) print("Ready for the next iteration ? "+readyForIteration+"/n") while (readyForIteration) { times += 1 print("Print the result of the clustering in iteration "+times+"/n") val reClusteringResult = points.map(v => { val (centerId, minDistance) = getClosestCenter(centers, v) print("Cluster id: "+centerId+", ") print("The point in the cluster "+centerId+": ") v.toArray.foreach(x => print(x+","));print("/n") (centerId, (newVector(v), 1, minDistance))}) val NewCentersRdd = reClusteringResult.reduceByKey(func1(_,_)) .map(ele => { val centerId = ele._1 val newCenter = (ele._2)._1 * (1d / ele._2._2) val sumOfDistance = (ele._2)._3 (newCenter.point, sumOfDistance)}) var s2 = getNewCenters(NewCentersRdd, centers) s = abs(s2 - s1) print("s = "+s+"/n") print("------------------------------------------------/n") print("Print the centers for the next iteration: /n") printCenters(centers) if (s <= shold) { readyForIteration = false reClusteringResult.map(ele => { var centerId = ele._1.toString()+"/t" val array = ele._2._1.point.toArray for (i <- 0 until array.length) { if (i == array.length - 1) {centerId = centerId + array(i).toString()} else {centerId = centerId + array(i).toString() + "/t"} } centerId }).saveAsTextFile(output) //如果算法收敛,输出结果 } print("to the next iteration ? "+readyForIteration+"/n") s1 = s2 } sc.stop() } case class newVector(point: Vector) { def *(a: Double): newVector = { var res = new Array[Double](point.size) for (i <- 0 until point.size) { res(i) = a*point.toArray.apply(i) } newVector(Vectors.dense(res)) } def +(that: newVector): newVector = { var res = new Array[Double](point.size) for (i <- 0 until point.size) { res(i) = point.toArray.apply(i) + that.point.toArray.apply(i) } newVector(Vectors.dense(res)) } def -(that: newVector): newVector = { this + (that * -1) } def pointLength(): Double = { var res = 0d for (i <- 0 until point.size) { res = res + pow(point.toArray.apply(i), 2) } res } def distanceTo(that: newVector): Double = { (this - that).pointLength() } } implicit def toNewVector(point: Vector) = newVector(point) def getClosestCenter(centers: Array[Vector], point: Vector): (Int, Double) = { var minDistance = Double.MaxValue var centerId = 0 for (i <- 0 until centers.length) { if (point.distanceTo(centers(i)) < minDistance) { minDistance = point.distanceTo(centers(i)) centerId = i } } (centerId, minDistance) } def getNewCenters(rdd: RDD[(Vector, Double)], centers: Array[Vector]): Double ={ val res = rdd.take(centers.length) var sumOfDistance = 0d for (i <- 0 until centers.length) { centers(i) = res.apply(i)._1 sumOfDistance += res.apply(i)._2 } sumOfDistance } def printCenters(centers: Array[Vector]) { for (v <- centers) { v.toArray.foreach(x => print(x+","));print("/n") } }}将代码编译并打包成jar文件,启动Spark之后,在命令行环境下运行下图所示命令:

Print the result of the clustering initeration 1Clister id:1:0.2,Clister id:1:0.28,Clister id:1:0.32,Clister id:1:0.39,Clister id:1:0.42,Clister id:1:0.5,Clister id:2:0.61,Clister id:2:0.68,Clister id:2:0.72,Clister id:2:0.76,Clister id:0:1.8,Clister id:0:1.88,Clister id:0:1.98,Clister id:0:2.0,Clister id:0:2.02,Clister id:0:2.1,Clister id:0:2.24,Clister id:0:2.32,Clister id:0:2.38,Clister id:0:2.4,Clister id:0:3.4,Clister id:0:2.52,Clister id:0:3.58,Clister id:0:3.6,Clister id:0:3.65,Clister id:0:3.72,Clister id:0:3.77,Clister id:0:3.88,Clister id:0:3.91,Clister id:0:3.94,Clister id:0:3.98,Clister id:0:4.0,s = 1.7976931348623157E308Print the centers for the next iteration: 2.9577272727272725,0.3516666666666666,0.6924999999999999,to the next iteration?truePrint the result of the clustering initeration 2Clister id:1:0.2,Clister id:1:0.28,Clister id:1:0.32,Clister id:1:0.39,Clister id:1:0.42,Clister id:1:0.5,Clister id:2:0.61,Clister id:2:0.68,Clister id:2:0.72,Clister id:2:0.76,Clister id:2:1.8,Clister id:0:1.88,Clister id:0:1.98,Clister id:0:2.0,Clister id:0:2.02,Clister id:0:2.1,Clister id:0:2.24,Clister id:0:2.32,Clister id:0:2.38,Clister id:0:2.4,Clister id:0:3.4,Clister id:0:2.52,Clister id:0:3.58,Clister id:0:3.6,Clister id:0:3.65,Clister id:0:3.72,Clister id:0:3.77,Clister id:0:3.88,Clister id:0:3.91,Clister id:0:3.94,Clister id:0:3.98,Clister id:0:4.0,s = 19.486131491046827Print the centers for the next iteration: 3.012857142857142,0.3516666666666666,0.9139999999999999,to the next iteration?truePrint the result of the clustering initeration 3Clister id:1:0.2,Clister id:1:0.28,Clister id:1:0.32,Clister id:1:0.39,Clister id:1:0.42,Clister id:1:0.5,Clister id:1:0.61,Clister id:2:0.68,Clister id:2:0.72,Clister id:2:0.76,Clister id:2:1.8,Clister id:2:1.88,Clister id:0:1.98,Clister id:0:2.0,Clister id:0:2.02,Clister id:0:2.1,Clister id:0:2.24,Clister id:0:2.32,Clister id:0:2.38,Clister id:0:2.4,Clister id:0:3.4,Clister id:0:2.52,Clister id:0:3.58,Clister id:0:3.6,Clister id:0:3.65,Clister id:0:3.72,Clister id:0:3.77,Clister id:0:3.88,Clister id:0:3.91,Clister id:0:3.94,Clister id:0:3.98,Clister id:0:4.0,s = 0.6850257992025988Print the centers for the next iteration: 3.0694999999999997,0.3885714285714285,1.168,to the next iteration?truePrint the result of the clustering initeration 4Clister id:1:0.2,Clister id:1:0.28,Clister id:1:0.32,Clister id:1:0.39,Clister id:1:0.42,Clister id:1:0.5,Clister id:1:0.61,Clister id:1:0.68,Clister id:1:0.72,Clister id:1:0.76,Clister id:2:1.8,Clister id:2:1.88,Clister id:2:1.98,Clister id:2:2.0,Clister id:2:2.02,Clister id:2:2.1,Clister id:0:2.24,Clister id:0:2.32,Clister id:0:2.38,Clister id:0:2.4,Clister id:0:3.4,Clister id:0:2.52,Clister id:0:3.58,Clister id:0:3.6,Clister id:0:3.65,Clister id:0:3.72,Clister id:0:3.77,Clister id:0:3.88,Clister id:0:3.91,Clister id:0:3.94,Clister id:0:3.98,Clister id:0:4.0,s = 2.0949843015873046Print the centers for the next iteration: 3.330625,0.488,1.9633333333333332,to the next iteration?truePrint the result of the clustering initeration 5Clister id:1:0.2,Clister id:1:0.28,Clister id:1:0.32,Clister id:1:0.39,Clister id:1:0.42,Clister id:1:0.5,Clister id:1:0.61,Clister id:1:0.68,Clister id:1:0.72,Clister id:1:0.76,Clister id:2:1.8,Clister id:2:1.88,Clister id:2:1.98,Clister id:2:2.0,Clister id:2:2.02,Clister id:2:2.1,Clister id:2:2.24,Clister id:2:2.32,Clister id:2:2.38,Clister id:2:2.4,Clister id:0:3.4,Clister id:2:2.52,Clister id:0:3.58,Clister id:0:3.6,Clister id:0:3.65,Clister id:0:3.72,Clister id:0:3.77,Clister id:0:3.88,Clister id:0:3.91,Clister id:0:3.94,Clister id:0:3.98,Clister id:0:4.0,s = 8.74493855573271Print the centers for the next iteration: 3.7663636363636357,0.488,2.149090909090909,to the next iteration?truePrint the result of the clustering initeration 6Clister id:1:0.2,Clister id:1:0.28,Clister id:1:0.32,Clister id:1:0.39,Clister id:1:0.42,Clister id:1:0.5,Clister id:1:0.61,Clister id:1:0.68,Clister id:1:0.72,Clister id:1:0.76,Clister id:2:1.8,Clister id:2:1.88,Clister id:2:1.98,Clister id:2:2.0,Clister id:2:2.02,Clister id:2:2.1,Clister id:2:2.24,Clister id:2:2.32,Clister id:2:2.38,Clister id:2:2.4,Clister id:0:3.4,Clister id:2:2.52,Clister id:0:3.58,Clister id:0:3.6,Clister id:0:3.65,Clister id:0:3.72,Clister id:0:3.77,Clister id:0:3.88,Clister id:0:3.91,Clister id:0:3.94,Clister id:0:3.98,Clister id:0:4.0,s = 2.4681143978851026Print the centers for the next iteration: 3.7663636363636357,0.488,2.149090909090909,to the next iteration?truePrint the result of the clustering initeration 7Clister id:1:0.2,Clister id:1:0.28,Clister id:1:0.32,Clister id:1:0.39,Clister id:1:0.42,Clister id:1:0.5,Clister id:1:0.61,Clister id:1:0.68,Clister id:1:0.72,Clister id:1:0.76,Clister id:2:1.8,Clister id:2:1.88,Clister id:2:1.98,Clister id:2:2.0,Clister id:2:2.02,Clister id:2:2.1,Clister id:2:2.24,Clister id:2:2.32,Clister id:2:2.38,Clister id:2:2.4,Clister id:0:3.4,Clister id:2:2.52,Clister id:0:3.58,Clister id:0:3.6,Clister id:0:3.65,Clister id:0:3.72,Clister id:0:3.77,Clister id:0:3.88,Clister id:0:3.91,Clister id:0:3.94,Clister id:0:3.98,Clister id:0:4.0,s = 0.0Print the centers for the next iteration: 3.7663636363636357,0.488,2.149090909090909,Clister id:1:0.2,Clister id:1:0.28,Clister id:1:0.32,Clister id:1:0.39,Clister id:1:0.42,Clister id:1:0.5,Clister id:1:0.61,Clister id:1:0.68,Clister id:1:0.72,Clister id:1:0.76,Clister id:2:1.8,Clister id:2:1.88,Clister id:2:1.98,Clister id:2:2.0,Clister id:2:2.02,Clister id:2:2.1,Clister id:2:2.24,Clister id:2:2.32,Clister id:2:2.38,Clister id:2:2.4,Clister id:0:3.4,Clister id:2:2.52,Clister id:0:3.58,Clister id:0:3.6,Clister id:0:3.65,Clister id:0:3.72,Clister id:0:3.77,Clister id:0:3.88,Clister id:0:3.91,Clister id:0:3.94,Clister id:0:3.98,Clister id:0:4.0,to the next iteration?falsescala>