Skip to content

Commit

Permalink
[query] Call-Cache CollectDistributedArray (rfc-0000) (#12954)
Browse files Browse the repository at this point in the history
Adds the ability to rerun/retry queries from the nearest
`CollectDistributedArray` (`CDA`) `IR` site.

Computes a "Semantic Hash" of the top-level `IR` which is used to
generate a key for the various constituent `CDA` calls in a query. The
implementation for CDA, `BackendUtils.collectDArray`, uses that key to
look into an the execution cache for the results of each partition for
that call and uses/updates the cache with successful partition
computations.

The nature of the staged- lower and execute model means we don't know
how many `CDA` calls that will be generated ahead of time. Thus we treat
the "Semantic Hash" in a similar way to an RNG state variable and
generate a key from the Semantic Hash every time every time we encounter
a `CDA`.

The execution cache is implemented on-top of a local or remote
filesystem (configurable via the `HAIL_CACHE_DIR` environment variable).
This defaults to `{tmpdir}/hail/{pip-version}`.
  • Loading branch information
ehigham authored Sep 15, 2023
1 parent 795c8fb commit 7a7bf28
Show file tree
Hide file tree
Showing 43 changed files with 1,928 additions and 388 deletions.
316 changes: 316 additions & 0 deletions dev-docs/hail-query/fast-restarts.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,316 @@
====================================
CollectDistributedArray Call-Caching
====================================

Introduction
==========
See `Fast Restarts for Failed Queries <https://github.com/hail-is/hail-rfcs/pull/1>`_.

Proposed Change Specification
=============================

Experience tells us that the majority of time spent on expensive and
scientifically interesting queries is within the tasks generated by
:code:`CollectDistributedArray` (:code:`CDA`).
This is because many of the table operations in hail's :code:`IR` are lowered
into one or more :code:`CDA` operations.
Consequently, we focus our attention on caching the intermediate results of
these tasks.

:code:`CDA` can be thought of as a distributed map-reduce operation, from some
input "context" for each partition in a table (eg, the path to the file
where the partition is serialised), a computation on that partition, and some
combiner for the results of those computations.
For what follows, let an *activation* be a particular invocation of a
:code:`CDA` pipeline (implemented via :code:`collectDArray`).

At a high-level, when the driver performs an *activation*, it will look in its
*execution cache* to see if it had successfully performed that *activation*
in the past.
The *cache* contains the results for all the successful partition computations.
The driver compares the tasks for each partition with the results in the cache
and removes those tasks that have already been completed.
It then executes any remaining work and updates the execution cache with their
results.
If all the work completes successfully, the driver returns the now-cached
results to be used in the the rest of the query.
The driver will cache the results of successful *activations* only.
Failed *activations* (ie. those that errored) will be handled in the usual way,
potentially failing the query.

We require two things to determine if the driver had successfully executed an
operation:

1. a way of looking up *activations* in a *cache*, and
2. then design of the execution cache itself

Semantic Hashing
----------------
To lookup operations in the cache, we need a way of producing an identifier
that uniquely represents a particular *activation*.
We do this by defining a *semantic hash* for the activation, comprised of:

a) a *static* component computed from the :code:`IR` that generated the
operation
b) a *dynamic* component for the particular activation instance.

For most :code:`IR` nodes, the *static* component can be computed purely from
their inputs plus some contribution uniquely representing the semantics of that
class of :code:`IR`.
For :code:`IR` nodes that read external files, we have to be a little more
cautious and ensure that those files haven't changed since we last read them.
Thus, we need to include some kind of checksum or digest of that file.
This static component can be passed down the lowering pipeline to the code
generator and driver, which, when performing an activation, can mix the static
component with a dynamically generated activation id to form the semantic hash.

Execution Cache
---------------

Users will "bring their own"\ :sup:`TM` cache directory where cached
computations will be stored.
This cache dir will be an prefix in local or cloud storage.
The driver will store cache files named ``{cachedir}/{semhash}``, where

- `cachdir` is a user-defined location, defaulting to
`{tmp}/hail/{hail-pip-version}`
- `tmp` is either the local tempdir for spark and local backends, or the
remote tempdir for `QoB`.

These files will contain accumulated activation results, indexed by their
partition number.

Examples
========

To opt in or out of fast-restarts, users will set hail flags in their python
client:

.. code-block:: python
>> hl._set_flags(use_fast_restarts='1')
>> hl._set_flags(cachedir='gs://my-bucket/cache/0')
Alternatively, users can set the corresponding environment variables at the
command line prior to starting their python session:

.. code-block:: sh
>> HAIL_USE_FAST_RESTARTS=1 HAIL_CACHE_DIR='gs://my-bucket/cache/0' ipython
Notes:

- The definition of the ``cachedir`` does not imply
``use_fast_restarts``.
- If ``use_fast_restarts`` is defined, hail will write cache entries to
a subfolder of the ``tmpdir`` by default.


Implementation Description
==========================

The reader should note that implementation examples below are for illustrative
purposes only and that the real implementation may differ slightly.

Semantic Hashes
---------------

Computing Static Component
^^^^^^^^^^^^^^^^^^^^^^^^^^

See :code:`SemanticHash.scala`.

The static component of a semantic hash for the :code:`IR` is computed in a
level-order traversal of the nodes in the :code:`IR`.
The particular ordering itself doesn't matter, only that an ordering is defined.
We also need to keep track of :code:`IR` shape when traversing;
it's possible to define two :code:`IR` trees with different shape but look
identical when flattened to a list.
We'll include an encoding of the node's trace (the path from the root node) to
account for this.

.. code-block:: scala
def levelOrder(node: BaseIR): Iterator[(BaseIR, Trace)]
Since the ``IR`` contains references and compiler-generated names, we need to
normalise the names in the :code:`IR` (see :scala:`NormalizeNames.scala`)
to get consistent hashes.

The semantic hash is defined for the whole :code:`IR` (as apposed to prefixes
of the :code:`IR` tree, see Alternatives below).
Thus, we'll compute the hash as early as possible to minimise the computational
cost as the :scala:`IR` gets lowered and expanded.
This also reduces the number of :code:`BaseIR` operations we need to define
semantic hashes for (ie. only those that can be constructed in python).

Generally, a hash function takes a seed and some data (typically a stream of
numbers or bytes) and produces a hash.
That hash can be extended with more data by feeding it back to the hash function
as the seed.
What's needed is a way to encode the :code:`IR` as a byte stream.
A simple :code:`toString` is not sufficient as some nodes read data from
external blob-storage;
we need to ensure that the data hasn't changed since we last ran the query.
Furthermore, we can't define an encoding for some :code:`IR` nodes, so we need
a way to bail out:


.. code-block:: scala
def encode(fs: FS, ir: BaseIR, trace: Trace): Option[Array[Byte]] = {
val buffer =
Array.newBuilder[Byte] ++= encodeTrace(trace)
ir match {
case Ref(name, _) =>
buffer ++=
encodeClass(classOf[Ref]) ++=
name.getBytes
case TableRead(_, _, reader) =>
buffer ++=
encodeClass(classOf[TableRead]) ++=
encodeClass(reader.getClass)
reader.pathsUsed.foreach { p =>
// encode the contents of the file (md5 digest, etag, other)
// to ensure it hasn't been modified since last time the query
// was ran (if ever).
buffer ++= encodeFile(fs, p)
}
case ir if DependsOnlyOnInputs(ir) =>
buffer ++= encodeClass(ir.getClass)
case _ if DontKnowHowToDefineSemhash(ir) =>
return None
case ... =>
}
Some(buffer)
}
Then, assuming we have an appropriate hashing algorithm, seed and a way of
combining hashes, we can create and extend the hash with the contribution of
each node:

.. code-block:: scala
var hash = Algorithm.SEED
for ((node, trace) <- levelOrder(nameNormalizedIr)) {
encode(fs, node, trace) match {
case Some(bytes) => hash = Algorithm.extend(hash, bytes)
case _ => return None
}
}
Some(hash)
Observations:

- For all :code:`IR` nodes that depend only on their children and have no
additional parameterisation, their semantic hash is simply some unique
encoding for what that node means.

- Implemented this as the hash code of the :scala:`IR`'s class
- :code:`Class.hashCode` is repeatable across JVM sessions

- Note that the node's children will be hashed in the traversal
- There are times when we can't define a semantic hash (such as reading a
table from a RVD). In these cases, we'll just return :scala:`None`.


Computing Dynamic Component
^^^^^^^^^^^^^^^^^^^^^^^^^^^

The query driver is a single-threaded system that compiles and executes the
same queries in a repeatable way.
That is, if a query generates one or more :code:`CDA` nodes, those nodes will be
emitted in the same order.
This, we can use the static component in the same way as random number
generator state:

- When a :scala:`CDA` node is emitted, we can fork the semhash key-value
- We "mix" one value with the :code:`CDA`'s dynamic id to generate the semantic
hash for that particular activation
- and update the static component state with the forked value for the next
:code:`CDA` node.

To do this, we can add the function :code:`nextHash` to the
:code:`ExecuteContext` that returns a new `Hash` value to be mixed with the
dynamic component and updates internal state:

.. code-block:: scala
final case class IrMetadata(semhash: Option[Int]) {
private[this] var counter: Int = 0
def nextHash: Option[Int] = {
val bytes = intToBytes(counter)
counter += 1
semhash.map(Algorithm.extend(_, bytes))
}
}
Then, in :scala:`Emit.scala`:

.. code-block:: scala
case cda: CollectDistributedArray =>
...
semhash <- newLocal[Integer]("semhash")
emitI(dynamicID).consume(
ifMissing = nextHash.foreach { hash =>
assign(semhash, boxToInteger(hash))
},
ifPresent = { dynamicID =>
nextHash.foreach { staticHash =>
val dynamicHash =
invokeScalaObject(
String.getClass,
"getBytes",
Array(classOf[String]),
Array(dynamicID.loadString(cb))
)
val combined =
invokeScalaObject(
Algorithm.getClass,
"extend",
Array(classOf[Int], classOf[Array[Byte]]),
Array(staticHash, dynamicHash)
)
assign(semhash, boxToInteger(combined))
}
}
)
// call `collectDArray` with semhash
Using :code:`Option` allows us to encode if we can compute a semantic hash
for the given :code:`IR`.
In the case when one cannot be computed, :code:`collectDArray` simply skips
reading and updating a cache.


Execution Cache
---------------

Given an interface for an :code:`ExecutionCache`` of the form:

.. code-block:: code
trait ExecutionCache {
def lookup(h: SemanticHash): Array[(Int, Array[Byte])]
def put(h: SemanticHash, res: Array[(Int, Array[Byte])]): Unit
}
We can implement a file-system cache that uses a file prefix plus the current
version of Hail to generate a "root" directory, under which all cache files are
stored by their semantic hash.
2 changes: 1 addition & 1 deletion hail/generate_splits.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def partition(k: int, ls: List[T]) -> List[List[T]]:
for x in foo]
classes = [cls for cls in classes
if not cls.startswith('is.hail.services')
if not cls.startswith('is.hail.fs')]
if not cls.startswith('is.hail.io.fs')]

random.shuffle(classes, lambda: 0.0)

Expand Down
2 changes: 2 additions & 0 deletions hail/python/hail/backend/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class Backend(abc.ABC):
"index_branching_factor": ("HAIL_INDEX_BRANCHING_FACTOR", None),
"rng_nonce": ("HAIL_RNG_NONCE", "0x0"),
"profile": ("HAIL_PROFILE", None),
"use_fast_restarts": ("HAIL_USE_FAST_RESTARTS", None),
"cachedir": ("HAIL_CACHE_DIR", None),
}

def _valid_flags(self) -> AbstractSet[str]:
Expand Down
20 changes: 20 additions & 0 deletions hail/python/test/hail/test_call_caching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import hail as hl

from hail.utils.misc import new_temp_file
from .helpers import with_flags

def test_execution_cache_creation():
"""Asserts creation of execution cache folder"""
folder = new_temp_file('hail-execution-cache')
fs = hl.current_backend().fs
assert not fs.exists(folder)

@with_flags(use_fast_restarts='1', cachedir=folder)
def test():
(hl.utils.range_table(10)
.annotate(another_field=5)
._force_count())
assert fs.exists(folder)
assert len(fs.ls(folder)) > 0

test()
Loading

0 comments on commit 7a7bf28

Please sign in to comment.