Skip to content

Commit

Permalink
feat: add context usage to libmirror push operations
Browse files Browse the repository at this point in the history
Signed-off-by: Maxim Vasilenko <maksim.vasilenko@flant.com>
  • Loading branch information
Maxim Vasilenko committed Nov 20, 2024
1 parent 5ba39bb commit b83ed67
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 55 deletions.
5 changes: 3 additions & 2 deletions pkg/libmirror/layouts/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package layouts

import (
"context"
"fmt"
"path"
"strings"
Expand Down Expand Up @@ -184,8 +185,8 @@ func PullImageSet(
err = retry.RunTask(
pullCtx.Logger,
fmt.Sprintf("[%d / %d] Pulling %s ", pullCount, totalCount, imageReferenceString),
task.WithConstantRetries(5, 10*time.Second, func() error {
img, err := remote.Image(ref, remoteOpts...)
task.WithConstantRetries(5, 10*time.Second, func(ctx context.Context) error {
img, err := remote.Image(ref, append(remoteOpts, remote.WithContext(ctx))...)
if err != nil {
if errorutil.IsImageNotFoundError(err) && pullOpts.allowMissingTags {
pullCtx.Logger.WarnLn("⚠️ Not found in registry, skipping pull")
Expand Down
93 changes: 60 additions & 33 deletions pkg/libmirror/layouts/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@ limitations under the License.
package layouts

import (
"context"
"errors"
"fmt"
"os"
"sync"
"time"

"github.com/google/go-containerregistry/pkg/authn"
"github.com/google/go-containerregistry/pkg/name"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/layout"
"github.com/google/go-containerregistry/pkg/v1/remote"
"github.com/hashicorp/go-multierror"
"github.com/samber/lo"
"github.com/samber/lo/parallel"

Expand All @@ -46,6 +48,27 @@ func PushLayoutToRepo(
logger contexts.Logger,
parallelismConfig contexts.ParallelismConfig,
insecure, skipVerifyTLS bool,
) error {
return PushLayoutToRepoContext(
context.Background(),
imagesLayout,
registryRepo,
authProvider,
logger,
parallelismConfig,
insecure,
skipVerifyTLS,
)
}

func PushLayoutToRepoContext(
ctx context.Context,
imagesLayout layout.Path,
registryRepo string,
authProvider authn.Authenticator,
logger contexts.Logger,
parallelismConfig contexts.ParallelismConfig,
insecure, skipVerifyTLS bool,
) error {
refOpts, remoteOpts := auth.MakeRemoteRegistryRequestOptions(authProvider, insecure, skipVerifyTLS)
if parallelismConfig.Blobs != 0 {
Expand Down Expand Up @@ -73,7 +96,9 @@ func PushLayoutToRepo(
tag := manifestSet[0].Annotations["io.deckhouse.image.short_tag"]
imageRef := registryRepo + ":" + tag
logger.InfoF("[%d / %d] Pushing image %s", imagesCount, len(indexManifest.Manifests), imageRef)
pushImage(logger, registryRepo, index, imagesCount, refOpts, remoteOpts)(manifestSet[0], 0)
if err = pushImage(ctx, registryRepo, index, manifestSet[0], refOpts, remoteOpts); err != nil {
return fmt.Errorf("Push Image: %w", err)
}
imagesCount += 1
continue
}
Expand All @@ -84,59 +109,61 @@ func PushLayoutToRepo(
logger.InfoF("- %s", registryRepo+":"+manifest.Annotations["io.deckhouse.image.short_tag"])
}

parallel.ForEach(manifestSet, pushImage(logger, registryRepo, index, imagesCount, refOpts, remoteOpts))
errMu := &sync.Mutex{}
merr := &multierror.Error{}
parallel.ForEach(manifestSet, func(item v1.Descriptor, i int) {
if err = pushImage(ctx, registryRepo, index, item, refOpts, remoteOpts); err != nil {
errMu.Lock()
defer errMu.Unlock()
merr = multierror.Append(merr, err)
}
})

return nil
return merr.ErrorOrNil()
})
if err != nil {
return fmt.Errorf("Push batch of images: %w", err)
}
batchesCount += 1
imagesCount += len(manifestSet)
}

return nil
}

func pushImage(
logger contexts.Logger,
ctx context.Context,
registryRepo string,
index v1.ImageIndex,
imagesCount int,
manifest v1.Descriptor,
refOpts []name.Option,
remoteOpts []remote.Option,
) func(v1.Descriptor, int) {
return func(manifest v1.Descriptor, _ int) {
tag := manifest.Annotations["io.deckhouse.image.short_tag"]
imageRef := registryRepo + ":" + tag
img, err := index.Image(manifest.Digest)
if err != nil {
logger.WarnF("Read image: %v", err)
os.Exit(1)
}
ref, err := name.ParseReference(imageRef, refOpts...)
if err != nil {
logger.WarnF("Parse image reference: %v", err)
os.Exit(1)
}
) error {
tag := manifest.Annotations["io.deckhouse.image.short_tag"]
imageRef := registryRepo + ":" + tag
img, err := index.Image(manifest.Digest)
if err != nil {
return fmt.Errorf("Read image: %v", err)
}
ref, err := name.ParseReference(imageRef, refOpts...)
if err != nil {
return fmt.Errorf("Parse image reference: %v", err)
}

err = retry.RunTask(silentLogger{}, "", task.WithConstantRetries(19, 3*time.Second, func() error {
if err = remote.Write(ref, img, remoteOpts...); err != nil {
err = retry.RunTaskWithContext(
ctx, silentLogger{}, "push",
task.WithConstantRetries(4, 3*time.Second, func(ctx context.Context) error {
if err = remote.Write(ref, img, append(remoteOpts, remote.WithContext(ctx))...); err != nil {
if errorutil.IsTrivyMediaTypeNotAllowedError(err) {
logger.WarnLn(errorutil.CustomTrivyMediaTypesWarning)
os.Exit(1)
return fmt.Errorf(errorutil.CustomTrivyMediaTypesWarning)
}
return fmt.Errorf("Write %s to registry: %w", ref.String(), err)
}
return nil
}))
if err != nil {
logger.WarnF("Push image: %v", err)
os.Exit(1)
}

imagesCount += 1
}),
)
if err != nil {
return fmt.Errorf("Run push task: %v", err)
}
return nil
}

type silentLogger struct{}
Expand Down
33 changes: 23 additions & 10 deletions pkg/libmirror/operations/push.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package operations

import (
"context"
"errors"
"fmt"
"io/fs"
Expand All @@ -19,17 +20,21 @@ import (
)

func PushDeckhouseToRegistry(mirrorCtx *contexts.PushContext) error {
return PushDeckhouseToRegistryContext(context.Background(), mirrorCtx)
}

func PushDeckhouseToRegistryContext(ctx context.Context, mirrorCtx *contexts.PushContext) error {
logger := mirrorCtx.Logger
logger.InfoF("Looking for Deckhouse images to push")
ociLayouts, modulesList, err := findLayoutsToPush(mirrorCtx)
ociLayouts, modulesList, err := findLayoutsToPush(ctx, mirrorCtx)
if err != nil {
return fmt.Errorf("Find OCI Image Layouts to push: %w", err)
}

for repo, ociLayout := range ociLayouts {
logger.InfoLn("Mirroring", repo)
err = layouts.PushLayoutToRepo(
ociLayout, repo,
err = layouts.PushLayoutToRepoContext(
ctx, ociLayout, repo,
mirrorCtx.RegistryAuth,
mirrorCtx.Logger,
mirrorCtx.Parallelism,
Expand All @@ -54,22 +59,22 @@ func PushDeckhouseToRegistry(mirrorCtx *contexts.PushContext) error {
}

logger.InfoLn("Pushing modules tags")
if err = pushModulesTags(&mirrorCtx.BaseContext, modulesList); err != nil {
if err = pushModulesTags(ctx, &mirrorCtx.BaseContext, modulesList); err != nil {
return fmt.Errorf("Push modules tags: %w", err)
}
logger.InfoF("All modules tags are pushed")

return nil
}

func pushModulesTags(mirrorCtx *contexts.BaseContext, modulesList []string) error {
func pushModulesTags(ctx context.Context, mirrorCtx *contexts.BaseContext, modulesList []string) error {
if len(modulesList) == 0 {
return nil
}

logger := mirrorCtx.Logger

refOpts, remoteOpts := auth.MakeRemoteRegistryRequestOptionsFromMirrorContext(mirrorCtx)
remoteOpts = append(remoteOpts, remote.WithContext(ctx))
modulesRepo := path.Join(mirrorCtx.RegistryHost, mirrorCtx.RegistryPath, "modules")
pushCount := 1
for _, moduleName := range modulesList {
Expand All @@ -93,7 +98,7 @@ func pushModulesTags(mirrorCtx *contexts.BaseContext, modulesList []string) erro
return nil
}

func findLayoutsToPush(mirrorCtx *contexts.PushContext) (map[string]layout.Path, []string, error) {
func findLayoutsToPush(ctx context.Context, mirrorCtx *contexts.PushContext) (map[string]layout.Path, []string, error) {
ociLayouts := make(map[string]layout.Path)
bundlePaths := [][]string{
{""}, // Root contains main deckhouse repo
Expand All @@ -107,6 +112,10 @@ func findLayoutsToPush(mirrorCtx *contexts.PushContext) (map[string]layout.Path,
}

for _, bundlePath := range bundlePaths {
if err := ctx.Err(); err != nil {
return nil, nil, err
}

indexRef := path.Join(append([]string{mirrorCtx.RegistryHost + mirrorCtx.RegistryPath}, bundlePath...)...)
layoutFileSystemPath := filepath.Join(append([]string{mirrorCtx.UnpackedImagesPath}, bundlePath...)...)
l, err := layout.FromPath(layoutFileSystemPath)
Expand All @@ -126,12 +135,16 @@ func findLayoutsToPush(mirrorCtx *contexts.PushContext) (map[string]layout.Path,
return nil, nil, err
}

for _, dir := range dirs {
if !dir.IsDir() {
for _, dirEntry := range dirs {
if err = ctx.Err(); err != nil {
return nil, nil, err
}

if !dirEntry.IsDir() {
continue
}

moduleName := dir.Name()
moduleName := dirEntry.Name()
modulesNames = append(modulesNames, moduleName)
moduleRef := path.Join(mirrorCtx.RegistryHost+mirrorCtx.RegistryPath, "modules", moduleName)
moduleReleasesRef := path.Join(mirrorCtx.RegistryHost+mirrorCtx.RegistryPath, "modules", moduleName, "release")
Expand Down
13 changes: 11 additions & 2 deletions pkg/libmirror/util/retry/retry.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,39 @@
package retry

import (
"context"
"fmt"
"time"

"github.com/deckhouse/deckhouse-cli/pkg/libmirror/contexts"
)

type Task interface {
Do(retryCount uint) error
Do(ctx context.Context, retryCount uint) error
Interval(retryCount uint) time.Duration
MaxRetries() uint
}

func RunTask(logger contexts.Logger, name string, task Task) error {
return RunTaskWithContext(context.Background(), logger, name, task)
}

func RunTaskWithContext(ctx context.Context, logger contexts.Logger, name string, task Task) error {
restarts := uint(0)
var lastErr error
for restarts < task.MaxRetries() {
if err := ctx.Err(); err != nil {
return err
}

if restarts > 0 {
interval := task.Interval(restarts)
logger.InfoF("%s failed, next retry in %v", name, interval)
time.Sleep(interval)
}

logger.InfoLn(name)
lastErr = task.Do(restarts)
lastErr = task.Do(ctx, restarts)
if lastErr == nil {
return nil
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/libmirror/util/retry/retry_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package retry

import (
"context"
"errors"
"log/slog"
"testing"
Expand All @@ -20,11 +21,13 @@ func TestRunSuccessfulTask(t *testing.T) {
require.Equalf(t, uint(1), task.runCount, "Task should only be called once")
}

var _ Task = &successfulTask{}

type successfulTask struct {
runCount uint
}

func (s *successfulTask) Do(_ uint) error {
func (s *successfulTask) Do(_ context.Context, _ uint) error {
s.runCount += 1
return nil
}
Expand All @@ -44,12 +47,14 @@ func TestRunFailingTask(t *testing.T) {
require.Equalf(t, uint(4), task.reportedRetryCount, "Task should be retried 4 times")
}

var _ Task = &failingTask{}

type failingTask struct {
runCount uint
reportedRetryCount uint
}

func (s *failingTask) Do(retryCount uint) error {
func (s *failingTask) Do(_ context.Context, retryCount uint) error {
s.runCount += 1
s.reportedRetryCount = retryCount
return errors.New("failing task")
Expand All @@ -69,11 +74,13 @@ func TestRunEventuallySuccessfulTask(t *testing.T) {
require.Equalf(t, uint(2), task.runCount, "Task should run 2 times")
}

var _ Task = &eventualSuccessTask{}

type eventualSuccessTask struct {
runCount uint
}

func (s *eventualSuccessTask) Do(_ uint) error {
func (s *eventualSuccessTask) Do(_ context.Context, _ uint) error {
s.runCount += 1
if s.runCount > 0 && s.runCount%2 == 0 {
return nil
Expand Down
13 changes: 8 additions & 5 deletions pkg/libmirror/util/retry/task/constant_interval.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package task

import "time"
import (
"context"
"time"
)

type ConstantRetryIntervalTask struct {
maxRetries uint
waitInterval time.Duration
payload func() error
payload func(context.Context) error
}

func WithConstantRetries(maxRetries uint, waitInterval time.Duration, payload func() error) *ConstantRetryIntervalTask {
func WithConstantRetries(maxRetries uint, waitInterval time.Duration, payload func(ctx context.Context) error) *ConstantRetryIntervalTask {
task := &ConstantRetryIntervalTask{
maxRetries: maxRetries,
waitInterval: waitInterval,
Expand All @@ -25,8 +28,8 @@ func WithConstantRetries(maxRetries uint, waitInterval time.Duration, payload fu
return task
}

func (s *ConstantRetryIntervalTask) Do(_ uint) error {
return s.payload()
func (s *ConstantRetryIntervalTask) Do(ctx context.Context, _ uint) error {
return s.payload(ctx)
}

func (s *ConstantRetryIntervalTask) Interval(_ uint) time.Duration {
Expand Down

0 comments on commit b83ed67

Please sign in to comment.