From 63ea05c67875b6c613953c84bea69923ecd4dcf0 Mon Sep 17 00:00:00 2001 From: Yan Song Date: Tue, 20 Dec 2022 09:54:18 +0000 Subject: [PATCH 1/3] export converter as a usable package So that we can import the `pkg/converter` package into other projects to integrate the image conversion ability. This patch is no logic changes. Signed-off-by: Yan Song --- go.mod | 22 ++- go.sum | 44 ++++- pkg/adapter/adapter.go | 140 ++++++++++++++++ .../annotation/annotation.go | 0 pkg/{converter => adapter}/rule.go | 2 +- pkg/{converter => adapter}/worker.go | 2 +- pkg/config/config.go | 47 ++++++ pkg/content/content.go | 62 +++---- pkg/converter/converter.go | 157 +++++------------- pkg/converter/opts.go | 51 ++++++ pkg/driver/driver.go | 11 +- pkg/driver/nydus/parser/parser.go | 2 +- pkg/handler/handler.go | 12 +- pkg/remote/resolve.go | 34 +--- 14 files changed, 382 insertions(+), 204 deletions(-) create mode 100644 pkg/adapter/adapter.go rename pkg/{converter => adapter}/annotation/annotation.go (100%) rename pkg/{converter => adapter}/rule.go (99%) rename pkg/{converter => adapter}/worker.go (98%) create mode 100644 pkg/converter/opts.go diff --git a/go.mod b/go.mod index ce198627..a30b6d8d 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,25 @@ require ( github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5 // indirect github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect github.com/astaxie/beego v1.12.1 // indirect + github.com/aws/aws-sdk-go-v2 v1.17.2 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect + github.com/aws/aws-sdk-go-v2/config v1.18.4 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.13.4 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.20 // indirect + github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.43 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.26 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.20 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.3.27 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.17 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.21 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.20 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.20 // indirect + github.com/aws/aws-sdk-go-v2/service/s3 v1.29.5 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.11.26 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.9 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.17.6 // indirect + github.com/aws/smithy-go v1.13.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.1.2 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect @@ -54,6 +73,7 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/gomodule/redigo v2.0.0+incompatible // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/klauspost/compress v1.15.12 // indirect github.com/lib/pq v1.10.0 // indirect github.com/mattn/go-colorable v0.1.11 // indirect @@ -101,4 +121,4 @@ require ( ) // It will be updated to official repo once nydus-snapshotter release. -replace github.com/containerd/nydus-snapshotter => github.com/imeoer/nydus-snapshotter v0.3.20 +replace github.com/containerd/nydus-snapshotter => github.com/imeoer/nydus-snapshotter v0.3.22 diff --git a/go.sum b/go.sum index e85bdb79..de277eb0 100644 --- a/go.sum +++ b/go.sum @@ -177,6 +177,44 @@ github.com/aws/aws-sdk-go v1.17.7/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.34.28/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= +github.com/aws/aws-sdk-go-v2 v1.17.2 h1:r0yRZInwiPBNpQ4aDy/Ssh3ROWsGtKDwar2JS8Lm+N8= +github.com/aws/aws-sdk-go-v2 v1.17.2/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 h1:dK82zF6kkPeCo8J1e+tGx4JdvDIQzj7ygIoLg8WMuGs= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10/go.mod h1:VeTZetY5KRJLuD/7fkQXMU6Mw7H5m/KP2J5Iy9osMno= +github.com/aws/aws-sdk-go-v2/config v1.18.4 h1:VZKhr3uAADXHStS/Gf9xSYVmmaluTUfkc0dcbPiDsKE= +github.com/aws/aws-sdk-go-v2/config v1.18.4/go.mod h1:EZxMPLSdGAZ3eAmkqXfYbRppZJTzFTkv8VyEzJhKko4= +github.com/aws/aws-sdk-go-v2/credentials v1.13.4 h1:nEbHIyJy7mCvQ/kzGG7VWHSBpRB4H6sJy3bWierWUtg= +github.com/aws/aws-sdk-go-v2/credentials v1.13.4/go.mod h1:/Cj5w9LRsNTLSwexsohwDME32OzJ6U81Zs33zr2ZWOM= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.20 h1:tpNOglTZ8kg9T38NpcGBxudqfUAwUzyUnLQ4XSd0CHE= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.20/go.mod h1:d9xFpWd3qYwdIXM0fvu7deD08vvdRXyc/ueV+0SqaWE= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.43 h1:+bkAMTd5OGyHu2nwNOangjEsP65fR0uhMbZJA52sZ64= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.43/go.mod h1:sS2tu0VEspKuY5eM1vQgy7P/hpZX8F62o6qsghZExWc= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.26 h1:5WU31cY7m0tG+AiaXuXGoMzo2GBQ1IixtWa8Yywsgco= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.26/go.mod h1:2E0LdbJW6lbeU4uxjum99GZzI0ZjDpAb0CoSCM0oeEY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.20 h1:WW0qSzDWoiWU2FS5DbKpxGilFVlCEJPwx4YtjdfI0Jw= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.20/go.mod h1:/+6lSiby8TBFpTVXZgKiN/rCfkYXEGvhlM4zCgPpt7w= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.27 h1:N2eKFw2S+JWRCtTt0IhIX7uoGGQciD4p6ba+SJv4WEU= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.27/go.mod h1:RdwFVc7PBYWY33fa2+8T1mSqQ7ZEK4ILpM0wfioDC3w= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.17 h1:5tXbMJ7Jq0iG65oiMg6tCLsHkSaO2xLXa2EmZ29vaTA= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.17/go.mod h1:twV0fKMQuqLY4klyFH56aXNq3AFiA5LO0/frTczEOFE= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 h1:y2+VQzC6Zh2ojtV2LoC0MNwHWc6qXv/j2vrQtlftkdA= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11/go.mod h1:iV4q2hsqtNECrfmlXyord9u4zyuFEJX9eLgLpSPzWA8= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.21 h1:77b1GfaSuIok5yB/3HYbG+ypWvOJDQ2rVdq943D17R4= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.21/go.mod h1:sPOz31BVdqeeurKEuUpLNSve4tdCNPluE+070HNcEHI= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.20 h1:jlgyHbkZQAgAc7VIxJDmtouH8eNjOk2REVAQfVhdaiQ= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.20/go.mod h1:Xs52xaLBqDEKRcAfX/hgjmD3YQ7c/W+BEyfamlO/W2E= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.20 h1:4K6dbmR0mlp3o4Bo78PnpvzHtYAqEeVMguvEenpMGsI= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.20/go.mod h1:1XpDcReIEOHsjwNToDKhIAO3qwLo1BnfbtSqWJa8j7g= +github.com/aws/aws-sdk-go-v2/service/s3 v1.29.5 h1:nRSEQj1JergKTVc8RGkhZvOEGgcvo4fWpDPwGDeg2ok= +github.com/aws/aws-sdk-go-v2/service/s3 v1.29.5/go.mod h1:wcaJTmjKFDW0s+Se55HBNIds6ghdAGoDDw+SGUdrfAk= +github.com/aws/aws-sdk-go-v2/service/sso v1.11.26 h1:ActQgdTNQej/RuUJjB9uxYVLDOvRGtUreXF8L3c8wyg= +github.com/aws/aws-sdk-go-v2/service/sso v1.11.26/go.mod h1:uB9tV79ULEZUXc6Ob18A46KSQ0JDlrplPni9XW6Ot60= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.9 h1:wihKuqYUlA2T/Rx+yu2s6NDAns8B9DgnRooB1PVhY+Q= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.9/go.mod h1:2E/3D/mB8/r2J7nK42daoKP/ooCwbf0q1PznNc+DZTU= +github.com/aws/aws-sdk-go-v2/service/sts v1.17.6 h1:VQFOLQVL3BrKM/NLO/7FiS4vcp5bqK0mGMyk09xLoAY= +github.com/aws/aws-sdk-go-v2/service/sts v1.17.6/go.mod h1:Az3OXXYGyfNwQNsK/31L4R75qFYnO641RZGAoV3uH1c= +github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8= +github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f h1:ZNv7On9kyUzm7fvRZumSyy/IUiSC7AzL0I1jKKtwooA= github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc= github.com/beego/goyaml2 v0.0.0-20130207012346-5545475820dd/go.mod h1:1b+Y/CofkYwXMUU0OhQqGvsY2Bvgr4j6jfT699wyZKQ= @@ -851,8 +889,8 @@ github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJ github.com/imdario/mergo v0.3.10/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= -github.com/imeoer/nydus-snapshotter v0.3.20 h1:kjkZpPg5hYgmrCVkCHgrS+QVVwFCv8qcumml53NnuJc= -github.com/imeoer/nydus-snapshotter v0.3.20/go.mod h1:P2SwdbwhVg8A2RkG3EjpZ6STXYZcUa8nl411rkqHaLw= +github.com/imeoer/nydus-snapshotter v0.3.22 h1:Vg+dKVTmmiNGWCJ/4KMCYHIgAx3demSPXvxnBLv2UJ0= +github.com/imeoer/nydus-snapshotter v0.3.22/go.mod h1:xRVCi0yITol7k6yzjGY1tdlYZZVy5Qn+7YVLDjZ+0AE= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/intel/goresctrl v0.2.0/go.mod h1:+CZdzouYFn5EsxgqAQTEzMfwKwuc0fVdMrT9FCCAVRQ= @@ -888,7 +926,9 @@ github.com/jinzhu/now v1.0.0/go.mod h1:oHTiXerJ20+SfYcrdlBO7rzZRJWGwSTQ0iUY2jI6G github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jmespath/go-jmespath v0.0.0-20160803190731-bd40a432e4c7/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= github.com/jmoiron/sqlx v1.3.1/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= diff --git a/pkg/adapter/adapter.go b/pkg/adapter/adapter.go new file mode 100644 index 00000000..c5de6af0 --- /dev/null +++ b/pkg/adapter/adapter.go @@ -0,0 +1,140 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package adapter + +import ( + "context" + "fmt" + + "github.com/containerd/containerd" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + + "github.com/goharbor/acceleration-service/pkg/config" + "github.com/goharbor/acceleration-service/pkg/converter" + "github.com/goharbor/acceleration-service/pkg/errdefs" + "github.com/goharbor/acceleration-service/pkg/metrics" + "github.com/goharbor/acceleration-service/pkg/task" +) + +type Adapter interface { + // Dispatch dispatches a conversion task to worker queue + // by specifying source image reference, the conversion is + // asynchronous, and if the sync option is specified, + // Dispatch will be blocked until the conversion is complete. + Dispatch(ctx context.Context, ref string, sync bool) error + // CheckHealth checks the containerd client can successfully + // connect to the containerd daemon and the healthcheck service + // returns the SERVING response. + CheckHealth(ctx context.Context) error +} + +type LocalAdapter struct { + cfg *config.Config + client *containerd.Client + rule *Rule + worker *Worker + cvt *converter.LocalConverter +} + +func NewLocalAdapter(cfg *config.Config) (*LocalAdapter, error) { + client, err := containerd.New( + cfg.Provider.Containerd.Address, + containerd.WithDefaultNamespace("harbor-acceleration-service"), + ) + if err != nil { + return nil, errors.Wrap(err, "create containerd client") + } + + cvt, err := converter.NewLocalConverter( + converter.WithClient(client), + converter.WithDriver(cfg.Converter.Driver.Type, cfg.Converter.Driver.Config), + converter.WithHosts(cfg.Host), + ) + if err != nil { + return nil, err + } + + worker, err := NewWorker(cfg.Converter.Worker) + if err != nil { + return nil, errors.Wrap(err, "create worker") + } + + rule := &Rule{ + items: cfg.Converter.Rules, + } + + handler := &LocalAdapter{ + cfg: cfg, + client: client, + rule: rule, + worker: worker, + cvt: cvt, + } + + return handler, nil +} + +func (adp *LocalAdapter) Convert(ctx context.Context, source string) error { + target, err := adp.rule.Map(source) + if err != nil { + if errors.Is(err, errdefs.ErrAlreadyConverted) { + logrus.Infof("image has been converted: %s", source) + return nil + } + return errors.Wrap(err, "create target reference by rule") + } + + return adp.cvt.Convert(ctx, source, target) +} + +func (adp *LocalAdapter) Dispatch(ctx context.Context, ref string, sync bool) error { + taskID := task.Manager.Create(ref) + + if sync { + // FIXME: The synchronous conversion task should also be + // executed in a limited worker queue. + return metrics.Conversion.OpWrap(func() error { + err := adp.Convert(ctx, ref) + task.Manager.Finish(taskID, err) + return err + }, "convert") + } + + adp.worker.Dispatch(func() error { + return metrics.Conversion.OpWrap(func() error { + err := adp.Convert(context.Background(), ref) + task.Manager.Finish(taskID, err) + return err + }, "convert") + }) + + return nil +} + +func (adp *LocalAdapter) CheckHealth(ctx context.Context) error { + health, err := adp.client.IsServing(ctx) + + msg := "containerd service is unhealthy" + if err != nil { + return errors.Wrap(err, msg) + } + + if !health { + return fmt.Errorf(msg) + } + + return nil +} diff --git a/pkg/converter/annotation/annotation.go b/pkg/adapter/annotation/annotation.go similarity index 100% rename from pkg/converter/annotation/annotation.go rename to pkg/adapter/annotation/annotation.go diff --git a/pkg/converter/rule.go b/pkg/adapter/rule.go similarity index 99% rename from pkg/converter/rule.go rename to pkg/adapter/rule.go index 8d438f5a..ccbb8e0a 100644 --- a/pkg/converter/rule.go +++ b/pkg/adapter/rule.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package converter +package adapter import ( "errors" diff --git a/pkg/converter/worker.go b/pkg/adapter/worker.go similarity index 98% rename from pkg/converter/worker.go rename to pkg/adapter/worker.go index fea4c587..650431aa 100644 --- a/pkg/converter/worker.go +++ b/pkg/adapter/worker.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package converter +package adapter import ( "errors" diff --git a/pkg/config/config.go b/pkg/config/config.go index c818c6d8..dd832480 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -15,8 +15,13 @@ package config import ( + "encoding/base64" + "fmt" "io/ioutil" + "net/url" + "strings" + "github.com/goharbor/acceleration-service/pkg/remote" "github.com/pkg/errors" "gopkg.in/yaml.v3" ) @@ -89,3 +94,45 @@ func Parse(configPath string) (*Config, error) { return &config, nil } + +func (cfg *Config) Host(ref string) (remote.CredentialFunc, bool, error) { + authorizer := func(ref string) (*SourceConfig, error) { + refURL, err := url.Parse(fmt.Sprintf("dummy://%s", ref)) + if err != nil { + return nil, errors.Wrap(err, "parse reference of source image") + } + + auth, ok := cfg.Provider.Source[refURL.Host] + if !ok { + return nil, fmt.Errorf("not found matched hostname %s in config", refURL.Host) + } + + return &auth, nil + } + + auth, err := authorizer(ref) + if err != nil { + return nil, false, err + } + + return func(host string) (string, string, error) { + auth, err := authorizer(host) + if err != nil { + return "", "", err + } + + // Leave auth empty if no authorization be required + if strings.TrimSpace(auth.Auth) == "" { + return "", "", nil + } + decoded, err := base64.StdEncoding.DecodeString(auth.Auth) + if err != nil { + return "", "", errors.Wrap(err, "decode base64 encoded auth string") + } + ary := strings.Split(string(decoded), ":") + if len(ary) != 2 { + return "", "", errors.New("invalid base64 encoded auth string") + } + return ary[0], ary[1], nil + }, auth.Insecure, nil +} diff --git a/pkg/content/content.go b/pkg/content/content.go index 9592e4a4..d7576a7e 100644 --- a/pkg/content/content.go +++ b/pkg/content/content.go @@ -18,7 +18,6 @@ import ( "context" "encoding/json" "fmt" - "net/url" "github.com/containerd/containerd" "github.com/containerd/containerd/content" @@ -26,12 +25,10 @@ import ( "github.com/containerd/containerd/labels" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/remotes" - "github.com/containerd/containerd/snapshots" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" - "github.com/goharbor/acceleration-service/pkg/config" nydusUtils "github.com/goharbor/acceleration-service/pkg/driver/nydus/utils" "github.com/goharbor/acceleration-service/pkg/remote" "github.com/goharbor/acceleration-service/pkg/utils" @@ -45,7 +42,7 @@ type Provider interface { // Use plain HTTP to communicate with registry. UsePlainHTTP() // Resolve attempts to resolve the reference into a name and descriptor. - Resolver(ctx context.Context, ref string) (remotes.Resolver, error) + Resolver(ref string) (remotes.Resolver, error) // Pull pulls source image from remote registry by specified reference. // This pulls all platforms of the image but Image() returns containerd.Image for // the default platform. @@ -56,8 +53,6 @@ type Provider interface { // Image gets the source image object. Image() containerd.Image - // Snapshotter gets the snapshotter object of containerd. - Snapshotter() snapshots.Snapshotter // ContentStore gets the content store object of containerd. ContentStore() content.Store // Client gets the raw containerd client. @@ -66,42 +61,21 @@ type Provider interface { type LocalProvider struct { image containerd.Image - cfg *config.ProviderConfig - snapshotter snapshots.Snapshotter client *containerd.Client usePlainHTTP bool + hosts remote.HostFunc } func NewLocalProvider( - cfg *config.ProviderConfig, client *containerd.Client, snapshotter snapshots.Snapshotter, + client *containerd.Client, + hosts remote.HostFunc, ) (Provider, error) { return &LocalProvider{ - cfg: cfg, - snapshotter: snapshotter, - client: client, + client: client, + hosts: hosts, }, nil } -func (pvd *LocalProvider) UsePlainHTTP() { - pvd.usePlainHTTP = true -} - -func (pvd *LocalProvider) Resolver(ctx context.Context, ref string) (remotes.Resolver, error) { - refURL, err := url.Parse(fmt.Sprintf("dummy://%s", ref)) - if err != nil { - return nil, errors.Wrap(err, "parse reference of source image") - } - - auth, ok := pvd.cfg.Source[refURL.Host] - if !ok { - return nil, fmt.Errorf("not found matched hostname %s in config", refURL.Host) - } - - resolver := remote.NewResolver(auth.Insecure, pvd.usePlainHTTP, remote.NewBasicAuthCredFunc(auth.Auth)) - - return resolver, nil -} - func (pvd *LocalProvider) updateLayerDiffID(ctx context.Context, image ocispec.Descriptor) error { cs := pvd.ContentStore() @@ -149,10 +123,22 @@ func (pvd *LocalProvider) updateLayerDiffID(ctx context.Context, image ocispec.D return nil } +func (pvd *LocalProvider) UsePlainHTTP() { + pvd.usePlainHTTP = true +} + +func (pvd *LocalProvider) Resolver(ref string) (remotes.Resolver, error) { + credFunc, insecure, err := pvd.hosts(ref) + if err != nil { + return nil, err + } + return remote.NewResolver(insecure, pvd.usePlainHTTP, credFunc), nil +} + func (pvd *LocalProvider) Pull(ctx context.Context, ref string) error { - resolver, err := pvd.Resolver(ctx, ref) + resolver, err := pvd.Resolver(ref) if err != nil { - return errors.Wrapf(err, "get resolver for %s", ref) + return err } // TODO: enable configuring the target platforms. @@ -191,9 +177,9 @@ func (pvd *LocalProvider) Pull(ctx context.Context, ref string) error { } func (pvd *LocalProvider) Push(ctx context.Context, desc ocispec.Descriptor, ref string) error { - resolver, err := pvd.Resolver(ctx, ref) + resolver, err := pvd.Resolver(ref) if err != nil { - return errors.Wrapf(err, "get resolver for %s", ref) + return err } // TODO: sets max concurrent uploaded layer limit by containerd.WithMaxConcurrentUploadedLayers. @@ -204,10 +190,6 @@ func (pvd *LocalProvider) Image() containerd.Image { return pvd.image } -func (pvd *LocalProvider) Snapshotter() snapshots.Snapshotter { - return pvd.snapshotter -} - func (pvd *LocalProvider) ContentStore() content.Store { return pvd.client.ContentStore() } diff --git a/pkg/converter/converter.go b/pkg/converter/converter.go index e8d2a2c4..b859942c 100644 --- a/pkg/converter/converter.go +++ b/pkg/converter/converter.go @@ -20,108 +20,77 @@ import ( "time" "github.com/containerd/containerd" - "github.com/containerd/containerd/snapshots" + "github.com/containerd/containerd/defaults" "github.com/pkg/errors" "github.com/sirupsen/logrus" - "github.com/goharbor/acceleration-service/pkg/config" "github.com/goharbor/acceleration-service/pkg/content" - "github.com/goharbor/acceleration-service/pkg/converter/annotation" "github.com/goharbor/acceleration-service/pkg/driver" "github.com/goharbor/acceleration-service/pkg/errdefs" - "github.com/goharbor/acceleration-service/pkg/metrics" - "github.com/goharbor/acceleration-service/pkg/task" ) var logger = logrus.WithField("module", "converter") -type Converter interface { - // Dispatch dispatches a conversion task to worker queue - // by specifying source image reference, the conversion is - // asynchronous, and if the sync option is specified, - // Dispatch will be blocked until the conversion is complete. - Dispatch(ctx context.Context, ref string, sync bool) error - // CheckHealth checks the containerd client can successfully - // connect to the containerd daemon and the healthcheck service - // returns the SERVING response. - CheckHealth(ctx context.Context) error -} - type LocalConverter struct { - cfg *config.Config - rule *Rule - worker *Worker - client *containerd.Client - snapshotter snapshots.Snapshotter - driver driver.Driver + client *containerd.Client + driver driver.Driver + provider content.Provider + opts ConvertOpts } -func NewLocalConverter(cfg *config.Config) (*LocalConverter, error) { - client, err := containerd.New( - cfg.Provider.Containerd.Address, - containerd.WithDefaultNamespace("harbor-acceleration-service"), - ) - if err != nil { - return nil, errors.Wrap(err, "create containerd client") +func NewLocalConverter(opts ...ConvertOpt) (*LocalConverter, error) { + var options ConvertOpts + for _, opt := range opts { + if err := opt(&options); err != nil { + return nil, fmt.Errorf("failed to apply option: %w", err) + } } - snapshotter := client.SnapshotService(cfg.Provider.Containerd.Snapshotter) - driver, err := driver.NewLocalDriver(&cfg.Converter.Driver) - if err != nil { - return nil, errors.Wrap(err, "create driver") + if options.client == nil { + client, err := containerd.New(defaults.DefaultAddress) + if err != nil { + return nil, errors.Wrapf(err, "connect to containerd address %s", defaults.DefaultAddress) + } + options.client = client } - worker, err := NewWorker(cfg.Converter.Worker) + provider, err := content.NewLocalProvider( + options.client, + options.hosts, + ) if err != nil { - return nil, errors.Wrap(err, "create worker") + return nil, errors.Wrap(err, "create content provider") } - rule := &Rule{ - items: cfg.Converter.Rules, + driver, err := driver.NewLocalDriver(options.driverType, options.driverConfig) + if err != nil { + return nil, errors.Wrap(err, "create driver") } handler := &LocalConverter{ - cfg: cfg, - rule: rule, - worker: worker, - client: client, - snapshotter: snapshotter, - driver: driver, + client: options.client, + driver: driver, + provider: provider, + opts: options, } return handler, nil } -func (cvt *LocalConverter) Convert(ctx context.Context, source string) error { +func (cvt *LocalConverter) Convert(ctx context.Context, source, target string) error { ctx, done, err := cvt.client.WithLease(ctx) if err != nil { return errors.Wrap(err, "create lease") } defer done(ctx) - target, err := cvt.rule.Map(source) - if err != nil { - if errors.Is(err, errdefs.ErrAlreadyConverted) { - logrus.Infof("image has been converted: %s", source) - return nil - } - return errors.Wrap(err, "create target reference by rule") - } - - content, err := content.NewLocalProvider( - &cvt.cfg.Provider, cvt.client, cvt.snapshotter, - ) - if err != nil { - return errors.Wrap(err, "create content provider") - } - logger.Infof("pulling image %s", source) start := time.Now() - if err := content.Pull(ctx, source); err != nil { + if err := cvt.provider.Pull(ctx, source); err != nil { if errdefs.NeedsRetryWithHTTP(err) { logger.Infof("try to pull with plain HTTP for %s", source) - content.UsePlainHTTP() - if err := content.Pull(ctx, source); err != nil { + cvt.provider.UsePlainHTTP() + if err := cvt.provider.Pull(ctx, source); err != nil { return errors.Wrap(err, "try to pull image") } } else { @@ -132,32 +101,19 @@ func (cvt *LocalConverter) Convert(ctx context.Context, source string) error { logger.Infof("converting image %s", source) start = time.Now() - desc, err := cvt.driver.Convert(ctx, content) + desc, err := cvt.driver.Convert(ctx, cvt.provider) if err != nil { return errors.Wrap(err, "convert image") } - - if cvt.cfg.Converter.HarborAnnotation { - // Append extra annotations to converted image for harbor usage. - // FIXME: implement a containerd#converter.ConvertFunc to avoid creating the new manifest/index. - desc, err = annotation.Append(ctx, content, desc, annotation.Appended{ - DriverName: cvt.driver.Name(), - DriverVersion: cvt.driver.Version(), - SourceDigest: content.Image().Target().Digest.String(), - }) - if err != nil { - return errors.Wrap(err, "append annotations") - } - } logger.Infof("converted image %s, elapse %s", target, time.Since(start)) start = time.Now() logger.Infof("pushing image %s", target) - if err := content.Push(ctx, *desc, target); err != nil { + if err := cvt.provider.Push(ctx, *desc, target); err != nil { if errdefs.NeedsRetryWithHTTP(err) { logger.Infof("try to push with plain HTTP for %s", target) - content.UsePlainHTTP() - if err := content.Push(ctx, *desc, target); err != nil { + cvt.provider.UsePlainHTTP() + if err := cvt.provider.Push(ctx, *desc, target); err != nil { return errors.Wrap(err, "try to push image") } } else { @@ -168,42 +124,3 @@ func (cvt *LocalConverter) Convert(ctx context.Context, source string) error { return nil } - -func (cvt *LocalConverter) Dispatch(ctx context.Context, ref string, sync bool) error { - taskID := task.Manager.Create(ref) - - if sync { - // FIXME: The synchronous conversion task should also be - // executed in a limited worker queue. - return metrics.Conversion.OpWrap(func() error { - err := cvt.Convert(ctx, ref) - task.Manager.Finish(taskID, err) - return err - }, "convert") - } - - cvt.worker.Dispatch(func() error { - return metrics.Conversion.OpWrap(func() error { - err := cvt.Convert(context.Background(), ref) - task.Manager.Finish(taskID, err) - return err - }, "convert") - }) - - return nil -} - -func (cvt *LocalConverter) CheckHealth(ctx context.Context) error { - health, err := cvt.client.IsServing(ctx) - - msg := "containerd service is unhealthy" - if err != nil { - return errors.Wrap(err, msg) - } - - if !health { - return fmt.Errorf(msg) - } - - return nil -} diff --git a/pkg/converter/opts.go b/pkg/converter/opts.go new file mode 100644 index 00000000..cc110528 --- /dev/null +++ b/pkg/converter/opts.go @@ -0,0 +1,51 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package converter + +import ( + "github.com/containerd/containerd" + "github.com/goharbor/acceleration-service/pkg/remote" +) + +type ConvertOpts struct { + client *containerd.Client + driverType string + driverConfig map[string]string + hosts remote.HostFunc +} + +type ConvertOpt func(opts *ConvertOpts) error + +func WithClient(client *containerd.Client) ConvertOpt { + return func(opts *ConvertOpts) error { + opts.client = client + return nil + } +} + +func WithDriver(typ string, config map[string]string) ConvertOpt { + return func(opts *ConvertOpts) error { + opts.driverType = typ + opts.driverConfig = config + return nil + } +} + +func WithHosts(hosts remote.HostFunc) ConvertOpt { + return func(opts *ConvertOpts) error { + opts.hosts = hosts + return nil + } +} diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index 7117a19f..b752a3da 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -20,7 +20,6 @@ import ( ocispec "github.com/opencontainers/image-spec/specs-go/v1" - "github.com/goharbor/acceleration-service/pkg/config" "github.com/goharbor/acceleration-service/pkg/content" "github.com/goharbor/acceleration-service/pkg/driver/estargz" "github.com/goharbor/acceleration-service/pkg/driver/nydus" @@ -45,13 +44,13 @@ type Driver interface { Version() string } -func NewLocalDriver(cfg *config.DriverConfig) (Driver, error) { - switch cfg.Type { +func NewLocalDriver(typ string, config map[string]string) (Driver, error) { + switch typ { case "nydus": - return nydus.New(cfg.Config) + return nydus.New(config) case "estargz": - return estargz.New(cfg.Config) + return estargz.New(config) default: - return nil, fmt.Errorf("unsupported driver %s", cfg.Type) + return nil, fmt.Errorf("unsupported driver %s", typ) } } diff --git a/pkg/driver/nydus/parser/parser.go b/pkg/driver/nydus/parser/parser.go index 7b42591f..ef801b22 100644 --- a/pkg/driver/nydus/parser/parser.go +++ b/pkg/driver/nydus/parser/parser.go @@ -47,7 +47,7 @@ func (parser *Parser) PullAsChunkDict(ctx context.Context, ref string, usePlainH if usePlainHTTP { parser.content.UsePlainHTTP() } - resolver, err := parser.content.Resolver(ctx, ref) + resolver, err := parser.content.Resolver(ref) if err != nil { return nil, nil, errors.Wrapf(err, "get resolver for %s", ref) } diff --git a/pkg/handler/handler.go b/pkg/handler/handler.go index 21074726..f0f8f682 100644 --- a/pkg/handler/handler.go +++ b/pkg/handler/handler.go @@ -21,8 +21,8 @@ import ( "github.com/pkg/errors" + "github.com/goharbor/acceleration-service/pkg/adapter" "github.com/goharbor/acceleration-service/pkg/config" - "github.com/goharbor/acceleration-service/pkg/converter" ) const healthCheckTimeout = time.Second * 5 @@ -44,18 +44,18 @@ type Handler interface { type LocalHandler struct { cfg *config.Config - cvt converter.Converter + adp adapter.Adapter } func NewLocalHandler(cfg *config.Config) (*LocalHandler, error) { - cvt, err := converter.NewLocalConverter(cfg) + adp, err := adapter.NewLocalAdapter(cfg) if err != nil { return nil, errors.Wrap(err, "create converter") } handler := &LocalHandler{ cfg: cfg, - cvt: cvt, + adp: adp, } return handler, nil @@ -75,11 +75,11 @@ func (handler *LocalHandler) Auth(ctx context.Context, host string, authHeader s } func (handler *LocalHandler) Convert(ctx context.Context, ref string, sync bool) error { - return handler.cvt.Dispatch(ctx, ref, sync) + return handler.adp.Dispatch(ctx, ref, sync) } func (handler *LocalHandler) CheckHealth(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, healthCheckTimeout) defer cancel() - return handler.cvt.CheckHealth(ctx) + return handler.adp.CheckHealth(ctx) } diff --git a/pkg/remote/resolve.go b/pkg/remote/resolve.go index c9a142ef..fc55bfb2 100644 --- a/pkg/remote/resolve.go +++ b/pkg/remote/resolve.go @@ -16,17 +16,14 @@ package remote import ( "crypto/tls" - "encoding/base64" "net" "net/http" "os" - "strings" "time" "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" dockerconfig "github.com/docker/cli/cli/config" - "github.com/pkg/errors" ) func newDefaultClient(skipTLSVerify bool) *http.Client { @@ -51,13 +48,17 @@ func newDefaultClient(skipTLSVerify bool) *http.Client { } } -// withCredentialFunc accepts host url parameter and returns with +// CredentialFunc accepts host url parameter and returns with // username, password and error. -type withCredentialFunc = func(string) (string, string, error) +type CredentialFunc = func(string) (string, string, error) + +// HostFunc accepts host url parameter and returns with +// CredentialFunc, insecure and error. +type HostFunc = func(ref string) (CredentialFunc, bool, error) // NewDockerConfigCredFunc attempts to read docker auth config file `$DOCKER_CONFIG/config.json` // to communicate with remote registry, `$DOCKER_CONFIG` defaults to `~/.docker`. -func NewDockerConfigCredFunc() withCredentialFunc { +func NewDockerConfigCredFunc() CredentialFunc { return func(host string) (string, string, error) { // The host of docker hub image will be converted to `registry-1.docker.io` in: // github.com/containerd/containerd/remotes/docker/registry.go @@ -76,26 +77,7 @@ func NewDockerConfigCredFunc() withCredentialFunc { } } -// NewBasicAuthCredFunc parses base64 encoded auth string to communicate with remote registry. -func NewBasicAuthCredFunc(auth string) withCredentialFunc { - return func(host string) (string, string, error) { - // Leave auth empty if no authorization be required - if strings.TrimSpace(auth) == "" { - return "", "", nil - } - decoded, err := base64.StdEncoding.DecodeString(auth) - if err != nil { - return "", "", errors.Wrap(err, "decode base64 encoded auth string") - } - ary := strings.Split(string(decoded), ":") - if len(ary) != 2 { - return "", "", errors.New("invalid base64 encoded auth string") - } - return ary[0], ary[1], nil - } -} - -func NewResolver(insecure, plainHTTP bool, credFunc withCredentialFunc) remotes.Resolver { +func NewResolver(insecure, plainHTTP bool, credFunc CredentialFunc) remotes.Resolver { registryHosts := docker.ConfigureDefaultRegistries( docker.WithAuthorizer(docker.NewAuthorizer( newDefaultClient(insecure), From ee697f30b999462e9987818f32115b517a660fc7 Mon Sep 17 00:00:00 2001 From: Yan Song Date: Wed, 21 Dec 2022 10:58:26 +0000 Subject: [PATCH 2/3] refine content.Provider interface Simplify the content.Provider interface, so that it can be implemented by other implementations. This patch is no logic changes. Signed-off-by: Yan Song --- README.md | 5 +++-- pkg/adapter/adapter.go | 15 +++++++++++++-- pkg/content/content.go | 25 +++++++++++------------- pkg/converter/converter.go | 32 ++----------------------------- pkg/converter/opts.go | 17 ++++------------ pkg/driver/driver.go | 5 +++-- pkg/driver/estargz/estargz.go | 8 ++++++-- pkg/driver/nydus/nydus.go | 14 +++++++++----- pkg/driver/nydus/parser/parser.go | 30 +++++++---------------------- 9 files changed, 58 insertions(+), 93 deletions(-) diff --git a/README.md b/README.md index 4e91e0df..a97fe624 100644 --- a/README.md +++ b/README.md @@ -136,10 +136,11 @@ Acceleration Service Framework provides a built-in extensible method called driv type Driver interface { // Convert converts the source image to target image, where // content parameter provides necessary image utils, image - // content store and so on. If conversion successful, the + // content store and so on, where source parameter is the + // original image reference. If conversion successful, the // converted image manifest will be returned, otherwise a // non-nil error will be returned. - Convert(context.Context, content.Provider) (*ocispec.Descriptor, error) + Convert(ctx context.Context, content content.Provider, source string) (*ocispec.Descriptor, error) // Name gets the driver type name, it is used to identify // different accelerated image formats. diff --git a/pkg/adapter/adapter.go b/pkg/adapter/adapter.go index c5de6af0..343f7476 100644 --- a/pkg/adapter/adapter.go +++ b/pkg/adapter/adapter.go @@ -23,6 +23,7 @@ import ( "github.com/sirupsen/logrus" "github.com/goharbor/acceleration-service/pkg/config" + "github.com/goharbor/acceleration-service/pkg/content" "github.com/goharbor/acceleration-service/pkg/converter" "github.com/goharbor/acceleration-service/pkg/errdefs" "github.com/goharbor/acceleration-service/pkg/metrics" @@ -58,10 +59,14 @@ func NewLocalAdapter(cfg *config.Config) (*LocalAdapter, error) { return nil, errors.Wrap(err, "create containerd client") } + provider, err := content.NewLocalProvider(client, cfg.Host) + if err != nil { + return nil, errors.Wrap(err, "create content provider") + } + cvt, err := converter.NewLocalConverter( - converter.WithClient(client), + converter.WithProvider(provider), converter.WithDriver(cfg.Converter.Driver.Type, cfg.Converter.Driver.Config), - converter.WithHosts(cfg.Host), ) if err != nil { return nil, err @@ -97,6 +102,12 @@ func (adp *LocalAdapter) Convert(ctx context.Context, source string) error { return errors.Wrap(err, "create target reference by rule") } + ctx, done, err := adp.client.WithLease(ctx) + if err != nil { + return errors.Wrap(err, "create lease") + } + defer done(ctx) + return adp.cvt.Convert(ctx, source, target) } diff --git a/pkg/content/content.go b/pkg/content/content.go index d7576a7e..fead671d 100644 --- a/pkg/content/content.go +++ b/pkg/content/content.go @@ -41,6 +41,7 @@ var logger = logrus.WithField("module", "content") type Provider interface { // Use plain HTTP to communicate with registry. UsePlainHTTP() + // Resolve attempts to resolve the reference into a name and descriptor. Resolver(ref string) (remotes.Resolver, error) // Pull pulls source image from remote registry by specified reference. @@ -51,18 +52,15 @@ type Provider interface { // the desc parameter represents the manifest of targe image. Push(ctx context.Context, desc ocispec.Descriptor, ref string) error - // Image gets the source image object. - Image() containerd.Image + // Image gets the source image descriptor. + Image(ctx context.Context, ref string) (*ocispec.Descriptor, error) // ContentStore gets the content store object of containerd. ContentStore() content.Store - // Client gets the raw containerd client. - Client() *containerd.Client } type LocalProvider struct { - image containerd.Image - client *containerd.Client usePlainHTTP bool + client *containerd.Client hosts remote.HostFunc } @@ -171,8 +169,6 @@ func (pvd *LocalProvider) Pull(ctx context.Context, ref string) error { return errors.Wrap(err, "update layer diff id") } - pvd.image = containerd.NewImageWithPlatform(pvd.client, image, platformMatcher) - return nil } @@ -186,14 +182,15 @@ func (pvd *LocalProvider) Push(ctx context.Context, desc ocispec.Descriptor, ref return pvd.client.Push(ctx, ref, desc, containerd.WithResolver(resolver)) } -func (pvd *LocalProvider) Image() containerd.Image { - return pvd.image +func (pvd *LocalProvider) Image(ctx context.Context, ref string) (*ocispec.Descriptor, error) { + image, err := pvd.client.GetImage(ctx, ref) + if err != nil { + return nil, err + } + target := image.Target() + return &target, nil } func (pvd *LocalProvider) ContentStore() content.Store { return pvd.client.ContentStore() } - -func (pvd *LocalProvider) Client() *containerd.Client { - return pvd.client -} diff --git a/pkg/converter/converter.go b/pkg/converter/converter.go index b859942c..37cadb60 100644 --- a/pkg/converter/converter.go +++ b/pkg/converter/converter.go @@ -19,8 +19,6 @@ import ( "fmt" "time" - "github.com/containerd/containerd" - "github.com/containerd/containerd/defaults" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -32,10 +30,8 @@ import ( var logger = logrus.WithField("module", "converter") type LocalConverter struct { - client *containerd.Client driver driver.Driver provider content.Provider - opts ConvertOpts } func NewLocalConverter(opts ...ConvertOpt) (*LocalConverter, error) { @@ -46,44 +42,20 @@ func NewLocalConverter(opts ...ConvertOpt) (*LocalConverter, error) { } } - if options.client == nil { - client, err := containerd.New(defaults.DefaultAddress) - if err != nil { - return nil, errors.Wrapf(err, "connect to containerd address %s", defaults.DefaultAddress) - } - options.client = client - } - - provider, err := content.NewLocalProvider( - options.client, - options.hosts, - ) - if err != nil { - return nil, errors.Wrap(err, "create content provider") - } - driver, err := driver.NewLocalDriver(options.driverType, options.driverConfig) if err != nil { return nil, errors.Wrap(err, "create driver") } handler := &LocalConverter{ - client: options.client, driver: driver, - provider: provider, - opts: options, + provider: options.provider, } return handler, nil } func (cvt *LocalConverter) Convert(ctx context.Context, source, target string) error { - ctx, done, err := cvt.client.WithLease(ctx) - if err != nil { - return errors.Wrap(err, "create lease") - } - defer done(ctx) - logger.Infof("pulling image %s", source) start := time.Now() if err := cvt.provider.Pull(ctx, source); err != nil { @@ -101,7 +73,7 @@ func (cvt *LocalConverter) Convert(ctx context.Context, source, target string) e logger.Infof("converting image %s", source) start = time.Now() - desc, err := cvt.driver.Convert(ctx, cvt.provider) + desc, err := cvt.driver.Convert(ctx, cvt.provider, source) if err != nil { return errors.Wrap(err, "convert image") } diff --git a/pkg/converter/opts.go b/pkg/converter/opts.go index cc110528..754411d1 100644 --- a/pkg/converter/opts.go +++ b/pkg/converter/opts.go @@ -15,22 +15,20 @@ package converter import ( - "github.com/containerd/containerd" - "github.com/goharbor/acceleration-service/pkg/remote" + "github.com/goharbor/acceleration-service/pkg/content" ) type ConvertOpts struct { - client *containerd.Client + provider content.Provider driverType string driverConfig map[string]string - hosts remote.HostFunc } type ConvertOpt func(opts *ConvertOpts) error -func WithClient(client *containerd.Client) ConvertOpt { +func WithProvider(provider content.Provider) ConvertOpt { return func(opts *ConvertOpts) error { - opts.client = client + opts.provider = provider return nil } } @@ -42,10 +40,3 @@ func WithDriver(typ string, config map[string]string) ConvertOpt { return nil } } - -func WithHosts(hosts remote.HostFunc) ConvertOpt { - return func(opts *ConvertOpts) error { - opts.hosts = hosts - return nil - } -} diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index b752a3da..64b52c10 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -30,10 +30,11 @@ import ( type Driver interface { // Convert converts the source image to target image, where // content parameter provides necessary image utils, image - // content store and so on. If conversion successful, the + // content store and so on, where source parameter is the + // original image reference. If conversion successful, the // converted image manifest will be returned, otherwise a // non-nil error will be returned. - Convert(context.Context, content.Provider) (*ocispec.Descriptor, error) + Convert(ctx context.Context, content content.Provider, source string) (*ocispec.Descriptor, error) // Name gets the driver type name, it is used to identify // different accelerated image formats. diff --git a/pkg/driver/estargz/estargz.go b/pkg/driver/estargz/estargz.go index 20fd74b8..0d9ebd04 100644 --- a/pkg/driver/estargz/estargz.go +++ b/pkg/driver/estargz/estargz.go @@ -35,14 +35,18 @@ func New(cfg map[string]string) (*Driver, error) { return &Driver{cfg}, nil } -func (d *Driver) Convert(ctx context.Context, p content.Provider) (*ocispec.Descriptor, error) { +func (d *Driver) Convert(ctx context.Context, p content.Provider, ref string) (*ocispec.Descriptor, error) { opts, docker2oci, err := getESGZConvertOpts(d.cfg) if err != nil { return nil, errors.Wrap(err, "parse estargz conversion options") } platformMC := platforms.All // TODO: enable to configure the target platforms + image, err := p.Image(ctx, ref) + if err != nil { + return nil, errors.Wrap(err, "get source image") + } return converter.DefaultIndexConvertFunc(estargzconvert.LayerConvertFunc(opts...), docker2oci, platformMC)( - ctx, p.ContentStore(), p.Image().Target()) + ctx, p.ContentStore(), *image) } func (d *Driver) Name() string { diff --git a/pkg/driver/nydus/nydus.go b/pkg/driver/nydus/nydus.go index 09988dd0..7a1d13bc 100644 --- a/pkg/driver/nydus/nydus.go +++ b/pkg/driver/nydus/nydus.go @@ -139,18 +139,22 @@ func (d *Driver) Version() string { return "" } -func (d *Driver) Convert(ctx context.Context, provider accelcontent.Provider) (*ocispec.Descriptor, error) { - desc, err := d.convert(ctx, provider) +func (d *Driver) Convert(ctx context.Context, provider accelcontent.Provider, source string) (*ocispec.Descriptor, error) { + image, err := provider.Image(ctx, source) + if err != nil { + return nil, errors.Wrap(err, "get source image") + } + desc, err := d.convert(ctx, provider, *image) if err != nil { return nil, err } if d.mergeManifest { - return d.makeManifestIndex(ctx, provider.ContentStore(), provider.Image().Target(), *desc) + return d.makeManifestIndex(ctx, provider.ContentStore(), *image, *desc) } return desc, err } -func (d *Driver) convert(ctx context.Context, provider accelcontent.Provider) (*ocispec.Descriptor, error) { +func (d *Driver) convert(ctx context.Context, provider accelcontent.Provider, source ocispec.Descriptor) (*ocispec.Descriptor, error) { cs := provider.ContentStore() chunkDictPath := "" @@ -190,7 +194,7 @@ func (d *Driver) convert(ctx context.Context, provider accelcontent.Provider) (* platforms.DefaultStrict(), convertHooks, ) - return indexConvertFunc(ctx, cs, provider.Image().Target()) + return indexConvertFunc(ctx, cs, source) } func (d *Driver) makeManifestIndex(ctx context.Context, cs content.Store, oci, nydus ocispec.Descriptor) (*ocispec.Descriptor, error) { diff --git a/pkg/driver/nydus/parser/parser.go b/pkg/driver/nydus/parser/parser.go index ef801b22..b80ae7ed 100644 --- a/pkg/driver/nydus/parser/parser.go +++ b/pkg/driver/nydus/parser/parser.go @@ -17,20 +17,15 @@ package parser import ( "context" - "github.com/containerd/containerd" imageContent "github.com/containerd/containerd/content" - "github.com/containerd/containerd/images" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" - "github.com/sirupsen/logrus" "github.com/goharbor/acceleration-service/pkg/content" nydusUtils "github.com/goharbor/acceleration-service/pkg/driver/nydus/utils" "github.com/goharbor/acceleration-service/pkg/utils" ) -var logger = logrus.WithField("module", "nydus-driver") - type Parser struct { content content.Provider } @@ -47,29 +42,18 @@ func (parser *Parser) PullAsChunkDict(ctx context.Context, ref string, usePlainH if usePlainHTTP { parser.content.UsePlainHTTP() } - resolver, err := parser.content.Resolver(ref) - if err != nil { - return nil, nil, errors.Wrapf(err, "get resolver for %s", ref) - } - opts := []containerd.RemoteOpt{ - containerd.WithPlatformMatcher(nydusUtils.NydusPlatformComparer{}), - containerd.WithImageHandler(images.HandlerFunc( - func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { - if images.IsLayerType(desc.MediaType) { - logger.Debugf("pulling chunk dict image layer %s", desc.Digest) - } - return nil, nil - }, - )), - containerd.WithResolver(resolver), + + if err := parser.content.Pull(ctx, ref); err != nil { + return nil, nil, errors.Wrap(err, "pull chunk dict image") } - image, err := parser.content.Client().Fetch(ctx, ref, opts...) + + image, err := parser.content.Image(ctx, ref) if err != nil { - return nil, nil, errors.Wrapf(err, "pull chunk dict image %s", ref) + return nil, nil, errors.Wrap(err, "get image from content store") } manifest := ocispec.Manifest{} - _, err = utils.ReadJSON(ctx, cs, &manifest, image.Target) + _, err = utils.ReadJSON(ctx, cs, &manifest, *image) if err != nil { return nil, nil, errors.Wrap(err, "read manifest json") } From ab7979ef2d3653dbb034345023ef1188e35ea62d Mon Sep 17 00:00:00 2001 From: Yan Song Date: Thu, 22 Dec 2022 07:59:24 +0000 Subject: [PATCH 3/3] converter: normalizes tagged or digested reference So that we can convert `localhost:5000/busybox` reference (with no tag) to `localhost:5000/busybox:latest` (tagged) automatically. Signed-off-by: Yan Song --- pkg/converter/converter.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pkg/converter/converter.go b/pkg/converter/converter.go index 37cadb60..cda2b85d 100644 --- a/pkg/converter/converter.go +++ b/pkg/converter/converter.go @@ -22,6 +22,7 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" + "github.com/containerd/containerd/reference/docker" "github.com/goharbor/acceleration-service/pkg/content" "github.com/goharbor/acceleration-service/pkg/driver" "github.com/goharbor/acceleration-service/pkg/errdefs" @@ -56,6 +57,17 @@ func NewLocalConverter(opts ...ConvertOpt) (*LocalConverter, error) { } func (cvt *LocalConverter) Convert(ctx context.Context, source, target string) error { + sourceNamed, err := docker.ParseDockerRef(source) + if err != nil { + return errors.Wrap(err, "parse source reference") + } + targetNamed, err := docker.ParseDockerRef(target) + if err != nil { + return errors.Wrap(err, "parse target reference") + } + source = sourceNamed.String() + target = targetNamed.String() + logger.Infof("pulling image %s", source) start := time.Now() if err := cvt.provider.Pull(ctx, source); err != nil {