pygmalios / reactiveinflux   0.10.0.5

Apache License 2.0 GitHub

Non-blocking, reactive InfluxDB driver for Scala and Java.

Scala versions: 2.11 2.10

reactiveinflux 0.4

Non-blocking InfluxDB driver for Scala and Java (blocking only) with support for Apache Spark.

Immutability, testability and extensibility are key features of ReactiveInflux. It internally uses Play Framework WS API which is a rich asynchronous HTTP client built on top of Async Http Client.

Get it from Maven Central repository

Maven:

<dependency>
  <groupId>com.pygmalios</groupId>
  <artifactId>reactiveinflux_2.10</artifactId>
  <version>0.10.0.4</version>
</dependency>

SBT:

libraryDependencies += "com.pygmalios" % "reactiveinflux" %% "0.10.0.4"

Compatibility

  • InfluxDB 0.11, 0.10 and 0.9 (maybe even older too)
  • Scala 2.11 and 2.10
  • Java 7 and above

Scala example

package com.pygmalios.reactiveinflux.examples

import java.net.URI

import com.pygmalios.reactiveinflux._
import org.joda.time.DateTime

import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

/**
  * Example of asynchronous usage of SyncReactiveInflux.
  *
  * It assumes that you have InfluxDB running locally on port 8086. How to install InfluxDB:
  * https://docs.influxdata.com/influxdb/v0.11/introduction/installation/
  */
object Example extends App {
  // Use Influx at the provided URL and database "example1"
  val result = withInfluxDb(new URI("http://localhost:8086/"), "example1") { db =>

    // Asynchronously create the "example1" database ...
    db.create().flatMap { _ =>

      // ... and then asynchronously write a single point to "measurement1" ...
      val point = Point(
        time = DateTime.now(),
        measurement = "measurement1",
        tags = Map("t1" -> "A", "t2" -> "B"),
        fields = Map(
          "f1" -> 10.3, // BigDecimal field
          "f2" -> "x", // String field
          "f3" -> -1, // Long field
          "f4" -> true) // Boolean field
      )
      db.write(point).flatMap { _ =>

        // ... and then asynchronously read the written point ...
        db.query("SELECT * FROM measurement1").flatMap { queryResult =>

          // Print the single point to the console
          println(queryResult.row.mkString)

          // ... and then asynchronously drop the "example1" database.
          db.drop()
        }
      }
    }
  }

  // Wait at most 30 seconds for the future to complete
  Await.ready(result, 30.seconds)
}

Scala blocking example

package com.pygmalios.reactiveinflux.examples

import java.net.URI

import com.pygmalios.reactiveinflux._
import org.joda.time.DateTime

import scala.concurrent.duration._

/**
  * Example of blocking, synchronous usage of SyncReactiveInflux.
  *
  * It assumes that you have InfluxDB running locally on port 8086. How to install InfluxDB:
  * https://docs.influxdata.com/influxdb/v0.11/introduction/installation/
  */
object SyncExample extends App {
  // You have to specify how much are you willing to wait results of individual blocking calls
  implicit val awaitAtMost = 10.seconds

  // Use Influx at the provided URL and database "example1"
  syncInfluxDb(new URI("http://localhost:8086/"), "example1") { db =>

    // Synchronously create the "example1" database
    db.create()

    // Synchronously write a single point to "measurement1"
    val point = Point(
      time = DateTime.now(),
      measurement = "measurement1",
      tags = Map("t1" -> "A", "t2" -> "B"),
      fields = Map(
        "f1" -> 10.3, // BigDecimal field
        "f2" -> "x",  // String field
        "f3" -> -1,   // Long field
        "f4" -> true) // Boolean field
    )
    db.write(point)

    // Synchronously read the written point
    val queryResult = db.query("SELECT * FROM measurement1")

    // Print the single point to the console
    println(queryResult.row.mkString)

    // Synchronously drop the "example1" database.
    db.drop()
  }
}

Java blocking example

package com.pygmalios.reactiveinflux.examples;

import com.pygmalios.reactiveinflux.jawa.*;
import com.pygmalios.reactiveinflux.jawa.sync.JavaSyncReactiveInflux;
import com.pygmalios.reactiveinflux.jawa.sync.SyncReactiveInflux;
import com.pygmalios.reactiveinflux.jawa.sync.SyncReactiveInfluxDb;
import org.joda.time.DateTime;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;

public class JavaSyncExample {
    private static final long awaitAtMostMillis = 30000;

    public static void main(String[] args) throws IOException, URISyntaxException {
        // Use Influx at the provided URL
        ReactiveInfluxConfig config = new JavaReactiveInfluxConfig(new URI("http://localhost:8086/"));
        try (SyncReactiveInflux reactiveInflux = new JavaSyncReactiveInflux(config, awaitAtMostMillis)) {
            // Use database "example1"
            SyncReactiveInfluxDb db = reactiveInflux.database("example1");

            // Create the "example1" database
            db.create();

            // Define tags for the point
            Map<String, String> tags = new HashMap<>();
            tags.put("t1", "A");
            tags.put("t2", "B");

            // Define fields for the point
            Map<String, Object> fields = new HashMap<>();
            fields.put("f1", 10.3);
            fields.put("f2", "x");
            fields.put("f3", -1);
            fields.put("f4", true);

            // Write a single point to "measurement1"
            Point point = new JavaPoint(
                DateTime.now(),
                "measurement1",
                tags,
                fields
            );
            db.write(point);

            // Synchronously read the written point
            QueryResult queryResult = db.query("SELECT * FROM measurement1");

            // Print the single point to the console
            System.out.println(queryResult.getRow().mkString());

            // Synchronously drop the "example1" database
            db.drop();
        }
    }
}

Versioning explained

Version number (0.10.0.4) of reactiveinflux consists of two parts:

  1. InfluxDB major and minor versions. (0.10)
  2. Reactiveinflux major and minor versions. (0.4)

Additionally Scala 2.11 and 2.10 versions are supported by adding "_2.11" or "_2.10" suffix to the artifact name.