nthportal / reservoir

Reservoir sampling implementation with akka-streams support

Version Matrix

reservoir

Build Status Coverage Status Maven Central Versioning Docs

Reservoir sampling implementation with Akka Streams support

Add to Your sbt Build

Scala 2.13

libraryDependencies += "lgbt.princess" %% "reservoir-core"        % "0.4.0"  // the core library supporting synchronous reservoir sampling
libraryDependencies += "lgbt.princess" %% "reservoir-akka-stream" % "0.4.0"  // the library for akka-stream operators
libraryDependencies += "lgbt.princess" %% "reservoir"             % "0.4.0"  // all parts of the library

Usage

Reservoir Sampler

import lgbt.princess.reservoir.Sampler

final case class User(id: String, displayName: String)

val sampler = Sampler[User, String](maxSampleSize = 100)(_.id)
sampler.sampleAll(onlineUsers())
val sampleIds = sampler.result()

val distinctSampler = Sampler.distinct[User, String](maxSampleSize = 100)(_.id)
distinctSampler.sampleAll(onlineUsers())
val distinctSampleIds = distinctSampler.result()

Akka Stream Operator

import akka.stream.scaladsl.{Keep, Sink}
import lgbt.princess.reservoir.akkasupport.Sample

final case class User(id: String, displayName: String)

val (users1, sampleIds) = onlineUsers()
  .viaMat(Sample[User, String](maxSampleSize = 100)(_.id))(Keep.right)
  .toMat(Sink.seq)(Keep.both)
  .run()
  
val (users2, distinctSampleIds) = onlineUsers()
  .viaMat(Sample.distinct[User, String](maxSampleSize = 100)(_.id))(Keep.right)
  .toMat(Sink.seq)(Keep.both)
  .run()