aparo / zio-elasticsearch Edit

ElasticSearch client for Scala based on ZIO and FP

Version Matrix


ElasticSearch client for Scala based on ZIO and FP. Only Elasticsearch 7.x is supported.

The project targets are:

  • simply API
  • completely functional approach on the library based on ZIO
  • full typesafe Query, Aggregation, Request and Response of Elasticsearch
  • http layer based on sttp (in future zio-http when it ill be released)
  • using circe for json management and circe-derivation for fast compiling time
  • full coverage of Elasticsearch call/responses (generated from API)

Quick usage tour

The follow overcommented example is taken from test directory:

package elasticsearch.client

import elasticsearch.orm.QueryBuilder
import elasticsearch.queries.TermQuery
import elasticsearch.requests.UpdateByQueryRequest
import elasticsearch.{ ESSystemUser, SpecHelper, AuthContext }
import io.circe.derivation.annotations.JsonCodec
import io.circe._
import org.codelibs.elasticsearch.runner.ElasticsearchClusterRunner
import org.scalatest._
import zio.blocking.Blocking
import org.scalatest.WordSpec
import zio.clock.Clock
import zio.console.Console
import zio.random.Random
import zio.{ DefaultRuntime, system }

class ElasticSearchSpec extends WordSpec with Matchers with BeforeAndAfterAll with SpecHelper {

  // we init a cluster for test
  private val runner = new ElasticsearchClusterRunner()

  // we init a n ElasticSearch Client
  implicit val elasticsearch = ZioClient("localhost", 9201)

  // we need a ZIO Enrvironment to "runUnsafe" out code
  lazy val environment: zio.Runtime[Clock with Console with system.System with Random with Blocking] =
    new DefaultRuntime {}

  // a context propagate user and other info for every call without need to pass the arguments to all functions
  implicit val context =
    new AuthContext(ESSystemUser, elasticsearch = elasticsearch)

  // we create a case class that contains our data
  // JsonCodec is a macro annotation that create encoder and decoder for circe
  case class Book(title: String, pages: Int)

  // we init the data 
  override def beforeAll() = {

    // we prepare he store statement with an ending refresh
    val load = for {
      _ <- register("source", "Akka in Action", 1)
      _ <- register("source", "Programming in Scala", 2)
      _ <- register("source", "Learning Scala", 3)
      _ <- register("source", "Scala for Spark in Production", 4)
      _ <- register("source", "Scala Puzzlers", 5)
      _ <- register("source", "Effective Akka", 6)
      _ <- register("source", "Akka Concurrency", 7)
      _ <- elasticsearch.refresh("source")

    } yield ()

    // we execute the statement with the ZIO environment

  // called on test completion
  override def afterAll() = {

  // helper function to flush ES to allow to search data
  def flush(indexName: String): Unit =

  // helper function to create an index request
  private def register(indexName: String, title: String, pages: Int) =
      body = JsonObject.fromMap(
        Map("title" -> Json.fromString(title), "pages" -> Json.fromInt(pages), "active" -> Json.fromBoolean(false))

  "Client" should {

    "count elements" in {
      // we call the countAll elements inside a index
      val count = environment.unsafeRun(elasticsearch.countAll("source"))
      count should be(7)

    "update pages" in {
      // we call the updateByQuery
      val multipleResultE = environment.unsafeRun(
          UpdateByQueryRequest.fromPartialDocument("source", JsonObject("active" -> Json.fromBoolean(true)))

      multipleResultE.updated should be(7)
      // we execute a query on updated data
      val searchResultE = environment.unsafeRun(
        elasticsearch.search(QueryBuilder(indices = List("source"), filters = List(TermQuery("active", true))))

      searchResultE.total.value should be(7)