Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sharding Prototype I: implementation as translating Store #876

Closed
wants to merge 11 commits into from

Conversation

jstriebel
Copy link
Member

@jstriebel jstriebel commented Nov 18, 2021

This PR is for an early prototype of sharding support, as described in the corresponding issue #877. It serves mainly to discuss the overall implementation approach for sharding. This PR is not (yet) meant to be merged.

This prototype

  • allows to specify shards as the number of chunks that should be contained in a shard (e.g. using arr.zeros((20, 3), chunks=(3, 3), shards=(2, 2), …)).
    One shard corresponds to one storage key, but can contain multiple chunks:
    sharding
  • ensures that this setting is persisted in the .zarray config and loaded when opening an array again, adding two entries:
    • "shard_format": "indexed" specifies the binary format of the shards and allows to extend sharding with other formats later
    • "shards": [2, 2] specifies how many chunks are contained in a shard,
  • adds a IndexedShardedStore class that is used to wrap the chunk-store when sharding is enabled. This store handles the grouping of multiple chunks to one shard and transparently reads and writes them via the inner store in a binary format which is specified below. The original store API does not need to be adapted, it just stores shards instead of chunks, which are translated back to chunks by the IndexedShardedStore.
  • adds a small script chunking_test.py for demonstration purposes, this is not meant to be merged but servers to illustrate the changes.

The currently implemented file format is still up for discussion. It implements "Format 2" @jbms describes in #876 (comment).

Chunks are written successively in a shard (unused space between them is allowed), followed by an index referencing them.
The index holding an offset, length pair of little-endian uint64 per chunk, the chunks-order in the index is row-major (C) order,
e.g. for (2, 2) chunks per shard an index would look like:

| chunk (0, 0)    | chunk (0, 1)    | chunk (1, 0)    | chunk (1, 1)    |
| offset | length | offset | length | offset | length | offset | length |
| uint64 | uint64 | uint64 | uint64 | uint64 | uint64 | uint64 | uint64 |

Empty chunks are denoted by setting both offset and length to 2^64 - 1. All the index always has the full shape of all possible chunks per shard, even if they are outside of the array size.

For the default order of the actual chunk-content in a shard I'd propose to use Morton order, but this can easily be changed and customized, since any order can be read.


If the overall direction of this PR is pursued, the following steps (and possibly more) are missing:

  • Functionality
    • Use a default write-order (Morton) of chunks in a shard and allow customization
    • Support deletion in the ShardedStore
    • Group chunk-wise operations in Array where possible (e.g. in digest & _resize_nosync)
    • Consider locking mechanisms to guard against concurrency issues within a shard
    • Allow partial reads and writes when the wrapped store supports them
    • Add support for prefixes before the chunk-dimensions in the storage key, e.g. for arrays that are contained in a group
    • Add warnings for inefficient reads/writes (might be configured)
    • Maybe use the new partial read method on the Store also for the current PartialReadBuffer usage (to detect if this is possible and reading via it)
  • Tests
    • Add unit tests and/or doctests in docstrings
    • Test coverage is 100% (Codecov passes)
  • Documentation
    • also document optional optimization possibilities on the Store or BaseStore class, such as getitems or partial reads
    • Add docstrings and API docs for any new/modified user-facing classes and functions
    • New/modified features documented in docs/tutorial.rst
    • Changes documented in docs/release.rst

changed 2021-12-07: added file format description and updated TODOs

@pep8speaks
Copy link

pep8speaks commented Nov 18, 2021

Hello @jstriebel! Thanks for updating this PR. We checked the lines you've touched for PEP 8 issues, and found:

Line 36:45: E211 whitespace before '['
Line 44:101: E501 line too long (104 > 100 characters)
Line 47:101: E501 line too long (115 > 100 characters)

Comment last updated at 2021-12-22 11:34:19 UTC

@jstriebel jstriebel changed the title Sharding Initial Sharding Prototype Nov 18, 2021
@jstriebel jstriebel mentioned this pull request Nov 18, 2021
@codecov
Copy link

codecov bot commented Nov 18, 2021

Codecov Report

Merging #876 (2d1fea0) into master (b80c0c4) will decrease coverage by 0.06%.
The diff coverage is 96.00%.

@@            Coverage Diff             @@
##           master     #876      +/-   ##
==========================================
- Coverage   99.94%   99.88%   -0.07%     
==========================================
  Files          32       34       +2     
  Lines       11216    11382     +166     
==========================================
+ Hits        11210    11369     +159     
- Misses          6       13       +7     
Impacted Files Coverage Δ
zarr/_storage/store.py 100.00% <ø> (ø)
zarr/util.py 98.95% <73.33%> (-1.05%) ⬇️
zarr/_storage/sharded_store.py 97.02% <97.02%> (ø)
chunking_test.py 100.00% <100.00%> (ø)
zarr/core.py 100.00% <100.00%> (ø)
zarr/creation.py 100.00% <100.00%> (ø)
zarr/meta.py 100.00% <100.00%> (ø)
zarr/storage.py 100.00% <100.00%> (ø)

@jbms
Copy link

jbms commented Nov 18, 2021

I think this is sorely-needed functionality. Can you please write some documentation of the data format and metadata, so that I don't have to infer it from the code?

@jbms
Copy link

jbms commented Nov 18, 2021

Taking a quick look at the code, I see that there isn't much to do the data format --- you add a shards key to the metadata and the shard file is just the concatenation of the individual chunks, which must be fixed size.

Naturally you will need an index in the shard to support variable-size chunks. I would suggest making the definition of that format a top priority, as it makes sense to decide on the format before worrying too much about the implementation.

@jbms
Copy link

jbms commented Nov 18, 2021

I would also suggest supporting missing chunks within a shard.

@jbms
Copy link

jbms commented Nov 18, 2021

Another thing to consider is putting the shard index in a separate file/key, and allow it to specify the key for individual chunks. That way you have some flexibility in which chunks are stored together. For example you can make the shard much larger that way. The downside is that there is a possibility of dead space and you might need to do a compaction step afterwards to reclaim that space.

If you do put the index in the shard file itself, I'd suggest making it a fixed size and at the end of the file. That way you can easily request it via a single HTTP range request from GCS or S3.

@jakirkham
Copy link
Member

Based on Jeremy's comments, am almost wondering if we should have a MutableMapping that describes the shards in a chunk as well. Then users can chose what that might be. It can also handle missing shards relatively easily.

Thinking of existing MutableMappings we support, this could be something like ZipStore, DBMStore, LMDBStore, or SQLiteStore.

Though maybe there are better options than these that we could also consider in the future. Basing sharding on a MutableMapping interface, should allow for that flexibility.

@jbms
Copy link

jbms commented Nov 18, 2021

First of all, the spec version will probably have to be bumped to version 3, since it is an incompatible change --- version 2 implementations that attempt to read the metadata will ignore the shards key and will not work properly.

I think it would be very useful to fully-specify the format of the shards in the .zarray metadata file itself, at least in the common case. Otherwise you have to have some out-of-band way to indicate the shard format as well, which is inconvenient and leads to implementation divergence (i.e. same issues as occurred with dimension_separator previously).

There is another consideration regarding making the shard format a mutable mapping: for reading, you need to support random access, and mutable mapping is a reasonable interface for reading, though for efficiency you will want to cache the shard index content, to avoid having to re-read it for every chunk individually. For writing, though, you need an interface where you stream out a sequence of chunks, then finalize it by writing the index. I guess that kind of fits the MutableMapping interface, except that you need to somehow handle the finalize step. If you did use a format like LMDB or Sqlite for the shard, and are storing the shard on a local filesystem or other storage system that allows writes to arbitrary byte ranges, then you could indeed just support the MutableMapping interface without need for a finalize step. But that would not work for S3, GCS and similar stores that are high latency and don't allow writes to existing files, and those stores were a key motivation for sharing in the first place.

I think an important question regarding allowing pluggable shard formats is what the benefit would be. Essentially it is already possible via a custom store to implement sharding. The key advantage of this proposal is that the sharding is directly specified in the .zarray file, which allows for standardization.

As a simple starting point, here are two possible shard formats:

Format 1 (chunks are in fixed order):

  • chunks stored in fixed order (e.g. lexicographical by chunk position)
  • chunk index, which consists of prod(shards) uint64le values (8 * prod(shards) bytes) specifying the size in bytes of each chunk. The special value of 2^64-1 indicates a missing chunk. You compute the cumulative sum (skipping missing chunks) to determine the starting and ending offsets of each chunk.

Format 2 (chunks are in arbitrary order):

  • chunks stored in any arbitrary order, empty space is also allowed
  • chunk index, which consists of prod(shards) * 2 uint64le values (16 * prod(shards) bytes) specifying the starting and ending offset of each chunk. The special start/end pair (2^64-1, 2^64-1) indicates a missing chunk.

With either format, to read a chunk you would first read the entire chunk index, then read the appropriate byte range for the chunk. When reading from an HTTP store, this would mean 1 byte range request for the index, plus one byte range request for the chunk.

Zip format would work fairly similarly to format 2, except that you need one more byte range request to first read the End of Central Directory record.

I guess so far zarr itself has avoided specifying any binary formats --- only the .zarray metadata file itself has a specified format, the format of individual chunks are determined entirely by the codec. I can see therefore the reluctance to specify a binary format for shards. But I think this feature would be much more valuable with a defined format.

@jakirkham
Copy link
Member

On immutable object stores, Jeremy, you might be interested in reading issue ( zarr-developers/zarr-specs#82 )

Copy link
Member

@joshmoore joshmoore left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For anyone reading along,

(base) /opt/zarr-python $ python pr876.py
ONDISK ['.zarray', '0.0', '1.0', '2.0', '3.0']
STORE ['.zarray', '1.0', '2.0', '3.0', '0.0']
CHUNKSTORE (SHARDED) ['.zarray', '2.0', '2.1', '3.0', '3.1', '4.0', '4.1', '5.0', '5.1', '6.0', '6.1', '7.0', '7.1', '0.0', '0.1', '1.0', '1.1']

(base) /opt/zarr-python $ tree chunking_test.zarr/
chunking_test.zarr/
├── 0.0
├── 1.0
├── 2.0
└── 3.0

0 directories, 4 files

and with dimension_separator=="/":

├── 0
│   └── 0
├── 1
│   └── 0
├── 2
│   └── 0
└── 3
    └── 0

4 directories, 4 files

As a minor side note, I almost wonder if the naming of the files shouldn't be different for shards.

chunking_test.py Outdated
import zarr

store = zarr.DirectoryStore("data/chunking_test.zarr")
z = zarr.zeros((20, 3), chunks=(3, 3), shards=(2, 2), store=store, overwrite=True, compressor=None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pondering the feedback on #877, I wonder if rather than the relatively hard-coded shared=..., an implementation of some interface here might not be able to give us the flexibility for different backends. i.e. basically passing in chunk_store directly, but that would raise the question of how to serialize the state into .zarray for other processes.

@jstriebel
Copy link
Member Author

jstriebel commented Nov 26, 2021

Thanks everybody for the feedback and sorry for the late response! I just pushed one more commit that makes the actual shard-format configurable by adding e.g. "shard_format": "morton_order" to the .zarray json or setting shard_format during construction. Also I added a bitmask for the uncompressed chunks, just to showcase how some binary metadata can be used in the shard.

I'll try to go through the threads and summarize different decisions we should make along the way and add my opinion:


@joshmoore

add interface to specify different backends for sharding

Totally agree! What do you think about some additional string and/or config for the shard-format, which might be expanded later-on, e.g. as I added in 7e2768a? I agree that it might make sense to allow different formats here, it should especially be future proof for new ones. This could be something like blosc2, zip, sqlite, … The only requirement I see is that it would need to be backed by the normal underlying store.


Re: index-format @jbms @jakirkham

I would suggest making the definition of that format a top priority, as it makes sense to decide on the format before worrying too much about the implementation.

Agreed 👍

wondering if we should have a MutableMapping that describes the shards in a chunk as well.
Thinking of existing MutableMappings we support, this could be something like ZipStore, DBMStore, LMDBStore, or SQLiteStore.

I'm wondering how the current implementations would then be mapped to the storage underneath. I'd assume that we would need to rewrite most of the logic, since those mutable mappings don't operate on binary blobs which we need for the interface of the underlying storage. But I agree, zips or DBs might also be an option, see my proposal about the shard_format key above. I think for now a simple index together with the chunks might be the easiest option, but anything along those lines could be easily added later without breaking compatibility.

As a simple starting point, here are two possible shard formats:

Format 1 (chunks are in fixed order):

  • chunks stored in fixed order (e.g. lexicographical by chunk position)
  • chunk index, which consists of prod(shards) uint64le values (8 * prod(shards) bytes) specifying the size in bytes of each chunk. The special value of 2^64-1 indicates a missing chunk. You compute the cumulative sum (skipping missing chunks) to determine the starting and ending offsets of each chunk.

Format 2 (chunks are in arbitrary order):

  • chunks stored in any arbitrary order, empty space is also allowed
  • chunk index, which consists of prod(shards) * 2 uint64le values (16 * prod(shards) bytes) specifying the starting and ending offset of each chunk. The special start/end pair (2^64-1, 2^64-1) indicates a missing chunk.

Great question. In the second format with arbitrary order, a reader would not need to know anything about the semantic order the chunks were written in, and strategies such as "append-only" would work. Fixed order would shorten the index a bit, and would also guarantee some properties, such as no empty space and the order of the chunks themselves. Especially for the latter I tend towards the fixed-order format, since Morton order gives advantages when reading neighborhoods of chunks in a shard. I'm also wondering if we should make the order itself configurable in this case, or if Morton order is fine for now. Anything else might still be added later via the shard_format string.

For fixed-length chunks (uncompressed and not of dtype object) I would propose to have a simple bitmask to indicate if a chunk is missing, e.g. as implemented in 7e2768a. Another question I was wondering about here: If chunks are fixed-length, should we still write missing chunks in a shard? This would allow that partial writes of any chunk are possible later without rewriting the whole file (just the chunk and the bitmask itself).

One more question that came up: Should the index/bitmask be part of the file or at its start or end? I'd integrate it into the shard-file directly, since it's one file-handle/entry less per shard and (I think) does not bring any downsides (partial reads / writes would still be necessary to read/write subchunks of a shard).
I'm not sure about the implications for putting the index at the start vs end of the file for http-range requests. Putting it at the end would help when streaming the compressed chunks while still compressing latter chunks, so that the index can be computed and written asynchronously at the end, as @jbms wrote already. That's nothing I'd aim for at this stage in the implementation yet, but might be something we want to add later (or is useful for other implementations). I was wondering if putting the index at the start of the file brings benefits for reads on disk and possibly more services, since the cache might rather populate "forward"? Just guessing here, tbh.

To summarize the decisions about the index format:

  • Do we want to use a simple custom binary format vs sth. like zip? (Assuming yes atm for the other questions.)
  • Should the order of the chunks be fixed? This determines if we need to store only the chunk-size or also the start-position of each chunk. If yes, which order (probably Morton)?
  • Should we use a simple bitmask in the case of fixed-length chunks? This would trade a little complexity against index-space.
  • For fixed-length chunks, should we still write non-existing chunks to allow partial writes to new chunks?
  • Should the index/bitmask be at outside or the start or at the end of the file?

@jbms

the spec version will probably have to be bumped to version 3, since it is an incompatible change --- version 2 implementations that attempt to read the metadata will ignore the shards key and will not work properly.

Not sure what incompatible means in this case, but new clients can read all files, just old clients cannot read arrays in the chunk-format anymore. In terms of semantic versioning, this would be a minor version bump in the software, which would probably be a bump the the spec version here.

@jbms
Copy link

jbms commented Nov 26, 2021

As far as Morton order, whether it is advantageous depends on the access pattern. There are also many variations possible, e.g. how the dimensions are ordered, whether some dimensions should be kept in lexicographic order as inner or outer dimensions. If the index does not require a fixed order, then it is possible to later add a metadata field that indicates a preferred order for writing without breaking compatibility.

I am inclined to think that having a separate index format in the case of fixed-size uncompressed chunks to save 8-16 bytes per chunk is not worth the added complexity.

A separate index file makes it possible to append chunks without retaining a wasted copy of the index, and also allows you to compress the index without needing an additional read to determine the size of the index. But it also makes it impossible to safely modify a shard using atomic single-file writes.

@chris-allan
Copy link

Firstly, I'd just like to echo @jbms in saying that this is sorely-needed functionality. It's exciting to see thought and real effort being put into chunk localization.

I would however just like to remind everyone of the discussion in #515 as it has been a while. Specifically, I'd like to draw attention to the historical differences in technical approaches to addressing use cases between Zarr and other efforts like TileDB. That is, Zarr has a history of taking a path that is simple, relatively easy to understand, and quick to implement. This allows the Zarr runtime to be, in comparison to something like TileDB, very lightweight. In my opinion, it has also allowed Zarr to be able to quite nicely balance read-heavy as well as write-heavy use cases when using object storage in particular.

I think everyone here understands the draw of localizing chunks in a single file or object for read-heavy use cases. However, the complexity really starts to ramp up when we start considering writing. This is especially true for object storage where, as @jbms and @jakirkham have already mentioned, objects are immutable. In the functionality section of this PRs description as well as throughout the comments at least the following has been touched upon:

There is substantial overlap between these complex software engineering concerns and the concerns of a database project or a storage engine project like TileDB. TileDB's Data Format in particular addresses:

  • Sparse and dense fragments
  • Immutability
  • Storage order
  • Metadata format

Its runtime also specifically addresses synchronization and time travel, and is a permissively (MIT) licensed project with excellent Python bindings.

Instead of attempting to design and implement many of the aforementioned features in the Zarr core would it be worthwhile evaluating a TileDB based storage implementation as an alternative?

@jbms
Copy link

jbms commented Dec 1, 2021

I agree with @chris-allan that zarr has focused on simplicity of the data format and implementation, and as that has served a class of use cases well it may well be wise to keep zarr aligned with that focus.

It is true that this discussion has touched on many different points, some of which surely depart quite far from the present simplicity of implementation and data format.

However, I think at the core of this proposal there is the possibility of a relatively simple design that has different trade-offs compared to TileDB and serves different use cases:

  • Suppose we make shards immutable once written, with a simple binary format (defined by the zarr specification) where the (compressed) chunks may be written at arbitrary offsets, but the end of the file must contain 16 * number_of_chunks bytes that specify the offset and length of each chunk within the shard.
  • Then writing, even partial writing, is essentially very similar to the existing write support in zarr, except that it is done at the granularity of entire shards rather than chunks.
  • Reading is a bit different as it requires support from the underlying store to read byte ranges rather the entire value.

This design allows efficient concurrent writes as long as they are shard aligned. It also allows efficient concurrent reads. The file format is still quite simple, and the only addition to the .zarray metadata format is the shards member.

My understanding of TileDB is that it maintains a single index, which is ordered first by time-of-write, and then spatially. That makes concurrent writing efficient and relatively simple to implement, and also naturally supports versioning. However, for reading, it is necessary to read at least the bounds of every fragment, as there is no top-level spatial index. That works fine if you have a small number of fragments, or a small number of long-lived readers that can keep a spatial index cached in memory.

However, in the case where you have a large number of fragments, and a large number of possibly short-lived readers, there is a problem. For example, suppose we have written a 1PB dataset as 1 million fragments of 1GB each.

  • If we then wish to read from this dataset from 10000 worker machines in parallel, each of the 10000 machine will have to read the bounds for all 1 million fragments, a total of 10 billion read requests, just to figure out which fragments actually need to be read.
  • A viewer like Neuroglancer, that is directly accessing the storage via a web server, would likewise need to read the bounds of all 1 million fragments, which would take a large amount of time, before it could begin loading any data.

This limitation could be avoided by adding an additional spatial index to TileDB, but that would have its own set of tradeoffs.

@mkitti
Copy link

mkitti commented Dec 3, 2021

Format 2 (chunks are in arbitrary order):

  • chunks stored in any arbitrary order, empty space is also allowed
  • chunk index, which consists of prod(shards) * 2 uint64le values (16 * prod(shards) bytes) specifying the starting and ending offset of each chunk. The special start/end pair (2^64-1, 2^64-1) indicates a missing chunk.

Format 2 is amenable to quickly and simply writing compressed chunks in parallel.

My perspective is from creating software to write shards at data acquisition. Compressing the data and then writing the data may be faster than writing raw uncompressed data to local disk array. Also in this environment, the cost of creating many files on a local filesystem may be high and add unnecessary latency. Thus, shards are relevant to help mitigate that cost. The actual size of the shard file on disk is a secondary priority to storing the data as quickly to disk as possible. The shard file could be repacked to eliminate unused space at a later time or at a slower rate than acquisition speeds. Alternatively repacking may occur while transferring the data off the acquisition system. A format that allows for empty space supports writing compressed chunks in parallel to a shard as quickly as possible would be useful at data acquisition since data write speed may be more important than file compactness.

One scheme to compress and write the chunks parallel is as follows. The mechanics of this scheme involve receiving a pointer to contiguous data from an acquisition device (e.g. a camera), chunking that data, and compressing the data in parallel across multiple threads. A large shard file may be quickly created by opening the file, seeking to an upper bound of the potential file size, and writing a small amount of data. To allow parallel writes to a single shard file, the file is memory mapped, and the chunks are copied to predetermined locations in the mapped memory so that the parallel threads do not need to communicate. These predetermined locations are calculated based on the uncompressed size of the data with potential overhead. Thus, there may be empty space between compressed chunks. Memory mapping is useful here since memory page files are often lazily allocated by the operating system and are flushed to disk in an asynchronous manner. Memory mapping is also a useful generic abstraction so that operating system specific APIs for asynchronous unbuffered writes are not necessary. In summary, the above scheme allows for independent parallel writes to a single shard using memory mapping, but requires the potential for empty space in the file.

There are optimization details for the above scheme, but the lack of need of synchronization across threads is a major advantage and allows for simple concurrent programming. Each thread just requires a set of input and output buffers, one or more of which may be mapped to a file. Buffer to buffer APIs are provided by many compression libraries. Additionally this scheme resembles that used to write chunks to separate files in situations where the cost of file creation is low or less of a priority.

Being able to write data efficiently to a Zarr compatible format at data acquisition would eliminate the need for intermediate formats.

@jbms
Copy link

jbms commented Dec 6, 2021

It would be great if the zarr specification owners weigh in on this. I think a sharded format would be tremendously useful and it would be great to have it ASAP.

I would suggest we go with "format 2", which @mkitti also seems to find satisfactory.

For zarr python we will need to add byte range reading support to the mutable mapping API, perhaps via a new optional method. For mutable mappings that don't support this new method, it can be (inefficiently) emulated by doing a full read and then throwing out the unneeded bytes. Or alternatively it could be handled at a higher level by just reading at full shard granularity.

It would be really great if we can reach agreement on the format and try to proceed with the implementation quickly.

I can implement support in Neuroglancer and TensorStore; given the existing support for the neuroglancer precomputed sharded format that will not be too difficult.

@normanrz
Copy link
Member

normanrz commented Dec 7, 2021

I agree to go forward with "format 2". I think it is powerful enough to solve all the current needs for storage, visualization (e.g. neuroglancer and webKnossos) and processing of very large datasets, which were the primary goals of the sharding feature. I think the format is also simple enough to be widely implemented and fits well with the zarr philosophy.

@jstriebel
Copy link
Member Author

jstriebel commented Dec 7, 2021

Thanks for the input! Going forward with "Format 2" sounds great! I just added a commit to support this format:
The (possibly compressed) chunks are currently written in random order, followed by an index referencing them. The index is holding an offset, length pair of biglittle-endian uint64 per chunk, the chunk-order in the index is row-major (C) order.
For (2, 2) chunks per shard an index would look like:

| chunk (0, 0)    | chunk (0, 1)    | chunk (1, 0)    | chunk (1, 1)    |
| offset | length | offset | length | offset | length | offset | length |
| uint64 | uint64 | uint64 | uint64 | uint64 | uint64 | uint64 | uint64 |

Fow now, I left the shard_format key in the .zarray specification and called the format indexed. For the default order of the actual chunk-content in a shard I'd propose to use Morton order, but this can easily be changed and customized, since all orders can be read. I'd propose to make the write-order easily configurable in the API, but to leave it out of the array spec in .zarray, which allows for arbitrary customizations without the need of serializing this setting.

If we can agree on this or a varied format, the following steps would be missing for the zarr-python implementation:

  • Use a default write-order (Morton) of chunks in a shard and allow customization
  • Support deletion in the ShardedStore
  • Group chunk-wise operations in Array where possible (e.g. in digest & _resize_nosync)
  • Consider locking mechanisms to guard against concurrency issues within a shard
  • Allow partial reads and writes when the wrapped store supports them
  • Add support for prefixes before the chunk-dimensions in the storage key, e.g. for arrays that are contained in a group
  • Add warnings for inefficient reads/writes (might be configured)
  • Maybe use the new partial read method on the Store also for the current PartialReadBuffer usage (to detect if this is possible and reading via it)
  • Tests
  • Documentation

I also updated the PR description in the top accordingly.

What would be the process for the next steps besides the implementation?

@chris-allan
Copy link

If we've settled our collective intent to be solely addressing the read-optimized, chunk-localized use case I will definitely table the TileDB discussion in favour of not creating additional noise here. If anyone is interested in looking into it I'd be happy to explore it on another issue.

"Format 2" certainly seems to have some consensus and I'll add my +1. Perhaps the two objects might have zbin and zidx suffixes? Is there a preference for a single object? It would also open the door for adding zhash or zpar extensions to the format going forward for those who might be interested in integrity hash and/or data recovery redundancy parity functionality.

Of the very large data set use cases already mentioned here I'll add a general one: moving and deleting the many files that make up a Zarr based imaging data set can be a real pain under a lot of circumstances. Users of even not so large imaging data sets already feel this pain with Zarr and they do not feel it with traditional single file, proprietary or not, file formats. I believe @normanrz has already referenced the PR on at least one forum.image.sc thread with concerns surrounding exactly this.

Furthermore, as @mkitti has already outlined, chunk-localization is of high interest to the instrumentation community. That community is incredibly diverse and often operates very close to the metal because it has to in order to achieve the write speeds required to not have acquisition hardware waiting on I/O. They would not be making mention of memory mapped and zero-copy I/O, operating system semantics, or repacking otherwise. I would argue that Zarr's current design is predicated on not having the user be even tangentially concerned about any of these things. This is especially true when object storage is in use and half of them aren't even possible.

Consequently, with respect to the zarr-python API, I think I'd move forward with caution and focus on very high quality documentation and extremely well defined semantics surrounding this functionality. It's going to be very tempting to have leaky abstractions to allow for very high performance, operating system specific or storage subsystem specific use cases and I don't think that is a good idea. If, as @jbms has been focusing on, we get the format right and that format continues in the Zarr spirit of simplicity then I think that gives freedom for those types of users to write to the format as they see fit.

One might even argue that the API additions be read-only, at least initially, as reconciling a Zarr user's current expectation of chunk aligned writes vs. shard aligned writes may be very difficult. I expect we can all agree that if we're not careful, as chunk-localization is very attractive to a lot of people, we will be presented with a significant number of users with write performance horror stories and that will have a real detrimental effect on the community perception of Zarr.

@rabernat
Copy link
Contributor

rabernat commented Dec 7, 2021

It would be great if the zarr specification owners weigh in on this.

I just read through this long and thoughtful thread. Thanks to everyone participating. I am 👍 on format 2.

One might even argue that the API additions be read-only, at least initially, as reconciling a Zarr user's current expectation of chunk aligned writes vs. shard aligned writes may be very difficult.

Geoscience Use Cases

From the perspective of geospatial data, our use cases tend to be "write once, read many." The data generation process can almost always be tuned to write shards (or what we currently call chunks) contiguously, e.g. one shard per task. The main performance benefits from sharding will be for data users who want to make reads at the sub-shard level--currently they have to read the whole V2 chunk, but this new feature will enable much more granular access. So I would personally be fine with our first releases of this new feature simply punting on supporting parallel writes to a single shard.

Optimizing Reads

To optimize reads, particularly on high-latency stores like S3, it will be key to implement intelligent fusion of contiguous chunks into a single read operation. For a given shard array slice (e.g. 0:20, 100:200), we would need to:

  • Figure out which chunks intersect with the slice (N chunks)
  • Look up the byte ranges of those N chunks from the index
  • Perform some kind of optimization to fuse the reads into a smaller number (< N) of range requests. Depending on the latency / concurrency / throughput profile of the storage system, it may even be optimal to read more chunks than are strictly needed in order to reduce the number of requests. I assume folks here (e.g. @jbms in Neuroglancer) have solved some version of this problem already.

Locking

Once we are ready to fully support concurrent writes to the sharded format, we may need to get more serious about locking. In the Pangeo Forge project, we have an internal class called a ChunkGrid which figures out the granular locking needed to write a source array with a particular chunk scheme to a target array with a different chunk scheme. That could be pulled out and turned into a standalone thing that would be useful here.

@martindurant
Copy link
Member

I'd just like to mention that the simplest and perhaps most useful compression to consider for sharding, and one that is quite common for kerchunk, is None. In that case, we already know all the offsets! I think blosc has a decent framing format for getting at blocks and so does, for example, zstd - but those are only useful (of course) if the internal block size is small enough to have several blocks in the chunk, but big enough to still give good compression. Is finely controlling the blocksize on write a necessity?

I don't know if it has been mentioned as part of this discussion, but the ability for concurrent reads via fsspec makes smaller zarr chunks more feasible and puts less pressure to make big, shardable chunks. That's from a raw throughput perspective anyway; making thousands of files has other downsides.I am thinking, though, that we need to be careful that the sharding index (compressed offset versus stream offset for all blocks of all chunks) is big enough that we'll notice on open.

It's perhaps interesting that this idea is something that kerchunk is completely independently interested in: for example being able to find the offset within a compressed CSV file, from which you can start reading and know that the first newline character is a real record terminator. Unfortunately, gz is by far the commonest compressor, with the least well-defined blocks, probably impossible to index.

@jbms
Copy link

jbms commented Dec 16, 2021

@martindurant wrote:

I'd just like to mention that the simplest and perhaps most useful compression to consider for sharding, and one that is quite common for kerchunk, is None. In that case, we already know all the offsets!

That was indeed the initial implementation in this PR. However, it definitely depends on the application whether uncompressed is desirable. Certainly for my own applications it is not. You also lose support for missing chunks.

I don't know if it has been mentioned as part of this discussion, but the ability for concurrent reads via fsspec makes smaller zarr chunks more feasible and puts less pressure to make big, shardable chunks. That's from a raw throughput perspective anyway; making thousands of files has other downsides.

My interest in this proposal is for use with large datasets where storing many small files would be impractical.

I am thinking, though, that we need to be careful that the sharding index (compressed offset versus stream offset for all blocks of all chunks) is big enough that we'll notice on open.

Can you explain what you mean by that?

It's perhaps interesting that this idea is something that kerchunk is completely independently interested in: for example being able to find the offset within a compressed CSV file, from which you can start reading and know that the first newline character is a real record terminator. Unfortunately, gz is by far the commonest compressor, with the least well-defined blocks, probably impossible to index.

I can see that you could use a 1-d zarr array to store your csv data, in which case the topic of this sharding proposal may be relevant to parallel reading, but that seems rather roundabout. For your case of wanting parallel access/seeking in a compressed file there are already numerous formats that support that and which would also provide better compression than gzip, e.g. zstd.

@martindurant
Copy link
Member

Can you explain what you mean by that?

Simply that downloading and interpreting the shard would add additional overhead when opening a dataset, depending on the size of that sharding information.

The thought about CSVs was not meant to be related to zarr as such, just a related other application of block-seeking in compressed files.

wanting parallel access/seeking in a compressed file there are already numerous formats

I know, I mention zstd, but often the consumer does not control the file format. Also, this needs to be done remotely, without downloading the whole file.

@joshmoore
Copy link
Member

(For some reason, I see neither the "Update branch" nor the "Conflicts" button on this PR)

@normanrz
Copy link
Member

(For some reason, I see neither the "Update branch" nor the "Conflicts" button on this PR)

Maybe because this PR originates from our fork?

@jstriebel
Copy link
Member Author

  1. Read .zarray key from store to obtain metadata, same as existing implementation.
  2. The .zarray metadata contains the information to calculate shard keys and decode chunks

a. When the user issues a read request, the array calculates which shards and which chunks within those shards are required.
b. Array reads the shard index for each required shard via a byte range request to the store
c. Array decodes the shard index for each required shard, and determines the byte ranges of each required chunk
4. Array reads from the store the byte ranges of each required chunk
5. Array decodes each chunk according to the compressor/filter specified in the .zarray metadata, same as existing implementation.

Regarding the array protocol / sharding procedure, I wanted to highlight that the steps 2-5 here can run in isolation of the other slicing/chunking logic of the Array, basically as a transparent translation layer. The API of this layer is basically the same as the one of a store (get, set, delete of chunks), which is why I modelled it as a Store class here, but in fact it's simply a translation layer between the array and the actual underlying store. I'll also provide a PR where this is implemented closer to the Array class, but this won't change anything in the protocol.

Commenting on the next steps:

  • Another PR evaluating the changes starting from Array. e.g. a ShardedArray subclass

Sounds good, I'll add a PR 👍

  • Discuss whether and if so how caterva/blosc2 would make use of the API.

Do you mean caterva/blosc2 as a sharding-container? I guess one could choose caterva chunks as a container format.

  • Consider necessary changes to the current V3 PRs (cc: @grlee77)
  • Capture design choices in v2.rst
  • Explore the same changes in the clean-room Zarrita (V3) (cc: @alimanfoo)

All of those points (also the two above) sound great! Are they already prioritized? Then I'd like to start working on them tomorrow and after the holidays again, hoping that this will allow for a better informed decision.

  • Evaluate whether or not the new API addition would allow for a more complete implementation of read_partial (cc: @andrewfulton9 @martindurant), e.g. by detecting a blosc-compressed chunk and returning an offset.

I think the sharding implementation would help with read_partial, since the store (and maybe array) implementations will need to support this for sharding support. For a even better read_partial support the compressor should be able to request specific ranges to read particular offsets, which the Array would need to query before requing the data from the store. But I think this discussion should be decoupled from sharding.

@jstriebel
Copy link
Member Author

@joshmoore To provide a preview of an implementation of sharding in the Array class I added a new branch:
master...scalableminds:sharding-on-array

To keep the communication focused in a single place I did not open a commit (yet), but feel free to open this as a PR if you want to comment there directly. It's just a prototype like this PR and not fully fleshed out. What do you think about it?

Also, I spent some time digging into the caterva and blosc2 python interfaces. Using them "only" for sharding is hard to achieve, since they (or at least their current python interface) don't allow to store arbitrary-length bytearrays. So there are two ways that might make sense, both coupling sharding directly to the blosc2 compression:

  • zarr could try to hook into intermediate representations of caterva and blosc2 (e.g. superchunk-buffers) and interpret and concatenate those the same way caterva does. This would mean to introduce a number of special cases and re-writing logic from caterva to achieve a similar result, but using the zarr storage logic.
  • zarr could use caterva (or just blosc2 if sharding is not needed there) as another compression, also with sharding possibilities. Sharding will only have a major effect if the partial_read codepath is adapted to caterva, so the caterva indexing logic should either be exposed via a Python interface or needs to be reimplemented.

I'll try to work on another prototype for the latter possibility, but without the partial_read support so far.
The step afterwards will be to adapt zarrita. I guess adapting v2.rst might only make sense when we settled on the design.

@joshmoore
Copy link
Member

joshmoore commented Jan 17, 2022

@jstriebel: apologies for dropping the ball on this. I imagine it will be best to open it as a PR so the same doesn't happen to others (and to proactively encourage comments). Probably best if you could do that so you can also close, re-open, etc. but reading has commenced. 👍

Edit: there was also some interest in possibly discussing during the Jan. 26 community meeting if that would be an option.

@jstriebel jstriebel changed the title Initial Sharding Prototype Sharding Prototype I: implementation as translating Store Jan 19, 2022
@jstriebel
Copy link
Member Author

@joshmoore Sure, I opened #947 with the implementation on the Array.

Regarding the community call: I'd be happy to join! At what time is the call? Unfortunately I'm only available before 19:00 CET next Wednesday, or after 22:00.

@joshmoore
Copy link
Member

joshmoore commented Jan 20, 2022

Unfortunately I'm only available before 19:00 CET next Wednesday, or after 22:00.

Ah, too bad. It's at 20:00 CET. (Other coordinates are in zarr-developers/community#1)

@jstriebel
Copy link
Member Author

jstriebel commented Jan 25, 2022

@joshmoore I added the meeting in 2 weeks to my calendar 📆

Explore the same changes in the clean-room Zarrita (V3) (cc: @alimanfoo)

To keep it simple I added a minimal implementation of sharding support in alimanfoo/zarrita#40.

@mkitti
Copy link

mkitti commented Apr 5, 2022

Another thing to consider is putting the shard index in a separate file/key, and allow it to specify the key for individual chunks.

I was just reading that @ajelenak had previously proposed .zchunkstore in #556 where offset and size are detailed within a JSON file.

@manzt has previously written about that proposal and used it to implement OME-TIFF support via Zarr.

Given the similarities between this @jbms 's Format 2 proposal, I wonder if some convergence is possible even though the prior proposal is talking about file chunk stores representing the whole array as opposed to shards representing part of an array.

@martindurant
Copy link
Member

You might find this thread interesting on creating zarr chunks by concatenating several other bytes chunks - meant for uncompressed array primarily fsspec/kerchunk#134

@jbms
Copy link

jbms commented Apr 5, 2022

As far as using a separate key/file in the underlying store for the shard index, rather than putting it at the beginning or end of the data file:

An early version of the neuroglancer_precomputed shared format did that: there was xx.shard and xx.index.

The major downside is that when overwriting an existing shard, assuming you can't atomically update multiple keys at once (a limitation of many underlying stores) then you have to write one key or the other first, and if a failure then happens before writing the second key you end up in an inconsistent state.

There are ways to mitigate this issue, e.g. having the index file specify the name of the data file rather than just the offset and size but I think it is valuable to keep this initial sharding format simple.

@mkitti
Copy link

mkitti commented Apr 5, 2022

I am mainly thinking about this in terms of deconfliction and normalization between the two proposals, and perhaps even fsspec/kerchunk#134, since #556 is still an open issue. It looks like an internal chunk index is technically superior as @jbms detailed.

Perhaps there is some utility for some applications to export the chunk index as JSON to facilitate some read-only applications?

To deconflict and normalize the proposals:

  1. If both the internal chunk index and a .zchunkindex exists, the internal chunk index always takes precedence.
  2. If only one index exists, it should be the internal chunk index.
  3. Normalize the term "length" over "size" to describe the number of bytes contained within a chunk.

Alternatively, we could close #556 and see this issue and #877 as superseding that proposal entirely.

@mkitti
Copy link

mkitti commented Apr 5, 2022

3. Normalize the term "length" over "size" to describe the number of bytes contained within a chunk.

Perhaps nbytes would be the more descriptive and less ambiguous than either term? nbytes also has a precedent.

@jstriebel
Copy link
Member Author

Regarding compatibility / deconflicting the proposals @mkitti: The storage transformer specification from zarr-developers/zarr-specs#134 could handle this quite nicely. If #556 would be added as a transformer, we could (theoretically) use both techniques on top of each other (both ways), as well as having either one or the other, if the json mirrors the sharding-indices laid out here. For pure sharding, I'm not quite sure how useful this could be (but I like that the sharding transformer terminology can describe the possible interactions well nonetheless). #556 seems relevant beyond the sharding feature when addressing compatibility with other formats, as arbitrary offsets can be added to existing files this way.

Perhaps nbytes would be the more descriptive and less ambiguous than either term? nbytes also has a precedent.

Sounds good 👍 For the moment, I assume that the most important PR is zarr-developers/zarr-specs#134, I'll adapt it there tomorrow. Also happy about other feedback there! Once the spec is ready, I'll add another PR here, building on top of v3 and using the terminology from the specs.

@jstriebel
Copy link
Member Author

A similar proposal is now formalized as a Zarr Enhancement Proposal, ZEP 2. The basic sharding concept and the binary representation is the same, however it is formalized as a storage transformer for Zarr v3.

Further feedback is very welcome! Either

I'll close this PR to consolidate the open sharding threads, please feel free to continue any discussions or add feedback on the PRs mentioned above.

@jstriebel jstriebel closed this Aug 22, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.