From ad0ee26c64ebc05be9d1e6fa16a632970b7d4933 Mon Sep 17 00:00:00 2001 From: KN4CK3R Date: Fri, 25 Nov 2022 06:47:46 +0100 Subject: [PATCH] Workaround for container registry push/pull errors (#21862) This PR addresses #19586 I added a mutex to the upload version creation which will prevent the push errors when two requests try to create these database entries. I'm not sure if this should be the final solution for this problem. I added a workaround to allow a reupload of missing blobs. Normally a reupload is skipped because the database knows the blob is already present. The workaround checks if the blob exists on the file system. This should not be needed anymore with the above fix so I marked this code to be removed with Gitea v1.20. Co-authored-by: Lunny Xiao --- integrations/api_packages_container_test.go | 28 +++++++++++++++++++ modules/packages/content_store.go | 7 +++++ routers/api/packages/container/blob.go | 31 ++++++++++++++++++++- routers/api/packages/container/container.go | 27 ++++++++++++++++-- routers/api/packages/container/manifest.go | 11 ++++++++ 5 files changed, 100 insertions(+), 4 deletions(-) diff --git a/integrations/api_packages_container_test.go b/integrations/api_packages_container_test.go index ad987a662e979..08ca49cd6697f 100644 --- a/integrations/api_packages_container_test.go +++ b/integrations/api_packages_container_test.go @@ -6,10 +6,12 @@ package integrations import ( "bytes" + "crypto/sha256" "encoding/base64" "fmt" "net/http" "strings" + "sync" "testing" "code.gitea.io/gitea/models/db" @@ -549,6 +551,32 @@ func TestPackageContainer(t *testing.T) { }) } + // https://github.com/go-gitea/gitea/issues/19586 + t.Run("ParallelUpload", func(t *testing.T) { + defer PrintCurrentTest(t)() + + url := fmt.Sprintf("%sv2/%s/parallel", setting.AppURL, user.Name) + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + + content := []byte{byte(i)} + digest := fmt.Sprintf("sha256:%x", sha256.Sum256(content)) + + go func() { + defer wg.Done() + + req := NewRequestWithBody(t, "POST", fmt.Sprintf("%s/blobs/uploads?digest=%s", url, digest), bytes.NewReader(content)) + addTokenAuthHeader(req, userToken) + resp := MakeRequest(t, req, http.StatusCreated) + + assert.Equal(t, digest, resp.Header().Get("Docker-Content-Digest")) + }() + } + wg.Wait() + }) + t.Run("OwnerNameChange", func(t *testing.T) { defer PrintCurrentTest(t)() diff --git a/modules/packages/content_store.go b/modules/packages/content_store.go index a3a5d1a6663c8..b0e653a102018 100644 --- a/modules/packages/content_store.go +++ b/modules/packages/content_store.go @@ -30,6 +30,13 @@ func (s *ContentStore) Get(key BlobHash256Key) (storage.Object, error) { return s.store.Open(KeyToRelativePath(key)) } +// FIXME: Workaround to be removed in v1.20 +// https://github.com/go-gitea/gitea/issues/19586 +func (s *ContentStore) Has(key BlobHash256Key) error { + _, err := s.store.Stat(KeyToRelativePath(key)) + return err +} + // Save stores a package blob func (s *ContentStore) Save(key BlobHash256Key, r io.Reader, size int64) error { _, err := s.store.Save(KeyToRelativePath(key), r, size) diff --git a/routers/api/packages/container/blob.go b/routers/api/packages/container/blob.go index 8a9cbd4a15fbf..33710472ceb47 100644 --- a/routers/api/packages/container/blob.go +++ b/routers/api/packages/container/blob.go @@ -7,8 +7,11 @@ package container import ( "context" "encoding/hex" + "errors" "fmt" + "os" "strings" + "sync" "code.gitea.io/gitea/models/db" packages_model "code.gitea.io/gitea/models/packages" @@ -19,6 +22,8 @@ import ( packages_service "code.gitea.io/gitea/services/packages" ) +var uploadVersionMutex sync.Mutex + // saveAsPackageBlob creates a package blob from an upload // The uploaded blob gets stored in a special upload version to link them to the package/image func saveAsPackageBlob(hsr packages_module.HashedSizeReader, pi *packages_service.PackageInfo) (*packages_model.PackageBlob, error) { @@ -28,6 +33,11 @@ func saveAsPackageBlob(hsr packages_module.HashedSizeReader, pi *packages_servic contentStore := packages_module.NewContentStore() + var uploadVersion *packages_model.PackageVersion + + // FIXME: Replace usage of mutex with database transaction + // https://github.com/go-gitea/gitea/pull/21862 + uploadVersionMutex.Lock() err := db.WithTx(func(ctx context.Context) error { created := true p := &packages_model.Package{ @@ -68,11 +78,30 @@ func saveAsPackageBlob(hsr packages_module.HashedSizeReader, pi *packages_servic } } + uploadVersion = pv + + return nil + }) + uploadVersionMutex.Unlock() + if err != nil { + return nil, err + } + + err = db.WithTx(func(ctx context.Context) error { pb, exists, err = packages_model.GetOrInsertBlob(ctx, pb) if err != nil { log.Error("Error inserting package blob: %v", err) return err } + // FIXME: Workaround to be removed in v1.20 + // https://github.com/go-gitea/gitea/issues/19586 + if exists { + err = contentStore.Has(packages_module.BlobHash256Key(pb.HashSHA256)) + if err != nil && errors.Is(err, os.ErrNotExist) { + log.Debug("Package registry inconsistent: blob %s does not exist on file system", pb.HashSHA256) + exists = false + } + } if !exists { if err := contentStore.Save(packages_module.BlobHash256Key(pb.HashSHA256), hsr, hsr.Size()); err != nil { log.Error("Error saving package blob in content store: %v", err) @@ -83,7 +112,7 @@ func saveAsPackageBlob(hsr packages_module.HashedSizeReader, pi *packages_servic filename := strings.ToLower(fmt.Sprintf("sha256_%s", pb.HashSHA256)) pf := &packages_model.PackageFile{ - VersionID: pv.ID, + VersionID: uploadVersion.ID, BlobID: pb.ID, Name: filename, LowerName: filename, diff --git a/routers/api/packages/container/container.go b/routers/api/packages/container/container.go index b961cd4afb393..acbe9670004e6 100644 --- a/routers/api/packages/container/container.go +++ b/routers/api/packages/container/container.go @@ -10,6 +10,7 @@ import ( "io" "net/http" "net/url" + "os" "regexp" "strconv" "strings" @@ -193,7 +194,7 @@ func InitiateUploadBlob(ctx *context.Context) { mount := ctx.FormTrim("mount") from := ctx.FormTrim("from") if mount != "" { - blob, _ := container_model.GetContainerBlob(ctx, &container_model.BlobSearchOptions{ + blob, _ := workaroundGetContainerBlob(ctx, &container_model.BlobSearchOptions{ Image: from, Digest: mount, }) @@ -361,7 +362,7 @@ func getBlobFromContext(ctx *context.Context) (*packages_model.PackageFileDescri return nil, container_model.ErrContainerBlobNotExist } - return container_model.GetContainerBlob(ctx, &container_model.BlobSearchOptions{ + return workaroundGetContainerBlob(ctx, &container_model.BlobSearchOptions{ OwnerID: ctx.Package.Owner.ID, Image: ctx.Params("image"), Digest: digest, @@ -503,7 +504,7 @@ func getManifestFromContext(ctx *context.Context) (*packages_model.PackageFileDe return nil, container_model.ErrContainerBlobNotExist } - return container_model.GetContainerBlob(ctx, opts) + return workaroundGetContainerBlob(ctx, opts) } // https://github.com/opencontainers/distribution-spec/blob/main/spec.md#checking-if-content-exists-in-the-registry @@ -643,3 +644,23 @@ func GetTagList(ctx *context.Context) { Tags: tags, }) } + +// FIXME: Workaround to be removed in v1.20 +// https://github.com/go-gitea/gitea/issues/19586 +func workaroundGetContainerBlob(ctx *context.Context, opts *container_model.BlobSearchOptions) (*packages_model.PackageFileDescriptor, error) { + blob, err := container_model.GetContainerBlob(ctx, opts) + if err != nil { + return nil, err + } + + err = packages_module.NewContentStore().Has(packages_module.BlobHash256Key(blob.Blob.HashSHA256)) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + log.Debug("Package registry inconsistent: blob %s does not exist on file system", blob.Blob.HashSHA256) + return nil, container_model.ErrContainerBlobNotExist + } + return nil, err + } + + return blob, nil +} diff --git a/routers/api/packages/container/manifest.go b/routers/api/packages/container/manifest.go index 8beed3dbb7296..c63d9363dfe28 100644 --- a/routers/api/packages/container/manifest.go +++ b/routers/api/packages/container/manifest.go @@ -6,8 +6,10 @@ package container import ( "context" + "errors" "fmt" "io" + "os" "strings" "code.gitea.io/gitea/models/db" @@ -403,6 +405,15 @@ func createManifestBlob(ctx context.Context, mci *manifestCreationInfo, pv *pack log.Error("Error inserting package blob: %v", err) return nil, false, "", err } + // FIXME: Workaround to be removed in v1.20 + // https://github.com/go-gitea/gitea/issues/19586 + if exists { + err = packages_module.NewContentStore().Has(packages_module.BlobHash256Key(pb.HashSHA256)) + if err != nil && errors.Is(err, os.ErrNotExist) { + log.Debug("Package registry inconsistent: blob %s does not exist on file system", pb.HashSHA256) + exists = false + } + } if !exists { contentStore := packages_module.NewContentStore() if err := contentStore.Save(packages_module.BlobHash256Key(pb.HashSHA256), buf, buf.Size()); err != nil {