rubanm / ignite-scala   0.0.1

MIT License GitHub

Scala API for distributed closures on Apache Ignite

Scala versions: 2.11 2.10

ignite scala

Scala API for distributed closures on Apache Ignite. Inspired by Scalding.

http://apacheignite.readme.io/v1.0/docs/distributed-closures

example 0 - cluster setup

import org.apache.ignite._
import org.apache.ignite.configuration._
import ignite.scala._

val cfg = new IgniteConfiguration // configure as appropriate
val ignite = Ignition.start(cfg)
val compute = ignite.compute(ignite.cluster)
implicit val cr = ComputeRunner(compute)

example 1 - character count

import com.twitter.algebird.Semigroup

implicit val sg = Semigroup.intSemigroup

val chain = IgnitePipe.from("The quick brown fox jumps over the lazy dog.".split(" "))
  .map(_.length) // fork
  .reduce // join

chain.execute // Option[Int]

example 2 - working with distributed cache

val cache = {
  val cfg = new CacheConfiguration[K, V]
  cfg.setName(name)
  ignite.getOrCreateCache(cfg)
}

val process: V => R // computation
val isValid: R => Boolean

IgnitePipe.from(keys)
  .map { k =>
    val v = cache.get(k)
    process(v)
  }
  .filter(isValid)
  .execute // Iterable[R]

example 3 - collocating compute with cache

Ignite allows routing computations to the nodes where data is cached.

IgnitePipe.collocated(cache, keys) { (c, k) =>
  val v = c.localPeek(k)
  process(v)
}
.filter(isValid)
.execute

example 4 - more chaining

val cache: IgniteCache[K, V]
val db: CacheJdbcBlobStore[K, V]

val cacheResults = IgnitePipe.from(keys)
  .map { k => cacheGetAndCompute(cache, k) }

val dbResults = IgnitePipe.from(keys)
  .map { k => dbGetAndCompute(db, k) }
  
val combined = (cacheResults ++ dbResults)
  .reduce // reduction could be to consolidate cache and db for instance
  .toPipe // continuation
  
combined.map { exportResults(_) }.execute

installing

Add the following to your build.sbt (fetches from sonatype)

resolvers += Resolver.sonatypeRepo("releases")
libraryDependencies += "com.github.rubanm" %% "ignite-scala" % "0.0.1"

core api

/* Provides composable distributed closures that can run on Apache Ignite. */
trait IgnitePipe[T] {
  
  def map[U](f: T => U): IgnitePipe[U]
  
  def flatMap[U](f: T => TraversableOnce[U]): IgnitePipe[U]
  
  def ++(p: IgnitePipe[T]): IgnitePipe[T]
  
  def reduce(implicit sg: Semigroup[T]): Reduction[T]
  
  def execute: Iterable[T]
}

/* Represents a reduction of the distributed closure results.*/
trait Reduction[T] {

  def execute: Option[T]

  def toPipe: IgnitePipe[T]
}