首页 > 学院 > 开发设计 > 正文

初试Spark之K-Means聚类算法实现

2019-11-08 02:55:45
字体:
来源:转载
供稿:网友

0.20.280.320.390.420.50.610.680.720.761.81.881.9822.022.12.242.322.382.43.43.523.583.63.653.723.773.883.913.943.984

 

package kmeans_sparkimport java.util.Randomimport java.lang.Math._import org.apache.spark.rdd.RDDimport org.apache.spark._import org.apache.spark.SparkContext._import org.apache.spark.mllib.linalg.Vectorsimport org.apache.spark.mllib.linalg.Vectorobject KMeans {  def main(args: Array[String]) {    val conf = new SparkConf().setAppName("kmeans in Spark")    val sc = new SparkContext(conf)    val input = args(0) //输入数据    val output = args(1) //输出路径    val k = args(2).toInt //聚类个数    var s = 0d //聚类效果评价标准    val shold = 0.1 //收敛阀值    var s1 = Double.MaxValue    var times = 0    var readyForIteration = true    val func1 = (x: (newVector, Int, Double), y: (newVector, Int, Double)) => {      (x._1 + y._1, x._2 + y._2, x._3 + y._3)}        val points = sc.textFile(input).map(line => {          val temp = line.split("/t").map(ele => ele.toDouble)          Vectors.dense(temp)}).cache() //将输入数据转换成RDD    var centers = points.takeSample(false, k, new Random().nextLong()) //生成随机初始质心    PRint("------------------------------------------------/n")    print("Print the centers for the next iteration: /n")    printCenters(centers)    print("Ready for the next iteration ? "+readyForIteration+"/n")    while (readyForIteration) {      times += 1      print("Print the result of the clustering in iteration "+times+"/n")      val reClusteringResult = points.map(v => {        val (centerId, minDistance) = getClosestCenter(centers, v)        print("Cluster id: "+centerId+", ")        print("The point in the cluster "+centerId+": ")        v.toArray.foreach(x => print(x+","));print("/n")        (centerId, (newVector(v), 1, minDistance))})      val NewCentersRdd = reClusteringResult.reduceByKey(func1(_,_))        .map(ele => {        val centerId = ele._1        val newCenter = (ele._2)._1 * (1d / ele._2._2)        val sumOfDistance = (ele._2)._3        (newCenter.point, sumOfDistance)})            var s2 = getNewCenters(NewCentersRdd, centers)      s = abs(s2 - s1)      print("s = "+s+"/n")      print("------------------------------------------------/n")      print("Print the centers for the next iteration: /n")      printCenters(centers)      if (s <= shold) {        readyForIteration = false        reClusteringResult.map(ele => {          var centerId = ele._1.toString()+"/t"          val array = ele._2._1.point.toArray          for (i <- 0 until array.length) {            if (i == array.length - 1) {centerId = centerId + array(i).toString()}            else {centerId = centerId + array(i).toString() + "/t"}          }          centerId        }).saveAsTextFile(output) //如果算法收敛,输出结果      }      print("to the next iteration ? "+readyForIteration+"/n")      s1 = s2    }    sc.stop()  }    case class newVector(point: Vector) {    def *(a: Double): newVector = {      var res = new Array[Double](point.size)      for (i <- 0 until point.size) {        res(i) = a*point.toArray.apply(i)      }      newVector(Vectors.dense(res))    }    def +(that: newVector): newVector = {      var res = new Array[Double](point.size)      for (i <- 0 until point.size) {        res(i) = point.toArray.apply(i) + that.point.toArray.apply(i)      }      newVector(Vectors.dense(res))    }    def -(that: newVector): newVector = {      this + (that * -1)    }    def pointLength(): Double = {      var res = 0d      for (i <- 0 until point.size) {        res = res + pow(point.toArray.apply(i), 2)      }      res    }    def distanceTo(that: newVector): Double = {      (this - that).pointLength()    }  }    implicit def toNewVector(point: Vector) = newVector(point)    def getClosestCenter(centers: Array[Vector], point: Vector): (Int, Double) = {    var minDistance = Double.MaxValue    var centerId = 0    for (i <- 0 until centers.length) {      if (point.distanceTo(centers(i)) < minDistance) {        minDistance = point.distanceTo(centers(i))        centerId = i      }    }    (centerId, minDistance)  }    def getNewCenters(rdd: RDD[(Vector, Double)], centers: Array[Vector]): Double ={    val res = rdd.take(centers.length)    var sumOfDistance = 0d    for (i <- 0 until centers.length) {      centers(i) = res.apply(i)._1      sumOfDistance += res.apply(i)._2    }    sumOfDistance  }  def printCenters(centers: Array[Vector]) {    for (v <- centers) {      v.toArray.foreach(x => print(x+","));print("/n")    }  }}

将代码编译并打包成jar文件,启动Spark之后,在命令行环境下运行下图所示命令:

Print the result of the clustering initeration 1Clister id:1:0.2,Clister id:1:0.28,Clister id:1:0.32,Clister id:1:0.39,Clister id:1:0.42,Clister id:1:0.5,Clister id:2:0.61,Clister id:2:0.68,Clister id:2:0.72,Clister id:2:0.76,Clister id:0:1.8,Clister id:0:1.88,Clister id:0:1.98,Clister id:0:2.0,Clister id:0:2.02,Clister id:0:2.1,Clister id:0:2.24,Clister id:0:2.32,Clister id:0:2.38,Clister id:0:2.4,Clister id:0:3.4,Clister id:0:2.52,Clister id:0:3.58,Clister id:0:3.6,Clister id:0:3.65,Clister id:0:3.72,Clister id:0:3.77,Clister id:0:3.88,Clister id:0:3.91,Clister id:0:3.94,Clister id:0:3.98,Clister id:0:4.0,s = 1.7976931348623157E308Print the centers for the next iteration: 2.9577272727272725,0.3516666666666666,0.6924999999999999,to the next iteration?truePrint the result of the clustering initeration 2Clister id:1:0.2,Clister id:1:0.28,Clister id:1:0.32,Clister id:1:0.39,Clister id:1:0.42,Clister id:1:0.5,Clister id:2:0.61,Clister id:2:0.68,Clister id:2:0.72,Clister id:2:0.76,Clister id:2:1.8,Clister id:0:1.88,Clister id:0:1.98,Clister id:0:2.0,Clister id:0:2.02,Clister id:0:2.1,Clister id:0:2.24,Clister id:0:2.32,Clister id:0:2.38,Clister id:0:2.4,Clister id:0:3.4,Clister id:0:2.52,Clister id:0:3.58,Clister id:0:3.6,Clister id:0:3.65,Clister id:0:3.72,Clister id:0:3.77,Clister id:0:3.88,Clister id:0:3.91,Clister id:0:3.94,Clister id:0:3.98,Clister id:0:4.0,s = 19.486131491046827Print the centers for the next iteration: 3.012857142857142,0.3516666666666666,0.9139999999999999,to the next iteration?truePrint the result of the clustering initeration 3Clister id:1:0.2,Clister id:1:0.28,Clister id:1:0.32,Clister id:1:0.39,Clister id:1:0.42,Clister id:1:0.5,Clister id:1:0.61,Clister id:2:0.68,Clister id:2:0.72,Clister id:2:0.76,Clister id:2:1.8,Clister id:2:1.88,Clister id:0:1.98,Clister id:0:2.0,Clister id:0:2.02,Clister id:0:2.1,Clister id:0:2.24,Clister id:0:2.32,Clister id:0:2.38,Clister id:0:2.4,Clister id:0:3.4,Clister id:0:2.52,Clister id:0:3.58,Clister id:0:3.6,Clister id:0:3.65,Clister id:0:3.72,Clister id:0:3.77,Clister id:0:3.88,Clister id:0:3.91,Clister id:0:3.94,Clister id:0:3.98,Clister id:0:4.0,s = 0.6850257992025988Print the centers for the next iteration: 3.0694999999999997,0.3885714285714285,1.168,to the next iteration?truePrint the result of the clustering initeration 4Clister id:1:0.2,Clister id:1:0.28,Clister id:1:0.32,Clister id:1:0.39,Clister id:1:0.42,Clister id:1:0.5,Clister id:1:0.61,Clister id:1:0.68,Clister id:1:0.72,Clister id:1:0.76,Clister id:2:1.8,Clister id:2:1.88,Clister id:2:1.98,Clister id:2:2.0,Clister id:2:2.02,Clister id:2:2.1,Clister id:0:2.24,Clister id:0:2.32,Clister id:0:2.38,Clister id:0:2.4,Clister id:0:3.4,Clister id:0:2.52,Clister id:0:3.58,Clister id:0:3.6,Clister id:0:3.65,Clister id:0:3.72,Clister id:0:3.77,Clister id:0:3.88,Clister id:0:3.91,Clister id:0:3.94,Clister id:0:3.98,Clister id:0:4.0,s = 2.0949843015873046Print the centers for the next iteration: 3.330625,0.488,1.9633333333333332,to the next iteration?truePrint the result of the clustering initeration 5Clister id:1:0.2,Clister id:1:0.28,Clister id:1:0.32,Clister id:1:0.39,Clister id:1:0.42,Clister id:1:0.5,Clister id:1:0.61,Clister id:1:0.68,Clister id:1:0.72,Clister id:1:0.76,Clister id:2:1.8,Clister id:2:1.88,Clister id:2:1.98,Clister id:2:2.0,Clister id:2:2.02,Clister id:2:2.1,Clister id:2:2.24,Clister id:2:2.32,Clister id:2:2.38,Clister id:2:2.4,Clister id:0:3.4,Clister id:2:2.52,Clister id:0:3.58,Clister id:0:3.6,Clister id:0:3.65,Clister id:0:3.72,Clister id:0:3.77,Clister id:0:3.88,Clister id:0:3.91,Clister id:0:3.94,Clister id:0:3.98,Clister id:0:4.0,s = 8.74493855573271Print the centers for the next iteration: 3.7663636363636357,0.488,2.149090909090909,to the next iteration?truePrint the result of the clustering initeration 6Clister id:1:0.2,Clister id:1:0.28,Clister id:1:0.32,Clister id:1:0.39,Clister id:1:0.42,Clister id:1:0.5,Clister id:1:0.61,Clister id:1:0.68,Clister id:1:0.72,Clister id:1:0.76,Clister id:2:1.8,Clister id:2:1.88,Clister id:2:1.98,Clister id:2:2.0,Clister id:2:2.02,Clister id:2:2.1,Clister id:2:2.24,Clister id:2:2.32,Clister id:2:2.38,Clister id:2:2.4,Clister id:0:3.4,Clister id:2:2.52,Clister id:0:3.58,Clister id:0:3.6,Clister id:0:3.65,Clister id:0:3.72,Clister id:0:3.77,Clister id:0:3.88,Clister id:0:3.91,Clister id:0:3.94,Clister id:0:3.98,Clister id:0:4.0,s = 2.4681143978851026Print the centers for the next iteration: 3.7663636363636357,0.488,2.149090909090909,to the next iteration?truePrint the result of the clustering initeration 7Clister id:1:0.2,Clister id:1:0.28,Clister id:1:0.32,Clister id:1:0.39,Clister id:1:0.42,Clister id:1:0.5,Clister id:1:0.61,Clister id:1:0.68,Clister id:1:0.72,Clister id:1:0.76,Clister id:2:1.8,Clister id:2:1.88,Clister id:2:1.98,Clister id:2:2.0,Clister id:2:2.02,Clister id:2:2.1,Clister id:2:2.24,Clister id:2:2.32,Clister id:2:2.38,Clister id:2:2.4,Clister id:0:3.4,Clister id:2:2.52,Clister id:0:3.58,Clister id:0:3.6,Clister id:0:3.65,Clister id:0:3.72,Clister id:0:3.77,Clister id:0:3.88,Clister id:0:3.91,Clister id:0:3.94,Clister id:0:3.98,Clister id:0:4.0,s = 0.0Print the centers for the next iteration: 3.7663636363636357,0.488,2.149090909090909,Clister id:1:0.2,Clister id:1:0.28,Clister id:1:0.32,Clister id:1:0.39,Clister id:1:0.42,Clister id:1:0.5,Clister id:1:0.61,Clister id:1:0.68,Clister id:1:0.72,Clister id:1:0.76,Clister id:2:1.8,Clister id:2:1.88,Clister id:2:1.98,Clister id:2:2.0,Clister id:2:2.02,Clister id:2:2.1,Clister id:2:2.24,Clister id:2:2.32,Clister id:2:2.38,Clister id:2:2.4,Clister id:0:3.4,Clister id:2:2.52,Clister id:0:3.58,Clister id:0:3.6,Clister id:0:3.65,Clister id:0:3.72,Clister id:0:3.77,Clister id:0:3.88,Clister id:0:3.91,Clister id:0:3.94,Clister id:0:3.98,Clister id:0:4.0,to the next iteration?falsescala>


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表