derrickburns / generalized-kmeans-clustering

This project generalizes the Spark MLLIB Batch and Streaming K-Means clusterers in every practical way.

GitHub

Stories in Ready Build Status Codacy Badge Download Coverage Status

Generalized K-Means Clustering

This project generalizes the Spark MLLIB Batch K-Means (v1.1.0) clusterer and the Spark MLLIB Streaming K-Means (v1.2.0) clusterer. Most practical variants of K-means clustering are implemented or can be implemented with this package, including:

If you find a novel variant of k-means clustering that is provably superior in some manner, implement it using the package and send a pull request along with the paper analyzing the variant!

This code has been tested on data sets of tens of millions of points in a 700+ dimensional space using a variety of distance functions. Thanks to the excellent core Spark implementation, it rocks!

Table of Contents

Getting Started

The massivedatascience-clusterer project is built for both Scala 2.10.x and 2.11.x against Spark v1.2.0.

SBT

If you are using SBT, simply add the following to your build.sbt file:

resolvers += Resolver.bintrayRepo("derrickburns", "maven")

libraryDependencies ++= Seq(
  "com.massivedatascience" %% "massivedatascience-clusterer" % "x.y.z"
)

Maven

<dependency>
  <groupId>com.massivedatascience</groupId>
  <artifactId>massivedatascience-clusterer_2.10</artifactId>
  <version>x.y.z</version>
</dependency>

<dependency>
  <groupId>com.massivedatascience</groupId>
  <artifactId>massivedatascience-clusterer_2.11</artifactId>
  <version>x.y.z</version>
</dependency>



<repositories>
    <repository>
        <id>bintray</id>
        <name>bintray</name>
        <url>http://dl.bintray.com/derrickburns/maven/</url>
    </repository>
</repositories>

Introduction

The goal of K-Means clustering is to produce a set of clusters of a set of points that satisfies certain optimality constraints. That model is called a K-Means model. It is fundamentally a set of points and a function that defines the distance from an arbitrary point to a cluster center.

The K-Means algorithm computes a K-Means model using an iterative algorithm known as Lloyd's algorithm. Each iteration of Lloyd's algorithm assigns a set of points to clusters, then updates the cluster centers to acknowledge the assignment of the points to the cluster.

The update of clusters is a form of averaging. Newly added points are averaged into the cluster while (optionally) reassigned points are removed from their prior clusters.

Bregman Divergences

While one can assign a point to a cluster using any distance function, Lloyd's algorithm only converges for a certain set of distance functions called Bregman divergences. Bregman divergences must define two methods, convex to evaluate a function on a point and gradientOfConvex to evaluate the gradient of the function on a point.

package com.massivedatascience.divergence

trait BregmanDivergence {
  def convex(v: Vector): Double

  def gradientOfConvex(v: Vector): Vector
}

For example, by defining convex to be the vector norm (i.e. the sum of the squares of the coordinates), one gets a distance function that equals the square of the well known Euclidean distance. We name it the SquaredEuclideanDistanceDivergence.

In addition to the squared Euclidean distance function, this implementation provides several other very useful distance functions. The provided BregmanDivergences may be accessed by supplying the name of the desired object to the apply method of the companion object.

Name Space Divergence Input
SquaredEuclideanDistanceDivergence R^d Squared Euclidean
RealKullbackLeiblerSimplexDivergence R+^d Kullback-Leibler Dense
NaturalKLSimplexDivergence N+^d Kullback-Leibler Dense
RealKLDivergence R^d Kullback-Leibler Dense
NaturalKLDivergence N^d Kullback-Leibler Dense
ItakuraSaitoDivergence R+^d Kullback-Leibler Sparse
LogisticLossDivergence R Logistic Loss
GeneralizedIDivergence R Generalized I

When selecting a distance function, consider the domain of the input data. For example, frequency data is integral. Similarity of frequencies or distributions are best performed using the Kullback-Leibler divergence.

Compute Bregman Distances Efficiently using BregmanPoints and BregmanCenters

For efficient repeated computation of distance between a fixed set of points and varying cluster centers, is it convenient to pre-compute certain information and associate that information with the point or the cluster center. The class that represent an enriched point is BregmanPoint. The class that represent the enriched cluster center is BregmanCenter. Users of this package do not construct instances of these objects directly.

package com.massivedatascience.divergence

trait BregmanPoint

trait BregmanCenter

We enrich a Bregman divergence with a set of commonly used operations, including factory methods toPoint and toCenter to contruct instances of the aforementioned BregmanPoint and BregmanCenter.

The enriched trait is the BregmanPointOps.

package com.massivedatascience.clusterer

trait BregmanPointOps  {
  type P = BregmanPoint
  type C = BregmanCenter

  val divergence: BregmanDivergence

  def toPoint(v: WeightedVector): P

  def toCenter(v: WeightedVector): C

  def centerMoved(v: P, w: C): Boolean

  def findClosest(centers: IndexedSeq[C], point: P): (Int, Double)

  def findClosestCluster(centers: IndexedSeq[C], point: P): Int

  def distortion(data: RDD[P], centers: IndexedSeq[C])

  def pointCost(centers: IndexedSeq[C], point: P): Double

  def distance(p: BregmanPoint, c: BregmanCenter): Double
}

object BregmanPointOps {

  def apply(distanceFunction: String): BregmanPointOps = ???

}

One may construct instances of BregmanPointOps using the companion object.

Name Divergence
EUCLIDEAN Squared Euclidean
RELATIVE_ENTROPY Kullback-Leibler
DISCRETE_KL Kullback-Leibler
DISCRETE_SMOOTHED_KL Kullback-Leibler
SPARSE_SMOOTHED_KL Kullback-Leibler
LOGISTIC_LOSS Logistic Loss
GENERALIZED_I Generalized I
ITAKURA_SAITO Itakura-Saito

Representing K-Means Models

With these definitions, we define our realization of a k-means model, KMeansModel, which we enrich with operations to find closest clusters to a point and to compute distances:

package com.massivedatascience.clusterer

trait KMeansModel {

  val pointOps: BregmanPointOps

  def centers: IndexedSeq[BregmanCenter]


  def predict(point: Vector): Int

  def predictClusterAndDistance(point: Vector): (Int, Double)

  def predict(points: RDD[Vector]): RDD[Int]

  def predict(points: JavaRDD[Vector]): JavaRDD[java.lang.Integer]

  def computeCost(data: RDD[Vector]): Double


  def predictWeighted(point: WeightedVector): Int

  def predictClusterAndDistanceWeighted(point: WeightedVector): (Int, Double)

  def predictWeighted(points: RDD[WeightedVector]): RDD[Int]

  def computeCostWeighted(data: RDD[WeightedVector]): Double


  def predictBregman(point: BregmanPoint): Int

  def predictClusterAndDistanceBregman(point: BregmanPoint): (Int, Double)

  def predictBregman(points: RDD[BregmanPoint]): RDD[Int]

  def computeCostBregman(data: RDD[BregmanPoint): Double
}

Constructing K-Means Models using Clusterers

One may construct K-Means models using one of the provided clusterers that implement Lloyd's algorithm.

trait MultiKMeansClusterer extends Serializable with Logging {
  def cluster(
    maxIterations: Int,
    pointOps: BregmanPointOps,
    data: RDD[BregmanPoint],
    centers: Seq[IndexedSeq[BregmanCenter]]): Seq[(Double, IndexedSeq[BregmanCenter])]

  def best(
    maxIterations: Int,
    pointOps: BregmanPointOps,
    data: RDD[BregmanPoint],
    centers: Seq[IndexedSeq[BregmanCenter]]): (Double, IndexedSeq[BregmanCenter]) = {
    cluster(maxIterations, pointOps, data, centers).minBy(_._1)
  }
}

object MultiKMeansClusterer {
  def apply(clustererName: String): MultiKMeansClusterer = ???
}

The COLUMN_TRACKING algorithm tracks the assignments of points to clusters and the distance of points to their assigned cluster. In later iterations of Lloyd's algorithm, this information can be used to reduce the number of distance calculations needed to accurately reassign points. This is a novel implementation.

The MINI_BATCH_10 algorithm implements the mini-batch algorithm. This implementation should be used when the number of points is much larger than the dimension of the data and the number of clusters desired.

The RESEED algorithm fills empty clusters with newly seeded cluster centers in an effort to reach the target number of desired clusters.

Objects implementing these algorithms may be constructed using the apply method of the companion object MultiKMeansClusterer.

Name Algorithm
COLUMN_TRACKING high performance implementation that performs less work on later rounds
MINI_BATCH_10 a mini-batch clusterer that samples 10% of the data each round to update centroids
RESEED a clusterer that re-seeds empty clusters

Constructing K-Means Models via Lloyd's Algorithm

A KMeansModel can be constructed from any set of cluster centers and distance function. However, the more interesting models satisfy an optimality constraint. If we sum the distances from the points in a given set to their closest cluster centers, we get a number called the "distortion" or "cost". A K-Means Model is locally optimal with respect to a set of points if each cluster center is determined by the mean of the points assigned to that cluster. Computing such a KMeansModel given a set of points is called "training" the model on those points.

The simplest way to train a KMeansModel on a fixed set of points is to use the KMeans.train method. This method is most similar in style to the one provided by the Spark 1.2.0 K-Means clusterer.

For dense data in a low dimension space using the squared Euclidean distance function, one may simply call KMeans.train with the data and the desired number of clusters:

import com.com.massivedatascience.clusterer
import org.apache.spark.mllib.linalg.Vector

val model : KMeansModel = KMeans.train(data: RDD[Vector], k: Int)

The full signature of the KMeans.train method is:

package com.massivedatascience.clusterer

object KMeans {
  /**
   *
   * Train a K-Means model using Lloyd's algorithm.
   *
   * @param data input data
   * @param k  number of clusters desired
   * @param maxIterations maximum number of iterations of Lloyd's algorithm
   * @param runs number of parallel clusterings to run
   * @param mode initialization algorithm to use
   * @param distanceFunctionNames the distance functions to use
   * @param clustererName which k-means implementation to use
   * @param embeddingNames sequence of embeddings to use, from lowest dimension to greatest
   * @return K-Means model
   */
  def train(
    data: RDD[Vector],
    k: Int,
    maxIterations: Int = KMeans.defaultMaxIterations,
    runs: Int = KMeans.defaultNumRuns,
    mode: String = KMeansSelector.K_MEANS_PARALLEL,
    distanceFunctionNames: Seq[String] = Seq(BregmanPointOps.EUCLIDEAN),
    clustererName: String = MultiKMeansClusterer.COLUMN_TRACKING,
    embeddingNames: List[String] = List(Embedding.IDENTITY_EMBEDDING)): KMeansModel = ???
}

Many of these parameters will be familiar to anyone who is familiar with the Spark 1.1 clusterer.

Similar to the Spark clusterer, we support data provided as Vectors, a request for a number k of clusters desired, a limit maxIterations on the number of iterations of Lloyd's algorithm, and the number of parallel runs of the clusterer.

We also offer different initialization modes. But unlike the Spark clusterer, we do not support setting the number of initialization steps for the mode at this level of the interface.

The K-Means.train helper methods allows one to name a sequence of embeddings. Several embeddings are provided that may be constructed using the apply method of the companion object Embedding.

Name Algorithm
IDENTITY_EMBEDDING Identity
HAAR_EMBEDDING Haar Transform
LOW_DIMENSIONAL_RI Random Indexing with dimension 64 and epsilon = 0.1
MEDIUM_DIMENSIONAL_RI Random Indexing with dimension 256 and epsilon = 0.1
HIGH_DIMENSIONAL_RI Random Indexing with dimension 1024 and epsilon = 0.1
SYMMETRIZING_KL_EMBEDDING Symmetrizing KL Embedding

Different distance functions may be used for each embedding. There must be exactly one distance function per embedding provided.

Constructing K-Means Models on WeightedVectors

Often, data points that are clustered have varying significance, i.e. they are weighted. This clusterer operates on weighted vectors. Use these WeightedVector companion object to construct weighted vectors.

package com.massivedatascience.linalg

trait WeightedVector extends Serializable {
  def weight: Double

  def inhomogeneous: Vector

  def homogeneous: Vector

  def size: Int = homogeneous.size
}

object WeightedVector {

  def apply(v: Vector): WeightedVector = ???

  def apply(v: Array[Double]): WeightedVector = ???

  def apply(v: Vector, weight: Double): WeightedVector = ???

  def apply(v: Array[Double], weight: Double): WeightedVector = ???

  def fromInhomogeneousWeighted(v: Array[Double], weight: Double): WeightedVector = ???

  def fromInhomogeneousWeighted(v: Vector, weight: Double): WeightedVector = ???
}

Indeed, the KMeans.train helper translates the parameters into a call to the underlying KMeans.trainWeighted method.

package com.massivedatascience.clusterer

object KMeans {
  /**
   *
   * Train a K-Means model using Lloyd's algorithm on WeightedVectors
   *
   * @param data input data
   * @param runConfig run configuration
   * @param pointOps the distance functions to use
   * @param initializer initialization algorithm to use
   * @param embeddings sequence of embeddings to use, from lowest dimension to greatest
   * @param clusterer which k-means implementation to use
   * @return K-Means model
   */

  def trainWeighted(
    runConfig: RunConfig,
    data: RDD[WeightedVector],
    initializer: KMeansSelector,
    pointOps: Seq[BregmanPointOps],
    embeddings: Seq[Embedding],
    clusterer: MultiKMeansClusterer): KMeansModel = ???
  }
}

The KMeans.trainWeighted method ultimately makes various calls to the underlying KMeans.simpleTrain method, which clusters the provided BregmanPoints using the provided BregmanPointOps and the provided KMeansSelector with the provided MultiKMeansClusterer.

package com.massivedatascience.clusterer

object KMeans {
  /**
   *
   * @param runConfig run configuration
   * @param data input data
   * @param pointOps the distance functions to use
   * @param initializer initialization algorithm to use
   * @param clusterer which k-means implementation to use
   * @return K-Means model
   */
  def simpleTrain(
    runConfig: RunConfig,
    data: RDD[BregmanPoint],
    pointOps: BregmanPointOps,
    initializer: KMeansSelector,
    clusterer: MultiKMeansClusterer): KMeansModel = ???
    }
}

Constructing K-Means Models Iteratively

If multiple embeddings are provided, the KMeans.train method actually performs the embeddings and trains on the embedded data sets iteratively.

For example, for high dimensional data, one way wish to embed the data into a lower dimension before clustering to reduce running time.

For time series data, the Haar Transform has been used successfully to reduce running time while maintaining or improving quality.

For high-dimensional sparse data, random indexing can be used to map the data into a low dimensional dense space.

One may also perform clustering recursively, using lower dimensional clustering to derive initial conditions for higher dimensional clustering.

Should you wish to train a model iteratively on data sets derived maps of a shared original data set, you may use KMeans.iterativelyTrain.

package com.massivedatascience.clusterer

object KMeans {
  /**
   * Train on a series of data sets, where the data sets were derived from the same
   * original data set via embeddings. Use the cluster assignments of one stage to
   * initialize the clusters of the next stage.
   *
   * @param runConfig run configuration
   * @param dataSets  input data sets to use
   * @param initializer  initialization algorithm to use
   * @param pointOps distance function
   * @param clusterer  clustering implementation to use
   * @return
   */
  def iterativelyTrain(
    runConfig: RunConfig,
    pointOps: Seq[BregmanPointOps],
    dataSets: Seq[RDD[BregmanPoint]],
    initializer: KMeansSelector,
    clusterer: MultiKMeansClusterer): KMeansModel = ???

Seeding the Set of Cluster Centers

Any K-Means model may be used as seed value to Lloyd's algorithm. In fact, our clusterers accept multiple seed sets. The K-Means.train helper methods allows one to name an initialization method.

Two algorithms are implemented that produce viable seed sets. They may be constructed by using the apply method of the companion objectKMeansSelector".

Name Algorithm
RANDOM Random selection of initial k centers
K_MEANS_PARALLEL a 5 step K-Means Parallel implementation

Under the covers, these initializers implement the KMeansSelector trait

package com.massivedatascience.clusterer

trait KMeansSelector extends Serializable {
  def init(
    ops: BregmanPointOps,
    d: RDD[BregmanPoint],
    numClusters: Int,
    initialInfo: Option[(Seq[IndexedSeq[BregmanCenter]], Seq[RDD[Double]])] = None,
    runs: Int,
    seed: Long): Seq[IndexedSeq[BregmanCenter]]
}

object KMeansSelector {
  def apply(name: String): KMeansSelector = ???
}

Iterative Clustering

K-means clustering can be performed iteratively using different embeddings of the data. For example, with high-dimensional time series data, it may be advantageous to:

  • Down-sample the data via the Haar transform (aka averaging)
  • Solve the K-means clustering problem on the down-sampled data
  • Assign the downsampled points to clusters.
  • Create a new KMeansModel from the using the assignments on the original data
  • Solve the K-Means clustering on the KMeansModel so constructed

This technique has been named the "Anytime" Algorithm.

The com.massivedatascience.clusterer.KMeans helper method provides a method, timeSeriesTrain that embeds the data iteratively.

package com.massivedatascience.clusterer

object KMeans {

  def timeSeriesTrain(
    runConfig: RunConfig,
    data: RDD[WeightedVector],
    initializer: KMeansSelector,
    pointOps: BregmanPointOps,
    clusterer: MultiKMeansClusterer,
    embedding: Embedding = Embedding(HAAR_EMBEDDING)): KMeansModel = ???
  }
}

High dimensional data can be clustered directly, but the cost is proportional to the dimension. If the divergence of interest is squared Euclidean distance, one can using Random Indexing to down-sample the data while preserving distances between clusters, with high probability.

The com.massivedatascience.clusterer.KMeans helper method provides a method, sparseTrain that embeds into various dimensions using randoming indexing.

package com.massivedatascience.clusterer

object KMeans {

  def sparseTrain(raw: RDD[Vector], k: Int): KMeansModel = {
    train(raw, k,
      embeddingNames = List(Embedding.LOW_DIMENSIONAL_RI, Embedding.MEDIUM_DIMENSIONAL_RI,
        Embedding.HIGH_DIMENSIONAL_RI))
  }
}

Creating a Custom K-means Clusterer

There are many ways to create your our custom K-means clusterer from these components.

Custom BregmanDivergence

You may create your own custom BregmanDivergence given a suitable continuously-differentiable real-valued and strictly convex function defined on a closed convex set in R^^N using the apply method of the companion object. Send a pull request to have it added the the package.

package com.massivedatascience.divergence

object BregmanDivergence {

  /**
   * Create a Bregman Divergence from
   * @param f any continuously-differentiable real-valued and strictly
   *          convex function defined on a closed convex set in R^^N
   * @param gradientF the gradient of f
   * @return a Bregman Divergence on that function
   */
  def apply(f: (Vector) => Double, gradientF: (Vector) => Vector): BregmanDivergence = ???
}

Custom BregmanPointOps

You may create your own custom BregmanPointsOps from your own implementation of the BregmanDivergence trait given a BregmanDivergence using the apply method of the companion object. Send a pull request to have it added the the package.

package com.massivedatascience.clusterer

object BregmanPointOps {

  def apply(d: BregmanDivergence): BregmanPointOps = ???

  def apply(d: BregmanDivergence, factor: Double): BregmanPointOps = ???
}

Custom Embedding

Perhaps you have a dimensionality reduction method that is not provided by one of the standard embeddings. You may create your own embedding.

For example, If the number of clusters desired is small, but the dimension is high, one may also use the method of Random Projections. At present, no embedding is provided for random projects, but, hey, I have to leave something for you to do! Send a pull request!!!

Creating K-Means Models using the KMeansModel Helper Object

Training a K-Means model from a set of points using KMeans.train is one way to create a KMeansModel. However, there are many others that are useful. The KMeansModel companion object provides a number of these constructors.

package com.massivedatascience.clusterer

object KMeansModel {

  /**
   * Create a K-means model from given cluster centers and weights
   *
   * @param ops distance function
   * @param centers initial cluster centers in homogeneous coordinates
   * @param weights initial cluster weights
   * @return  k-means model
   */
  def fromVectorsAndWeights(
    ops: BregmanPointOps,
    centers: IndexedSeq[Vector],
    weights: IndexedSeq[Double]) = ???

  /**
   * Create a K-means model from given weighted vectors
   *
   * @param ops distance function
   * @param centers initial cluster centers as weighted vectors
   * @return  k-means model
   */
  def fromWeightedVectors[T <: WeightedVector : ClassTag](
    ops: BregmanPointOps,
    centers: IndexedSeq[T]) = ???

  /**
   * Create a K-means model by selecting a set of k points at random
   *
   * @param ops distance function
   * @param k number of centers desired
   * @param dim dimension of space
   * @param weight initial weight of points
   * @param seed random number seed
   * @return  k-means model
   */
  def usingRandomGenerator(ops: BregmanPointOps,
    k: Int,
    dim: Int,
    weight: Double,
    seed: Long = XORShiftRandom.random.nextLong()) = ???

  /**
   * Create a K-Means model using the KMeans++ algorithm on an initial set of candidate centers
   *
   * @param ops distance function
   * @param data initial candidate centers
   * @param weights initial weights
   * @param k number of clusters desired
   * @param perRound number of candidates to add per round
   * @param numPreselected initial sub-sequence of candidates to always select
   * @param seed random number seed
   * @return  k-means model
   */
  def fromCenters[T <: WeightedVector : ClassTag](
    ops: BregmanPointOps,
    data: IndexedSeq[T],
    weights: IndexedSeq[Double],
    k: Int,
    perRound: Int,
    numPreselected: Int,
    seed: Long = XORShiftRandom.random.nextLong()): KMeansModel = ???

  /**
   * Create a K-Means Model from a streaming k-means model.
   *
   * @param streamingKMeansModel mutable streaming model
   * @return immutable k-means model
   */
  def fromStreamingModel(streamingKMeansModel: StreamingKMeansModel): KMeansModel = ???

  /**
   * Create a K-Means Model from a set of assignments of points to clusters
   *
   * @param ops distance function
   * @param points initial bregman points
   * @param assignments assignments of points to clusters
   * @return
   */
  def fromAssignments[T <: WeightedVector : ClassTag](
    ops: BregmanPointOps,
    points: RDD[T],
    assignments: RDD[Int]): KMeansModel = ???

  /**
   * Create a K-Means Model using K-Means || algorithm from an RDD of Bregman points.
   *
   * @param ops distance function
   * @param data initial points
   * @param k  number of cluster centers desired
   * @param numSteps number of iterations of k-Means ||
   * @param sampleRate fractions of points to use in weighting clusters
   * @param seed random number seed
   * @return  k-means model
   */
  def usingKMeansParallel[T <: WeightedVector : ClassTag](
    ops: BregmanPointOps,
    data: RDD[T],
    k: Int,
    numSteps: Int = 2,
    sampleRate: Double = 1.0,
    seed: Long = XORShiftRandom.random.nextLong()): KMeansModel = ???

  /**
   * Construct a K-Means model using the Lloyd's algorithm given a set of initial
   * K-Means models.
   *
   * @param ops distance function
   * @param data points to fit
   * @param initialModels  initial k-means models
   * @param clusterer k-means clusterer to use
   * @param seed random number seed
   * @return  the best K-means model found
   */
  def usingLloyds[T <: WeightedVector : ClassTag](
    ops: BregmanPointOps,
    data: RDD[T],
    initialModels: Seq[KMeansModel],
    clusterer: MultiKMeansClusterer = new ColumnTrackingKMeans(),
    seed: Long = XORShiftRandom.random.nextLong()): KMeansModel = ???
}