Timeflux is a Scala 3 library that provides a functional, type-safe client for InfluxDB 2.x. It offers comprehensive access to time-series data operations including writes, queries, and bucket/organization management.
Built on http4s and Cats Effect, Timeflux provides a pure functional interface to the InfluxDB v2 API with streaming support, token authentication, and typed request/response models.
- Type-Safe API - Strongly typed request and response models for all endpoints
- Functional Design - Built on Cats Effect with pure functional semantics
- Streaming Support - Write and read data using fs2 streams
- Token Authentication - Handles InfluxDB token-based authentication
- Organization & Bucket Management - Create and manage InfluxDB resources
- Flux Query Support - Execute Flux queries and receive typed results
- Line Protocol Writer - Write time-series data using InfluxDB line protocol
- Time Precision Control - Support for seconds, milliseconds, microseconds, and nanoseconds
- SSL Configuration - Option to ignore SSL validation for development environments
- Fluent DSL - Simple extension methods for cleaner API calls
| Category | Operations |
|---|---|
| Organizations | List, create, create if missing, resolve ID |
| Buckets | List, create, delete, create if missing |
| Write | Write measures using line protocol |
| Query | Execute Flux queries with streaming results |
Create a client and write time-series data:
import cats.effect.*
import com.colofabrix.scala.timeflux.*
import com.colofabrix.scala.timeflux.api.*
import com.colofabrix.scala.timeflux.config.*
import com.colofabrix.scala.timeflux.measures.*
import com.colofabrix.scala.timeflux.model.*
import org.http4s.Uri
object MyApp extends IOApp.Simple with TimefluxDSL {
def run: IO[Unit] =
val clientConfig =
TimefluxClientConfig(
serverUrl = Uri.unsafeFromString("http://localhost:8086"),
authToken = AuthToken("your-influxdb-token"),
)
for
client <- TimefluxClient[IO](clientConfig)
orgId <- client.resolveOrgId(OrgName("my-org"))
_ <- client.createBucketIfMissing("my-bucket", orgId.value)
_ <- IO.println("Bucket ready!")
yield ()
}Write time-series measures to InfluxDB:
import com.colofabrix.scala.timeflux.measures.*
import com.colofabrix.scala.timeflux.api.TimePrecision
import java.time.OffsetDateTime
object WriteExample extends TimefluxDSL {
val measures: fs2.Stream[IO, Measure] =
fs2.Stream.emit(
Measure(
name = "temperature",
tags = Vector(
MeasureTag("room", "living"),
MeasureTag("sensor", "dht22"),
),
fields = Vector(
MeasureField("celsius", FieldValue(21.5)),
MeasureField("humidity", FieldValue(45)),
),
time = OffsetDateTime.now(),
),
)
val writeData =
for
client <- TimefluxClient[IO](clientConfig)
orgId <- client.resolveOrgId(OrgName("my-org"))
_ <- client.writeMeasures(
bucket = "my-bucket",
orgID = orgId.value,
values = measures,
precision = Some(TimePrecision.Nanoseconds),
batchWrites = Some(1000),
)
yield ()
}Execute Flux queries and stream results:
val query =
"""
|from(bucket: "my-bucket")
| |> range(start: -1h)
| |> filter(fn: (r) => r["_measurement"] == "temperature")
""".stripMargin
val queryData =
for
client <- TimefluxClient[IO](clientConfig)
orgId <- client.resolveOrgId(OrgName("my-org"))
stream <- client.query(QueryRequest(query, orgId.value))
rows <- stream.compile.toList
yield rowsobject BucketExample extends TimefluxDSL {
// Create bucket if it doesn't exist
val ensureBucket =
for
client <- TimefluxClient[IO](clientConfig)
orgId <- client.resolveOrgId(OrgName("my-org"))
bucket <- client.createBucketIfMissing(
name = "my-bucket",
orgID = orgId.value,
description = Some("My bucket description"),
retentionRules = List(
RetentionRules(
everySeconds = 86400 * 30, // 30 days retention
shardGroupDurationSeconds = None,
`type` = Some("expire"),
),
),
)
yield bucket
// List all buckets
val listAllBuckets =
for
client <- TimefluxClient[IO](clientConfig)
buckets <- client.listBuckets()
yield buckets.buckets
// List buckets by name
val findBucket =
for
client <- TimefluxClient[IO](clientConfig)
buckets <- client.listBuckets(name = Some("my-bucket"))
yield buckets.buckets
}object OrgExample extends TimefluxDSL {
// Create organization if it doesn't exist
val ensureOrg =
for
client <- TimefluxClient[IO](clientConfig)
org <- client.createOrgIfMissing("my-org")
yield org
// Create organization with description
val createOrgWithDesc =
for
client <- TimefluxClient[IO](clientConfig)
org <- client.createOrg("my-org", Some("My organization description"))
yield org
// Resolve organization name to ID
val getOrgId =
for
client <- TimefluxClient[IO](clientConfig)
orgId <- client.resolveOrgId(OrgName("my-org"))
yield orgId
// List all organizations
val listAllOrgs =
for
client <- TimefluxClient[IO](clientConfig)
orgs <- client.listOrgs()
yield orgs.orgs
}The DSL provides extension methods on TimefluxClient for cleaner API calls. Mix in TimefluxDSL to use them:
object MyApp extends TimefluxDSL {
// Now client.createBucket(...) uses simple parameters
}| Method | Description |
|---|---|
listOrgs(name?) |
List organizations |
createOrg(name, description?) |
Create an organization |
createOrgIfMissing(name, description?) |
Create organization if it doesn't exist |
resolveOrgId(orgName) |
Resolve organization name to ID |
listBuckets(name?, orgID?) |
List buckets |
createBucket(name, orgID, description?, retentionRules?) |
Create a bucket |
createBucketIfMissing(name, orgID, description?, ...) |
Create bucket if it doesn't exist |
writeData(bucket, orgID, values, precision?, batchWrites?) |
Write stream of serializable values |
writeMeasures(bucket, orgID, values, precision?, ...) |
Write stream of measures |
query(request) |
Execute Flux query |
| Type | Description |
|---|---|
OrgName |
Organization name (opaque type) |
OrgId |
Organization ID (opaque type) |
AuthToken |
InfluxDB authentication token (opaque type) |
Measure |
Time-series data point |
MeasureField |
Measure field with name and typed value |
MeasureTag |
Measure tag with name and value |
FieldValue |
Typed field value (String, Float, Integer, UInteger, Boolean) |
ResultRow |
Query result row as ListMap[String, String] |
Control the precision of timestamps when writing data:
enum TimePrecision {
case Seconds // "s"
case Milliseconds // "ms"
case Microseconds // "us"
case Nanoseconds // "ns"
}Configure the client programmatically:
// Client configuration (required)
val clientConfig =
TimefluxClientConfig(
serverUrl = Uri.unsafeFromString("http://localhost:8086"),
authToken = AuthToken("your-token"),
)
// Optional library configuration via application.conftimeflux {
api-base = "api/v2"
concurrent-writes = 5
http-timeout = 30 seconds
max-retries = 5
max-retry-time = 1 minute
ignore-ssl = false
}| Option | Default | Description |
|---|---|---|
api-base |
api/v2 |
Base path for API calls |
concurrent-writes |
5 |
Number of concurrent batched writes |
http-timeout |
30 seconds |
HTTP request timeout |
max-retries |
5 |
Maximum number of retries for failed requests |
max-retry-time |
1 minute |
Maximum total retry time |
ignore-ssl |
false |
Ignore SSL certificate validation |
Timeflux uses typed errors for API failures:
import com.colofabrix.scala.timeflux.api.*
client
.query(request)
.handleErrorWith {
case e: TimefluxRequestError =>
IO.println(s"API error: ${e.code} - ${e.message}")
case e: TimefluxException =>
IO.println(s"Client error: ${e.getMessage}")
}Timeflux writes data using InfluxDB line protocol format:
measurement,tag1=value1,tag2=value2 field1=value1,field2=value2 timestamp
The Measure case class handles this formatting automatically:
val measure =
Measure(
name = "temperature",
tags = Vector(
MeasureTag("room", "kitchen"),
MeasureTag("sensor", "dht22"),
),
fields = Vector(
MeasureField("celsius", FieldValue(23.5)),
MeasureField("humidity", FieldValue(45)),
),
time = OffsetDateTime.now(),
)Fields support multiple types through FieldValue:
FieldValue("text") // StringValue
FieldValue(23.5) // FloatValue (BigDecimal/Double)
FieldValue(42) // IntegerValue
FieldValue(42L) // UIntegerValue
FieldValue(true) // BooleanValueImplement TimefluxSerializable to write custom types:
final case class TemperatureReading(
room: String,
celsius: Double,
humidity: Int,
timestamp: OffsetDateTime,
)
given TimefluxSerializable[TemperatureReading] with
def toMeasure(r: TemperatureReading): Measure =
Measure(
name = "temperature",
tags = Vector(MeasureTag("room", r.room)),
fields = Vector(
MeasureField("celsius", FieldValue(r.celsius)),
MeasureField("humidity", FieldValue(r.humidity)),
),
time = r.timestamp,
)
// Use writeData instead of writeMeasures
object CustomTypeExample extends TimefluxDSL {
val write =
for
client <- TimefluxClient[IO](clientConfig)
orgId <- client.resolveOrgId(OrgName("my-org"))
_ <- client.writeData(
bucket = "my-bucket",
orgID = orgId.value,
values = fs2.Stream.emit(temperatureReading),
)
yield ()
}Query results are returned as a stream of ResultRow, which is an opaque type
wrapping ListMap[String, String] to preserve column order:
val printResults =
for
client <- TimefluxClient[IO](clientConfig)
stream <- client.query(QueryRequest(fluxQuery, orgId.value))
_ <- stream.evalMap { row =>
IO.println(s"Time: ${row("_time")}, Value: ${row("_value")}")
}.compile.drain
yield ()Contributions are welcome! Please feel free to submit a Pull Request.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add some amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
Timeflux is released under the MIT license. See LICENSE for details.
- InfluxDB v2 API Documentation
- Flux Query Language
- http4s - Typeful, functional HTTP for Scala
- Cats Effect - The pure asynchronous runtime for Scala
- fs2 - Functional streams for Scala