aparo / zio-elasticsearch

ElasticSearch client for Scala based on ZIO and FP

GitHub

zio-elasticsearch

ElasticSearch client for Scala based on ZIO and FP. Only Elasticsearch 7.x is supported.

The project targets are:

  • simply API
  • completely functional approach on the library based on ZIO
  • full typesafe Query, Aggregation, Request and Response of Elasticsearch
  • http layer based on sttp (in future zio-http when it ill be released)
  • using circe for json management and circe-derivation for fast compiling time
  • full coverage of Elasticsearch call/responses (generated from API)

Quick usage tour

The follow overcommented example is taken from test directory:

package elasticsearch.client

import elasticsearch.orm.QueryBuilder
import elasticsearch.queries.TermQuery
import elasticsearch.requests.UpdateByQueryRequest
import elasticsearch.{ ESSystemUser, SpecHelper, AuthContext }
import io.circe.derivation.annotations.JsonCodec
import io.circe._
import org.codelibs.elasticsearch.runner.ElasticsearchClusterRunner
import org.scalatest._
import zio.blocking.Blocking
import org.scalatest.WordSpec
import zio.clock.Clock
import zio.console.Console
import zio.random.Random
import zio.{ DefaultRuntime, system }

class ElasticSearchSpec extends WordSpec with Matchers with BeforeAndAfterAll with SpecHelper {

  // we init a cluster for test
  private val runner = new ElasticsearchClusterRunner()

  // we init a n ElasticSearch Client
  implicit val elasticsearch = ZioClient("localhost", 9201)

  // we need a ZIO Enrvironment to "runUnsafe" out code
  lazy val environment: zio.Runtime[Clock with Console with system.System with Random with Blocking] =
    new DefaultRuntime {}

  // a context propagate user and other info for every call without need to pass the arguments to all functions
  implicit val context =
    new AuthContext(ESSystemUser, elasticsearch = elasticsearch)

  // we create a case class that contains our data
  // JsonCodec is a macro annotation that create encoder and decoder for circe
  @JsonCodec
  case class Book(title: String, pages: Int)


  // we init the data 
  override def beforeAll() = {
    runner.build(ElasticsearchClusterRunner.newConfigs().baseHttpPort(9200).numOfNode(1))
    runner.ensureYellow()

    // we prepare he store statement with an ending refresh
    val load = for {
      _ <- register("source", "Akka in Action", 1)
      _ <- register("source", "Programming in Scala", 2)
      _ <- register("source", "Learning Scala", 3)
      _ <- register("source", "Scala for Spark in Production", 4)
      _ <- register("source", "Scala Puzzlers", 5)
      _ <- register("source", "Effective Akka", 6)
      _ <- register("source", "Akka Concurrency", 7)
      _ <- elasticsearch.refresh("source")

    } yield ()

    // we execute the statement with the ZIO environment
    environment.unsafeRun(load)
  }

  // called on test completion
  override def afterAll() = {
    elasticsearch.close()
    runner.close()
    runner.clean()
  }

  // helper function to flush ES to allow to search data
  def flush(indexName: String): Unit =
    environment.unsafeRun(elasticsearch.refresh(indexName))

  // helper function to create an index request
  private def register(indexName: String, title: String, pages: Int) =
    elasticsearch.indexDocument(
      indexName,
      body = JsonObject.fromMap(
        Map("title" -> Json.fromString(title), "pages" -> Json.fromInt(pages), "active" -> Json.fromBoolean(false))
      )
    )

  "Client" should {

    "count elements" in {
      // we call the countAll elements inside a index
      val count = environment.unsafeRun(elasticsearch.countAll("source"))
      count should be(7)
    }

    "update pages" in {
      
      // we call the updateByQuery
      val multipleResultE = environment.unsafeRun(
        elasticsearch.updateByQuery(
          UpdateByQueryRequest.fromPartialDocument("source", JsonObject("active" -> Json.fromBoolean(true)))
        )
      )

      multipleResultE.updated should be(7)
      flush("source")
      
      // we execute a query on updated data
      val searchResultE = environment.unsafeRun(
        elasticsearch.search(QueryBuilder(indices = List("source"), filters = List(TermQuery("active", true))))
      )

      searchResultE.total.value should be(7)
    }
  }

}