The Elasticsearch connector provides Akka Stream sources and sinks for Elasticsearch.
For more information about Elasticsearch please visit the official documentation.
This repository hasn't been maintained no longer because this connector is availavle as a part of Alpakka (akka/alpakka#221).
- 
sbt: libraryDependencies += "com.github.takezoe" %% "akka-stream-elasticsearch" % "1.1.0" 
- 
Maven: <dependency> <groupId>com.github.takezoe</groupId> <artifactId>akka-stream-elasticsearch_2.12</artifactId> <version>1.1.0</version> </dependency> 
- 
Gradle: dependencies { compile group: "com.github.takezoe", name: "akka-stream-elasticsearch_2.12", version: "1.1.0" } 
Sources, Flows and Sinks provided by this connector need a prepared RestClient to access to Elasticsearch.
- 
Scala: implicit val client = RestClient.builder(new HttpHost("localhost", 9201)).build() 
- 
Java: client = RestClient.builder(new HttpHost("localhost", 9211)).build(); 
We will also need an ActorSystem and an ActorMaterializer.
- 
Scala: implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() 
- 
Java: system = ActorSystem.create(); materializer = ActorMaterializer.create(system); 
This is all preparation that we are going to need.
Now we can stream messages which contains spray-json's JsObject (in Scala) or java.util.Map<String, Object> (in Java)
from or to Elasticsearch where we have access to by providing the RestClient to the ElasticsearchSource or the ElasticsearchSink.
- 
Scala: val f1 = ElasticsearchSource( "source", "book", """{"match_all": {}}""", ElasticsearchSourceSettings(5) ) .map { message: OutgoingMessage[JsObject] => IncomingMessage(Some(message.id), message.source) } .runWith( ElasticsearchSink( "sink1", "book", ElasticsearchSinkSettings(5) ) ) 
- 
Java: CompletionStage<Done> f1 = ElasticsearchSource.create( "source", "book", "{\"match_all\": {}}", new ElasticsearchSourceSettings().withBufferSize(5), client) .map(m -> new IncomingMessage<>(new Some<String>(m.id()), m.source())) .runWith( ElasticsearchSink.create( "sink1", "book", new ElasticsearchSinkSettings().withBufferSize(5), client), materializer); 
Also, it's possible to stream messages which contains any classes. In Scala, spray-json is used for JSON conversion,
so defining the mapped class and JsonFormat for it is necessary. In Java, Jackson is used, so just define the mapped class.
- 
Scala: case class Book(title: String) implicit val format = jsonFormat1(Book) 
- 
Java: public static class Book { public String title; } 
Use ElasticsearchSource.typed and ElasticsearchSink.typed to create source and sink instead.
- 
Scala: val f1 = ElasticsearchSource .typed[Book]( "source", "book", """{"match_all": {}}""", ElasticsearchSourceSettings(5) ) .map { message: OutgoingMessage[Book] => IncomingMessage(Some(message.id), message.source) } .runWith( ElasticsearchSink.typed[Book]( "sink2", "book", ElasticsearchSinkSettings(5) ) ) 
- 
Java: CompletionStage<Done> f1 = ElasticsearchSource.typed( "source", "book", "{\"match_all\": {}}", new ElasticsearchSourceSettings().withBufferSize(5), client, Book.class) .map(m -> new IncomingMessage<>(new Some<String>(m.id()), m.source())) .runWith( ElasticsearchSink.typed( "sink2", "book", new ElasticsearchSinkSettings().withBufferSize(5), client), materializer); 
We can configure the source by ElasticsearchSourceSettings.
Scala (source)
final case class ElasticsearchSourceSettings(bufferSize: Int = 10)| Parameter | Default | Description | 
|---|---|---|
| bufferSize | 10 | ElasticsearchSourceretrieves messages from Elasticsearch by scroll scan. This buffer size is used as the scroll size. | 
Also, we can configure the sink by ElasticsearchSinkSettings.
Scala (sink)
final case class ElasticsearchSinkSettings(bufferSize: Int = 10,
                                           retryInterval: Int = 5000,
                                           maxRetry: Int = 100,
                                           retryPartialFailure: Boolean = true)| Parameter | Default | Description | 
|---|---|---|
| bufferSize | 10 | ElasticsearchSinkputs messages by one bulk request per messages of this buffer size. | 
| retryInterval | 5000 | When a request is failed, ElasticsearchSinkretries that request after this interval (milliseconds). | 
| maxRetry | 100 | ElasticsearchSinkgive up and fails the stage if it gets this number of consective failures. | 
| retryPartialFailure | true | A bulk request might fails partially for some reason. If this parameter is true, then ElasticsearchSinkretries to request these failed messages. Otherwise, failed messages are discarded (or pushed to downstream if you useElasticsearchFlowinstead of the sink). | 
You can also build flow stages. The API is similar to creating Sinks.
- 
Scala (flow): val f1 = ElasticsearchSource .typed[Book]( "source", "book", """{"match_all": {}}""", ElasticsearchSourceSettings(5) ) .map { message: OutgoingMessage[Book] => IncomingMessage(Some(message.id), message.source) } .via( ElasticsearchFlow.typed[Book]( "sink3", "book", ElasticsearchSinkSettings(5) ) ) .runWith(Sink.seq) 
- 
Java (flow): CompletionStage<List<List<IncomingMessage<Book>>>> f1 = ElasticsearchSource.typed( "source", "book", "{\"match_all\": {}}", new ElasticsearchSourceSettings().withBufferSize(5), client, Book.class) .map(m -> new IncomingMessage<>(new Some<String>(m.id()), m.source())) .via(ElasticsearchFlow.typed( "sink3", "book", new ElasticsearchSinkSettings().withBufferSize(5), client)) .runWith(Sink.seq(), materializer); 
The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
- 
Scala sbt > elasticsearch/testOnly *.ElasticsearchSpec
- 
Java sbt > elasticsearch/testOnly *.ElasticsearchTest