EtlFlow
EtlFlow is an ecosystem of functional libraries in Scala based on ZIO for running complex Auditable workflows which can interact with Google Cloud Platform, AWS, Kubernetes, Databases and more.
Below are some salient features of this library.
- Functional. Rapidly compose complex workflows from simple tasks.
- Fibers. Built on non-blocking fibers that never waste or leak resources, which lets you build scalable, resilient, and reactive applications
- Resource-safe. Build workflows that never leak resources (including threads!), even when they fail.
- Concurrent. Easily build concurrent workflows without deadlocks, race conditions, or complexity.
- Asynchronous. Write sequential code that looks the same whether it's asynchronous or synchronous.
- Type-safe. Use the full power of the Scala compiler to catch bugs at compile time.
- Testable. Inject test services into your workflow for fast, deterministic, and type-safe testing.
- Resilient. Build workflows that never lose errors, and which respond to failure locally and flexibly.
Examples
- Core Module:
In this example project, you can explore core features of etlflow, Task and Audit API. - GCP Module (GCS, DataProc, BigQuery tasks):
In this example project, you can explore GCP tasks. - K8S Module (K8S tasks):
In this example project, you can explore different Kubernetes tasks. - Spark Module (Spark tasks):
In this example project, you can explore different Apache Spark tasks.
Modules Dependency Graph
Module | Latest Version | Documentation | Scala Versions | Java Version |
---|---|---|---|---|
Core | Java 8 + | |||
GCP | Java 8 + | |||
JDBC | Java 8 + | |||
Http | Java 11 + | |||
K8S | Java 8 + | |||
Java 8 + | ||||
AWS | Java 8 + | |||
Redis | Java 8 + | |||
Spark | Java 8 + |
Requirements and Installation
This project is compiled with scala versions 2.12.17, 2.13.10, 3.2.1
Available via maven central. Add the below latest release as a dependency to your project
SBT
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-core" % "1.7.1"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-gcp" % "1.7.1"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-jdbc" % "1.7.1"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-spark" % "1.7.1"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-k8s" % "1.7.1"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-http" % "1.7.1"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-redis" % "1.7.1"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-aws" % "1.7.1"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-email" % "1.7.1"
Maven
<dependency>
<groupId>com.github.tharwaninitin</groupId>
<artifactId>etlflow-core_2.12</artifactId>
<version>1.7.1</version>
</dependency>
Etlflow Modules
Core
// Todo
Task
// Todo
Audit
// Todo
Config
// Todo
Json
// Todo
GCP
# To run all below GCP examples set the GOOGLE_APPLICATION_CREDENTIALS environment variable to the location of the service account json key.
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/key.json
Dataproc
import etlflow.task._
import gcp4zio.dp._
import etlflow.audit
import etlflow.audit.Audit
import zio._
val gcpProject: String = "GCP_PROJECT"
val gcpRegion: String = "GCP_REGION"
val dpCluster: String = "DP_CLUSTER"
val dpEndpoint: String = "DP_ENDPOINT"
val dpBucket: String = "DP_BUCKET"
val createCluster = DPCreateTask("DPCreateTask", dpCluster, ClusterProps(dpBucket)).toZIO
val deleteCluster = DPDeleteTask("DPDeleteTask", dpCluster).toZIO
val args = List("1000")
val mainClass = "org.apache.spark.examples.SparkPi"
val libs = List("file:///usr/lib/spark/examples/jars/spark-examples.jar")
val conf = Map("spark.executor.memory" -> "1g", "spark.driver.memory" -> "1g")
val sparkJob = DPSparkJobTask("DPSparkJobTask", args, mainClass, libs, conf).toZIO
val programGCP: RIO[DPJob with DPCluster with Audit, Unit] = for {
_ <- createCluster
_ <- sparkJob
_ <- deleteCluster
} yield ()
val dpJobLayer = DPJob.live(dpCluster, gcpProject, gcpRegion, dpEndpoint)
val dpClusterLayer = DPCluster.live(gcpProject, gcpRegion, dpEndpoint)
programGCP.provide(dpJobLayer ++ dpClusterLayer ++ audit.noop)
Check this for complete example.
K8S
import etlflow.task._
import etlflow.k8s._
import etlflow.audit
import etlflow.audit.Audit
import zio._
val jobName: String = "hello"
val programK8S: RIO[K8S with Audit, Unit] = for {
_ <- K8SJobTask(
name = "CreateKubeJobTask",
jobName = jobName,
image = "busybox:1.28",
command = Some(List("/bin/sh", "-c", "sleep 5; ls /etc/key; date; echo Hello from the Kubernetes cluster"))
).toZIO
_ <- K8STrackJobTask("TrackKubeJobTask", jobName).toZIO
_ <- K8SJobLogTask("GetKubeJobLogTask", jobName).toZIO
_ <- K8SDeleteJobTask("DeleteKubeJobTask", jobName).toZIO
} yield ()
programK8S.provide(K8S.live() ++ audit.noop)
Check this for complete example.
JDBC
// Todo
Http
// Todo
// Todo
AWS
// Todo
Redis
// Todo
Spark
// Todo
Contributions
Please feel free to add issues to report any bugs or to propose new features.