anicolaspp / minikaf   1.0.0

MIT License GitHub

Minikaf is a minimal implementation of Publish / Subscriber for in memory communication.

Scala versions: 2.12

Minikaf

Maven Central Build Status

libraryDependencies += "com.github.anicolaspp" % "minikaf_2.12" % "x.y.z"

Minikaf is a minimal implementation of Publish / Subscriber for in memory communication.

The main idea of this library is to have a pub / sub mechanism based on topics similar to Kafka using automatic topic selection based on the data being sent.

Minikaf is based on ScalaRx for in memory message passing. However, it removes the need of connecting publishers and subscribes. There is a central, transparent hub where publishers can write to, and completely decompled, subscribers can read from.

When using ScalaRx, in order to publish a message and the message to be processed, we need a Subject[A] that we can use in the following way.

val subject = Subject[Int]

Then we need a subscriber that listen to the Subject[A], connecting both ends, publisher and subscriber.

val subscriber = subject.subscribe(event => someFunction(event)

Once we have created this connection, the publisher uses .onNext to push messages to the Stream that the connection represents.

subject.onNext(5)

Once the message is pushed, the defined function someFunction will be executed.

Minikaf eliminates the problem of connecting both ends by allowing us to define publishers and subscribers in any part of our code without explicitely connect them.

Our subscription model works similar to Kafka, where messages are published and consumed to and from topics, but with complete independence from each other.

The specific topic used to send and received messages is automatically extracted from the type of the message being sent.

val subscriber = Subscriber()
subscriber.subscribe[Int](e => println((e.topic, e.value)))

val publisher = Publisher()
publisher.publish(5)
publisher.publish("hello")

The result will be:

(Int, 5)

When we publish the value 5 we extract the type Int from it and use it as the topic. We do the same when we subscribe. We use the type parameter as the topic for our messages.

As you might noticed, we are not explicitely connecting the subscriber and publisher, all this machinery is happening behind the scene.

Multiple subscribers and publishers can be created and they work in total isolation.

val intSubscriber = Subscriber()
intSubscriber.subscribe[Int](e => println(s"I am an Int: $e.value")

val stringSubscriber = Subscriber()
stringSubscriber.subscribe[Int](e => println(s"I am a String: $e.value")


Publisher().publish(5)
Publisher().publish("hello")

Each subscriber will get only the messages that it is subscribed to.

Sometimes, being able to unsubscribe from a particular topic is as important as the subscription itself. This is an easy operation in Minikaf.

stringSubscriber.unsubscribe()

After the call to .unsubscribe, the subscriber in question will not receive and process messages any longer.