Spark-like datasets in Unison

Distributed and parallel reductions

by
  • Rebecca Mark
  • Paul Chiusano

In the previous part of this series, we introduced a simple distributed tree type and showed how to implement transformation functions likelib.Tree.mapin a way that "brings the computation to the data". We saw how functions likelib.Tree.mapare lazy: they don't do any work when called but merely set up a pending transformation that will be applied as the data structure is forced.

In this part of the series we'll focus on functions for ourTreedata type that evalute or force the data structure in some way. We'll use areducefunction as an example. Again, this will illuminate how small tweaks to the code can cause different runtime behavior.

sequential.Tree.reduce zero combine treehas three parameters: a "zero" value to return if a subtree is empty,combinefor combining the results of two subtrees, and the tree to be reduced. Reduce functions for in-memory data structures deal with a set of familiar concerns: do you want to reduce the left or right subtree first, should the reduce implementation be tail-recursive or maintain state on the stack, but because we're now working with distributed computations, our reduce implmentation needs to manage the additional dimensions ofwhereandwhenthe combine function forreduceshould be run.

We could write areducefunction that behaved in either one of the following ways:

  • Send the function down:Push the combining functiondownthe tree to the data, and send the resultingNatreduced value back to the parent for combining.
  • Send the subtrees up:Send each forcedTree Natupto its parentTree.Two,which callsreduceon each subtrees, thencombinesthe twoNatresults.

We ultimately want theSend the function downoption, since sending aNatto the parent will be cheaper than sending aTree Nat(only to immediatelyreducethat to aNat),but we'll illustrate both here. Take a look at the recursive case in processing theTree.Twobranch in the following implementation:

sequential.Tree.reduce : a
                         -> (a ->{Remote} a ->{Remote} a)
                         -> Tree a
                         ->{Remote} a
sequential.Tree.reduce :
  a -> (a ->{Remote} a ->{Remote} a) -> Tree a ->{Remote} a
sequential.Tree.reduce zero combine = cases
  Tree.Empty -> zero
  Tree.One valueA -> Value.get valueA
  Tree.Two leftValue rightValue ->
    combine
      (sequential.Tree.reduce zero combine (Value.get leftValue))
      (sequential.Tree.reduce zero combine (Value.get rightValue))

It does the following:

  1. Evaluate the left subtree and send it to the current location.
  2. Evaluate the right subtree and send it to the current location.
  3. Recursivelyreducethe left subtree.
  4. Recursivelyreducethe right subtree.
  5. Apply thecombinefunction to the two results from (3) and (4).

We've implemented thesend the subtrees upapproach. If the subtrees are at the same location as the parent, this is fine. But since this is meant to be used in situations where the data cannot fit on one location, there will be nodes in the tree where the parent resides at a different location than one of its subtrees. In these places we're sending more data over the network than we should.

There's another problem with thesend the subtrees upapproach: the reducing and combining is always happening where the parent resides. Since this is a recursive function, this means that all the work is ultimately happening at whatever location callssequential.Tree.reduce.That is going to be bad when we try to add parallelism laterโ€”we can't have one location doing all the work!

Let's try to write a version ofreducethat implements thesend the function downapproach, usingValue.mapinstead:

withMap.Tree.reduce : a
                      -> (a ->{Remote} a ->{Remote} a)
                      -> Tree a
                      ->{Remote} a
withMap.Tree.reduce :
  a -> (a ->{Remote} a ->{Remote} a) -> Tree a ->{Remote} a
withMap.Tree.reduce zero combine = cases
  Tree.Empty                    -> zero
  Tree.One valueA               -> Value.get valueA
  Tree.Two leftValue rightValue ->
    use Value get map
    use withMap.Tree reduce
    left' = map (t -> reduce zero combine t) leftValue
    right' = map (t -> reduce zero combine t) rightValue
    combine (get left') (get right')

This version will:

  1. reducethe left subtree at its location.
  2. reducethe right subtree at its location.
  3. Send thereducedleft value to the parent.
  4. Send thereducedright value to the parent.
  5. Combine the tworeducedvalues at the parent.

This is thesend the function downapproach. Notice at at no point are we sending aTreeto the parent (there are no calls toValue.getthat return aTree,only calls toValue.getthat return reduced values).

While this is an improvement in our execution strategy forsequential.Tree.reduce,we are still reducing the left and the right subtrees sequentially, first the left, then the right. Why not reduce the two subtrees in parallel?

To make this into a parallel reduce, we can useRemote.forkto start a computation running in a backgroundTask.Anawaitblocks until the forkedTaskcompletes and returns its result (or raises a failure if the forked taskfailed).

UsingRemote.forkandawaitin ourreducefunction yields something like this:

parallel.Tree.reduce : a
                       -> (a ->{Remote} a ->{Remote} a)
                       -> Tree a
                       ->{Remote} a
parallel.Tree.reduce :
  a -> (a ->{Remote} a ->{Remote} a) -> Tree a ->{Remote} a
parallel.Tree.reduce zero combine = cases
  Tree.Empty -> zero
  Tree.One valueA -> Value.get valueA
  Tree.Two leftValue rightValue ->
    use Remote fork
    use Value get map
    use parallel.Tree reduce
    leftTask = 
      fork here! '(get (map (t -> reduce zero combine t) leftValue))
    rightTask = 
      fork here! '(get (map (t -> reduce zero combine t) rightValue))
    left' = await leftTask
    right' = await rightTask
    combine left' right'

Our left and rightTreebranches are now being reduced in parallel through the use ofRemote.forkandawait.Moreover, all the pending transformations that have been applied to the tree (for instance vialib.Tree.map)will be forced in parallel in the same pass as theparallel.Tree.reduce.

๐Ÿคฏ

Wowser! So our distributed map and parallel reduce functions are just a few lines of code each. It's remarkable that we can obtain exactly the runtime behavior we want just by phrasing our functions in the right way.

This is what writing distributed programs is like in Unison. You have to consider where and when you want computations to happen, and express those decisions with code, but it's a tiny amount of code that deals with the essence of the important decisions, not serialization or networking boilerplate, or deployment scripts or YAML files or building containers...

Operations that only partially evaluate a structure

Suppose we want to write a function that doesn't require us to fully evaluate the tree? Let's say we want a functionTree.takethat lists the firstnelements it finds in the tree. Let's write a function that may not need to force the right subtree at all:

Tree.take : Nat -> Tree a ->{Remote} [a]
Tree.take : Nat -> Tree a ->{Remote} [a]
Tree.take n = cases
  Tree.Empty                -> []
  Tree.One valueA           -> List.singleton (Value.get valueA)
  Tree.Two leftVal rightVal ->
    use List ++ size
    use Nat - >=
    use Value get map
    combine l r =
      if size l >= n then List.take n l
      else
        nextN = n - size l
        l ++ get (map (Tree.take nextN) r)
    combine (get (map (Tree.take n) leftVal)) rightVal

The trick is we have to guard the right branch from being evaluated by keeping it wrapped in adistributed.Value.So the function that joins together the left and right branches has to be more careful about the circumstances in which it evaluates the right branch via calls toValue.get.

An exercise for the reader๐Ÿ˜Ž
An exercise for the reader๐Ÿ˜Ž
๐Ÿ““

Exercise:Implement lazy reduce

Try to generalize the lazyTree.takefunction into a lazyreducefunction:

lazy.Tree.reduce : a -> (a -> distributed.Value a ->{Remote} a) -> Tree a ->{Remote} a
Show me the answer
Show me the answer
lazy.Tree.reduce : a
                   -> (a -> distributed.Value a ->{Remote} a)
                   -> Tree a
                   ->{Remote} a
lazy.Tree.reduce :
  a -> (a -> distributed.Value a ->{Remote} a) -> Tree a ->{Remote} a
lazy.Tree.reduce zero combine = cases
  Tree.Empty                    -> zero
  Tree.One valueA               -> Value.get valueA
  Tree.Two leftValue rightValue ->
    use Value map
    use lazy.Tree reduce
    left' = map (t -> reduce zero combine t) leftValue
    right' = map (t -> reduce zero combine t) rightValue
    combine (Value.get left') right'

Takeaways

  • Value.getwill force the evaluation of aRemotevalue, bringing it to the location of the caller. You'll see it in functions which interpret the distributed data structure.
  • UseValue.mapandValue.getin tandem to controlwhereandwhena computation should be run.
  • Unison'sRemote.forkandawaitfunctions provide a way to introduce parallelization to remote computations.

Whatever runtime behavior you want for your distributed computations can be achieved with only tiny code changes, and the decisions you make are then codified in reusable functions that others can use without needing to be experts in distributed systems.

In the next part, we'll go further, showing how computations on distributed data structures can be madeincremental,avoiding work that has already been done in a previous execution.

If you're interested in using Unison at work for this sort of thing, we'd love to chat with you.