Concurrency made easy with Clojure and Pulsar

In the last couple of months I've been playing around with different solutions for actor based programming.

To recap - actors are lightweight processes that communicate asynchronously by sending messages between each other. You can read more about the actor model here.

Earlier I've covered how to do this using Erlang, Akka and Elixir. Now it's time to take a look at how it could be done in Clojure using Pulsar.

Pulsar is a Clojure API that wraps Quasar - a library for creating lightweight threads and actors on the JVM.

Pulsar stays true to Erlang's way of doing things, so you'll see a lot of similarities.

So what are we creating?

To show how Pulsar works, we'll go through the same example used in the posts about Erlang, Akka and Elixir.

We'll created a small chatroom where we have two types of actors - server actors and client actors. Clients will then be able to connect to the server in order to participate in the chatroom.

First, let's take a look at the server actor

(defn- get-name [ref clients]
  (-> (first (filter #(= (:ref %) ref) clients))

(defn- broadcast [msg clients]
  (doseq [c clients]
    (! (:ref c) msg)))

(defsfn server [clients]
   [:join ref name] (do
                      (link! ref)
                       [:info (str name " joined the chat")]
                      (recur (conj clients {:name name :ref ref})))
   [:send ref msg]  (do
                       [:new-msg (get-name ref clients) msg]
                      (recur clients))
   [:exit _ ref _]  (let [updated-clients (remove #(= (:ref %) ref) clients)]
                       [:info (str (get-name ref clients) " left the chat")]
                      (recur updated-clients))
   :shutdown        (println "Shutting down")))

(defn create-server []
  (spawn :trap true server '()))

The action starts in the create-server function.

create-server creates an actor of our function server using spawn. After creating the actor, spawn will put it into a new fiber - which is a lightweight thread. For now, let's ignore the :trap flag and move on to our actor function.

The first thing to notice about our server function, is that we call a function called receive. This function will listen for new messages in the actor's mailbox.

When a message is received, it'll start matching it against the parameters it got. As you can see - the parameter list is pairs of patterns and actions. If a pattern matches the message, the associated action will be executed.

The second thing to notice is that the function is recursive. If you just let the function return, the actor will die.

Let's go through the different messages

In the server actor, we've defined four types of messages. Let's take a look at what they do.

:join - Cool, someone is trying to join our chat!

Someone is connecting and we need to properly monitor them. To do this we use link!, which creates a symmetrical link between the actors. This gives us some advantages that we'll look at under the :exit section.

It's worth mentioning that you could use watch! instead of link! if you just want the monitoring one way and not the other.

Next we notify all the logged in clients by sending them a message using the ! function in our broadcast function.

:send - A new message for the chatroom!

We received a message for the chatroom and again we use the broadcast function to spread the word.

:exit??? - Aww, someone disconnected.

By adding the link to the actors when they joined, we'll get an exception if one of the actors dies.

Exceptions are no good, so by adding the :trap flag when spawning the actors, we're able to get the news about the dying actor as a message instead.

So if an :exit message is received, we know that the actor referred to has died and we should notify the others about the sad news before removing it from the client list.

:shutdown - Server is going down.

:shutdown simply doesn't do a recursive call, resulting in the server shutting down.

Next up - the client actor

(defn- client-prn [name msg]
  (println (format "[%s's client] - %s" name msg)))

(defn- prn-msg [name from msg]
  (client-prn name (format "%s: %s" from msg)))

(defsfn client [name server]
   [:new-msg from msg] (do (prn-msg name from msg)
                           (recur name server)) 
   [:info msg] (do
                 (client-prn name msg)
                 (recur name server))
   [:send msg] (do
                 (! server [:send @self msg])
                 (recur name server))
   :disconnect (client-prn name "Disconnected")
   [:exit _ _ _] (client-prn name "Lost connection. Shutting down...")))

(defn create-client [name server]
  (let [c (spawn :trap true client name server)]
    (! server [:join c name])

As you can see, the structure is pretty much the same as in the server actor.

We got the receive function, the different types of messages with their associated actions, and we got the simple error handling if the server goes down.

Let's take our chatroom for a spin

Now that we have our actors in place, let's try them out!

First, let's start our repl before firing up the server actor.

$ lein repl

=> (def s (create-server))

Next, let's get a client in there!

=> (def c1 (create-client "Sam" s))

=> (! c1 [:send "Hi, anyone here?"])
  [Sam's client] - Sam: Hi, anyone here?

Let's bring in someone for Sam to talk to.

=> (def c2 (create-client "Mia" s))
  [Sam's client] - Mia joined the chat

=> (def c3 (create-client "Luke" s))
  [Sam's client] - Luke joined the chat
  [Mia's client] - Luke joined the chat

=> (! c2 [:send "Hello!"])
  [Mia's client] - Mia: Hello!
  [Luke's client] - Mia: Hello!
  [Sam's client] - Mia: Hello!

As you can see, the messages about new connections and the messages themselves gets delivered to all the clients. But what about when a client actor dies?

=> (! c3 :disconnect)
  [Luke's client] - Disconnected
  [Mia's client] - Luke left the chat.
  [Sam's client] - Luke left the chat.

As expected, the server noticed that Luke's client died, and notifies the others.

Now, if we shutdown the server, the two remaining clients should shutdown as well.

=> (! s :shutdown)
  Shutting down...
  [Mia's client] - Lost connection. Shutting down...
  [Sam's client] - Lost connection. Shutting down...

You can find the source code of the example on GitHub.

For further reading...

Other articles on actor based programming using this example