colofabrix / timeflux   1.0.0

MIT License GitHub

A functional, type-safe client for InfluxDB

Scala versions: 3.x

License: MIT

Timeflux - InfluxDB Client for Scala

Timeflux is a Scala 3 library that provides a functional, type-safe client for InfluxDB 2.x. It offers comprehensive access to time-series data operations including writes, queries, and bucket/organization management.

Built on http4s and Cats Effect, Timeflux provides a pure functional interface to the InfluxDB v2 API with streaming support, token authentication, and typed request/response models.

Features

  • Type-Safe API - Strongly typed request and response models for all endpoints
  • Functional Design - Built on Cats Effect with pure functional semantics
  • Streaming Support - Write and read data using fs2 streams
  • Token Authentication - Handles InfluxDB token-based authentication
  • Organization & Bucket Management - Create and manage InfluxDB resources
  • Flux Query Support - Execute Flux queries and receive typed results
  • Line Protocol Writer - Write time-series data using InfluxDB line protocol
  • Time Precision Control - Support for seconds, milliseconds, microseconds, and nanoseconds
  • SSL Configuration - Option to ignore SSL validation for development environments
  • Fluent DSL - Simple extension methods for cleaner API calls

Supported Operations

Category Operations
Organizations List, create, create if missing, resolve ID
Buckets List, create, delete, create if missing
Write Write measures using line protocol
Query Execute Flux queries with streaming results

Quick Start

Basic Usage

Create a client and write time-series data:

import cats.effect.*
import com.colofabrix.scala.timeflux.*
import com.colofabrix.scala.timeflux.api.*
import com.colofabrix.scala.timeflux.config.*
import com.colofabrix.scala.timeflux.measures.*
import com.colofabrix.scala.timeflux.model.*
import org.http4s.Uri

object MyApp extends IOApp.Simple with TimefluxDSL {

  def run: IO[Unit] =
    val clientConfig =
      TimefluxClientConfig(
        serverUrl = Uri.unsafeFromString("http://localhost:8086"),
        authToken = AuthToken("your-influxdb-token"),
      )

    for
      client <- TimefluxClient[IO](clientConfig)
      orgId  <- client.resolveOrgId(OrgName("my-org"))
      _      <- client.createBucketIfMissing("my-bucket", orgId.value)
      _      <- IO.println("Bucket ready!")
    yield ()

}

Write Data

Write time-series measures to InfluxDB:

import com.colofabrix.scala.timeflux.measures.*
import com.colofabrix.scala.timeflux.api.TimePrecision
import java.time.OffsetDateTime

object WriteExample extends TimefluxDSL {

  val measures: fs2.Stream[IO, Measure] =
    fs2.Stream.emit(
      Measure(
        name = "temperature",
        tags = Vector(
          MeasureTag("room", "living"),
          MeasureTag("sensor", "dht22"),
        ),
        fields = Vector(
          MeasureField("celsius", FieldValue(21.5)),
          MeasureField("humidity", FieldValue(45)),
        ),
        time = OffsetDateTime.now(),
      ),
    )

  val writeData =
    for
      client <- TimefluxClient[IO](clientConfig)
      orgId  <- client.resolveOrgId(OrgName("my-org"))
      _      <- client.writeMeasures(
        bucket = "my-bucket",
        orgID = orgId.value,
        values = measures,
        precision = Some(TimePrecision.Nanoseconds),
        batchWrites = Some(1000),
      )
    yield ()

}

Query Data

Execute Flux queries and stream results:

val query =
  """
    |from(bucket: "my-bucket")
    |  |> range(start: -1h)
    |  |> filter(fn: (r) => r["_measurement"] == "temperature")
  """.stripMargin

val queryData =
  for
    client <- TimefluxClient[IO](clientConfig)
    orgId  <- client.resolveOrgId(OrgName("my-org"))
    stream <- client.query(QueryRequest(query, orgId.value))
    rows   <- stream.compile.toList
  yield rows

Manage Buckets

object BucketExample extends TimefluxDSL {

  // Create bucket if it doesn't exist
  val ensureBucket =
    for
      client <- TimefluxClient[IO](clientConfig)
      orgId  <- client.resolveOrgId(OrgName("my-org"))
      bucket <- client.createBucketIfMissing(
        name = "my-bucket",
        orgID = orgId.value,
        description = Some("My bucket description"),
        retentionRules = List(
          RetentionRules(
            everySeconds = 86400 * 30,  // 30 days retention
            shardGroupDurationSeconds = None,
            `type` = Some("expire"),
          ),
        ),
      )
    yield bucket

  // List all buckets
  val listAllBuckets =
    for
      client  <- TimefluxClient[IO](clientConfig)
      buckets <- client.listBuckets()
    yield buckets.buckets

  // List buckets by name
  val findBucket =
    for
      client  <- TimefluxClient[IO](clientConfig)
      buckets <- client.listBuckets(name = Some("my-bucket"))
    yield buckets.buckets

}

Manage Organizations

object OrgExample extends TimefluxDSL {

  // Create organization if it doesn't exist
  val ensureOrg =
    for
      client <- TimefluxClient[IO](clientConfig)
      org    <- client.createOrgIfMissing("my-org")
    yield org

  // Create organization with description
  val createOrgWithDesc =
    for
      client <- TimefluxClient[IO](clientConfig)
      org    <- client.createOrg("my-org", Some("My organization description"))
    yield org

  // Resolve organization name to ID
  val getOrgId =
    for
      client <- TimefluxClient[IO](clientConfig)
      orgId  <- client.resolveOrgId(OrgName("my-org"))
    yield orgId

  // List all organizations
  val listAllOrgs =
    for
      client <- TimefluxClient[IO](clientConfig)
      orgs   <- client.listOrgs()
    yield orgs.orgs

}

API Reference

TimefluxDSL

The DSL provides extension methods on TimefluxClient for cleaner API calls. Mix in TimefluxDSL to use them:

object MyApp extends TimefluxDSL {
  // Now client.createBucket(...) uses simple parameters
}
Method Description
listOrgs(name?) List organizations
createOrg(name, description?) Create an organization
createOrgIfMissing(name, description?) Create organization if it doesn't exist
resolveOrgId(orgName) Resolve organization name to ID
listBuckets(name?, orgID?) List buckets
createBucket(name, orgID, description?, retentionRules?) Create a bucket
createBucketIfMissing(name, orgID, description?, ...) Create bucket if it doesn't exist
writeData(bucket, orgID, values, precision?, batchWrites?) Write stream of serializable values
writeMeasures(bucket, orgID, values, precision?, ...) Write stream of measures
query(request) Execute Flux query

Domain Types

Type Description
OrgName Organization name (opaque type)
OrgId Organization ID (opaque type)
AuthToken InfluxDB authentication token (opaque type)
Measure Time-series data point
MeasureField Measure field with name and typed value
MeasureTag Measure tag with name and value
FieldValue Typed field value (String, Float, Integer, UInteger, Boolean)
ResultRow Query result row as ListMap[String, String]

Time Precision

Control the precision of timestamps when writing data:

enum TimePrecision {
  case Seconds      // "s"
  case Milliseconds // "ms"
  case Microseconds // "us"
  case Nanoseconds  // "ns"
}

Configuration

Configure the client programmatically:

// Client configuration (required)
val clientConfig =
  TimefluxClientConfig(
    serverUrl = Uri.unsafeFromString("http://localhost:8086"),
    authToken = AuthToken("your-token"),
  )

// Optional library configuration via application.conf
timeflux {
  api-base          = "api/v2"
  concurrent-writes = 5
  http-timeout      = 30 seconds
  max-retries       = 5
  max-retry-time    = 1 minute
  ignore-ssl        = false
}
Option Default Description
api-base api/v2 Base path for API calls
concurrent-writes 5 Number of concurrent batched writes
http-timeout 30 seconds HTTP request timeout
max-retries 5 Maximum number of retries for failed requests
max-retry-time 1 minute Maximum total retry time
ignore-ssl false Ignore SSL certificate validation

Error Handling

Timeflux uses typed errors for API failures:

import com.colofabrix.scala.timeflux.api.*

client
  .query(request)
  .handleErrorWith {
    case e: TimefluxRequestError =>
      IO.println(s"API error: ${e.code} - ${e.message}")
    case e: TimefluxException =>
      IO.println(s"Client error: ${e.getMessage}")
  }

Measure and Line Protocol

Timeflux writes data using InfluxDB line protocol format:

measurement,tag1=value1,tag2=value2 field1=value1,field2=value2 timestamp

The Measure case class handles this formatting automatically:

val measure =
  Measure(
    name = "temperature",
    tags = Vector(
      MeasureTag("room", "kitchen"),
      MeasureTag("sensor", "dht22"),
    ),
    fields = Vector(
      MeasureField("celsius", FieldValue(23.5)),
      MeasureField("humidity", FieldValue(45)),
    ),
    time = OffsetDateTime.now(),
  )

Typed Field Values

Fields support multiple types through FieldValue:

FieldValue("text")       // StringValue
FieldValue(23.5)         // FloatValue (BigDecimal/Double)
FieldValue(42)           // IntegerValue
FieldValue(42L)          // UIntegerValue
FieldValue(true)         // BooleanValue

Custom Serializable Types

Implement TimefluxSerializable to write custom types:

final case class TemperatureReading(
  room: String,
  celsius: Double,
  humidity: Int,
  timestamp: OffsetDateTime,
)

given TimefluxSerializable[TemperatureReading] with
  def toMeasure(r: TemperatureReading): Measure =
    Measure(
      name = "temperature",
      tags = Vector(MeasureTag("room", r.room)),
      fields = Vector(
        MeasureField("celsius", FieldValue(r.celsius)),
        MeasureField("humidity", FieldValue(r.humidity)),
      ),
      time = r.timestamp,
    )

// Use writeData instead of writeMeasures
object CustomTypeExample extends TimefluxDSL {
  val write =
    for
      client <- TimefluxClient[IO](clientConfig)
      orgId  <- client.resolveOrgId(OrgName("my-org"))
      _      <- client.writeData(
        bucket = "my-bucket",
        orgID = orgId.value,
        values = fs2.Stream.emit(temperatureReading),
      )
    yield ()
}

Query Results

Query results are returned as a stream of ResultRow, which is an opaque type wrapping ListMap[String, String] to preserve column order:

val printResults =
  for
    client <- TimefluxClient[IO](clientConfig)
    stream <- client.query(QueryRequest(fluxQuery, orgId.value))
    _      <- stream.evalMap { row =>
      IO.println(s"Time: ${row("_time")}, Value: ${row("_value")}")
    }.compile.drain
  yield ()

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

License

Timeflux is released under the MIT license. See LICENSE for details.

Author

Fabrizio Colonna

See Also