razem-io / scala-influxdb-client   0.6.3

MIT License GitHub

Asynchronous InfluxDB client for Scala

Scala versions: 2.13 2.12 2.11

scala-influxdb-client

Build Status codecov Maven Central

Asynchronous library for accessing InfluxDB up to version 1.8 from Scala.

Since version 1.8+ now features an official scala client, there won't be any effort to support 2.0 here. Use this instead: https://github.com/influxdata/influxdb-client-java/tree/master/client-scala

Installation

Add the following to your build.sbt

libraryDependencies += "io.razem" %% "scala-influxdb-client" % "0.6.3"

Connecting

import io.razem.influxdbclient._
import scala.concurrent.ExecutionContext.Implicits.global

val influxdb = InfluxDB.connect("localhost", 8086)

And when all done close the client:

influxdb.close()

Usage

All methods are non-blocking and return a Future; in most cases a Future[QueryResponse] which might be empty if the action does not return a result. Failing Futures carry a subclass of InfluxDBException

Working with databases

val database = influxdb.selectDatabase("my_database")
database.exists() // => Future[Boolean]
database.create()
database.drop()

Writing data

val point = Point("cpu")
  .addTag("host", "my.server")
  .addField("1m", 0.3)
  .addField("5m", 0.4)
  .addField("15m", 0.5)
database.write(point)

Additionally, timestamp precision, consistency and custom retention policies can be specified

val point = Point("cpu", System.currentTimeMillis())
database.write(point,
               precision = Precision.MILLISECONDS,
               consistency = Consistency.ALL, 
               retentionPolicy = "custom_rp")

If no precision parameter is given, InfluxDB assumes timestamps to be in nanoseconds.

If a write fails, it's future will contain a subclass of WriteException. This can be handled through the usual methods of error handling in Futures, i.e.

database.write(point)
  // ...
  .recover{ case e: WriteException => ...}

Multiple points can be written in one operation by using the bulkWrite operation

val points = List(
  Point("test_measurement1").addField("value1", 123),
  Point("test_measurement2").addField("value2", 123),
  Point("test_measurement3").addField("value3", 123)
)
database.bulkWrite(points, precision = Precision.MILLISECONDS)

Querying the database

Given the following data:

name: cpu
---------
time                            host     region   value
2015-10-14T18:31:14.744203449Z	serverA  us_west  0.64
2015-10-14T18:31:19.242472211Z	serverA  us_west  0.85
2015-10-14T18:31:22.644254309Z	serverA  us_west  0.43
database.query("SELECT * FROM cpu")

This returns a Future[QueryResult]. To access the list of records use

result.series.head.records

which we can iterate to access the different fields

result.series.head.records.foreach(record => record("host"))

For each record, we can access all it's values at once using the allValues property

result.series.head.records(0).allValues

If we are only interested in the "value" field of each record

result.series.head.points("value")

returns a list of just the value field of each record.

The list of column names can be accessed through

result.series.head.columns

Multiple queries can be sent to the server at the same time using the multiQuery method

database.multiQuery(List("SELECT * FROM cpu LIMIT 5", "SELECT * FROM cpu LIMIT 5 OFFSET 5"))

In this case, the result is a Future[List[QueryResult]].

Errors during queries return a QueryException.

Executing actions

To execute any action that is not covered by the API you can use the exec method

influxdb.exec("CREATE CONTINUOUS QUERY ...")

Managing users

influxdb.createUser(username, password, isClusterAdmin)
influxdb.dropUser(username)
influxdb.showUsers()
influxdb.setUserPassword(username, password)
influxdb.grantPrivileges(username, database, privilege)
influxdb.revokePrivileges(username, database, privilege)
influxdb.makeClusterAdmin(username)
influxdb.userIsClusterAdmin(username)

Managing retention policies

database.createRetentionPolicy(name, duration, replication, default)
database.showRetentionPolicies()
database.dropRetentionPolicy(name)
database.alterRetentionPolicy(name, duration, replication, default)

NOTE: User and retention policy management primitives return an empty QueryResult or fail with a QueryException in case of an error.

Writing over UDP

import io.razem.influxdbclient._

val udpClient = InfluxDB.udpConnect("localhost", 8086)
val point = Point("cpu", System.currentTimeMillis())
udpClient.write(point)

Points can also be written in bulk

val points = List(
  Point("test_measurement1").addField("value1", 123),
  Point("test_measurement2").addField("value2", 123),
  Point("test_measurement3").addField("value3", 123)
)
udpClient.bulkWrite(points)