Kubernetes Client for Scala

kubernetes-client Scala version support

A pure functional client for Kubernetes.

Installation

Mill:

ivy"com.goyeau::kubernetes-client:<latest version>"

or

SBT:

"com.goyeau" %% "kubernetes-client" % "<latest version>"

Usage

Client configuration example

Standard configuration "chain"

import cats.effect.IO
import com.goyeau.kubernetes.client.*
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO]

val kubernetesClient =
  KubernetesClient[IO](
    KubeConfig.standard[IO]
  )

The standard configuration mimics the way ClientBuilder.standard from the official Java k8s client works:

  • if KUBECONFIG env variable is set, and the file exists - it will be used; the 'current-context' specified in the file will be used
  • otherwise, if ~/.kube/config file exists - it will be used; the 'current-context' specified in the file will be used
  • otherwise, if cluster configuration is found - use it

Cluster configuration is defined by:

  • /var/run/secrets/kubernetes.io/serviceaccount/ca.crt certificate file
  • /var/run/secrets/kubernetes.io/serviceaccount/token token file
  • KUBERNETES_SERVICE_HOST env variable (https protocol is assumed)
  • KUBERNETES_SERVICE_PORT env variable

Manually providing the configuration

import cats.effect.IO
import com.goyeau.kubernetes.client.*
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import java.io.File
import org.http4s.AuthScheme
import org.http4s.Credentials.Token
import org.http4s.headers.Authorization
import org.http4s.implicits.*
import scala.concurrent.ExecutionContext
import scala.io.Source

implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO]

val kubernetesClient =
  KubernetesClient[IO](
    KubeConfig.of[IO](
      server = uri"https://k8s.goyeau.com",
      authorization = Option(IO.pure(Authorization(Token(AuthScheme.Bearer, Source.fromFile("/var/run/secrets/kubernetes.io/serviceaccount/token").mkString)))),
      caCertFile = Option(new File("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"))
    )
  )
import cats.effect.IO
import com.goyeau.kubernetes.client.*
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import java.io.File
import scala.concurrent.ExecutionContext

implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO]

val kubernetesClient =
  KubernetesClient[IO](KubeConfig.fromFile[IO](new File(s"${System.getProperty("user.home")}/.kube/config")))

Authorization caching

It is possible (and recommended) to configure the kubernetes client to cache the authorization (and renew it, when/if it expires).

import cats.effect.IO
import com.goyeau.kubernetes.client.*
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import scala.concurrent.duration._

implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO]

val kubernetesClient =
  KubernetesClient[IO](
    KubeConfig.standard[IO].map(_.withDefaultAuthorizationCache(5.minutes))
  )

When authorization cache is configured, the client attempts to derive the expiration time of the token:

  • if it's a raw authorization header (provided directly, or from the token file inside the cluster), we attempt to decode it as a JWT and take the exp field from it;
  • if authorization is provided by the auth plugin in the kube config file – the auth plugin provides the expiration alongside the token.

The cache works this way:

The first time the token is "requested" by the client, it will unconditionally delegate to the underlying F[Authorization], and will cache the token.

  • if the underlying F[Authorization] "throws", the cache throws as well.

When the token is requested subsequently:

  • if the expiration time is not present (the token was not a JWT, the auth plugin did not specify expiration, etc) the cached authorization will be re-used forever
  • if the expiration time is present, but it's far enough into the future (later than now + refreshTokenBeforeExpiration), the cached authorization will be re-used
  • if the expiration time is present, and it's soon enough (sooner than now + refreshTokenBeforeExpiration), the underlying F[Authorization] will be evaluated
    • if it's successful, the new authorization is cached
    • if not, but the cached token is still valid, the cached token is re-used otherwise, it will raise an error.

If the cache is not configured (which is by default), the authorization will never be updated and might expire eventually.

Requests

import cats.effect.IO
import com.goyeau.kubernetes.client.*
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import io.k8s.api.apps.v1.*
import io.k8s.api.core.v1.*
import io.k8s.apimachinery.pkg.api.resource.Quantity
import io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta
import java.io.File
import scala.concurrent.ExecutionContext

implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO]

val kubernetesClient =
  KubernetesClient(KubeConfig.fromFile[IO](new File(s"${System.getProperty("user.home")}/.kube/config")))

val deployment = Deployment(
  metadata = Option(ObjectMeta(name = Option("web-backend"), namespace = Option("my-namespace"))),
  spec = Option(
    DeploymentSpec(
      selector = null,
      strategy = Option(
        DeploymentStrategy(
          `type` = Option("RollingUpdate"),
          rollingUpdate = Option(RollingUpdateDeployment(Option(StringValue("10%")), Option(StringValue("50%"))))
        )
      ),
      template = PodTemplateSpec(
        metadata = Option(
          ObjectMeta(
            labels = Option(Map("app" -> "web", "tier" -> "frontend", "environment" -> "myenv"))
          )
        ),
        spec = Option(
          PodSpec(
            containers = Seq(
              Container(
                name = "nginx",
                image = Option("nginx"),
                resources = Option(
                  ResourceRequirements(
                    Option(Map("cpu" -> Quantity("100m"), "memory" -> Quantity("128Mi"))),
                    Option(Map("cpu" -> Quantity("80m"), "memory" -> Quantity("64Mi")))
                  )
                ),
                volumeMounts = Option(Seq(VolumeMount(name = "nginx-config", mountPath = "/etc/nginx/conf.d"))),
                ports = Option(Seq(ContainerPort(name = Option("http"), containerPort = 8080)))
              )
            ),
            volumes = Option(
              Seq(
                Volume(
                  name = "nginx-config",
                  configMap = Option(ConfigMapVolumeSource(name = Option("nginx-config")))
                )
              )
            )
          )
        )
      )
    )
  )
)

kubernetesClient.use { client =>
  client.deployments.namespace("my-namespace").create(deployment)
}

Raw requests

In case a particular K8S API endpoint is not explicitly supported by this library, there is an escape hatch that you can use in order to run a raw request or open a raw WS connection.

Here's an example of how you can get a list of nodes using a raw request:

import cats.effect.*
import org.http4s.implicits.*
import com.goyeau.kubernetes.client.*
import org.http4s.*

val kubernetesClient: KubernetesClient[IO] = ???

val response: IO[(Status, String)] =
  kubernetesClient
          .raw.runRequest(
            Request[IO](
              uri = uri"/api" / "v1" / "nodes"
            )
          )
          .use { response =>
            response.bodyText.foldMonoid.compile.lastOrError.map { body =>
              (response.status, body)
            }
          }

Similarly, you can open a WS connection (org.http4s.jdkhttpclient.WSConnectionHighLevel):

import cats.effect.*
import org.http4s.implicits.*
import com.goyeau.kubernetes.client.*
import org.http4s.*
import org.http4s.jdkhttpclient.*

val connection: Resource[IO, WSConnectionHighLevel[IO]] =
  kubernetesClient.raw.connectWS(
    WSRequest(
      uri = (uri"/api" / "v1" / "my-custom-thing") +? ("watch" -> "true")
    )
  )

Development

Pre-requisites

  • Java 11 or higher
  • Docker

IntelliJ

Generate a BSP configuration:

./mill mill.bsp.BSP/install

Compiling

./mill kubernetes-client[2.13.10].compile

Running the tests

All tests:

./mill kubernetes-client[2.13.10].test

A specific test:

./mill kubernetes-client[2.13.10].test.testOnly 'com.goyeau.kubernetes.client.api.PodsApiTest'

minikube has to be installed and running.

Before opening a PR:

Check and fix formatting:

./mill __.style

Related projects

Why Kubernetes Client for Scala?

You might wonder why using this library instead of Skuber for example? Kubernetes Client is a pure functional based on Cats and Http4s.
Another benefit of Kubernetes Client is that (like the Kubernetes Client for Java) it is generating all the payload case classes by just ingesting the swagger api provided by Kubernetes' main repo. That means this project will always remain up to date with the latest Kubernetes API.

Adopters/Projects