Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Wrap content to support cache #170

Merged
merged 2 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions misc/config/config.nydus.ref.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ provider:
gcpolicy:
# size threshold that triggers GC, the oldest used blobs will be reclaimed if exceeds the size.
threshold: 1000MB
# remote cache record capacity of converted layers, default is 200.
cache_size: 200

converter:
# number of worker for executing conversion task
Expand Down Expand Up @@ -61,3 +63,5 @@ converter:
rules:
# add suffix to tag of source image reference as target image reference
- tag_suffix: -nydus-oci-ref
# add suffix to tag of source image reference as remote cache reference, leave empty to disable remote cache.
- cache_tag_suffix: -nydus-cache
4 changes: 4 additions & 0 deletions misc/config/config.nydus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ provider:
gcpolicy:
# size threshold that triggers GC, the oldest used blobs will be reclaimed if exceeds the size.
threshold: 1000MB
# remote cache record capacity of converted layers, default is 200.
cache_size: 200

converter:
# number of worker for executing conversion task
Expand Down Expand Up @@ -100,3 +102,5 @@ converter:
rules:
# add suffix to tag of source image reference as target image reference
- tag_suffix: -nydus
# add suffix to tag of source image reference as remote cache reference, leave empty to disable remote cache.
- cache_tag_suffix: -nydus-cache
16 changes: 13 additions & 3 deletions pkg/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewLocalAdapter(cfg *config.Config) (*LocalAdapter, error) {
return nil, errors.Wrap(err, "invalid platform configuration")
}

provider, content, err := content.NewLocalProvider(cfg.Provider.WorkDir, cfg.Provider.GCPolicy.Threshold, cfg.Host, platformMC)
provider, content, err := content.NewLocalProvider(cfg, platformMC)
if err != nil {
return nil, errors.Wrap(err, "create content provider")
}
Expand Down Expand Up @@ -95,15 +95,25 @@ func NewLocalAdapter(cfg *config.Config) (*LocalAdapter, error) {
}

func (adp *LocalAdapter) Convert(ctx context.Context, source string) error {
target, err := adp.rule.Map(source)
target, err := adp.rule.Map(source, TagSuffix)
if err != nil {
if errors.Is(err, errdefs.ErrAlreadyConverted) {
logrus.Infof("image has been converted: %s", source)
return nil
}
return errors.Wrap(err, "create target reference by rule")
}
if _, err = adp.cvt.Convert(ctx, source, target); err != nil {
cacheRef, err := adp.rule.Map(source, CacheTagSuffix)
if err != nil {
if errors.Is(err, errdefs.ErrIsRemoteCache) {
logrus.Infof("image was remote cache: %s", source)
return nil
}
}
if err = adp.content.NewRemoteCache(cacheRef); err != nil {
return err
}
if _, err = adp.cvt.Convert(ctx, source, target, cacheRef); err != nil {
return err
}
if err := adp.content.GC(ctx); err != nil {
Expand Down
38 changes: 30 additions & 8 deletions pkg/adapter/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ import (
"github.com/goharbor/acceleration-service/pkg/errdefs"
)

const (
TagSuffix = "tag_suffix"
CacheTagSuffix = "cache_tag_suffix"
)

// Add suffix to source image reference as the target
// image reference, for example:
// Source: 192.168.1.1/nginx:latest
Expand All @@ -47,16 +52,33 @@ type Rule struct {

// Map maps the source image reference to a new one according to
// a rule, the new one will be used as the reference of target image.
func (rule *Rule) Map(ref string) (string, error) {
for _, item := range rule.items {
if item.TagSuffix != "" {
if strings.HasSuffix(ref, item.TagSuffix) {
// FIXME: To check if an image has been converted, a better solution
// is to use the annotation on image manifest.
return "", errdefs.ErrAlreadyConverted
func (rule *Rule) Map(ref, opt string) (string, error) {
switch opt {
case TagSuffix:
for _, item := range rule.items {
if item.TagSuffix != "" {
if strings.HasSuffix(ref, item.TagSuffix) {
// FIXME: To check if an image has been converted, a better solution
// is to use the annotation on image manifest.
return "", errdefs.ErrAlreadyConverted
}
return addSuffix(ref, item.TagSuffix)
}
}
case CacheTagSuffix:
for _, item := range rule.items {
if item.CacheTagSuffix != "" {
if strings.HasSuffix(ref, item.CacheTagSuffix) {
// FIXME: Ditto.A better way is to use the annotation on image manifest.
return "", errdefs.ErrIsRemoteCache
}
return addSuffix(ref, item.CacheTagSuffix)
}
return addSuffix(ref, item.TagSuffix)
}
// CacheTagSuffix empty means do not provide remote cache, just return empty string.
return "", nil
default:
return "", fmt.Errorf("unsupported map option: %s", opt)
}
return "", errors.New("not found matched conversion rule")
}
19 changes: 15 additions & 4 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ type MetricConfig struct {
}

type ProviderConfig struct {
Source map[string]SourceConfig `yaml:"source"`
WorkDir string `yaml:"work_dir"`
GCPolicy GCPolicy `yaml:"gcpolicy"`
Source map[string]SourceConfig `yaml:"source"`
WorkDir string `yaml:"work_dir"`
GCPolicy GCPolicy `yaml:"gcpolicy"`
CacheSize int `yaml:"cache_size"`
}

type GCPolicy struct {
Expand All @@ -66,7 +67,8 @@ type SourceConfig struct {
}

type ConversionRule struct {
TagSuffix string `yaml:"tag_suffix"`
TagSuffix string `yaml:"tag_suffix"`
CacheTagSuffix string `yaml:"cache_tag_suffix"`
}

type ConverterConfig struct {
Expand Down Expand Up @@ -137,3 +139,12 @@ func (cfg *Config) Host(ref string) (remote.CredentialFunc, bool, error) {
return ary[0], ary[1], nil
}, auth.Insecure, nil
}

func (cfg *Config) EnableRemoteCache() bool {
for _, rule := range cfg.Converter.Rules {
if rule.CacheTagSuffix != "" {
return true
}
}
return false
}
60 changes: 60 additions & 0 deletions pkg/content/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (

"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/namespaces"
"github.com/goharbor/acceleration-service/pkg/remote"
lru "github.com/hashicorp/golang-lru/v2"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)

// This is not thread-safe, which means it will depend on the parent implementation to do the locking mechanism.
Expand Down Expand Up @@ -122,3 +124,61 @@ func (leaseCache *leaseCache) remove(key string, usedCount string) {
func (leaseCache *leaseCache) Len() int {
return leaseCache.size
}

type RemoteCache struct {
// remoteCache is an LRU cache for caching target layer descriptors, the cache key is the source layer digest,
// and the cache value is the target layer descriptor after conversion.
remoteCache *lru.Cache[string, ocispec.Descriptor]
PerseidMeteor marked this conversation as resolved.
Show resolved Hide resolved
// cacheRef is the remote cache reference.
cacheRef string
// host is a func to provide registry credential by host name.
host remote.HostFunc
// cacheSize is the remote cache record capacity of converted layers.
cacheSize int
}

func NewRemoteCache(cacheSize int, host remote.HostFunc) (*RemoteCache, error) {
remoteCache, err := lru.New[string, ocispec.Descriptor](cacheSize)
if err != nil {
return nil, err
}
return &RemoteCache{
remoteCache: remoteCache,
host: host,
cacheSize: cacheSize,
}, nil
}

func (rc *RemoteCache) Values() []ocispec.Descriptor {
return rc.remoteCache.Values()
}

func (rc *RemoteCache) Get(key string) (ocispec.Descriptor, bool) {
return rc.remoteCache.Get(key)
}

func (rc *RemoteCache) Add(key string, value ocispec.Descriptor) {
rc.remoteCache.Add(key, value)
}

func (rc *RemoteCache) Remove(key string) {
rc.remoteCache.Remove(key)
}

// Size returns the number of items in the cache.
func (rc *RemoteCache) Size() int {
return rc.remoteCache.Len()

}

func (rc *RemoteCache) NewLRUCache(cacheSize int, cacheRef string) error {
if rc != nil {
remoteCache, err := lru.New[string, ocispec.Descriptor](cacheSize)
if err != nil {
return err
}
rc.remoteCache = remoteCache
rc.cacheRef = cacheRef
}
return nil
}
2 changes: 1 addition & 1 deletion pkg/content/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestLeaseCache(t *testing.T) {
func TestLeaseCacheInit(t *testing.T) {
os.MkdirAll("./tmp", 0755)
defer os.RemoveAll("./tmp")
content, err := NewContent("./tmp", "./tmp", "100MB")
content, err := NewContent("./tmp", "./tmp", "100MB", false, 200, nil)
require.NoError(t, err)
testDigest := []string{
"sha256:9bb13890319dc01e5f8a4d3d0c4c72685654d682d568350fd38a02b1d70aee6b",
Expand Down
Loading
Loading