Unison Cloud comes with several unique features: transparently shipping code to different nodes, programmable transactional storage, and typed persistence.
This post aims to show how this feature set radically simplifies the implementation of sophisticated distributed systems, and we'll use a real system as our case study: @systemfw/volturno, a distributed stream processing engine.
The main goal of Volturno is to provide stateful streaming without the headache:
- No infrastructure to setup, and deploying happens with a function call.
- Both data streams and streaming pipelines are typed, and don't require explicit encoding and decoding.
- Pipelines guarantee exactly-once state-message consistency, so you don't need to write any state synchronisation code.
- The data plane is based on object storage, which significantly reduces cost and frees you from thinking about disk utilisation.
- Scales to multiple nodes via Unison's unique approach to distribution, which doesn't require an external coordination layer like Zookeeper.
I'm going to assume basic knowledge of the Unison language and the Cloud's core apis such as Remote and cloud.Storage, but no deep distributed systems knowledge. Let's dive in.
The Volturno api
We begin with a short introduction to the core concepts of the library.
Data is made available for processing by publishing it to a KLog k v, a persistent, keyed log hosted by a Broker. A KStream k v is an ephemeral stream of data which initially flows out of a KLog and gets then transformed into other KStreams to define streaming computations. Each computation runs continuously as a Pipeline, a named streaming job with persistent state and exactly-once processing.
KStream computations happen in the KOps ability, whose core idea is to define a sequential transformation over all the messages of a KStream that share the same key. These transformations are stateful and can emit further messages downstream, and users can change the key of a message to route it to the appropriate stateful transform. Transformations pertaining different keys are processed concurrently, allowing for scalability.
Let's look at an example computation. It consumes two KLog Text Nat, which emit lowercase and uppercase word count events respectively. We want to log a running aggregate count for each word, in a case insensitive way.
wordCount : KLog Text Nat -> KLog Text Nat ->{KOps} ()
wordCount upper lower =
use Nat +
KStream.merge [source upper, source lower]
|> partitionKeys (k -> Text.toLowercase k)
|> (loop 0 do
count = Step.state() + input()
state.set count
output count)
|> sink
(word count ->
toRemote do
Log.json
(Json.object
[ ("word", Json.text word)
, ("count", Json.nat count)
]))The example above showcases all the key operations in KOps:
Reads data from aKLog. Streaming computations start with one or more calls tosource.
Runs the provided function on each message emitted byKStream.
Creates a singleKStream.merge : [KStream k v] ->{KOps} KStream k vKStreamby nondeterministic interleaving of the messages from its input KStreams.
Transforms the key of each message emitted by apartitionKeys : (k -> k2) -> KStream k v ->{KOps} KStream k2 vKStream, allowing for rerouting to the appropriateloop.
Transforms messages emitted by aKStream. The transformation is stateful and it's encoded by a computation in theStepability, which has functions to access the current input message, emit new messages, and access and modify the state for the key that's being processed.
Here's the same logic in a fully complete example: we start a Broker; create a couple of KLogs; create, define and run the Pipeline; then publish some events to our KLogs.
volturnoDesign.ex : '{IO, Exception} ()volturnoDesign.ex : '{IO, Exception} ()
volturnoDesign.ex = do
use Cloud submit
shards = 10
Cloud.run do
env = Environment.default()
broker = Broker.named "broker"
broker |> Broker.start Broker.Settings.default
upper = submit env do KLog.named broker "upper" shards
lower = submit env do KLog.named broker "lower" shards
pipeline = Pipeline.named "word-count"
prepare pipeline shards do wordCount upper lower
pipeline |> Pipeline.run Pipeline.Settings.default
event name count source = KLog.publish source [(name, count)]
submit env do
upper |> event "F" 1
upper |> event "M" 3
lower |> event "f" 4
upper |> event "P" 2Compiling pipelines
Let's have a look at how to run pipelines. For now we'll assume we are operating on a single node, without any processing guarantees.
A KOps computation consists of a series of transformations over KStreams, but note how the shape of the overall program is completely static, which means we can also interpret a KOps program as a graph of processing stages, connected by KStreams.
This is more evident if we name each KStream in our example:
wordCount2 : KLog k v -> KLog k v ->{KOps} ()So, we can write an ability handler for KOps that produces a blueprint for the computation: each KStream will be represented as the hash of the processing stage it represents, and these hashes will be inserted into a map representing the links between the stages. For example:
type KStream k v = KStream KStreamId
type KStreamId = KStreamId Bytes
stages: Map KStreamId [KStreamId]
--^ Links a stage to all its downstream stages
buildGraph stages kOps = handle kOps() with cases
{ a } -> stages
{ partitionImpl f (KStream in) -> resume } ->
stage = KStreamId (blake2b_256 (Partition, in, f))
stages' = stages |> insertIfNotPresent stage |> connect in stage
buildGraph stages' do resume (KStream stage)
...
buildGraph Map.empty kOpsArmed with our blueprint, we can then write another handler for KOps to run the computational graph: we will create an in-memory channel for each KStream, and spawn a long running thread for each stage. Each thread queries the blueprint to get a reference to its upstream and downstream channels, then it repeatedly reads from its input channels, does the relevant processing, and sends further messages downstream. Stateful loop stages simply keep a Map k s that they update on each iteration.
But what about source stages that have no upstream channel to read from? They read directly from KLogs by polling the Broker for new data: messages in a KLog are stored durably in object storage, indexed by an integer offset, so source asks for a batch of data at a given offset, and then keeps track on each iteration of the next offset it needs to read from.
Distribution model
Since our semantics do not guarantee order among different keys, we can distribute their processing to different nodes. Each node will have a worker: a parent thread that runs the computational graph as above.
We want each worker to pull a subset of messages from its input KLogs, but remember that we need to guarantee sequential processing for messages with the same key, so we need those to be on the same node. We achieve that by sharding each KLog into several totally ordered loglets. Loglets are identified by an integer, and are typically in the order of tens for a given KLog. When publishing to a KLog we compute the target loglet with hash(key) % totalLoglets, which guarantees that the same key ends up on the same loglet. Consumption happens as before, except offsets are tracked at the loglet level, rather than the whole KLog.
We are now free to spawn several pipeline workers and, provided that each worker only consumes a subset of loglets, we have a very clean scaling model which still respects per-key ordering.
There is one snag however, because partitionKeys can change the key of a message during processing to route it to the appropriate stateful loop. The word routing should now be taken literally: a partitionKeys stage running on Worker A might produce a message whose new key is assigned to Worker B, so we need to route it there.
This means that instead of having one channel per stage, we need to have a channel per stage per worker, and make sure we select the right channel, again by hash(key) % totalWorkers.
But here comes the magic of Unison Cloud: we don't have to change the implementation of our channels.
The in-memory channels in Volturno are built off the Remote.Ref and Remote.Promise primitives in Remote, which means they are already location-aware: if we try to enqueue a message to a channel that resides on a different machine, Cloud will transparently move it there.
Ok, but what about failures? Networks can be partitioned and nodes can crash, and our cloud programming model does not pretend you can ignore these concerns. Instead, it gives you the tools to address them, let's see how in the next section.
Fault tolerance
Imagine we magically possessed perfect knowledge of the behaviour of our cluster across time, we'd see periods of stable activity delimited by downtime events like crashes or network partitions. Now imagine if each period was populated by a totally different set of workers: from the point of view of those workers no failures ever happen, because they exist as part of a stable view of the system where this property is true by definition. Error handling for any failure then reduces to a view change, where a new stable set of workers takes over.
Let's explore this idea a little more: a reasonable objection is that spinning up new virtual machines every time a failure happens is costly, but here we can once again exploit a unique feature of Unison Cloud, the ability to spawn code on a different node by forking a Remote Thread.
So what we'll do is have a long-lived supervisor thread who's in charge of handling views: it will start a view by spawning our workers onto different nodes, monitor their health via periodic heartbeats, and trigger a view change if a heartbeat is skipped.
When creating a view, the supervisor also distributes loglets to each worker, and performs a handshake so that each worker has a reference to all the worker channels, which we saw is needed for partitionKeys.
This makes the life of a worker much easier: during its whole lifetime it can operate on the assumption that its sibling workers are up and reachable, and it can just crash if that assumption breaks, since the supervisor will then create a new view with new workers. All workers have to do for fault tolerance is sending heartbeats to the supervisor and honouring a request for a view change by terminating.
Error handling in a distributed system is much harder than on a single machine, and this strategy isolates all its complexity into one place: the view change protocol. To give you a sense of some of the scenarios we have to deal with, what if a worker that needs to be informed of a view change is unreachable due to a network partition? What if the supervisor is temporarily down when a worker crashes? What if a supervisor goes temporarily down during a view change, leaving it half-complete?
The full view change protocol is complex, but I want to touch on some of the ideas that make it tick:
- The view state is kept in
cloud.Storage, so it survives crashes and can be modified transactionally. - Instead of a single mutable
View, we have a monotonically increasing view number, and then aTable ViewNumber View. The protocol starts by increasing the view number, and only deletes the old view when the view change is fully complete, which allows a view change to be resumed if it was left partially complete by a supervisor failure, e.g. if we need to retry contacting workers for an old view to inform them of the change. - Workers for
viewNumber + 1are only activated when workers fromviewNumberhave terminated, to prevent messages with the same key from being processed on two different nodes that are operating in different views. - Views are parameterised by a refresh time which informs how often the workers contact
cloud.Storageto check that the view they belong to is still the current view. This means that even if there's a partition between the supervisor and the workers, the supervisor knowns that after the refresh time, workers will notice a view change and shut themselves down.
Finally, the protocol employs a number of fast paths for the vastly more common case where there isn't a network partition involved. For example, it relies on Remote Thread interruption to quickly stop a worker from an old view, often avoiding the need to wait out the view refresh time. Similarly, workers attempt to proactively tell the supervisor of a failure, so that error detection can be prompter than waiting for a missing heartbeat.
Supervisors and leases
Through the use of views and view changes, we have reduced the full spectrum of failures in Volturno to a much smaller problem: we need to have a supervisor thread running.
The system is resilient to the supervisor being temporarily down, so we only need to guarantee that it is up most of the time, which we can do by deploying our supervisor as a Daemon. Daemons are a Unison Cloud feature that allows a given thread to run on all nodes assigned to a given user, by having the runtime automatically restart them on node crashes or redeploys.
We only have one last hurdle to get through: daemons run on all nodes, and although the view change protocol is resilient to multiple supervisors running it simultaneously, doing so by default is wasteful since it will spawn various sets of workers, only one of which will be elected as the next view.
So, what we want to do instead is elect one daemon thread as the leader, with the others on standby but ready to take over in case the leader fails. We will assign each thread a random SupervisorId at startup, and then implement this requirement through a simple but powerful technique called a lease.
The lease is a record in cloud.Storage containing a SupervisorId, a monotonically increasing Nat, and a configurable duration. Supervisors race to acquire the lease with a transaction by writing (theirId, 0, duration), at which point they have leadership for duration. The leader is allowed to renew its lease by increasing the Nat, which basically acts as a leadership term number. On the other hand, it will interrupt its current work if it fails to renew the current lease.
A supervisor that loses the race will sleep for duration, and then try to access the lease again: if it's been renewed or there is a new leader, it goes back to sleep for duration. If not, it tries to take over the lease itself.
Leases are clever because they incorporate both leadership election and failure detection into one mechanism. It's tempting to think that instead of sleeping the followers can just try to contact the leader and take over leadership on failing to do so, but this strategy is unsafe: a leader could be simply partitioned away, at which point there would be two leaders running. Instead, the lease duration acts like a shared contract that's independent of communication, and note that it doesn't require clocks to be synchronised, only that measuring durations is not too skewed.
There are some rare (and unavoidable) corner cases related to events like scheduling pauses, but we aren't concerned with them here since we're using leadership only as an optimisation.
Exactly-once processing
Our system can survive failure, but to be truly resilient it has to preserve correctness throughout any view changes. The behaviour we are after is commonly known as "exactly-once processing", but a more precise term is exactly-once state-message consistency, i.e. the guarantee that each message will affect the pipeline state exactly once. Our basic strategy will be to resume processing after a view change by rewinding back to the latest state that's known to be consistent.
Let's consider a trivial pipeline of shape source --> loop that runs as a single worker and consumes a single loglet. A consistent snapshot for such a pipeline consists of an offset n that's been read from the loglet, plus the state of the loop stage as it is right before receiving the message at offset n + 1 .
Let's say a view change occurs when the offset n + 3 is about to be read , and we resume from our snapshot taken at n. We poll the Broker for messages coming after n, so [n + 1, n + 2, n + 3] aren't lost. Furthermore, even though n + 1 and n + 2 have been received twice, the state has been rolled back to how it was before they were received for the first time, so their effect is only recorded once.
Note how correctness does not depend on the freshness of the snapshot, which means that we don't need to snapshot after every message, and not every snapshot attempt needs to succeed. Of course, it's preferable to have fresh snapshots to minimise the amount of reprocessing required to catch up, so Volturno lets you tune the snapshot frequency, even whilst the pipeline is running.
Let's now generalise our definition of a snapshot to an arbitrary pipeline: we will consider the messages from loglets to source stages, and conceptually delimit their flow over time into epochs, where each epoch marks a snapshot attempt. A snapshot for epoch n can then be defined as the offsets into all the loglets read by the pipeline, plus the state of all its loop stages, across all nodes, as they are right before epoch n + 1 begins.
However, taking such a snapshot seems pretty slow: we'd have to pause all the source stages, wait for all in-flight messages to fully propagate through the entire pipeline, including across nodes via partitionKeys, then upload the set of offsets and the state of all loop stages to object storage, and only then we would be able to resume the source stages.
Async snapshotting
We can make snapshotting much faster by modelling the idea of epochs explicitly, rather than relying on pausing in physical time: to take a snapshot, we will inject special barrier messages into the graph of in-memory channels, which all the stages propagate downstream.
When a stateful node like source or loop receives a barrier for epoch n, it will take an in-memory snapshot of its state, propagate the barrier downstream, and kick off a separate thread to upload its snapshot fragment to object storage. It will then start processing messages of epoch n + 1 without waiting for its upload to complete, much less blocking the whole pipeline.
There is one tricky requirement to ensure the correctness of this process however: barrier alignment. This scenario happens when a stage receives messages from multiple stages (like merge) or multiple nodes (like any stage downstream of a partitionKeys, and other cases): when such a stage receives a barrier from one of its upstream links, it needs to pause processing from that link until it receives the barrier from all its other links as well, and only then it can continue the snapshotting process. This is because after receiving a barrier from link L for epoch n, all subsequent messages on that link belong to epoch n + 1, and shouldn't be considered until the state for epoch n is complete, i.e. until all messages for epoch n have been received, i.e. until barriers for epoch n have been received on all links. Barrier alignment is embedded directly into Volturno's custom channel implementation, so the core logic can be mostly oblivious to it.
Finally, we need a designated process to drive the snapshotting process: periodically injecting barriers, advancing the epoch, and marking a snapshot as complete once all fragments have been uploaded. However, we don't need to worry about any subtleties with leader election here: the rest of our architecture guarantees that a view is consistent, so we can just assign the task of snapshot coordinator to one of the workers in that view. Any errors will be dealt with by view change, as per usual.
Conclusion
The design of Unison Cloud tries to strike a balance between providing high level building blocks, and remaining flexible enough to model various distributed systems techniques and patterns.
It doesn't pretend you can ignore the challenges introduced by distribution, but it gives you a wealth of tools to tackle them with the full expressiveness you're used to having when programming any other type of software.
For distributed systems designers, this means productivity and joy. For users, it means flexibility and convenience, since even highly sophisticated systems like Volturno are just libraries you can import and run on Unison Cloud without any further infrastructure setup.