Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for resumable downloads; clean up ingest directory; add --concurrency flag #408

Merged
merged 9 commits into from
Jul 16, 2024

Conversation

amisevsk
Copy link
Contributor

Description

Add support for resuming downloads that were cancelled/hit an error in an earlier invocation. Doing this required implementing our own Pull routine, separate from oras.Copy, which is used elsewhere.

If a server responds with header Accept-Ranges: "bytes", treat the download as resumable; otherwise, pull behaves as before. For resumable downloads:

  • Files are saved to $KITOPS_HOME/storage/ingest/<blob-digest>
  • If a file exists, we first read it to get a digest from the start to where the download ended, then seek the download reader to that point and resume downloading
  • Once completed, we verify the digest and move it to the normal location

There's some risk of corruption if e.g. two separate kit pulls are pulling the same resumable download simultaneously, though I think the risk is fairly low and I'm not sure we care to support running kit in parallel at this moment.

When a pull is completed, all data in the ingest directory is removed. This is to ensure we don't leave files lying around with no easy path to their removal, which can happen with non-resumable downloads that are e.g. cancelled via ctrl-C. This means that completing a pull effectively cancels all other downloads.

Finally, since I had to add reimplement concurrency for the pull operation, I've added a --concurrency flag to allow configuring it. It's available on all commands that take the standard network options, but is currently only used for pull and push.

Linked issues

Closes #311
Closes #387

Copy link
Contributor

@gorkem gorkem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still testing but a few small issues to fix

@@ -43,6 +44,7 @@ func (o *NetworkOptions) AddNetworkFlags(cmd *cobra.Command) {
fmt.Sprintf("Path to client certificate used for authentication (can also be set via environment variable %s)", constants.ClientCertEnvVar))
cmd.Flags().StringVar(&o.ClientCertKeyPath, "key", "",
fmt.Sprintf("Path to client certificate key used for authentication (can also be set via environment variable %s)", constants.ClientCertKeyEnvVar))
cmd.Flags().IntVar(&o.Concurrency, "concurrency", 5, "Maximum number of simultaeous uploads/downloads")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

Suggested change
cmd.Flags().IntVar(&o.Concurrency, "concurrency", 5, "Maximum number of simultaeous uploads/downloads")
cmd.Flags().IntVar(&o.Concurrency, "concurrency", 5, "Maximum number of simultaneous uploads/downloads")

@@ -58,6 +60,9 @@ func (o *NetworkOptions) Complete(ctx context.Context, args []string) error {
if certKeyPath := os.Getenv(constants.ClientCertKeyEnvVar); certKeyPath != "" {
o.ClientCertKeyPath = certKeyPath
}
if o.Concurrency < 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a max concurrency?

Copy link
Contributor Author

@amisevsk amisevsk Jul 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. We don't have a lot of the usual number-of-threads concurrency worries since there's at most one goroutine per layer and if it crashes you can just rerun it with a sane number.

We're not allocating a threadpool or anything, so saying 999999 here would have no real effect.

modelRef *registry.Reference
configHome string
modelRef *registry.Reference
Concurrency int64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be int?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah this one is actually unused/redeclares the one in the NetworkOptions block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

}
logger.Wait()

return desc, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return desc, err
return desc, nil

@@ -123,7 +125,7 @@ func (r *localRepo) GetRepoName() string {
}

func (r *localRepo) BlobPath(desc ocispec.Descriptor) string {
return filepath.Join(r.storagePath, "blobs", "sha256", desc.Digest.Encoded())
return filepath.Join(r.storagePath, ocispec.ImageBlobsDir, desc.Digest.Algorithm().String(), desc.Digest.Encoded())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something I want to check, if we can easily support different digest levels. Filed #414


func (l *localRepo) ensurePullDirs() error {
blobsPath := filepath.Join(l.storagePath, ocispec.ImageBlobsDir, "sha256")
if err := os.MkdirAll(blobsPath, 0777); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it work if we had 0755 as permissions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would, this is just copying what oras-go does.

func (l *localRepo) resumeAndDownloadFile(desc ocispec.Descriptor, blob io.ReadSeekCloser, p *output.PullProgress) error {
ingestDir := constants.IngestPath(l.storagePath)
ingestFilename := filepath.Join(ingestDir, desc.Digest.Encoded())
ingestFile, err := os.OpenFile(ingestFilename, os.O_CREATE|os.O_RDWR, 0666)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would 0644 work as permission?

Copy link
Contributor

@gorkem gorkem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

works as expected.

Add basic implementation for PullModel, which pulls a modelkit from a
remote reference. This is basically equivalent to the Copy() utility
function in the oras library, which uses Fetch() and Push(); however,
this base allows us control over how files are saved and enables
implementing better cleanup and resumable downloads.
If a remote response with Accept-Ranges: "bytes", treat downloads as
resumable. In this case:

* Temporary ingest file is saved with the digest name in ingest
  directory
* If a file already exists, seek to the end of it and seek to that point
  in the download
* If a download is cancelled or encounters an error, the ingest file is
  not removed

To avoid excessive slowdown, instead of seeking to the end of the file,
we read it into a digester to resume calculating the digest of the
downloaded data at the same time.
If a pull is cancelled via signal (instead of an error), the ingest file
will not be cleaned up. To avoid accrual of partially-downloaded ingest
files (generated by the non-resumable download case), clear the ingest
directory on every successful pull.

This may remove a partially-completed resumable download, but this
should only occur if the user downloads some _other_ modelkit before
attempting to resume the download.
Since we're no longer using a basic copy operation, we need to redo
progress bars for pull from scratch.
Add flag --concurrency to allow configuring maximum number of
simultaneous uploads/downloads. Default value is 5.
@amisevsk amisevsk merged commit fc06105 into jozu-ai:main Jul 16, 2024
3 checks passed
@amisevsk amisevsk deleted the resumable-download branch July 16, 2024 17:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants