Advanced Quasar Usage for Kotlin

1. Introduction

We recently looked at Quasar, which gives us tools to make asynchronous programming more accessible and more efficient. We’ve seen the basics of what we can do with it, allowing for lightweight threads and message passing.

In this tutorial, we’re going to see some more advanced things that we can do with Quasar to take our asynchronous programming even further.

2. Actors

Actors are a well-known programming practice for concurrent programming, made especially popular in Erlang. Quasar allows us to define Actors, which are the fundamental building blocks of this form of programming.

Actors can:

  • Start other actors

  • Send messages to other actors

  • Receive messages from other actors that they react to

These three pieces of functionality give us everything we need to build our application.

In Quasar, an actor is represented as a strand — normally a fiber, but threads are also an option if needed — with a channel to get messages in, and some special support for lifecycle management and error handling.

2.1. Adding Actors to the Build

Actors aren’t a core concept in Quasar. Instead, we need to add the dependency that gives us access to them:

<dependency>
    <groupId>co.paralleluniverse</groupId>
    <artifactId>quasar-actors</artifactId>
    <version>0.8.0</version>
</dependency>

It is important that we use the same version of this dependency as of any other Quasar dependencies in use.

2.2. Creating Actors

We create an actor by subclassing the Actor class, providing a name and a MailboxConfig and implementing the doRun() method:

val actor = object : Actor<Int, String>("noopActor", MailboxConfig(5, Channels.OverflowPolicy.THROW)) {
    @Suspendable
    override fun doRun(): String {
        return "Hello"
    }
}

Both the name and the mailbox configuration are optional — if we don’t specify a mailbox configuration, then the default is an unbounded mailbox.

Note that we need to mark the methods in the actor as @Suspendable manually. Kotlin doesn’t require that we declare exceptions at all, which means that we don’t declare the SuspendException that is on the base class we are extending. This means that Quasar doesn’t see our methods as suspendable without a little more help.

Once we’ve created an actor, we then need to start it — using either the spawn() method to start a new fiber, or spawnThread() to start a new thread. Other than the difference between a fiber and thread, these two work the same way.

Once we spawn the actor, we can treat it to an extent the same as any other strand. This includes being able to call join() to wait for it to finish executing, and get() to retrieve the value from it:

actor.spawn()

println("Noop Actor: ${actor.get()}")

2.3. Sending Messages to Actors

When we spawn a new actor, the spawn() and spawnThread() methods return an ActorRef instance. We can use this to interact with the actor itself, by sending messages for it to receive.

The ActorRef implements the SendPort interface, and as such we can use it the same as we would use the producing half of a Channel. This gives us access to the send and trySend methods that we can use to pass messages into the actor:

val actorRef = actor.spawn()

actorRef.send(1)

2.4. Receiving Messages with Actors

Now that we can pass messages into the actor, we need to be able to do things with them. We do this inside our doRun() method on the actor itself, where we can call the receive() method to get the next message to process:

val actor = object : Actor<Int, Void?>("simpleActor", null) {
    @Suspendable
    override fun doRun(): Void? {
        val msg = receive()
        println("SimpleActor Received Message: $msg")
        return null
    }
}

The receive() method will block inside the actor until a message is available, and then it will allow the actor to process this message as required.

Often, actors will be designed to receive many messages and process them all. As such, actors will typically have an infinite loop inside the doRun() method that will process all messages that come in:

val actor = object : Actor<Int, Void?>("loopingActor", null) {
    @Suspendable
    override fun doRun(): Void? {
        while (true) {
            val msg = receive()

            if (msg > 0) {
                println("LoopingActor Received Message: $msg")
            } else {
                break
            }
        }

        return null
    }
}

This will then keep processing incoming messages until we receive a value of 0.

2.5. Sending Messages Too Fast

In some cases, the actor will process messages slower than they are sent to it. This will cause the mailbox to fill up, and potentially, to overflow.

The default mailbox policy has an unlimited capacity. We can configure this when we create the actor, though, by providing a MailboxConfig. Quasar also offers a configuration for how to react when the mailbox overflows, but at present, this isn’t implemented.

Instead, Quasar will use the policy of THROW, regardless of what we specify:

Actor<Int, String>("backlogActor",
    MailboxConfig(1, Channels.OverflowPolicy.THROW)) {
}

If we specify a mailbox size, and the mailbox overflows, then the receive() method inside of the actor will cause the actor to abort by throwing an exception.

This isn’t something we can handle in any way:

try {
    receive()
} catch (e: Throwable) {
    // This is never reached
}

When this happens, the get() method from outside the actor will also throw an exception, but this can be handled. In this case, we’ll get an ExecutionException that wraps a QueueCapacityExceededException with a stack trace pointing to the send() method that added the overflowing message.

If we know we’re working with an actor that has a limited mailbox size, we can use the trySend() method to send messages to it instead. This won’t cause the actor to fail, but will instead report whether the message was successfully sent or not:

val actor = object : Actor<Int, String>("backlogTrySendActor",
  MailboxConfig(1, Channels.OverflowPolicy.THROW)) {
    @Suspendable
    override fun doRun(): String {
        TimeUnit.MILLISECONDS.sleep(500);
        println("Backlog TrySend Actor Received: ${receive()}")

        return "No Exception"
    }
}

val actorRef = actor.spawn()

actorRef.trySend(1) // Returns True
actorRef.trySend(2) // Returns False

2.6. Reading Messages Too Fast

In the opposite case, we might have an actor that is trying to read messages faster than they are being provided. Normally this is fine — the actor will block until a message is available and then process it.

In some situations, though, we want to be able to handle this in other ways.

When it comes to receiving messages, we have three options available to us:

  • Block indefinitely until a message is available

  • Block until a message is available or until a timeout occurs

  • Don’t block at all

So far, we’ve used the receive() method, which blocks forever.

If necessary, we can provide timeout details to the receive() method. This will cause it to block only for that period before returning — either the received message or null if we timed out:

while(true) {
    val msg = receive(1, TimeUnit.SECONDS)
    if (msg != null) {
        // Process Message
    } else {
        println("Still alive")
    }
}

On rare occasions, we might want not to block at all, and instead, return immediately with a message or null. We can do this with the tryReceive() method instead — as a mirror to the trySend() method we saw above:

while(true) {
    val msg = tryReceive()
    if (msg != null) {
        // Process Message
    } else {
        print(".")
    }
}

2.7. Filtering Messages

So far, our actors have received every single message that was sent to them. We can adjust this if desired, though.

Our doRun() method is designed to represent the bulk of the actor functionality, and the receive() method called from this will give us the next method to work with.

We can also override a method called filterMessage() that will determine if we should process any given message or not. The receive() method calls this for us, and if it returns null, then the message isn’t passed on to the actor. For example, the following will filter out all messages that are odd numbers:

override fun filterMessage(m: Any?): Int? {
    return when (m) {
        is Int -> {
            if (m % 2 == 0) {
                m
            } else {
                null
            }
        } else -> super.filterMessage(m)
    }
}

The filterMessage() method is also able to transform the messages as they come through. The value that we return is the value provided to the actor, so it acts as both filter and map. The only restriction is that the return type must match the expected message type of actor.

For example, the following will filter out all odd numbers, but then multiply all the even numbers by 10:

override fun filterMessage(m: Any?): Int? {
    return when (m) {
        is Int -> {
            if (m % 2 == 0) {
                m * 10
            } else {
                null
            }
        }
        else -> super.filterMessage(m)
    }
}

2.8. Linking Actors and Error Handling

So far, our actors have all worked strictly in isolation. We do have the ability to have actors that watch each other so that one can react to events in the other. We can do this in a symmetric or asymmetric manner as desired.

At present, the only event that we can handle is when an actor exits — either deliberately or because it failed for some reason.

When we link actors using the watch() method, then we are allowing one actor — the watcher — to be informed of lifecycle events in the other — the watched. This is strictly a one-way affair, and the watched actor doesn’t get notified of anything about the watcher:

val watcherRef = watcher.spawn()
val watchedRef = watched.spawn()
watcher.watch(watchedRef)

Alternatively, we can use the link() method, which is the symmetric version. In this case, both actors are informed of lifecycle events in the other, instead of having a watcher and a watched actor:

val firstRef = first.spawn()
val secondRef = second.spawn()
first.watch(secondRef)

In both cases, the effect is the same. Any lifecycle events that occur in the watched actor will cause a special message — of type LifecycleMessage — to be added to the input channel of the watcher actor. This then gets processed by the filterMessage() method as described earlier.

The default implementation will then pass this on to the handleLifecycleMessage() method in our actor instead, which can then process these messages as needed:

override fun handleLifecycleMessage(m: LifecycleMessage?): Int? {
    println("WatcherActor Received Lifecycle Message: ${m}")
    return super.handleLifecycleMessage(m)
}

Here, there’s a subtle difference between link() and watch(). With watch(), the standard handleLifecycleMessage() method does nothing more than remove the listener references, whereas with link(), it also throws an exception that will be received in the doRun() message in response to the receive() call.

This means that using link() automatically causes our doRun() method of the actors to see an exception when any linked actors exit, whereas watch() forces us to implement the handleLifecycleMessage() method to be able to react to the message.

2.9. Registering and Retrieving Actors

So far, we’ve only ever interacted with actors immediately after we’ve created them, so we’ve been able to use the variables in scope to interact with them. Sometimes, though, we need to be able to interact with actors a long way from where we spawned them.

One way that we can do this is by using standard programming practices — pass the ActorRef variable around so that we have access to it from where we need it.

Quasar gives us another way to achieve this. We can register actors with a central ActorRegistry and then access them by name later:

val actorRef = actor.spawn()
actor.register()

val retrievedRef = ActorRegistry.getActor<ActorRef<Int>>("theActorName")

assertEquals(actorRef, retrievedRef)

This assumes that we gave the actor a name when we created it and registers it with that name. If the actor wasn’t named — for example, if the first constructor argument was null — then we can pass in a name to the register() method instead:

actor.register("renamedActor")

ActorRegistry.getActor() is static so that we can access this from anywhere in our application.

If we try to retrieve an actor using a name that isn’t known, Quasar will block until such an actor does exist. This can potentially be forever, so we can also give a timeout when we’re retrieving the actor to avoid this. This will then return null on timeout should the requested actor not be found:

val retrievedRef = ActorRegistry.getActor<ActorRef<Int>>("unknownActor", 1, TimeUnit.SECONDS)

Assert.assertNull(retrievedRef)

3. Actor Templates

So far, we have written our actors from first principles. However, there are several common patterns that get used over and over. As such, Quasar has packaged these up in a way that we can easily re-use them.

These templates are often referred to as Behaviors, borrowing the terminology for the same concept used in Erlang.

Many of these templates are implemented as subclasses of Actor and of ActorRef, which add additional features for us to use. This will give additional methods inside the Actor class to override or to call from inside our implemented functionality, and additional methods on the ActorRef class for the calling code to interact with the actor.

3.1. Request/Reply

A common use case for actors is that some calling code will send them a message, and then the actor will do some work and send some result back. The calling code then receives the response and carries on working with it. Quasar gives us the RequestReplyHelper to let us achieve both sides of this easily.

To use this, our messages must all be subclasses of the RequestMessage class. This allows Quasar to store additional information to get the reply back to the correct calling code:

data class TestMessage(val input: Int) : RequestMessage<Int>()

As the calling code, we can use RequestReplyHelper.call() to submit a message to the actor, and then get either the response or an exception back as appropriate:

val result = RequestReplyHelper.call(actorRef, TestMessage(50))

Inside the actor itself, we then receive the message, process it, and use RequestReplyHelper.reply() to send back the result:

val actor = object : Actor<TestMessage, Void?>() {
    @Suspendable
    override fun doRun(): Void {
        while (true) {
            val msg = receive()

            RequestReplyHelper.reply(msg, msg.input * 100)
        }

        return null
    }
}

3.2. Server

The ServerActor is an extension to the above where the request/reply capabilities are part of the actor itself. This gives us the ability to make a synchronous call to the actor and to get a response from it — using the call() method — or to make an asynchronous call to the actor where we don’t need a response — using the cast() method.

We implement this form of an actor by using the ServerActor class and passing an instance of ServerHandler to the constructor. This is generic over the types of message to handle for a synchronous call, to return from a synchronous call, and to handle for an asynchronous call.

When we implement a ServerHandler, then we have several methods that we need to implement:

  • init — Handle the actor starting up

  • terminate — Handle the actor shutting down

  • handleCall — Handle a synchronous call and return the response

  • handleCast — Handle an asynchronous call

  • handleInfo — Handle a message that is neither a Call nor a Cast

  • handleTimeout — Handle when we haven’t received any messages for a configured duration

The easiest way to achieve this is to subclass AbstractServerHandler, which has default implementations of all the methods. This then gives us the ability only to implement the bits we need for our use case:

val actor = ServerActor(object : AbstractServerHandler<Int, String, Float>() {
    @Suspendable
    override fun handleCall(from: ActorRef<*>?, id: Any?, m: Int?): String {
        println("Called with message: " + m + " from " + from)
        return m.toString() ?: "None"
    }

    @Suspendable
    override fun handleCast(from: ActorRef<*>?, id: Any?, m: Float?) {
        println("Cast message: " + m + " from " + from)
    }
})

Our handleCall() and handleCast() methods get called with the message to handle but are also given a reference to where the message came from and a unique ID to identify the call, in case they are important. Both the source ActorRef and the ID are optional and may not be present.

Spawning a ServerActor will return us a Server instance. This is a subclass of ActorRef that gives us additional functionality for call() and cast(), to send messages in as appropriate, and a method to shut the server down:

val server = actor.spawn()

val result = server.call(5)
server.cast(2.5f)

server.shutdown()

3.3. Proxy Server

The Server pattern gives us a specific way to handle messages and given responses. An alternative to this is the ProxyServer, which has the same effect but in a more usable form. This uses Java dynamic proxies to allow us to implement standard Java interfaces using actors.

To implement this pattern, we need to define an interface that describes our functionality:

@Suspendable
interface Summer {
    fun sum(a: Int, b: Int) : Int
}

This can be any standard Java Interface, with whatever functions we need.

We then pass an instance of this to the ProxyServerActor constructor to create the actor:

val actor = ProxyServerActor(false, object : Summer {
    override fun sum(a: Int, b: Int): Int {
        return a + b
    }
})

val summerActor = actor.spawn()

The boolean also passed to ProxyServerActor is a flag to indicate whether to use the actor’s strand for void methods or not. If set to true, then the calling strand will block until the method completes, but there will be no return from it.

Quasar will then ensure that we run the method calls inside the actor as needed, rather than on the calling strand. The instance returned from spawn() or spawnThread() implements both Server — as seen above — and our interface, thanks to the power of Java dynamic proxies:

// Calling the interface method
val result = (summerActor as Summer).sum(1, 2)

// Calling methods on Server
summerActor.shutdown()

Internally, Quasar implements a ProxyServerActor using the Server behavior that we saw earlier, and we can use it in the same way. The use of dynamic proxies simply makes calling methods on it easier to achieve.

3.4. Event Sources

The event source pattern allows us to create an actor where messages sent to it get handled by several event handlers. These handlers get added and removed as necessary. This follows the pattern that we have seen several times for handling asynchronous events. The only real difference here is that our event handlers are run on the actor strand and not the calling strand.

We create an EventSourceActor without any special code and start it running in the standard way:

val actor = EventSourceActor<String>()
val eventSource = actor.spawn()

Once the actor has been spawned, we can then register event handlers against it. The body of these handlers are then executed in the strand of the actor, but they are registered outside of it:

eventSource.addHandler { msg ->
    println(msg)
}

Kotlin allows us to write our event handlers as lambda functions, and to, therefore, use all the functionality we have here. This includes accessing values from outside the lambda function, but these will be accessed across the different strands — so we need to be careful when we do this as in any multi-threaded scenario:

val name = "Baeldung"
eventSource.addHandler { msg ->
    println(name + " " + msg)
}

We also get the major benefit of event-handling code, in that we can register as many handlers as we need whenever we need to, each of which is focused on its one task. All handlers run on the same strand — the one that the actor runs on — so handlers need to take this into account with the processing they do.

As such, it would be common to have these handlers do any heavy processing by passing on to another actor.

3.5. Finite-State Machine

A finite-state machine is a standard construct where we have a fixed number of possible states, and where the processing of one state can switch to a different one. We can represent many algorithms in this way.

Quasar gives us the ability to model a finite-state machine as an actor, so the actor itself maintains the current state, and each state is essentially a message handler.

To implement this, we have to write our actor as a subclass of FiniteStateMachineActor. We then have as many methods as we need, each of which will handle a message and return the new state to transition into:

@Suspendable
fun lockedState() : SuspendableCallable<SuspendableCallable<*>> {
    return receive {msg ->
        when (msg) {
            "PUSH" -> {
                println("Still locked")
                lockedState()
            }
            "COIN" -> {
                println("Unlocking...")
                unlockedState()
            }
            else -> TERMINATE
        }
    }
}

We then also need to implement the initialState() method to tell the actor where to start:

@Suspendable
override fun initialState(): SuspendableCallable<SuspendableCallable<*>> {
    return SuspendableCallable { lockedState() }
}

Each of our state methods will do whatever it needs to do, then return one of three possible values as needed:

  • The new state to use

  • The special token TERMINATE, which indicates that the actor should shut down

  • null, which indicates to not consume this specific message — in this case, the message is available to the next state we transition into

4. Reactive Streams

Reactive Streams are a relatively new standard that is becoming popular across many languages and platforms. This API allows for interoperation between various libraries and frameworks that support asynchronous I/O — including RxJava, Akka, and Quasar, amongst others.

The Quasar implementation allows us to convert between Reactive streams and Quasar channels, which then makes it possible to have events from these streams feed into strands or messages from strands feeding into streams.

Reactive streams have the concept of a Publisher and a Subscriber. A publisher is something that can publish messages to subscribers. Conversely, Quasar uses the concepts of SendPort and ReceivePort, where we use the SendPort to send messages and the ReceivePort to receive those same messages. Quasar also has the concept of a Topic, which is simply a mechanism to allow us to send messages to multiple channels.

These are similar concepts, and Quasar lets us convert one to the other.

4.1. Adding Reactive Streams to the Build

Reactive streams aren’t a core concept in Quasar. Instead, we need to add a dependency that gives us access to them:

<dependency>
    <groupId>co.paralleluniverse</groupId>
    <artifactId>quasar-reactive-streams</artifactId>
    <version>0.8.0</version>
</dependency>

It’s important that we use the same version of this dependency as of any other Quasar dependencies in use. It is also important that the dependency is consistent with the Reactive streams APIs that we are using in the application. For example, quasar-reactive-streams:0.8.0 depends on reactive-streams:1.0.2.

If we do not depend on Reactive Streams already, then this is not a concern. We only need to care about this if we’re already depending on Reactive streams, since our local dependency will override the one that Quasar depends on.

4.2. Publishing to a Reactive Stream

Quasar gives us the ability to convert a Channel to a Publisher, such that we can generate messages using a standard Quasar channel, but the receiving code can treat it as a reactive Publisher:

val inputChannel = Channels.newChannel<String>(1)
val publisher = ReactiveStreams.toPublisher(inputChannel)

Once we’ve done this, we can treat our Publisher as if it were any other Publisher instance, meaning that the client code doesn’t need to be aware of Quasar at all, or even that the code is asynchronous.

Any messages that get sent to inputChannel get added to this stream, such that they can be pulled by the subscriber.

At this point, we can only have a single subscriber to our stream. Attempting to add a second subscriber will throw an exception instead.

If we want to support multiple subscribers, then we can use a Topic instead. This looks the same from the Reactive Streams end, but we end up with a Publisher that supports multiple subscribers:

val inputTopic = Topic<String>()
val publisher = ReactiveStreams.toPublisher(inputTopic)

4.3. Subscribing to a Reactive Stream

The opposite side of this is converting a Publisher to a Channel. This allows us to consume messages from a Reactive stream using standard Quasar channels as if it were any other channel:

val channel = ReactiveStreams.subscribe(10, Channels.OverflowPolicy.THROW, publisher)

This gives us a ReceivePort portion of a channel. Once done, we can treat it the same as any other channel, using standard Quasar constructs to consume messages from it. Those messages originate from the Reactive stream, wherever that has come from.

5. Conclusion

We have seen some more advanced techniques that we can achieve using Quasar. These allow us to write better, more maintainable asynchronous code, and to more easily interact with streams that come out of different asynchronous libraries.

Examples of some of the concepts we’ve covered here can be found over on GitHub.

Leave a Reply

Your email address will not be published.