Kubernetes Operator in Scala for Kerberos Keytab Management
Alexey Novakov published on
8 min, 1452 words
Categories: scala
Tags: kerberos kubernetes operator devops
Kubernetes has built-in controllers to handle its native resource such as
- Pod
- Service
- Deployment
- etc.
What if you want a completely new resource type, which would describe some new abstraction in clear and concise way? Such new resource would describe everything in one single type which would require 5-10 separate native Kubernetes resources.
Good news is that such custom resource creation is possible in Kubernetes, however who would be processing such custom resources?
The answer is the Custom Controller
would process such new custom resources and this is official way that Kubernetes allows its users
to extend Kubernetes API-Server:
- kind: CustomResourceDefinition, apiVersion: apiextensions.k8s.io/v1
- Custom Controller, which is dedicated program running usually inside the Kubernetes cluster
Operator Pattern
Custom Controller is also called Kubernetes Operator
. Operator became a design pattern to automate application deployment operations by
running custom program, which will deploy Kubernetes native resources based on code logic. So you can say that the Operator pattern
is a next level of automation, which DevOps applies to Kubernetes. Of course, you could also go with Helm
package manager
and deploy/change your set of native resources via command like helm update
. However, that requires a full-time engineer to sit and perform such operations by hands.
Operator workflow can be illustrated as the following 3 steps:
- User creates new custom resource instance via Kubernetes API, for example using
kubectl
CLI - Once custom resource is created, this event is sent to custom controller, since the last one is subscribed to custom resource type updates
- Once custom controller got the event it automatically executes one or multiple CRUD commands towards API server
Later, users can modify or delete custom resources like any other native resources. Custom controller will handle these events as well. Based on its programmed logic it decides what to do in case of custom resource modification or deletion. Custom controller can watch for native resources as well, there are no restrictions to that.
Use Case: Kerberos Operator
Let's look at how to develop new operator from scratch in Scala using Kubernetes-Client library.
Our desired custom resources are going to be the following:
KrbServer
apiVersion: krb-operator.novakov-alexey.github.io/v1
kind: KrbServer
metadata:
name: my-krb
namespace: test
spec:
realm: EXAMPLE.COM
Principals
apiVersion: krb-operator.novakov-alexey.github.io/v1
kind: Principals
metadata:
name: my-krb1
namespace: test
labels:
krb-operator.novakov-alexey.github.io/server: my-krb # reference to KrbServer
spec:
list:
- name: client1
password:
type: static
value: mypass
keytab: cluster.keytab
secret:
type: Keytab
name: cluster-keytab
- name: user2
keytab: cluster.keytab
secret:
type: KeytabAndPassword
name: cluster-keytab
Based on above resource example, our operator will eventually create/modify/delete the following native resources:
- Service to expose Kerberos server
- Pod to launch Kerberos container
- Secrets to keep Kerberos principal keytabs or their credentials
Operator will have native resource templates in its code to apply them when it gets custom resource instance. Such native template will be used to create native resources based on values available in custom resources. So that we get logical mapping between particular custom resource instance and its set of native resources, which user expects to create/modify/delete in Kubernetes namespace(s).
Implementation
Libraries
Below are the main Scala libraries to implement new operator app:
lazy val kubernetesClient = "com.goyeau" %% "kubernetes-client" % "0.8.0"
lazy val circeVersion = "0.14.1"
lazy val circeExtra = "io.circe" %% "circe-generic-extras" % circeVersion
lazy val circeCore = "io.circe" %% "circe-core" % circeVersion
Resource Model
Custom resource specification is modeled as a hierarchy of Scala case classes:
sealed trait Password
final case class Static(value: String) extends Password
case object Random extends Password
sealed trait Secret {
val name: String
}
final case class Keytab(name: String) extends Secret
final case class KeytabAndPassword(name: String) extends Secret
final case class Principal(
name: String,
password: Password = Random,
keytab: String,
secret: Secret
)
final case class Principals(list: List[Principal])
final case class PrincipalsStatus(
processed: Boolean,
lastPrincipalCount: Int,
totalPrincipalCount: Int,
error: String = ""
)
final case class KrbServer(realm: String)
final case class KrbServerStatus(processed: Boolean, error: String = "")
In the result, we get two custom resources: Princpals
and KrbServer
. Each of them has a status property such as PrincipalsStatus
and KrbServerStatus
accordingly. Status property of the custom resource is additional way to communicate from an operator to a user who is going
to submit CR instances and changes to them. Structure of status property can have anything in it what operator author decides to have.
Both statuses and main CR specifications are validated according to operator provided OpenAPI JSON schema. Basically, operator author provided a JSON schema when operator is installed.
Operators can also install CRD automatically. In order to allow that, a service account used by operator has to have a special RBAC permissions.
Operator Main Class
Let's import the following classes into the operator main class:
// Cats
import cats.effect._
import cats.implicits._
// Kubernetes Client
import com.goyeau.kubernetes.client.EventType.{ADDED, DELETED, ERROR, MODIFIED}
import com.goyeau.kubernetes.client._
import com.goyeau.kubernetes.client.crd.{CrdContext, CustomResource}
import io.k8s.apiextensionsapiserver.pkg.apis.apiextensions.v1.CustomResourceDefinition
// FS2 from Kubernetes Client
import fs2.{Pipe, Stream}
import io.circe.generic.extras.auto._
import io.circe.{Decoder, Encoder}
// Operator's other classes
import krboperator.Controller.NewStatus
import krboperator.service.Template.implicits._
import krboperator.service.{Kadmin, Secrets, Template}
// Log4cats
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
// Scala, Java std libs
import java.io.File
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.reflect.ClassTag
In the operator main class, we will instantiate Kubernetes client and start watching for KrbServer
and Principalas
custom resources.
object KrbOperator extends IOApp with Codecs {
implicit def unsafeLogger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F]
val kubernetesClient =
KubernetesClient[IO](
KubeConfig.fromFile[IO](
new File(s"${System.getProperty("user.home")}/.kube/config")
)
)
override def run(args: List[String]): IO[ExitCode] = ???
....
Watching Resource
Kubernetes-Client library is brings FS2 library as a direct dependency to watch for the Kubernetes resources. That gives a lot of power to handle events in fault-tolerant way. Also, FS2 nicely composes with Cats-Effect.
def watchCr[F[_]: Sync,
Resource: Encoder: Decoder: ClassTag,
Status: Encoder: Decoder](
ctx: CrdContext
)(implicit
controller: Controller[F, Resource, Status],
kubernetesClient: KubernetesClient[F]
) = {
val context = crds.context[Resource]
Stream.eval(
Logger[F].info(s"Watching context: $context")
) >> kubernetesClient
.customResources[Resource, Status](context)
.watch()
.through(handler[F, Resource, Status])
.through(submitStatus[F, Resource, Status](ctx))
}
Above code opens long-term HTTP connection to Kubernetes API-Server, which is going to listen for custom resource updates. Until this connection is not closed by the
operator or by the API-Server, operator will receive and pass custom resource events containing current resource state to a function called handler
. Its output
will be passed further to submitStatus
function. These two functions will be called on every event sent by API-Server to the watching operator. Now, our goal is to
react to these events and make something to happen in Kubernetes cluster. In our case, we need to create Kerberos pod, service and a bunch of secrets.
Also, our Principals
resource means that principal list must be registered in Kerberos database using one of the Kerberos process called KDC. If you never worked with
Kerberos, then think of it like a database, which we need to deploy to Kubernetes and create new users with requested credentials. Later, someone
will mount newly created secrets with keytabs to some user's Pod and will going to authenticate to Kerberos via keytab file. Keytab is something like
password saved into a binary file with additional meta information.
Handler Function
We are coming closer to processing the custom resource events. Below is a FS2 Pipe to take CustomResource type and return optional status. which is going to be added to the copy of original custom resource event. Then, we take this optional status and pass to the next function to submit it to API-Server.
def handler[F[_]: Sync, Resource, Status](implicit
controller: Controller[F, Resource, Status]
): Pipe[F, Either[String, WatchEvent[
CustomResource[Resource, Status]
]], Option[CustomResource[Resource, Status]]] =
_.flatMap { s =>
val action = s match {
case Left(err) =>
Logger[F].error(s"Error on watching events: $err") *> Sync[F].pure(
none
)
case Right(event) =>
Logger[F].debug(s"Received event: $event") >> {
val status =
event.`type` match {
case ADDED =>
controller
.onAdd(event.`object`)
.map(addStatus(_, event.`object`))
case MODIFIED =>
controller
.onModify(event.`object`)
.map(addStatus(_, event.`object`))
case DELETED =>
controller.onDelete(event.`object`).map(_ => none)
case ERROR =>
Logger[F].error(
s"Received error event for ${event.`object`}"
) *> Sync[F].pure(none)
}
status.handleErrorWith { e =>
Logger[F].error(e)(
s"Controller failed to handle event type ${event.`type`}"
) *> Sync[F].pure(none)
}
}
}
Stream.eval(action)
}
In the above match statement, we call different methods of a controller object, which will define right now:
import cats.effect.Sync
import cats.syntax.all._
import com.goyeau.kubernetes.client.crd.CustomResource
import krboperator.Controller.{NewStatus, NoStatus, noStatus}
import scala.language.implicitConversions
object Controller {
type NewStatus[U] = Option[U]
type NoStatus = NewStatus[Unit]
def noStatus[F[_], U](implicit F: Sync[F]): F[Option[U]] = F.pure(none[U])
}
abstract class Controller[F[_], T, U](implicit val F: Sync[F]) {
implicit def unitToNoStatus(unit: F[Unit]): F[NoStatus] =
unit *> noStatus
def onAdd(resource: CustomResource[T, U]): F[NewStatus[U]] = noStatus
def onModify(resource: CustomResource[T, U]): F[NewStatus[U]] = noStatus
def reconcile(resource: CustomResource[T, U]): F[NewStatus[U]] = noStatus
def onError(resource: CustomResource[T, U]): F[NewStatus[U]] = noStatus
def onDelete(resource: CustomResource[T, U]): F[Unit] = F.unit
}
As you might noticed, main class has word Operator
in it. Event handlers are called controllers.
Basically, our entire application will be called operator. However, this operator app has internal logic wrapped into Controller
class.
In our case, we will have two controllers, each handles own custom resource such as KrbServer
and Principals
.
Submit Status
Below submitStatus
function is to add status value and submit it back to API-Server:
def submitStatus[F[_],
Resource: Encoder: Decoder,
Status: Encoder: Decoder](
ctx: CrdContext
)(implicit
client: KubernetesClient[F],
F: Sync[F]
): Pipe[F, Option[CustomResource[Resource, Status]], Unit] =
_.flatMap { cr =>
val action = cr match {
case Some(r) =>
for {
name <- F.fromOption(
r.metadata.flatMap(_.name),
new RuntimeException(s"Resource name is empty: $r")
)
namespace <- F.fromOption(
r.metadata.flatMap(_.namespace),
new RuntimeException(s"Resource namespace is empty: $r")
)
_ <- client
.customResources[Resource, Status](ctx)
.namespace(namespace)
.updateStatus(name, r)
.flatMap(status =>
if (status.isSuccess) F.unit
else
Logger[F].error(new RuntimeException(status.toString()))(
"Status updated failed on Kubernetes side"
)
)
} yield ()
case None => F.unit
}
Stream.eval(action.handleErrorWith { e =>
Logger[F].error(e)(
s"Failed to submit status"
) *> F.pure(none)
})
}
Watchers and Reconcillers
We need to create a pair of watchers and return them to Cats IOApp to launch the entire application.
We are almost ready to define run
method of the IOApp:
override def run(args: List[String]): IO[ExitCode] =
kubernetesClient
.use { implicit client =>
for {
operatorCfg <- IO.fromEither(
AppConfig.load
.leftMap(e => new RuntimeException(s"failed to load config: $e"))
)
serverCtx = crds.context[KrbServer]
_ <- createCrdIfAbsent[IO, KrbServer](
serverCtx,
crds.server.definition(serverCtx)
)
principalCtx = crds.context[Principals]
_ <- createCrdIfAbsent[IO, Principals](
principalCtx,
crds.principal.definition(principalCtx)
)
_ <- createWatchers(
operatorCfg,
serverCtx,
principalCtx
).parJoinUnbounded.compile.drain
} yield ()
}
.as(ExitCode.Success)
def createWatchers(
operatorCfg: KrbOperatorCfg,
serverCtx: CrdContext,
principalCtx: CrdContext
)(implicit client: KubernetesClient[IO]) = {
val secrets = new Secrets(client, operatorCfg)
val kadmin = new Kadmin(client, operatorCfg)
val template = new Template(client, secrets, operatorCfg)
implicit lazy val serverController =
new ServerController(
template,
secrets,
client,
crds.context[KrbServer]
)
implicit lazy val principalController =
new PrincipalController(
secrets,
kadmin,
client,
serverController,
operatorCfg
)
Stream(
watchCr[IO, KrbServer, KrbServerStatus](serverCtx),
watchCr[IO, Principals, PrincipalsStatus](principalCtx),
reconcile[IO, KrbServer, KrbServerStatus](serverCtx),
reconcile[IO, Principals, PrincipalsStatus](principalCtx)
)
}
Before starting our watchers we check whether our CRDs are already registered in API-Server. If they are not, then make an attempt to register by our operator:
def createCrdIfAbsent[F[_]: Sync, Resource](
ctx: CrdContext,
customDefinition: CustomResourceDefinition,
attempts: Int = 1
)(implicit client: KubernetesClient[F]): F[Unit] = {
val crdName = crds.crdName(ctx)
for {
found <- client.customResourceDefinitions
.get(crdName)
.map(_ => true)
.recoverWith { case _ =>
false.pure[F]
}
_ <-
if (!found && attempts > 0)
client.customResourceDefinitions.create(
customDefinition
) *> createCrdIfAbsent(ctx, customDefinition, attempts - 1)
else if (!found)
Sync[F].raiseError(new RuntimeException(s"CRD '$crdName' not found"))
else Sync[F].unit
} yield ()
}
If operator failed to create a CRD, then it crashes the entire application, because it won't be able to open a watcher connection later for non-existing CRD.
Finally, let's look what reconcile
function is doing:
def reconcile[F[_]: Async,
Resource: Encoder: Decoder: ClassTag,
Status: Encoder: Decoder](
ctx: CrdContext,
every: FiniteDuration = 30.seconds
)(implicit
client: KubernetesClient[F],
controller: Controller[F, Resource, Status]
): Stream[F, Unit] =
Stream
.awakeEvery(every)
.evalMap { _ =>
Logger[F].debug(
s"Reconciling events for ${implicitly[ClassTag[Resource]].runtimeClass.getSimpleName}"
) *> client.customResources[Resource, Status](ctx).list()
}
.through(
_.evalMap { crList =>
Logger[F].debug(
s"Got ${crList.items.length} custom resources"
) *> crList.items
.map(cr => controller.reconcile(cr).map(addStatus(_, cr)))
.sequence
}.flatMap(Stream.emits)
)
.through(submitStatus[F, Resource, Status](ctx))
It calls every 30 seconds the API-Server to list all existing CR instances of a specific custom resource type. Then every instance in the list is checked whether its state is correctly handled according to business logic of the operator. Main idea of reconciliation loop is to have a chance to recover from failures, which may occur in the main handler stream.
Finish
At this point we have all pieces, which can be used to implement any operator. Just plug in your own controller's logic. In our case here, the main business logic is hidden in these classes:
val secrets = new Secrets(client, operatorCfg)
val kadmin = new Kadmin(client, operatorCfg)
val template = new Template(client, secrets, operatorCfg)
Full source of working operator can be found here: https://github.com/novakov-alexey/krb-operator2
Summary
We have created an operator skeleton in Scala using Kubernetes-Client, which can be used to build any other operator. Thanks to Cats-Effect and FS2 we can handle custom resource event stream easily composing functions.
Kubernetes Operator is a dedicated application which automates deployment of complex stateful and stateless applications. Such application can be implemented in any language since API-Server is exposing custom resource via REST API using JSON payloads. There is no need to have special operator framework or library to enable developers to create new operators easily. Just have a show case app like we have developed and re-use its code to start another operator when needed. Basically, what is actually needed is a decent Kubernetes client library like the one we used and you are good to go.