Gcp4zio is simple Scala interface to Google Cloud API based on ZIO.
Add the latest release as a dependency to your project
Module | Latest Version | Documentation | Scala Versions |
---|---|---|---|
Google Cloud Storage | |||
Dataproc | |||
BigQuery | |||
PubSub | |||
Cloud Monitoring |
This project is tested with scala versions 2.13.12, 3.3.1 and java versions 17, 21
SBT
libraryDependencies ++= List(
"com.github.tharwaninitin" %% "gcp4zio-gcs" % "1.5.1",
"com.github.tharwaninitin" %% "gcp4zio-dp" % "1.5.1",
"com.github.tharwaninitin" %% "gcp4zio-bq" % "1.5.1",
"com.github.tharwaninitin" %% "gcp4zio-pubsub" % "1.5.1",
"com.github.tharwaninitin" %% "gcp4zio-monitoring" % "1.5.1"
)
Maven
<dependency>
<groupId>com.github.tharwaninitin</groupId>
<artifactId>gcp4zio-gcs_2.12</artifactId>
<version>1.5.1</version>
</dependency>
# To run all these examples set the GOOGLE_APPLICATION_CREDENTIALS environment variable to the location of the service account json key.
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/key.json
import gcp4zio.gcs._
import java.nio.file.Paths
import zio._
// Upload single object from local to provided bucket at provided prefix
val localPath1 = Paths.get("/local/path/to/file1.csv")
GCS.putObject("targetBucket", "temp/gcs/prefix/file1.csv", localPath1)
// Download single object from bucket at provided prefix to local
val localPath2 = Paths.get("/local/path/to/file2.csv")
GCS.getObject("srcBucket", "temp/gcs/prefix/file1.csv", localPath2)
// Delete single object from bucket at provided prefix
GCS.deleteObject("gcsBucket", "temp/gcs/prefix/file1.csv")
import gcp4zio.gcs._
import com.google.cloud.storage.Storage.BlobWriteOption
val src = GCS.getObject("gcsBucket", "temp/gcs/prefix/file1.csv", 4096)
val opts = List(BlobWriteOption.doesNotExist())
val sink = GCS.putObject("gcsBucket", "temp/test/ratings2.csv", opts)
src.run(sink)
import gcp4zio.gcs._
// Copy single object from source bucket to target bucket
GCS.copyObjectsGCStoGCS(
srcBucket = "srcBucket",
srcPrefix = Some("temp/gcs/prefix/file1.csv"),
targetBucket = "targetBucket",
targetPrefix = Some("temp2/gcs/prefix/file1.csv")
)
// Copy all objects from source bucket to target bucket
GCS.copyObjectsGCStoGCS(
srcBucket = "srcBucket",
targetBucket = "targetBucket"
)
// Copy all objects from source bucket with prefix to target bucket
GCS.copyObjectsGCStoGCS(
srcBucket = "srcBucket",
srcPrefix = Some("temp/gcs/prefix"),
targetBucket = "targetBucket"
)
import gcp4zio.dp._
// Create Dataproc Cluster Properties
val props = new ClusterProps("dataproc-logs")
// Create Dataproc Cluster
val createTask = DPCluster.createDataproc("cluster1", props)
// Delete Dataproc Cluster
val deleteTask = DPCluster.deleteDataproc("dpCluster")
(createTask *> deleteTask).provide(DPCluster.live("gcpProject", "gcpRegion"))
import gcp4zio.dp._
val libs = List("file:///usr/lib/spark/examples/jars/spark-examples.jar")
val conf = Map("spark.executor.memory" -> "1g", "spark.driver.memory" -> "1g")
val mainClass = "org.apache.spark.examples.SparkPi"
val job = DPJob.executeSparkJob(List("1000"), "mainClass", libs, conf)
job.provide(DPJob.live("dpCluster", "gcpProject", "gcpRegion", "dpEndpoint"))
import gcp4zio.bq._
// Execute DML/DDL query on Bigquery
val task1: RIO[BQ, Unit] = BQ.executeQuery("CREATE TABLE dataset1.test1 (column1 STRING)").unit
val task2: RIO[BQ, Unit] = BQ.executeQuery(""" INSERT INTO dataset1.test1 VALUES ("value1") """).unit
// Fetching data from Bigquery
val task3: RIO[BQ, Iterable[String]] = BQ.fetchResults("SELECT * FROM dataset1.test1")(rs => rs.get("column1").getStringValue)
(task1 *> task2 *> task3).provide(BQ.live())
import gcp4zio.bq._
import gcp4zio.bq.FileType.PARQUET
// Load PARQUET file into Bigquery
val step = BQ.loadTable("inputFilePathParquet", PARQUET, Some("gcpProject"), "outputDataset", "outputTable")
step.provide(BQ.live())
import gcp4zio.pubsub.topic._
// Create PubSub Topic
PSTopic.createTopic(project = "gcsProjectId", topic = "topicName")
// Add IAM Policy Binding to existing Topic, where you grant basic pubsub role to a member
PSTopic.addIAMPolicyBindingToTopic(project = "gcsProjectId", topic = "topicName", member = "[email protected]", role = "roles/<IAM_Role>")
// Delete PubSub Topic
PSTopic.deleteTopic(project = "gcsProjectId", topic = "topicName")
import gcp4zio.pubsub.subscription._
// Create Pull Subscription
PSSubscription.createPullSubscription(
project = "gcsProjectId",
subscription = "subName",
topic = "topicName",
ackDeadlineSeconds = 20 // default 10 seconds
)
// Create Push Subscription
PSSubscription.createPushSubscription(
project = "gcsProjectId",
subscription = "subName",
topic = "topicName",
ackDeadlineSeconds = 20, // default 10 seconds
pushEndpoint = "https://example.com/push"
)
// Create Bigquery Subscription
PSSubscription.createBQSubscription(
project = "gcsProjectId",
subscription = "subName",
topic = "topicName",
bqTableId = "projectId:datasetId.tableId"
)
// Delete Subscription
PSSubscription.deleteSubscription(
project = "gcsProjectId",
subscription = "subName"
)
import gcp4zio.pubsub.publisher._
// Create encoder for sending String messages to Topic
implicit val encoder: MessageEncoder[String] = (a: String) => Right(a.getBytes(java.nio.charset.Charset.defaultCharset()))
// Publish message to topic
val publishMsg = PSPublisher.produce[String]("String Message")
// Provide Publisher layer
publishMsg.provide(PSPublisher.live("gcsProjectId", "topic"))
import gcp4zio.pubsub.subscriber._
import zio._
// Create stream to consume messages from the subscription
val subscriberStream = PSSubscriber.subscribe("gcsProjectId", "subscription")
// Print first 10 messages from stream to console
val task = subscriberStream.mapZIO { msg =>
ZIO.logInfo(msg.value.toString) *> msg.ack
}
.take(10)
.runDrain
Check this example to use PubSub APIs
import gcp4zio.monitoring._
import com.google.monitoring.v3.TimeInterval
// Get GCS Cloud Monitoring metric data (time-series data)
Monitoring.getMetric(
project = "gcsProjectId",
metric = "compute.googleapis.com/instance/cpu/usage_time",
interval = TimeInterval.getDefaultInstance // Provide TimeInterval with start and end time
)