Spark-like datasets in Unison

Additional topics

by
  • Rebecca Mark
  • Paul Chiusano

Here we'll cover some additional topics you may have wondered about while reading the article:

Error handling

While theRemoteability does support fine-grained error handling via functions liketryandtryAwait,for batch computing workloads like the one we've developed here, coarse-grained error handling policies applied to an entire computation are often appropriate. Some example policies:

  • In the event of a failure, retry repeatedly (possibly with exponential backoff). If the failure keeps happening, a human will eventually intervene to kill the job.
  • In the event of a failure, retry up to 4 times, then reraise the failure.

These sorts of general policies can be provided by "middleware" handlers ofRemoterather than needing to pollute nice implementations likeSeq.reducewith retry logic.

TreevsSeqand controlling granularity of parallelism

TheTreewe used in the tutorial is a bit different than theSeqused in thedistributed library.This is subtle stuff. Let's take a look at the differences:

structural type Tree a
structural type Seq k a
structural type Seq k a
  = lib.distributed.Seq.Seq
      (distributed.Value (Two Mode a (Seq k a)))

What's going on here? Well, first, ignore thatSeqhas an extrakparameter. Also ignore for now theModeparameter for controlling granularity of parallelism—we'll discuss that shortly.

More substantially,Seqis defined via mutual recursion between two types,SeqandTwo.This is a little brain bending if you haven't seen this sort of thing before, but the usage here just enforces that there is adistributed.Valuewrapping each level of the tree, including the root. Recall thatdistributed.Valueis lazy, so this means we can't know if aSeqis empty, a leaf or a branch without first forcing thedistributed.Value.

It's instructive to compare how the two types represent a few example trees:

use Value pure
emptyTree = Tree.Empty
emptySeq = Seq (pure Two.Empty)
leafTree = Tree.One (pure "A")
leafSeq = Seq (pure (Two.One "A"))
branchTree = Tree.Two (pure leafTree) (pure leafTree)
branchSeq leafSeq mode = Seq (pure (Two.Two mode leafSeq leafSeq))

Treeisn't fully lazy. Without looking inside anydistributed.Value,we can tell whether the tree is empty, a leaf, or a brach. The laziness only kicks in when we try to see what isinsidea leaf, or what subtrees are one level down from aTree.Two.

In contrast,Seqisfully lazy:we can't obtainanyinformation about its structure without calling eitherValue.maporValue.get.We can't even tell if the data structure is empty or not! We can tell that it is aSeq,but any information about its internal structure is guarded by adistributed.Value.

structural type Tree a
structural type Seq k a
structural type Seq k a
  = lib.distributed.Seq.Seq
      (distributed.Value (Two Mode a (Seq k a)))

For an ordinary data in-memory data structure, the difference in these two phrasings might not matter much in practice. For distributed data structures, it's a more substantive decision. Consider the functionSeq.flatMap,and imagine calling it with a function that does some serious work:

Seq.flatMap : (a ->{Exception, Remote} Seq Defer b)
              -> Seq k a
              -> Seq Defer b
Seq.flatMap :
  (a ->{Exception, Remote} Seq Defer b) -> Seq k a -> Seq Defer b
Seq.flatMap f s =
  use Seq unwrap
  use Two Empty
  use Value pure
  go :
    Two Mode a (Seq k a)
    -> distributed.Value (Two Mode b (Seq Defer b))
  go = cases
    Empty -> pure Empty
    Two.One a -> unwrap (f a)
    Two.Two mode l r ->
      pure (Two.Two mode (Seq.flatMap f l) (Seq.flatMap f r))
  Seq (Value.flatMap go (unwrap s))

The above implementation is nicely lazy and does no work until the sequence is later forced (by aSeq.reducesay). In contrast, imagine writingflatMapforTree.If the tree happens to be a leaf it will have tostrictlyapply the function, which means thatflatMapnow requires access toRemote.

This is a bit of an awkward programming model, where some computation is run strictly some of the time, while other computations are evaluated lazily only when needed, and the difference depends on the size of the tree!Seqhas a more uniform representation that ensures no work ever happens until theSeqis forced by a function likeSeq.reduce,and this is often a good choice.

Controlling chunking and the granularity of parallelism

Let's look at the implementation ofparallel.Tree.reduce:

parallel.Tree.reduce : a
                       -> (a ->{Remote} a ->{Remote} a)
                       -> Tree a
                       ->{Remote} a
parallel.Tree.reduce :
  a -> (a ->{Remote} a ->{Remote} a) -> Tree a ->{Remote} a
parallel.Tree.reduce zero combine = cases
  Tree.Empty -> zero
  Tree.One valueA -> Value.get valueA
  Tree.Two leftValue rightValue ->
    use Remote fork
    use Value get map
    use parallel.Tree reduce
    leftTask = 
      fork here! '(get (map (t -> reduce zero combine t) leftValue))
    rightTask = 
      fork here! '(get (map (t -> reduce zero combine t) rightValue))
    left' = await leftTask
    right' = await rightTask
    combine left' right'

This does a parallel reduce of the tree, forking threads at eachTree.Two.This is fine if each subtree represents a large chunk of work. But for say aTree Nat,close to the leaves of such a tree, there's so little data that the overhead of forking a thread to process may not pay for itself, even with Unison's lightweight threads.

TheSeqtype allows branches to be annotated with aModeindicating whether they should beParallelorSequential.Functions likeSeq.reducerespect the annotation and only fork threads if annotation indicates it's worthwhile, and functions likeSeq.mapleave the annotation alone:

Seq.reduce : m -> (m -> m ->{Remote} m) -> Seq k m ->{Remote} m
Seq.reduce : m -> (m -> m ->{Remote} m) -> Seq k m ->{Remote} m
Seq.reduce z op s =
  use Remote fork
  use Seq reduce
  use Two Two
  go = cases
    Two.Empty          -> z
    Two.One a          -> a
    Two Sequential l r -> op (reduce z op l) (reduce z op r)
    Two Parallel l r   ->
      tl = fork here! '(reduce z op l)
      tr = fork here! '(reduce z op r)
      op (await tl) (await tr)
  Value.get (Value.map go (Seq.unwrap s))
Seq.map : (a ->{Exception, Remote} b) -> Seq k a -> Seq Defer b
Seq.map : (a ->{Exception, Remote} b) -> Seq k a -> Seq Defer b
Seq.map f s =
  use Two Empty One Two
  go = cases
    Empty        -> Empty
    One a        -> One (f a)
    Two mode l r -> Two mode (Seq.map f l) (Seq.map f r)
  Seq (Value.map go (Seq.unwrap s))

When building up a distributed sequence, you can control the granularity of parallelism by pickingParallelorSequentialfor constructedTwo.Twonodes, and functions likefromChunkedListwill do this automatically. We'll talk more about constructing sequences in the next section.

In addition to using theModeannotation, you can also work with trees whose leaf values are some chunk type. So rather than working with aTree Nat,you could instead work withTree [Nat]or use some more specialized chunk type. This sort of explicit chunking isn't as nice to program with but it can offer better performance.

Creating distributed sequences

It's nice that we can write functions likeSeq.mapthat preserve the existing partitioning of the data onto multiple nodes. But how do we actually create a distributed sequence in the first place? This section gives the basic idea of how to construct distributed sequences in terms of theLocationtype used withinRemote.

Distributed sequences will typically be created lazily, by repeatedly splitting some state in half and producing a leaf or the empty once the state hits some base case. For instance, here's a function that produces aSeqfrom a list, placing everychunkSizesubtree together at a single location:

fromListAt : Location g -> Nat -> [a] -> Seq k a
fromListAt : Location g -> Nat -> [a] -> Seq k a
fromListAt region chunkSize as =
  use List halve
  use Nat * <= >=
  use Two Two
  step isRoot count as =
    if List.size as <= chunkSize then
      Value.at region (Seq.unwrap (Seq.fromList Sequential as))
        |> Value.join
    else
      if (count >= chunkSize) || isRoot then
        delayAt region do
          (l, r) = halve as
          Two Parallel (Seq (step false 2 l)) (Seq (step false 2 r))
      else
        (l, r) = halve as
        count' = count * 2
        Value.pure
          (Two
            Parallel
            (Seq (step false count' l))
            (Seq (step false count' r)))
  Seq (step true 1 as)

As a sample use case, we might call this with a list of urls or file names from S3. A subsequentSeq.maporSeq.flatMapcan then be used to fetch (or "hydrate") the contents of those urls. The loading happens lazily, only when the resultingSeqis forced.

If the sequence is so large that not even the names of all the files can fit in memory, we might recursively build the sequence usingfromListAt(to list top level directory names) andSeq.flatMap(to recursively build a sequence for the files under each directory).

We can also use more general combinators likeSeq.unfoldorskewUnfoldAt.

If you're interested in using Unison at work for this sort of thing, we'd love to chat with you.