Let's imagine the following scenario: you're MegaCorp's Chief WordCount Scientist. Every day you receive a file containing a list of all Shakespeare quotations found when crawling the internet, each paired with the url where it was spotted. For instance,this articlequotes the line "Shall I compare thee to a summer's day?"... right now, in the sentence you are currently reading.
You're charged with computing various statistics about this dataset (say, a histogram for the top 10 Shakespeare quotes), and enabling MegaCorp's Business Intelligence team to answer Very Important Questions ("does Shakespeare get quoted more often on Twitter, reddit, the NY Times, or in YouTube commentthreads?").
Now, since there isn't any new Shakespeare being written (he is dead after all), and people admittedly aren't dropping Shakespeare quotes all that often in new content posted to the internet, it's likely much of the dataset will be the same from one day to the next. It would be nice to not have to process it from scratch every time if not much has changed.
More generally, we'd prefer it ifincremental changes to a datasetrequired incremental computation to produce an answer. We don't want to rerun an entire batch job just because one or two new pieces of data filtered in! In this part, we'll show a nice solution to this problem for computations defined on theTree
type we've developed so far.
This issue also arises for batch jobs that run on what is effectively an "append-only" dataset, like a collection of log files.
While it's possible to rewrite such batch jobs in a way that takes advantage of the append-only nature of a dataset, this can require awkward rephrasing of the computation, not to mention serialization boilerplate.
A good solution for computing results incrementally can be a huge deal for larger batch jobs: the difference between a job that finishes in seconds and gets run and refined interactively vs a job that's run overnight. In principle, incremental evaluation "just" requires caching deterministic computations whose results you've computed before.
Doing this robustly and accurately requires a way of uniquely identifying computations that will change whenever the computation or any of its dependencies change.In Unison, we'll use the hash of a computation as its identity. The hash of a function incorporates the hashes of all of its dependencies and will change if any of its dependencies change.
OurTree
is nicely set up to take advantage of caching. Imagine we're computing areduce
of the tree. If any subtree of the reduction has been seen before (if it has the same hash), we can just return the reduced result immediately from the cache rather than descending into that subtree.
The same idea also applies to, say, alib.Tree.map
.Suppose we havelib.Tree.map expensiveFn t
.IfexpensiveFn
has already been applied at some subtree and cached, the resulting transformed tree can be retrieved from the cache.
Fortunately,distributed.Value
includes a function for memoizing remote values, caching them at the location where the data resides, and we can use it to implement a "memoized reduce" and other operations:
Value.memo : Location {g, Scratch}
-> distributed.Value a
-> distributed.Value a
Value.memo loc v
returns a newdistributed.Value
that intercept calls toValue.get
onv
.Such calls first consult a cache, keyed by the hash ofv
.If a result is found in the cache, it's returned immediately. Otherwise it forcesv
and inserts the result in the cache for next time.
The caching happens at the location ofv
,and if that location doesn't support this capability, anotherLocation
withinloc
will be consulted as a fallback. The exact ability required here isScratch
,an abstract API for ephemeral local caching (using a combination of RAM and/or disk), but the basic logic ofValue.memo
would look similar when the cache provider is some other ability besidesScratch
.
Let's look at an implementation ofreduce
that produces memoized results. Such an implementation will support efficient incremental recomputation when the input changes slightly, because the results of subtrees will be cached.
memo1.Tree.reduce : Location {Scratch, g}
-> a
-> (a -> '{Remote} a ->{Remote} a)
-> Tree a
->{Remote} a
memo1.Tree.reduce :
Location {Scratch, g}
-> a
-> (a -> '{Remote} a ->{Remote} a)
-> Tree a
->{Remote} a
memo1.Tree.reduce scratch a combine = cases
Tree.Empty -> a
Tree.One valueA -> Value.get (Value.memo scratch valueA)
Tree.Two l r ->
use Remote fork
use Value get map memo
use memo1.Tree reduce
lValue =
memo
scratch
(map (leftTree -> reduce scratch a combine leftTree) l)
rValue =
memo
scratch
(map (rightTree -> reduce scratch a combine rightTree) r)
l' = fork here! '(get lValue)
r' = fork here! '(get rValue)
combine (await l') '(await r')
The call toValue.memo
happens when we handle theTree.One
andTree.Two
cases. So this reduce function caches the values at the leaf nodes, but it also caches the computation of each branch in the tree by wrapping the calls toValue.map
inValue.memo
.
This reduce function also requires that aLocation
argument be provided now that we're callingValue.memo
which requires.
After a particularily expensive computation runs on theTree
,one thing you might do to speed up subsequent computations is call theanswers.Tree.memo
function written above as a caching strategy. In addition to caching already run data in between jobs, you might useanswers.Tree.memo
between transformations in a data pipeline so sub-stages of the pipeline don't need to re-compute data.
The use ofValue.memo
provides clear efficiency gains: cached data means faster jobs. But we also are saving on engineering costs: we don't need to contort our code or deal with manual serialization of cached state, nor do we need additional infrastructure beyond the general infrastructure used to runRemote
computations.
Conclusions
Phew! We've covered a lot of ground in this article. Here's a quick recap:
- We got a preview of the distributed
Seq
type and how it enables Spark-like distributed computations. The type is just a few lines of code but lets us nicely express many distributed operations likeSeq.map
,Seq.reduce
,and lots more. - We learned the general method for making any immutable data structure distributed, and how to implement functions like
map
andreduce
in a way that "brings the computation to the data". - We showed how computations over distributed datasets could be memoized to enable efficient incremental evaluation.
We hope you enjoyed this article. It's the first of a series oncompositional distributed systems,each showing how powerful libraries for distributed computing can be assembled from reusable components in a tiny amount of Unison code. The examples shown in this series aren't big frameworks siloed from your "regular" code. These are just ordinary libraries that are a function call away, and that can be combined with the aid of a typechecker rather than a mess of glue and duct tape.
Distributed programmingcanbe fun and approachable. While there are new things to think about ("where do I want the data to live and the computation to happen?"), Unison helps you say exactly what you mean with a minimum of ceremony, letting you focus on the interesting parts and not on the tedious particulars of simply moving data and computations from one place to another.
We'll cover some additional topics in the last section. Read on to learn about approaches to error handling, smarter chunking and granularity of parallelism, and more.
Got questions or comments? Feel free toopen a discussion tagged 'distributed-api' hereor chat with us in the #distributed channel of theUnison Slack.