Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream directly to packed during verdi archive import #6417

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

GeigerJ2
Copy link
Contributor

@GeigerJ2 GeigerJ2 commented May 24, 2024

This is a very early proof-of-concept implementation of streaming repository files directly to packed during an archive import.

Especially for large archives, in particular, with many (small) repository files, this could provide significant speed-up as it reduces the required file open and close operations when writing to the target repository. While many such operations to obtain the streams from the ZipfileBackendRepository can probably not be avoided, they can be significantly reduced by streaming to packed instead of loose, as the latter would require the same number again as for reading the contents, while the former would require just one per pack (~4gb file).

I just ran the command for the
acwf-verification_unaries-verification-PBE-v1-results_quantum_espresso-SSSP-1.3-PBE-precision.aiida archive (2gb, ~60k repository files)
and the
MC3D-provenance.aiida archive (12gb, ~840k repository files)
In the first case, the total runtime of the verdi archive import command was 176s and 107s when streaming to loose or packed respectively, while in the second case they were 787s and 728s. Here, I was expecting a larger speed-up, but this might originate from the non-ideal preliminary implementation and benchmarking. Overall, I think this could be a promising feature when done properly.

A few notes regarding the implementation:

  • I first obtain all hashes from the source repository (ZipFileBackendRepository) via repository_from.list_objects() and then retrieve the contents as BytesIO via repository_from.get_object_content(). I didn't use the repository_from.iter_object_streams() as that returns ZipExtFiles which couldn't be opened outside the scope of a context manager, and, importantly, when passing to Container.add_streamed_objects_to_pack().
  • While absolutely necessary, no validation on the size is currently being done. Therefore, all streams are opened and in memory. This works on the PSI workstation with ~250gb of memory, but would otherwise crash most normal workstations. I haven't found a method of the ZipfileBackendRepository that returns the hashes and contents in batches (rather than one by one, such as iter_object_streams()) so that might be something we have to implement from scratch for this feature, either based on the number of files, or their total size for each batch, using sensible defaults.

The remaining todos to have a proper implementation (ties in with the notes above):

  • Use more suitable ZipFileBackendRepository and disk_objectstore Container methods if applicable and available (e.g. I saw there is also add_objects_to_pack in addition to add_streamed_objects_to_pack)
  • Return hashes and file contents from the source repository in batches to avoid memory overloads and improve performance, as well as implement safety measures
  • Proper benchmarking on the time savings that streaming to packed can provide (only timing the repository file additions)
  • Logic that somewhat automatically determines where to stream based on the total file size and/or number of repository files that are being imported (while still being able to manually overwrite it?)
  • Properly check if the profile is being locked during the operation (this is something I have just ignored until now. It might already automatically being taken care of by the existing infrastructure?)
  • Ensure that all repository connections are properly being opened and closed, e.g. by using the relevant context managers

Also note that I added the verdi profile flush command as a convenience command to clean the data from a profile, which I use now during development. This can be moved into a separate PR if we deem it useful, or eventually be removed from this PR if not.

Pinging @khsrali as he voiced his interest in being involved in the disk_objectstore, @agoscinski as we looked into this together for a while, @mbercx to keep in the loop, and @sphuber and @giovannipizzi with expertise on the backend implementations who can likely provide crucial pointers. I'm still familiarizing myself with the backend repository implementations and the disk_objectstore, so any such pointers are more than welcome. I'll keep working on these to-dos on my branch, updating the PR here along the way.

@GeigerJ2 GeigerJ2 force-pushed the feature/import-stream-to-packed branch from 573218d to 5a2aea5 Compare May 24, 2024 16:12
@GeigerJ2 GeigerJ2 self-assigned this May 24, 2024
@giovannipizzi
Copy link
Member

Thanks a lot! I'm happy to provide you with more technical details. In particular I think we need to properly take care of locking, it will not be automatic. But most importantly, it's essential to go via streams and not via get_object, as you say it will put all in memory and it will crash for few large objects in a machine with limited memory (think to a 4gb file on a small machine). With streams properly used, memory is limited to few MB. I think I had to solve the same problem of using properly context managers in disk object store, so we can look at the issue together. Also, the performance benefit for me is just not yet enough to merit making aiida more complex on this, I was hoping something better? If we really only gain 10%, let's discuss but we maybe drop it. If instead we can play a bit and get significant benefits, then it becomes relevant. Maybe instead of working directly on aiida, we can do a mock example where we just stream the files from the AiiDA zip file to a disk objectstore with the two approaches, to see if there is a major improvement or not. Happy to discuss this further

@giovannipizzi
Copy link
Member

giovannipizzi commented May 29, 2024

I decided to go to the lowest level and do a bit of time comparison, to see if it's worth looking into writing (optionally) directly to packs when importing (that will however mean you can't do much on the profile, but it's OK when importing large DBs e.g. right after setting up a profile, or in special cases when people know what they want to do).

And I hope I didn't burn my laptop HD doing these tests :-D (as a reference I used a SDD, not a rotating HD, and this was on a Intel 2019 MacBook Pro, with 32GB of RAM).

TL;DR: I think it's worth to do it, and if done properly (do not read objects in memory but use properly streams), you can get a speed up of ~3x (between 1.7x and 7x depending on what you measure).

Results of my analysis, where I consider the MC3D-provenance.aiida file mentioned by Julian, after migrating it with a recent version of AiiDA (AiiDA version 2.5.1.post0).

The code that I used (possibly changing the values of the variables directly_to_packs and add_max) is this:

import pathlib
import zipfile
import disk_objectstore as dostore
import time

directly_to_packs = False

#file = pathlib.Path('test.aiida')
file = pathlib.Path('MC3D-provenance-migrated.aiida')
add_max = 1000000 # Just if you want to try the code on a few objects first

container = dostore.Container('test_container')
container.init_container(clear=True)

subfolder = 'repo/'

if False:
    with zipfile.ZipFile(file) as myzip:
        added_counter = 0
        for fname in myzip.namelist():
            if not fname.startswith(subfolder):
                continue
            added_counter += 1
    print(f"Number of files in the .aiida file: {added_counter}")

t = time.time()
with zipfile.ZipFile(file) as myzip:
    added_counter = 0
    if directly_to_packs:
        # Create list of file paths - TODO: batch in groups - TODO: check if `.namelist` reads in the order files are written on disk in the .zip
        fhs = []
        for fname in myzip.namelist():
            if not fname.startswith(subfolder):
                continue
            fhs.append(myzip.open(fname))
            added_counter += 1
            if added_counter >= add_max:
                break
        container.add_streamed_objects_to_pack(fhs)
        # Close all now
        for fh in fhs:
            fh.close()
    else:
        for fname in myzip.namelist():
            if not fname.startswith(subfolder):
                continue
            added_counter += 1
            if added_counter >= add_max:
                break

            # In addition to different speed, here we are putting all in memory! Not very good
            with myzip.open(fname) as fh:
                data = fh.read()
            container.add_object(data)
elapsed = time.time() - t
print(f"Elapsed time: {elapsed} s")

print(container.count_objects())

Results

As a note, the total number of files/objects in the .aiida file: 837443

Limiting to 100,000 objects only

For time reasons, I first do most test limiting only to the first 100,000 objects.
At the end I just some more tests on the whole file.

Directly adding to packs

Note: it's not compressing anything, the heuristics is not implemented in the add_streamed_objects_to_pack function yet, so anyway it's a fair comparison with importing to loose (not clear if the performance loss for making the heuristics is worth it, wrt just asking to repack full at the end, since anyway add_streamed_objects_to_pack could leave some "holes" in the packs - so it's possible it's not something we want to necessarily implement).

  • Run 1: Elapsed time: 49.087523221969604 s
  • Run 2: Elapsed time: 42.49984383583069 s
  • Run 3: Elapsed time: 41.42906975746155 s
  • Run 4: Elapsed time: 42.04715013504028 s
  • Run 5: Elapsed time: 41.114356994628906 s

Final check on dostore: ObjectCount(packed=100000, loose=0, pack_files=2)

Output of dostore status:

"size": {
    "total_size_packed": 5772426617,
    "total_size_packed_on_disk": 5772426617,
    "total_size_packfiles_on_disk": 5772426617,
    "total_size_packindexes_on_disk": 17350656,
    "total_size_loose": 0
}

As an order of magnitude, running once full repacking with heuristics.
Note that the dostore cli only supports always compressing or always not compressing (to be improved!) so I use this in python instead:

import disk_objectstore as dostore
import time

container = dostore.Container('test_container')
t = time.time()
container.repack(compress_mode=dostore.CompressMode.AUTO)
elapsed = time.time() - t
print(f"Elapsed time: {elapsed} s")

Result: Elapsed time: 118.65187788009644 s
and the result do dostore status is:

"size": {
    "total_size_packed": 5772426617,
    "total_size_packed_on_disk": 1636178318,
    "total_size_packfiles_on_disk": 1636178318,
    "total_size_packindexes_on_disk": 16228352,
    "total_size_loose": 0
}

(note: if you do the repacking with dostore.CompressMode.KEEP, repacking only takes ~11s)

As a comparison, compressing EVERYTHING directly from the beginning, with the test speed script (setting compress=True in the call to add_streamed_objects_to_pack: Elapsed time: 89.36299085617065 s
And size statistics give:

  "size": {
    "total_size_packed": 5772426617,
    "total_size_packed_on_disk": 1624178250,
    "total_size_packfiles_on_disk": 1624178250,
    "total_size_packindexes_on_disk": 17141760,
    "total_size_loose": 0
  }

(The heuristics takes time as it has to try partial compression, with the hope of avoiding to compress certain files and thus speeding up any further reading from those - particularly valuable for large already compressed files, in the other cases it's not so important)

To loose directly (without any final repacking)

  • Run 1: Elapsed time: 122.33692216873169 s
  • Run 2: Elapsed time: 125.46979999542236 s

Final check on do store: ObjectCount(packed=0, loose=99999, pack_files=0)
From dostore status ("compression": "zlib+1")

"size": {
    "total_size_packed": 0,
    "total_size_packed_on_disk": 0,
    "total_size_packfiles_on_disk": 0,
    "total_size_packindexes_on_disk": 12288,
    "total_size_loose": 5772425753
}

As an order of magnitude, running once time dostore -p test_container optimize -n (note that this also compresses!):

Initial container size: 5772438.23 Mb
Final container size: 1657726.76 Mb

real	2m52.818s (172.8s)

After repacking:

"size": {
    "total_size_packed": 5772425753,
    "total_size_packed_on_disk": 1624178095,
    "total_size_packfiles_on_disk": 1624178095,
    "total_size_packindexes_on_disk": 16228352,
    "total_size_loose": 0
}

Summary on 100,000 objects

  • Only considering writing: 123/42 = 2.9x slower without directly to packs
  • Considering writing + packing (and compressing only in the first case): (123+172)/42 = 7x slower without directly to packs
  • Comparing writing + packing (+compressing), wrt directly to packs + repacking with heuristics: (123+172)/(42+118) = 295/160 = 1.8x slower without directly to packs

All objects

Directly adding to packs

  • Run 1: Elapsed time: 378.8354449272156 s (6 min 18 sec)

Final check on dostore: ObjectCount(packed=837443, loose=0, pack_files=13)

Output of dostore status:

"size": {
    "total_size_packed": 52052617561,
    "total_size_packed_on_disk": 52052617561,
    "total_size_packfiles_on_disk": 52052617561,
    "total_size_packindexes_on_disk": 145559552,
    "total_size_loose": 0
}

To loose directly (without any final repacking)

  • Run 1: Elapsed time: 1118.8799998760223 s

Final check on dostore: ObjectCount(packed=0, loose=837443, pack_files=0)

Output of dostore status (that, BTW, takes several tens of seconds to run (more than 1 min), I guess because it has to run through folders with almost 1 million loose objects - showing once more the importance of disk-objectstore and packing for large repos, and that probably a fair comparison is indeed the one where, after importing to loose, you then also repack the repo):

"size": {
    "total_size_packed": 0,
    "total_size_packed_on_disk": 0,
    "total_size_packfiles_on_disk": 0,
    "total_size_packindexes_on_disk": 12288,
    "total_size_loose": 52052617561
}

Summary on all 837,443 objects

  • Only considering writing: 1119/379 = 2.9x slower without directly to packs
    Note: Did not collect data yet on the other cases discussed above (including repacking for loose, and repacking with AUTO compression heuristics for directly_to_packs), but from the result above I think we would get very similar speed-ups/slow-downs.

As an order of magnitude, running once time dostore -p test_container optimize -n (note that this also compresses!):

Initial container size: 52052671.01 Mb
Final container size: 14956169.24 Mb

real    41m14.106s (~2474s)

Therefore: Considering writing + packing (and compressing only in the first case): (2474+1119)/379 = 9.5x slower without directly to packs (but a bit unfair as in the first case I'm also compressing)

After repacking:

"size": {
    "total_size_packed": 52052617561,
    "total_size_packed_on_disk": 14680432820,
    "total_size_packfiles_on_disk": 14680432820,
    "total_size_packindexes_on_disk": 137297920,
    "total_size_loose": 0
}

Conclusion

To conclude, I suggest that you try to do another test properly using the methods above (specifically add_streamed_objects_to_pack passing a list of open handles to the files inside the zip, as I do above) and see if you start getting a 2-3x speedup (otherwise we need to see where is the slowdown).

IMPORTANT NOTE: it would be very important to check that the order in which you get the list of files (not only directly with zipfile.ZipFile.namelist(), but also via the current AiiDA wrappers around the .aiida file). It must be the same order in which compressed files are written inside the .zip file. If you start reading in a random order from disk instead, you easily can lose even an order of magnitude or more of time (I had to optimise this in disk-objectstore as well, this is why some methods are a bit more complex to use, we need the LazyOpener etc. - this should be documented I think)

@GeigerJ2
Copy link
Contributor Author

Wow, thanks a lot, @giovannipizzi! This is extremely useful! Despite the initial timings, I've also been quite convinced that the problem was the early implementation and that it would be worth it, if done properly. This definitely gives me a good state to continue with it. I'll report back here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants