初试Spark之K-Means聚类算法实现
关键词:kmeans文本聚类spark、spark kmeans 聚类、spark实现kmeans算法、kmeans聚类算法原理、k means聚类算法
自学Spark有将近一个月了,一直想找一个稍微复杂点的例子练练手,K均值聚类算法实现是个不错的例子,于是有了这篇博客。
K均值聚类算法的原理本身很简单,大概思想就是:选取初始质心,根据这些质心将样本点聚类,聚类之后计算新的质心,然后重新将样本点聚类,不断循环重复“产生质心,重新聚类”这一过程,直至聚类效果不再发生明显变换。Hadoop的MapReduce计算框架虽然也能够实现这一算法,但是代码的实现过程实在是太恶心了,认识到Spark的简洁之后,义无反顾地投入到Spark的怀抱。
写代码时没想太多,测试数据的样本点都是一维的,32个样本点分散在三个区间中,分别是0.2至0.8,1.8至2.4,3.4至4,如下图所示
下面是代码:
- package kmeans_spark
- import java.util.Random
- import java.lang.Math._
- import org.apache.spark.rdd.RDD
- import org.apache.spark._
- import org.apache.spark.SparkContext._
- import org.apache.spark.mllib.linalg.Vectors
- import org.apache.spark.mllib.linalg.Vector
- object KMeans {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName(“kmeans in Spark”)
- val sc = new SparkContext(conf)
- val input = args(0) //输入数据
- val output = args(1) //输出路径
- val k = args(2).toInt //聚类个数
- var s = 0d //聚类效果评价标准
- val shold = 0.1 //收敛阀值
- var s1 = Double.MaxValue
- var times = 0
- var readyForIteration = true
- val func1 = (x: (newVector, Int, Double), y: (newVector, Int, Double)) => {
- (x._1 + y._1, x._2 + y._2, x._3 + y._3)}
- val points = sc.textFile(input).map(line => {
- val temp = line.split(“\t”).map(ele => ele.toDouble)
- Vectors.dense(temp)}).cache() //将输入数据转换成RDD
- var centers = points.takeSample(false, k, new Random().nextLong()) //生成随机初始质心
- print(“————————————————\n”)
- print(“Print the centers for the next iteration: \n”)
- printCenters(centers)
- print(“Ready for the next iteration ? “+readyForIteration+“\n”)
- while (readyForIteration) {
- times += 1
- print(“Print the result of the clustering in iteration “+times+“\n”)
- val reClusteringResult = points.map(v => {
- val (centerId, minDistance) = getClosestCenter(centers, v)
- print(“Cluster id: “+centerId+“, “)
- print(“The point in the cluster “+centerId+“: “)
- v.toArray.foreach(x => print(x+“,”));print(“\n”)
- (centerId, (newVector(v), 1, minDistance))})
- val NewCentersRdd = reClusteringResult.reduceByKey(func1(_,_))
- .map(ele => {
- val centerId = ele._1
- val newCenter = (ele._2)._1 * (1d / ele._2._2)
- val sumOfDistance = (ele._2)._3
- (newCenter.point, sumOfDistance)})
- var s2 = getNewCenters(NewCentersRdd, centers)
- s = abs(s2 – s1)
- print(“s = “+s+“\n”)
- print(“————————————————\n”)
- print(“Print the centers for the next iteration: \n”)
- printCenters(centers)
- if (s <= shold) {
- readyForIteration = false
- reClusteringResult.map(ele => {
- var centerId = ele._1.toString()+“\t”
- val array = ele._2._1.point.toArray
- for (i <- 0 until array.length) {
- if (i == array.length – 1) {centerId = centerId + array(i).toString()}
- else {centerId = centerId + array(i).toString() + “\t”}
- }
- centerId
- }).saveAsTextFile(output) //如果算法收敛,输出结果
- }
- print(“to the next iteration ? “+readyForIteration+“\n”)
- s1 = s2
- }
- sc.stop()
- }
- case class newVector(point: Vector) {
- def *(a: Double): newVector = {
- var res = new Array[Double](point.size)
- for (i <- 0 until point.size) {
- res(i) = a*point.toArray.apply(i)
- }
- newVector(Vectors.dense(res))
- }
- def +(that: newVector): newVector = {
- var res = new Array[Double](point.size)
- for (i <- 0 until point.size) {
- res(i) = point.toArray.apply(i) + that.point.toArray.apply(i)
- }
- newVector(Vectors.dense(res))
- }
- def -(that: newVector): newVector = {
- this + (that * –1)
- }
- def pointLength(): Double = {
- var res = 0d
- for (i <- 0 until point.size) {
- res = res + pow(point.toArray.apply(i), 2)
- }
- res
- }
- def distanceTo(that: newVector): Double = {
- (this – that).pointLength()
- }
- }
- implicit def toNewVector(point: Vector) = newVector(point)
- def getClosestCenter(centers: Array[Vector], point: Vector): (Int, Double) = {
- var minDistance = Double.MaxValue
- var centerId = 0
- for (i <- 0 until centers.length) {
- if (point.distanceTo(centers(i)) < minDistance) {
- minDistance = point.distanceTo(centers(i))
- centerId = i
- }
- }
- (centerId, minDistance)
- }
- def getNewCenters(rdd: RDD[(Vector, Double)], centers: Array[Vector]): Double ={
- val res = rdd.take(centers.length)
- var sumOfDistance = 0d
- for (i <- 0 until centers.length) {
- centers(i) = res.apply(i)._1
- sumOfDistance += res.apply(i)._2
- }
- sumOfDistance
- }
- def printCenters(centers: Array[Vector]) {
- for (v <- centers) {
- v.toArray.foreach(x => print(x+“,”));print(“\n”)
- }
- }
- }
将代码编译并打包成jar文件,启动Spark之后,在命令行环境下运行下图所示命令:
可以看出在示例代码上,算法收敛得很快,经过4次迭代之后就停止了,以下是聚类结果:
可以看出在测试数据上,聚类结果很好,第一个字段是类别编号,第二个字段是点的坐标,可以看出点所处的区间和相应的类别是一致的,从代码量上看,确实要比用MapReduce框架实现要节省很多,主要还是得益于RDD上丰富的算子带来的强大的表达能力。