Skip to content

Commit

Permalink
New Datastore abstraction for Metaflow (#580)
Browse files Browse the repository at this point in the history
* Add external_field to parameter

* Fix timedate conversion from service to client

* New datastore implementation

This commit contains only the new datastore code and none of the backend implementations.
The datastore is now split into different files:
  - flow_datastore.py contains the top-level FlowDataStore implementation
  - task_datastore.py contains the task-level datastore implementation
  - content_addressed_store.py contains the underlying content addressed store used by
    both previous data-stores.

* Local backend for the datastore.

The local backend is used to save and load local files.

* Datatools performance improvements

Datatools will now cache the s3 client it uses for single operations
resulting in faster operation times.

Another optimization (non advertised) is that datatools can now take
an IOBase directly to avoid having an additional copy.

Finally, __del__ now performs a close so the S3 datatool can be used
as a regular object as opposed to just within a context.

* Added S3 datastore backend.

This backend allows the datastore to interface with S3.

* Pure non-semantic changes (name changes and comment changes).

One tiny semantic change in the way a Tar file is read (using the recommended
open method instead of TarFile object)

* Integrate the new datastore implementation into the current code

* Remove no longer needed files for the datastore.

* New features for the S3 datatools:
  - support for range queries (in get and get_many)
  - support for content-type and user metadata in put, put_many and put_files
    (metadata can also be retrieved using any of the get calls)
  - support for info and info_many to retrieve information about a file without
    fetching it.

* Fix metadata to allow for non-string arguments

* [WIP] Modify new datastore to use metadata in backend storage

Instead of encoding needed information directly in the file, we now
encode this information as file metadata leveraging the support for metadata
for the S3 datatools and implementing support for it in the local
filesystem (creating a separate file)

* Properly use download_fileobj when no range is specified

* Re-add intermediate compatibility mode for loading blobs from the CAS

* Remove print message; fix local_backend in some cases

* [WIP] Datatools: See if moving info to worker makes a performance difference

* Fix issue with s3 datastore backend for multiple files

* Fix issue with s3 datastore backend for multiple files

* Addressed comments

* Improve import of metaflow_custom to allow for top-level imports

* Fix small issue with sidecar message types

* Fix tags for attempt-done in task-datastore

* Fix small issue with sidecar message types

* Addressed comments -- minor tweaks and file refactoring

* Addressed comment: removed handling of ephemeral CAS encoding directly in file

* Addressed comments; refactored use of pipes

Addressed the minor style comments.
Removed the use of a pipe to communicate between workers and master.
Communication now occurs via a file.

* Forgot file in previous commit

* Typo in parentheses causing issues with put of multiple files

* Fix issue with S3PutObject

* Fix bugs due to int/string conversion issues with new file mechanism

* Fix issue with is_none and py2 vs py3

* Fix issue with metaflow listing all flows

* Handle non-integer task/run IDs in local metadata

* WIP: MFLog on convergence branch

This is still in progress but pushing for reference.

* Fixups to track master MFLog and specific convergence fixups

* Fix to Batch logging

* Add support for getting all artifacts from the filecache

* Improve MFLog:
  - Save logs later in runtime to get all logs (even messages printed on error)
  - Allow for escaping variables in bash commands

* Fix a bug when missing data is requested from the datastore

* Trigger test on convergence branch

* Fix failing tests

* Fix issue caused by merge_artifacts now having artifacts read from datastore in write mode (#577)

* Convergence master merge (#578)

* UBF Implementation (#554)

Unbounded Foreach Implementation

Co-authored-by: Savin <savingoyal@gmail.com>

* Check if R step names are valid Python identifiers (#573)

* Check if R step names are valid Python identifiers

* Accidentally changed a docstring step_name to validator. Reverting.

* get rid of  _METAFLOW_RESUMED_RUN (#575)

* Add Unbounded Foreach support

Co-authored-by: Savin <savingoyal@gmail.com>
Co-authored-by: David Neuzerling <mdneuzerling@users.noreply.github.com>
Co-authored-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>

* Revert "Add external_field to parameter"

This reverts commit b118685.

* Small MFlog refactor

* Add a way to pass an external client to S3()

* Cache the S3 boto connection in the datastore (#570)

* The datastore now caches the S3 boto connection and passes it to the S3 client

This allows us to use S3() using with (and therefore properly clean up the temp
directory) but also benefit from the connection reuse.

As part of this change, we also had to change the interface to load_bytes. This
change is invisible to the user as it is 100% internal to the datastore.

Cleaned up the s3tail.py and s3util.py files to co-locate them with the
datatools instead of the datastore (since they were no longer used directly
in the datastore). This simplified the import story.

* Addressed comments and simplified the code a bit

- Use a single CloseAfterUse class.
- Remove the use of caching on write (in artifact and blob cache); the cache
  now only caches things that are read which is the way it is being used
- Simplified the types of caches used in datastore_set.py

* Remove no longer needed file

* Clean up usage of CloseAfterUse

* remove artifact_cache, simplify blob_cache

* get rid of redundant LazyFile

* fix FileBlobCache root

* fix paths in content_addressed_store

* Addressed comments

* Move check on file existence in CAS to backend for better efficiency

* Fix modes for datastore due to merge_artifacts functionality

* Ignore error when overwrite is False and file exists

Co-authored-by: Ville Tuulos <tuulos@gmail.com>

* Add an option to define your own AWS client provider (#611)

* Add an option to define your own AWS client provider

You can now specify a function that returns an AWS client. This is
useful if you want to use something other than boto3 and can be used
with the metaflow_custom mechanism to provide your own authentication

* Typo

* Do not remove _expected_extensions since it is used in a function

* Another typo

* Addressed comments

* Add inputs to task_pre_step (#616)

* Tweaks and optimizations to datastore

* Revert "Move check on file existence in CAS to backend for better efficiency"

This reverts commit d0ac6fe.

* Change encoding type to gzip+pickle to keep forward compatibility

* Reduce memory usage of artifact save and load

We ensure that we do not keep around temporaries and allow the
GC to remove objects that are no longer needed

* Make is_file parallel

We can now call is_file in parallel. It is unclear if this helps a lot given the
overhead of loading the s3op and starting them all. It may help in the case of
a very large number of artifacts

* Set overwrite=True when uploading metadata

* Improve fetching info and file at the same time

* reduce duplicate copies in datastore by using generators

* minimize memory consumption by handling artifacts one by one

* fix docstrings and documentation

* fix @Batch to work with the new datastore

* do not implicitly assume a correct key order in flow_datastore.load_data

* Other minor consistency tweaks

Co-authored-by: Romain Cledat <rcledat@netflix.com>

* Merge master into convergence

* Add inputs to task_pre_step (#615)

* Add inputs to task_pre_step

* Addressed comments

* Forgot to remove an import

* Refactor @resources decorator (#617)

* Refactor @resources decorator

@resources decorator is shared by all compute related decorators -
@Batch, @lambda, @K8s, @titus. This patch moves it out of
batch_decorator.py so that other decorators can cleanly reference
it.

* Update __init__.py

* Add an option to define your own AWS client provider (#620)

You can now specify a function that returns an AWS client. This is
useful if you want to use something other than boto3 and can be used
with the metaflow_custom mechanism to provide your own authentication

* Add S3 tests (#613)

* Add S3 tests

* Addressed comments

* silence tar timestamp warnings (#627)

* Handle None as default parameters properly in AWS Step Functions (#630)

A parameter specification like -
```
Parameter(name="test_param", type=int, default=None)
```
will result in an error even though the default has been specified
```
Flow failed:
    The value of parameter test_param is ambiguous. It does not have a default and it is not required.
```

This PR fixes this issue.

* IncludeFile now returns the included file in the client (#607)

* DRAFT: IncludeFile now returns the included file in the client and CLI

THIS IS NOT FINISHED; DO NOT MERGE AS IS.

* Fix the tests

* Forgot to update type check for multiple encoding

* Add resource tags to AWS Batch jobs (#631)

* Add resource tags to AWS Batch jobs

This PR assumes that Batch:TagResource is whitelisted for the
role which submits the AWS Batch job.

* propagate tags

* add env var

* better commens

* add production token

* New patch release (#633)

* Bug fix with s3tail exception (#635)

* Revert "IncludeFile now returns the included file in the client (#607)" (#637)

This reverts commit 0575f8d.

* patch release (#639)

* Update Unbounded Foreach Test in the case of a Conda environment (#626)

Co-authored-by: Savin <savingoyal@gmail.com>
Co-authored-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>

* Fix Py2 compatibility

* Fix typo in s3op.py

* Pin test executions to master branch

* Fix names of metadata file to include  ending

* Renamed metaflow_custom to metaflow_extensions

* Remove duplicate code

* remove .json suffix from metadata

* test commit

* remove spurious commits

* Fix merge

* remove new line

* Fixed batch related bug in convergence. (#710)

* fix merge conflicts

* Batch fixes for convergence branch

* fixup aws batch

* fix whitespace

* fix imports

* fix newline

* fix merge

* fix local metadata for aws batch

* merge mflog

* stylistic fixes

* Added preliminary documentation on the datastore (#593)

* Added preliminary documentation on the datastore

* Update datastore.md

Update with new name for the backend.

* Final set of patches to convergence (#714)

* convergence fixes

* fixes

* gzip ts changes

* fix logs subcommand

* typo

* Address comments; add support for var_transform in bash_capture_logs

* Fix local metadata sync

* Forgot to remove duplicate sync_metadata

Co-authored-by: Romain Cledat <rcledat@netflix.com>
Co-authored-by: Romain <romain-intel@users.noreply.github.com>

* Typo in error message

Co-authored-by: Savin <savingoyal@gmail.com>
Co-authored-by: David Neuzerling <mdneuzerling@users.noreply.github.com>
Co-authored-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>
Co-authored-by: Ville Tuulos <tuulos@gmail.com>
Co-authored-by: Valay Dave <valaygaurang@gmail.com>
  • Loading branch information
6 people authored Sep 24, 2021
1 parent 2410ec0 commit 4c8c470
Show file tree
Hide file tree
Showing 45 changed files with 2,517 additions and 1,728 deletions.
216 changes: 216 additions & 0 deletions docs/datastore.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
# Datastore design
## Motivation

The datastore is a crucial part of the Metaflow architecture and deals with
storing and retrieving data, be they artifacts (data produced or consumed within
user steps), logs, metadata information used by Metaflow itself to track execution
or other data like code packages.

One of the key benefits of Metaflow is the ease with which users can access the
data; it is made available to steps of a flow that need it and users can access
it using the Metaflow client API.

This documentation provides a brief overview of Metaflow's datastore implementation
and points out ways in which it can be extended to support, for example, other
storage systems (like GCS instead of S3).

## High-level design

### Design principles
A few principles were followed in designing this datastore. They are listed here
for reference and to help explain some of the choices made.

#### Backward compatibility
The new datastore should be able to read and interact with data stored using
an older implementation of the datastore. While we do not guarantee forward
compatibility, currently, older datastores should be able to read most of the data
stored using the newer datastore.

#### Batch operations
Where possible, APIs are batch friendly and should be used that way. In other
words, it is typically more efficient to call an API once, passing it all the
items to operate on (for example, all the keys to fetch) than to call the same
API multiple times with a single key at a time. All APIs are designed with
batch processing in mind where it makes sense.

#### Separation of responsabilities
Each class implements few functionalities and we attempted to maximize reuse.
The idea is that this will also help in developing newer implementations going
forward and being able to surgically change a few things while keeping most of
the code the same.

### Storage structure
Before going into the design of the datastore itself, it is worth considering
**where** Metaflow stores its information. Note that, in this section, the term
`directory` can also refer to a `prefix` in S3 for example.

Metaflow considers a datastore to have a `datastore_root` which is the base
directory of the datastore. Within that directory, Metaflow will create multiple
sub-directories, one per flow (identified by the name of the flow). Within each
of those directories, Metaflow will create one directory per run as well as
a `data` directory which will contain all the artifacts ever produced by that
flow.

The datastore has several components (starting at the lowest-level):
- a `DataStoreStorage` which abstracts away a storage system (like S3 or
the local filesystem). This provides very simple methods to read and write
bytes, obtain metadata about a file, list a directory as well as minor path
manipulation routines. Metaflow provides sample S3 and local filesystem
implementations. When implementing a new backend, you should only need to
implement the methods defined in `DataStoreStorage` to integrate with the
rest of the Metaflow datastore implementation.
- a `ContentAddressedStore` which implements a thin layer on top of a
`DataStoreStorage` to allow the storing of byte blobs in a content-addressable
manner. In other words, for each `ContentAddressedStore`, identical objects are
stored once and only once, thereby providing some measure of de-duplication.
This class includes the determination of what content is the same or not as well
as any additional encoding/compressing prior to storing the blob in the
`DataStoreStorage`. You can extend this class by providing alternate methods of
packing and unpacking the blob into bytes to be saved.
- a `TaskDataStore` is the main interface through which the rest of Metaflow
interfaces with the datastore. It includes functions around artifacts (
`persisting` (saving) artifacts, loading (getting)), logs and metadata.
- a `FlowDataStore` ties everything together. A `FlowDataStore` will include
a `ContentAddressedStore` and all the `TaskDataStore`s for all the tasks that
are part of the flow. The `FlowDataStore` includes functions to find the
`TaskDataStore` for a given task as well as save and load data directly (
this is used primarily for data that is not tied to a single task, for example
code packages which are more tied to runs).

From the above description, you can see that there is one `ContentAddressedStore`
per flow so artifacts are de-duplicated *per flow* but not across all flows.

## Implementation details

In this section, we will describe each individual class mentioned above in more
detail

### `DataStoreStorage` class

This class implements low-level operations directly interacting with the
file-system (or other storage system such as S3). It exposes a file and
directory like abstraction (with functions such as `path_join`, `path_split`,
`basename`, `dirname` and `is_file`).

Files manipulated at this level are byte objects; the two main functions `save_bytes`
and `load_bytes` operate at the byte level. Additional metadata to save alongside
the file can also be provided as a dictionary. The backend does not parse or
interpret this metadata in any way and simply stores and retrieves it.

The `load_bytes` has a particularity in the sense that it returns an object
`CloseAfterUse` which must be used in a `with` statement. Any bytes loaded
will not be accessible after the `with` statement terminates and so must be
used or copied elsewhere prior to termination of the `with` scope.

### `ContentAddressedStore` class

The content addressed store also handles content as bytes but performs two
additional operations:
- de-duplicates data based on the content of the data (in other words, two
identical blobs of data will only be stored once
- transforms the data prior to storing; we currently only compress the data but
other operations are possible.

Data is always de-duplicated but you can choose to skip the transformation step
by telling the content address store that the data should be stored `raw` (ie:
with no transformation). Note that the de-duplication logic happens *prior* to
any transformation (so the transformation itself will not impact the de-duplication
logic).

Content stored by the content addressed store is addressable using a `key` which is
returned when `save_blobs` is called. `raw` objects can also directly be accessed
using a `uri` (also returned by `save_blobs`); the `uri` will point to the location
of the `raw` bytes in the underlying `DataStoreStorage` (so for exmaple a local
filesystem path or a S3 path). Objects that are not `raw` do not return a `uri`
as they should only be accessed through the content addressed store.

The symmetrical function to `save_blobs` is `load_blobs` which takes a list of
keys (returned by `save_blobs`) and loads all the objects requested. Note that
at this level of abstraction, there is no `metadata` for the blobs; other
mechanisms exist to store, for example, task metadata or information about
artifacts.

#### Implementation detail

The content addressed store contains several (well currently only a pair) of
functions named `_pack_vX` and `_unpack_vX`. They effectively correspond to
the transformations (both transformation to store and reverse transformation
to load) the data undergoes prior to being stored. The `X` corresponds to the
version of the transformation allowing new transformations to be added easily.
A backward compatible `_unpack_backward_compatible` method also allows this
datastore to read any data that was stored with a previous version of the
datastore. Note that going forward, if a new datastore implements `_pack_v2` and
`_unpack_v2`, this datastore would not be able to unpack things packed with
`_pack_v2` but would throw a clear error as to what is happening.

### `TaskDataStore` class

This is the meatiest class and contains most of the functionality that an executing
task will use. The `TaskDataStore` is also used when accessing information and
artifacts through the Metaflow Client.

#### Overview

At a high level, the `TaskDataStore` is responsible for:
- storing artifacts (functions like `save_artifacts`, `persist` help with this)
- storing other metadata about the task execution; this can include logs,
general information about the task, user-level metadata and any other information
the user wishes the persist about the task. Functions for this include
`save_logs` and `save_metadata`. Internally, functions like `done` will
also store information about the task.

Artifacts are stored using the `ContentAddressedStore` that is common to all
tasks in a flow; all other data and metadata is stored using the `DataStoreStorage`
directly at a location indicated by the `pathspec` of the task.

#### Saving artifacts

To save artifacts, the `TaskDataStore` will first pickle the artifacts, thereby
transforming a Python object into bytes. Those bytes will then be passed down
to the `ContentAddressedStore`. In other words, in terms of data transformation:
- Initially you have a pickle-able Python object
- `TaskDataStore` pickles it and transforms it to `bytes`
- Those `bytes` are then de-duped by the `ContentAddressedStore`
- The `ContentAddressedStore` will also gzip the `bytes` and store them
in the storage backend.

Crucially, the `TaskDataStore` takes (and returns when loading artifacts)
Python objects whereas the `ContentAddressedStore` only operates with bytes.

#### Saving metadata and logs

Metadata and logs are stored directly as files using the `DataStoreStorage` to create
and write to a file. The name of the file is something that `TaskDataStore`
determines internally.

### `FlowDataStore` class

The `FlowDataStore` class doesn't do much except give access to `TaskDataStore`
(in effect, it creates the `TaskDataStore` objects to use) and also allows
files to be stored in the `ContentAddressedStore` directly. This is used to
store, for example, code packages. File stored using the `save_data` method
are stored in `raw` format (as in, they are not further compressed). They will,
however, still be de-duped.

### Caching

The datastore allows the inclusion of caching at the `ContentAddressedStore` level:
- for blobs (basically the objects returned by `load_blobs` in the
`ContentAddressedStore`). Objects in this cache have gone through: reading
from the backend storage system and the data transformations in
`ContentAddressedStore`.

The datastore does not determine how and where to cache the data and simply
calls the functions `load_key` and `store_key` on a cache configured by the user
using `set_blob_cache`.
`load_key` is expected to return the object in the cache (if present) or None otherwise.
`store_key` takes a key (the one passed to `load`) and the object to store. The
outside cache is free to implement its own policies and/or own behavior for the
`load_key` and `store_key` functions.

As an example, the `FileCache` uses the `blob_cache` construct to write to
a file anything passed to `store_key` and returns it by reading from the file
when `load_key` is called. The persistence of the file is controlled by the
`FileCache` so an artifact `store_key`ed may vanish from the cache and would
be re-downloaded by the datastore when needed (and then added to the cache
again).
Loading

0 comments on commit 4c8c470

Please sign in to comment.