Asynchronous library for accessing InfluxDB up to version 1.8 from Scala.
Since version 1.8+ now features an official scala client, there won't be any effort to support 2.0 here. Use this instead: https://github.com/influxdata/influxdb-client-java/tree/master/client-scala
Add the following to your build.sbt
libraryDependencies += "io.razem" %% "scala-influxdb-client" % "0.6.3"
import io.razem.influxdbclient._
import scala.concurrent.ExecutionContext.Implicits.global
val influxdb = InfluxDB.connect("localhost", 8086)
And when all done close the client:
influxdb.close()
All methods are non-blocking and return a Future
; in most cases a Future[QueryResponse]
which might be empty if
the action does not return a result. Failing Futures
carry a subclass of InfluxDBException
val database = influxdb.selectDatabase("my_database")
database.exists() // => Future[Boolean]
database.create()
database.drop()
val point = Point("cpu")
.addTag("host", "my.server")
.addField("1m", 0.3)
.addField("5m", 0.4)
.addField("15m", 0.5)
database.write(point)
Additionally, timestamp precision, consistency and custom retention policies can be specified
val point = Point("cpu", System.currentTimeMillis())
database.write(point,
precision = Precision.MILLISECONDS,
consistency = Consistency.ALL,
retentionPolicy = "custom_rp")
If no precision parameter is given, InfluxDB assumes timestamps to be in nanoseconds.
If a write fails, it's future will contain a subclass of WriteException
. This can be handled through the usual
methods of error handling in Futures
, i.e.
database.write(point)
// ...
.recover{ case e: WriteException => ...}
Multiple points can be written in one operation by using the bulkWrite operation
val points = List(
Point("test_measurement1").addField("value1", 123),
Point("test_measurement2").addField("value2", 123),
Point("test_measurement3").addField("value3", 123)
)
database.bulkWrite(points, precision = Precision.MILLISECONDS)
Given the following data:
name: cpu
---------
time host region value
2015-10-14T18:31:14.744203449Z serverA us_west 0.64
2015-10-14T18:31:19.242472211Z serverA us_west 0.85
2015-10-14T18:31:22.644254309Z serverA us_west 0.43
database.query("SELECT * FROM cpu")
This returns a Future[QueryResult]
. To access the list of records use
result.series.head.records
which we can iterate to access the different fields
result.series.head.records.foreach(record => record("host"))
For each record, we can access all it's values at once using the allValues property
result.series.head.records(0).allValues
If we are only interested in the "value" field of each record
result.series.head.points("value")
returns a list of just the value
field of each record.
The list of column names can be accessed through
result.series.head.columns
Multiple queries can be sent to the server at the same time using the multiQuery
method
database.multiQuery(List("SELECT * FROM cpu LIMIT 5", "SELECT * FROM cpu LIMIT 5 OFFSET 5"))
In this case, the result is a Future[List[QueryResult]]
.
Errors during queries return a QueryException
.
To execute any action that is not covered by the API you can use the exec
method
influxdb.exec("CREATE CONTINUOUS QUERY ...")
influxdb.createUser(username, password, isClusterAdmin)
influxdb.dropUser(username)
influxdb.showUsers()
influxdb.setUserPassword(username, password)
influxdb.grantPrivileges(username, database, privilege)
influxdb.revokePrivileges(username, database, privilege)
influxdb.makeClusterAdmin(username)
influxdb.userIsClusterAdmin(username)
database.createRetentionPolicy(name, duration, replication, default)
database.showRetentionPolicies()
database.dropRetentionPolicy(name)
database.alterRetentionPolicy(name, duration, replication, default)
NOTE: User and retention policy management primitives return an empty QueryResult
or fail with a QueryException
in case of an error.
import io.razem.influxdbclient._
val udpClient = InfluxDB.udpConnect("localhost", 8086)
val point = Point("cpu", System.currentTimeMillis())
udpClient.write(point)
Points can also be written in bulk
val points = List(
Point("test_measurement1").addField("value1", 123),
Point("test_measurement2").addField("value2", 123),
Point("test_measurement3").addField("value3", 123)
)
udpClient.bulkWrite(points)