本站分享:AI、大数据、数据分析师培训认证考试,包括:Python培训Excel培训Matlab培训SPSS培训SAS培训R语言培训Hadoop培训Amos培训Stata培训Eviews培训

Spark之K-Means聚类算法实现_Spark K-Means聚类算法

hadoop培训 cdadata 5671℃

初试Spark之K-Means聚类算法实现

关键词:kmeans文本聚类sparkspark kmeans 聚类spark实现kmeans算法kmeans聚类算法原理k means聚类算法

自学Spark有将近一个月了,一直想找一个稍微复杂点的例子练练手,K均值聚类算法实现是个不错的例子,于是有了这篇博客。

K均值聚类算法的原理本身很简单,大概思想就是:选取初始质心,根据这些质心将样本点聚类,聚类之后计算新的质心,然后重新将样本点聚类,不断循环重复“产生质心,重新聚类”这一过程,直至聚类效果不再发生明显变换。Hadoop的MapReduce计算框架虽然也能够实现这一算法,但是代码的实现过程实在是太恶心了,认识到Spark的简洁之后,义无反顾地投入到Spark的怀抱。

写代码时没想太多,测试数据的样本点都是一维的,32个样本点分散在三个区间中,分别是0.2至0.8,1.8至2.4,3.4至4,如下图所示初试Spark之K-Means聚类算法实现

下面是代码:

 

[java] view plain copy
  1. package kmeans_spark
  2. import java.util.Random
  3. import java.lang.Math._
  4. import org.apache.spark.rdd.RDD
  5. import org.apache.spark._
  6. import org.apache.spark.SparkContext._
  7. import org.apache.spark.mllib.linalg.Vectors
  8. import org.apache.spark.mllib.linalg.Vector
  9. object KMeans {
  10.   def main(args: Array[String]) {
  11.     val conf = new SparkConf().setAppName(“kmeans in Spark”)
  12.     val sc = new SparkContext(conf)
  13.     val input = args(0//输入数据
  14.     val output = args(1//输出路径
  15.     val k = args(2).toInt //聚类个数
  16.     var s = 0d //聚类效果评价标准
  17.     val shold = 0.1 //收敛阀值
  18.     var s1 = Double.MaxValue
  19.     var times = 0
  20.     var readyForIteration = true
  21.     val func1 = (x: (newVector, Int, Double), y: (newVector, Int, Double)) => {
  22.       (x._1 + y._1, x._2 + y._2, x._3 + y._3)}
  23.     val points = sc.textFile(input).map(line => {
  24.           val temp = line.split(“\t”).map(ele => ele.toDouble)
  25.           Vectors.dense(temp)}).cache() //将输入数据转换成RDD
  26.     var centers = points.takeSample(false, k, new Random().nextLong()) //生成随机初始质心
  27.     print(“————————————————\n”)
  28.     print(“Print the centers for the next iteration: \n”)
  29.     printCenters(centers)
  30.     print(“Ready for the next iteration ? “+readyForIteration+“\n”)
  31.     while (readyForIteration) {
  32.       times += 1
  33.       print(“Print the result of the clustering in iteration “+times+“\n”)
  34.       val reClusteringResult = points.map(v => {
  35.         val (centerId, minDistance) = getClosestCenter(centers, v)
  36.         print(“Cluster id: “+centerId+“, “)
  37.         print(“The point in the cluster “+centerId+“: “)
  38.         v.toArray.foreach(x => print(x+“,”));print(“\n”)
  39.         (centerId, (newVector(v), 1, minDistance))})
  40.       val NewCentersRdd = reClusteringResult.reduceByKey(func1(_,_))
  41.         .map(ele => {
  42.         val centerId = ele._1
  43.         val newCenter = (ele._2)._1 * (1d / ele._2._2)
  44.         val sumOfDistance = (ele._2)._3
  45.         (newCenter.point, sumOfDistance)})
  46.       var s2 = getNewCenters(NewCentersRdd, centers)
  47.       s = abs(s2 – s1)
  48.       print(“s = “+s+“\n”)
  49.       print(“————————————————\n”)
  50.       print(“Print the centers for the next iteration: \n”)
  51.       printCenters(centers)
  52.       if (s <= shold) {
  53.         readyForIteration = false
  54.         reClusteringResult.map(ele => {
  55.           var centerId = ele._1.toString()+“\t”
  56.           val array = ele._2._1.point.toArray
  57.           for (i <- 0 until array.length) {
  58.             if (i == array.length – 1) {centerId = centerId + array(i).toString()}
  59.             else {centerId = centerId + array(i).toString() + “\t”}
  60.           }
  61.           centerId
  62.         }).saveAsTextFile(output) //如果算法收敛,输出结果
  63.       }
  64.       print(“to the next iteration ? “+readyForIteration+“\n”)
  65.       s1 = s2
  66.     }
  67.     sc.stop()
  68.   }
  69.   case class newVector(point: Vector) {
  70.     def *(a: Double): newVector = {
  71.       var res = new Array[Double](point.size)
  72.       for (i <- 0 until point.size) {
  73.         res(i) = a*point.toArray.apply(i)
  74.       }
  75.       newVector(Vectors.dense(res))
  76.     }
  77.     def +(that: newVector): newVector = {
  78.       var res = new Array[Double](point.size)
  79.       for (i <- 0 until point.size) {
  80.         res(i) = point.toArray.apply(i) + that.point.toArray.apply(i)
  81.       }
  82.       newVector(Vectors.dense(res))
  83.     }
  84.     def -(that: newVector): newVector = {
  85.       this + (that * –1)
  86.     }
  87.     def pointLength(): Double = {
  88.       var res = 0d
  89.       for (i <- 0 until point.size) {
  90.         res = res + pow(point.toArray.apply(i), 2)
  91.       }
  92.       res
  93.     }
  94.     def distanceTo(that: newVector): Double = {
  95.       (this – that).pointLength()
  96.     }
  97.   }
  98.   implicit def toNewVector(point: Vector) = newVector(point)
  99.   def getClosestCenter(centers: Array[Vector], point: Vector): (Int, Double) = {
  100.     var minDistance = Double.MaxValue
  101.     var centerId = 0
  102.     for (i <- 0 until centers.length) {
  103.       if (point.distanceTo(centers(i)) < minDistance) {
  104.         minDistance = point.distanceTo(centers(i))
  105.         centerId = i
  106.       }
  107.     }
  108.     (centerId, minDistance)
  109.   }
  110.   def getNewCenters(rdd: RDD[(Vector, Double)], centers: Array[Vector]): Double ={
  111.     val res = rdd.take(centers.length)
  112.     var sumOfDistance = 0d
  113.     for (i <- 0 until centers.length) {
  114.       centers(i) = res.apply(i)._1
  115.       sumOfDistance += res.apply(i)._2
  116.     }
  117.     sumOfDistance
  118.   }
  119.   def printCenters(centers: Array[Vector]) {
  120.     for (v <- centers) {
  121.       v.toArray.foreach(x => print(x+“,”));print(“\n”)
  122.     }
  123.   }
  124. }

将代码编译并打包成jar文件,启动Spark之后,在命令行环境下运行下图所示命令:

初试Spark之K-Means聚类算法实现
以下是运行期间的部分截图:

初试Spark之K-Means聚类算法实现

 

初试Spark之K-Means聚类算法实现

 

初试Spark之K-Means聚类算法实现

 

初试Spark之K-Means聚类算法实现

 

初试Spark之K-Means聚类算法实现

可以看出在示例代码上,算法收敛得很快,经过4次迭代之后就停止了,以下是聚类结果:

初试Spark之K-Means聚类算法实现

可以看出在测试数据上,聚类结果很好,第一个字段是类别编号,第二个字段是点的坐标,可以看出点所处的区间和相应的类别是一致的,从代码量上看,确实要比用MapReduce框架实现要节省很多,主要还是得益于RDD上丰富的算子带来的强大的表达能力。

转载请注明:数据分析 » Spark之K-Means聚类算法实现_Spark K-Means聚类算法

喜欢 (1)or分享 (0)