Spark-like datasets in Unison

Incremental evaluation via memoization

by
  • Rebecca Mark
  • Paul Chiusano

Let's imagine the following scenario: you're MegaCorp's Chief WordCount Scientist. Every day you receive a file containing a list of all Shakespeare quotations found when crawling the internet, each paired with the url where it was spotted. For instance,this articlequotes the line "Shall I compare thee to a summer's day?"... right now, in the sentence you are currently reading.

You're charged with computing various statistics about this dataset (say, a histogram for the top 10 Shakespeare quotes), and enabling MegaCorp's Business Intelligence team to answer Very Important Questions ("does Shakespeare get quoted more often on Twitter, reddit, the NY Times, or in YouTube commentthreads?").

Now, since there isn't any new Shakespeare being written (he is dead after all), and people admittedly aren't dropping Shakespeare quotes all that often in new content posted to the internet, it's likely much of the dataset will be the same from one day to the next. It would be nice to not have to process it from scratch every time if not much has changed.

More generally, we'd prefer it ifincremental changes to a datasetrequired incremental computation to produce an answer. We don't want to rerun an entire batch job just because one or two new pieces of data filtered in! In this part, we'll show a nice solution to this problem for computations defined on theTreetype we've developed so far.

This issue also arises for batch jobs that run on what is effectively an "append-only" dataset, like a collection of log files.

While it's possible to rewrite such batch jobs in a way that takes advantage of the append-only nature of a dataset, this can require awkward rephrasing of the computation, not to mention serialization boilerplate.

A good solution for computing results incrementally can be a huge deal for larger batch jobs: the difference between a job that finishes in seconds and gets run and refined interactively vs a job that's run overnight. In principle, incremental evaluation "just" requires caching deterministic computations whose results you've computed before.

Doing this robustly and accurately requires a way of uniquely identifying computations that will change whenever the computation or any of its dependencies change.

In Unison, we'll use the hash of a computation as its identity. The hash of a function incorporates the hashes of all of its dependencies and will change if any of its dependencies change.

OurTreeis nicely set up to take advantage of caching. Imagine we're computing areduceof the tree. If any subtree of the reduction has been seen before (if it has the same hash), we can just return the reduced result immediately from the cache rather than descending into that subtree.

The same idea also applies to, say, asrc.Tree.map.Suppose we havesrc.Tree.map expensiveFn t.IfexpensiveFnhas already been applied at some subtree and cached, the resulting transformed tree can be retrieved from the cache.

Fortunately,distributed.Valueincludes a function for memoizing remote values, caching them at the location where the data resides, and we can use it to implement a "memoized reduce" and other operations:

Value.memo loc vreturns a newdistributed.Valuethat intercept calls toValue.getonv.Such calls first consult a cache, keyed by the hash ofv.If a result is found in the cache, it's returned immediately. Otherwise it forcesvand inserts the result in the cache for next time.

The caching happens at the location ofv,and if that location doesn't support this capability, anotherLocationwithinlocwill be consulted as a fallback. The exact ability required here isScratch,an abstract API for ephemeral local caching (using a combination of RAM and/or disk), but the basic logic ofValue.memowould look similar when the cache provider is some other ability besidesScratch.

📓
TheLocationtype is used throughout theRemoteability to represent "places where data can live or computations can happen" in an abstract way. It will be mapped to, say, a hostname and port by aRemotehandler that does actual distributed execution.

Let's look at an implementation ofreducethat produces memoized results. Such an implementation will support efficient incremental recomputation when the input changes slightly, because the results of subtrees will be cached.

memo1.Tree.reduce : Location {Scratch, g}
                    -> a
                    -> (a -> '{Remote} a ->{Remote} a)
                    -> Tree a
                    ->{Remote} a
memo1.Tree.reduce :
  Location {Scratch, g}
  -> a
  -> (a -> '{Remote} a ->{Remote} a)
  -> Tree a
  ->{Remote} a
memo1.Tree.reduce scratch a combine = cases
  Tree.Empty      -> a
  Tree.One valueA -> Value.get (Value.memo scratch valueA)
  Tree.Two l r    ->
    use Remote await fork
    use Value get map memo
    use memo1.Tree reduce
    lValue =
      memo
        scratch
        (map (leftTree -> reduce scratch a combine leftTree) l)
    rValue =
      memo
        scratch
        (map (rightTree -> reduce scratch a combine rightTree) r)
    l' = fork here! '(get lValue)
    r' = fork here! '(get rValue)
    combine (await l') '(await r')

The call toValue.memohappens when we handle theTree.OneandTree.Twocases. So this reduce function caches the values at the leaf nodes, but it also caches the computation of each branch in the tree by wrapping the calls toValue.mapinValue.memo.

When run a second time, recursion into subtrees will be cut off whenever a subtree has been previously computed and resides in the cache.

This reduce function also requires that aLocationargument be provided now that we're callingValue.memowhich requires.

An exercise: Memoize the tree
An exercise: Memoize the tree
📓

Exercise:Implement aTree.memofunction

Write aTree.memofunction which at every level memoizes the evaluation of aTree.

Show me the answer
Show me the answer
answers.Tree.memo : Location {Scratch} -> Tree a -> Tree a
answers.Tree.memo location = cases
  Tree.Empty      -> Tree.Empty
  Tree.One valueA -> Tree.One (Value.memo location valueA)
  Tree.Two l r    ->
    use Value map
    use answers Tree.memo
    Tree.Two
      (Value.memo location (map (Tree.memo location) l))
      (Value.memo location (map (Tree.memo location) r))

After a particularily expensive computation runs on theTree,one thing you might do to speed up subsequent computations is call theanswers.Tree.memofunction written above as a caching strategy. In addition to caching already run data in between jobs, you might useanswers.Tree.memobetween transformations in a data pipeline so sub-stages of the pipeline don't need to re-compute data.

Try your own map-reduce
Try your own map-reduce
📓

Exercise: Write a test program using theRemote.pure.runinterpreter

This exercise is up to the reader 😎. See if you can use the functions we've written onTree,to write a simple map reduce program.Remote.pure.runis an in memory interpreter for for theRemoteability.

The use ofValue.memoprovides clear efficiency gains: cached data means faster jobs. But we also are saving on engineering costs: we don't need to contort our code or deal with manual serialization of cached state, nor do we need additional infrastructure beyond the general infrastructure used to runRemotecomputations.

Conclusions

Phew! We've covered a lot of ground in this article. Here's a quick recap:

  • We got a preview of the distributedSeqtype and how it enables Spark-like distributed computations. The type is just a few lines of code but lets us nicely express many distributed operations likeSeq.map,Seq.reduce,and lots more.
  • We learned the general method for making any immutable data structure distributed, and how to implement functions likemapandreducein a way that "brings the computation to the data".
  • We showed how computations over distributed datasets could be memoized to enable efficient incremental evaluation.

We hope you enjoyed this article. It's the first of a series oncompositional distributed systems,each showing how powerful libraries for distributed computing can be assembled from reusable components in a tiny amount of Unison code. The examples shown in this series aren't big frameworks siloed from your "regular" code. These are just ordinary libraries that are a function call away, and that can be combined with the aid of a typechecker rather than a mess of glue and duct tape.

Distributed programmingcanbe fun and approachable. While there are new things to think about ("where do I want the data to live and the computation to happen?"), Unison helps you say exactly what you mean with a minimum of ceremony, letting you focus on the interesting parts and not on the tedious particulars of simply moving data and computations from one place to another.

We'll cover some additional topics in the last section. Read on to learn about approaches to error handling, smarter chunking and granularity of parallelism, and more.

Got questions or comments? Feel free toopen a discussion tagged 'distributed-api' hereor chat with us in the #distributed channel of theUnison Slack.

If you're interested in using Unison at work for this sort of thing, we'd love to chat with you.