Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

time based cache cleanup #369

Merged
merged 2 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 48 additions & 31 deletions cmds/ocm/commands/cachecmds/clean/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,21 @@ package clean

import (
"fmt"
"sync"
"time"

"github.com/mandelsoft/vfs/pkg/vfs"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

"github.com/open-component-model/ocm/cmds/ocm/commands/cachecmds/names"
"github.com/open-component-model/ocm/cmds/ocm/commands/verbs"
"github.com/open-component-model/ocm/cmds/ocm/pkg/utils"
"github.com/open-component-model/ocm/pkg/common"
"github.com/open-component-model/ocm/pkg/common/accessio"
"github.com/open-component-model/ocm/pkg/contexts/clictx"
"github.com/open-component-model/ocm/pkg/contexts/oci/attrs/cacheattr"
"github.com/open-component-model/ocm/pkg/errors"
"github.com/open-component-model/ocm/pkg/out"
utils2 "github.com/open-component-model/ocm/pkg/utils"
)

var (
Expand All @@ -33,7 +35,11 @@ type Cache interface {

type Command struct {
utils.BaseCommand
cache Cache
cache accessio.CleanupCache

duration string
before time.Time
dryrun bool
}

// NewCommand creates a new artifact command.
Expand All @@ -55,54 +61,65 @@ $ ocm clean cache
}
}

func (o *Command) AddFlags(fs *pflag.FlagSet) {
o.BaseCommand.AddFlags(fs)
fs.StringVarP(&o.duration, "before", "b", "", "time since last usage")
fs.BoolVarP(&o.dryrun, "dry-run", "s", false, "show size to be removed")
}

func (o *Command) Complete(args []string) error {
c := cacheattr.Get(o.Context)
if c == nil {
return errors.Newf("no blob cache configured")
}
r, ok := c.(Cache)
r, ok := c.(accessio.CleanupCache)
if !ok {
return errors.Newf("only filesystem based caches are supported")
return errors.Newf("cache implementation does not support cleanup")
}
o.cache = r
if o.duration != "" {
if t, err := utils2.ParseDeltaTime(o.duration, true); err == nil {
o.before = t
} else {
t, err := time.Parse(time.RFC3339, o.duration)
if err != nil {
t, err = time.Parse(o.duration, o.duration)
}
if err != nil {
return fmt.Errorf("invalid lifetime %q", o.duration)
}
o.before = t
}
}
return nil
}

func (o *Command) Run() error {
var size int64
var fsize int64
cnt := 0
errs := 0

if l, ok := o.cache.(sync.Locker); ok {
l.Lock()
defer l.Unlock()
}
path, fs := o.cache.Root()
cnt, ncnt, fcnt, size, nsize, fsize, err := o.cache.Cleanup(common.NewPrinter(o.Context.StdErr()), &o.before, o.dryrun)

entries, err := vfs.ReadDir(fs, path)
if err != nil {
return err
}
for _, e := range entries {
err := fs.RemoveAll(vfs.Join(fs, path, e.Name()))
if err != nil {
out.Errf(o.Context, "cannot delete %q: %s\n", e.Name(), err)
errs++
fsize += e.Size()
if !o.before.IsZero() {
if o.dryrun {
out.Outf(o.Context, "Matching %d/%d entries [%.3f/%.3f MB]\n", cnt, ncnt+cnt, float64(size)/1024/1024, float64(size+nsize)/1024/1024)
} else {
cnt++
size += e.Size()
out.Outf(o.Context, "Successfully deleted %d/%d entries [%.2f/%.3f MB]\n", cnt, ncnt+cnt, float64(size)/1024/1024, float64(size+nsize)/1024/1024)
}
}
if cnt == 0 && errs > 0 {
return fmt.Errorf("Failed to delete %d entries [%.2f MB]\n", cnt, float64(fsize)/1024/1024)
}
if errs == 0 {
out.Outf(o.Context, "Successfully deleted %d entries [%.2f MB]\n", cnt, float64(size)/1024/1024)
} else {
out.Outf(o.Context, "Deleted %d entries [%.2f MB]\n", cnt, float64(size)/1024/1024)
out.Outf(o.Context, "Failed to delete %d entries [%.2f MB]\n", cnt, float64(fsize)/1024/1024)
if o.dryrun {
out.Outf(o.Context, "Would remove %d entries [%.3f MB]\n", cnt, float64(size)/1024/1024)
} else {
out.Outf(o.Context, "Successfully deleted %d entries [%.3f MB]\n", cnt, float64(size)/1024/1024)
}
}
if fcnt > 0 {
if o.dryrun {
out.Outf(o.Context, "Failed to check %d entries [%.3f MB]\n", fcnt, float64(fsize)/1024/1024)
} else {
out.Outf(o.Context, "Failed to delete %d entries [%.3f MB]\n", fcnt, float64(fsize)/1024/1024)
}
}
return nil
}
46 changes: 14 additions & 32 deletions cmds/ocm/commands/cachecmds/info/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@
package info

import (
"sync"

"github.com/mandelsoft/vfs/pkg/vfs"
"github.com/spf13/cobra"

"github.com/open-component-model/ocm/cmds/ocm/commands/cachecmds/names"
Expand All @@ -25,14 +22,9 @@ var (
Verb = verbs.Info
)

type Cache interface {
accessio.BlobCache
accessio.RootedCache
}

type Command struct {
utils.BaseCommand
cache Cache
cache accessio.BlobCache
}

// NewCommand creates a new artifact command.
Expand All @@ -55,38 +47,28 @@ $ ocm cache info
}

func (o *Command) Complete(args []string) error {
c := cacheattr.Get(o.Context)
if c == nil {
o.cache = cacheattr.Get(o.Context)
if o.cache == nil {
return errors.Newf("no blob cache configured")
}
r, ok := c.(Cache)
if !ok {
return errors.Newf("only filesystem based caches are supported")
}
o.cache = r
return nil
}

func (o *Command) Run() error {
var size int64
cnt := 0

if l, ok := o.cache.(sync.Locker); ok {
l.Lock()
defer l.Unlock()
if r, ok := o.cache.(accessio.RootedCache); ok {
path, fs := r.Root()
out.Outf(o.Context, "Used cache directory %s [%s]\n", path, fs.Name())
}
path, fs := o.cache.Root()

entries, err := vfs.ReadDir(fs, path)
if err != nil {
return err
}
for _, e := range entries {
cnt++
size += e.Size()
if r, ok := o.cache.(accessio.CleanupCache); ok {
cnt, _, _, size, _, _, err := r.Cleanup(nil, nil, true)
if err != nil {
return err
}
out.Outf(o.Context, "Total cache size %d entries [%.3f MB]\n", cnt, float64(size)/1024/1024)
} else {
out.Outf(o.Context, "Cache does not support more info\n")
}

out.Outf(o.Context, "Used cache directory %s [%s]\n", path, fs.Name())
out.Outf(o.Context, "Total cache size %d entries [%.2f MB]\n", cnt, float64(size)/1024/1024)
return nil
}
4 changes: 3 additions & 1 deletion docs/reference/ocm_clean_cache.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ ocm clean cache [<options>]
### Options

```
-h, --help help for cache
-b, --before string time since last usage
-s, --dry-run show size to be removed
-h, --help help for cache
```

### Description
Expand Down
76 changes: 76 additions & 0 deletions pkg/common/accessio/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"fmt"
"io"
"os"
"strings"
"sync"
"time"

"github.com/mandelsoft/vfs/pkg/osfs"
"github.com/mandelsoft/vfs/pkg/projectionfs"
Expand Down Expand Up @@ -132,6 +134,15 @@ type RootedCache interface {
Root() (string, vfs.FileSystem)
}

type CleanupCache interface {
// Cleanup can be implemented to offer a cache reorg.
// It returns the number and size of
// - handled entries (cnt, size)
// - not handled entries (ncnt, nsize)
// - failing entries (fcnt, fsize)
Cleanup(p common.Printer, before *time.Time, dryrun bool) (cnt int, ncnt int, fcnt int, size int64, nsize int64, fsize int64, err error)
}

type BlobCache interface {
BlobSource
BlobSink
Expand All @@ -149,6 +160,12 @@ var (
_ RootedCache = (*blobCache)(nil)
)

// ACCESS_SUFFIX is the suffix of an additional blob related
// file used to track the last access time by its modification time,
// because Go does not support a platform independent way to access the
// last access time attribute of a filesystem.
const ACCESS_SUFFIX = ".acc"

func NewDefaultBlobCache(fss ...vfs.FileSystem) (BlobCache, error) {
var err error
fs := DefaultedFileSystem(nil, fss...)
Expand Down Expand Up @@ -190,6 +207,61 @@ func (c *blobCache) Unlock() {
c.lock.Unlock()
}

func (c *blobCache) Cleanup(p common.Printer, before *time.Time, dryrun bool) (cnt int, ncnt int, fcnt int, size int64, nsize int64, fsize int64, err error) {
c.Lock()
defer c.Unlock()

if p == nil {
p = common.NewPrinter(nil)
}
path, fs := c.Root()

entries, err := vfs.ReadDir(fs, path)
if err != nil {
return 0, 0, 0, 0, 0, 0, err
}
for _, e := range entries {
if strings.HasSuffix(e.Name(), ACCESS_SUFFIX) {
continue
}
base := vfs.Join(fs, path, e.Name())
if before != nil && !before.IsZero() {
fi, err := fs.Stat(base + ACCESS_SUFFIX)
if err != nil {
if !vfs.IsErrNotExist(err) {
if p != nil {
p.Printf("cannot stat %q: %s", e.Name(), err)
}
fcnt++
fsize += e.Size()
continue
}
} else {
if fi.ModTime().After(*before) {
ncnt++
nsize += e.Size()
continue
}
}
}
if !dryrun {
err := fs.RemoveAll(base)
if err != nil {
if p != nil {
p.Printf("cannot delete %q: %s", e.Name(), err)
}
fcnt++
fsize += e.Size()
continue
}
fs.RemoveAll(base + ACCESS_SUFFIX)
}
cnt++
size += e.Size()
}
return cnt, ncnt, fcnt, size, nsize, fsize, nil
}

func (c *blobCache) cleanup() error {
return vfs.Cleanup(c.cache)
}
Expand All @@ -204,6 +276,9 @@ func (c *blobCache) GetBlobData(digest digest.Digest) (int64, DataAccess, error)
path := common.DigestToFileName(digest)
fi, err := c.cache.Stat(path)
if err == nil {
vfs.WriteFile(c.cache, path+ACCESS_SUFFIX, []byte{}, 0o600)
// now := time.Now()
// c.cache.Chtimes(path+ACCESS_SUFFIX, now, now)
return fi.Size(), DataAccessForFile(c.cache, path), nil
}
if os.IsNotExist(err) {
Expand Down Expand Up @@ -272,6 +347,7 @@ func (c *blobCache) AddBlob(blob BlobAccess) (int64, digest.Digest, error) {
err = c.cache.Rename(tmp, target)
}
c.cache.Remove(tmp)
vfs.WriteFile(c.cache, target+ACCESS_SUFFIX, []byte{}, 0o600)
return size, digest, err
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/errors/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func (l *ErrorList) Len() int {
return len(l.errors)
}

func (l *ErrorList) Entries() []error {
return l.errors
}

func (l *ErrorList) Result() error {
if l == nil || len(l.errors) == 0 {
return nil
Expand Down
46 changes: 46 additions & 0 deletions pkg/utils/time.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and Open Component Model contributors.
//
// SPDX-License-Identifier: Apache-2.0

package utils

import (
"strconv"
"time"

"github.com/open-component-model/ocm/pkg/errors"
"github.com/open-component-model/ocm/pkg/generics"
)

// ParseDeltaTime parses a time diff relative to the actual
// time and returns the resulting time.
func ParseDeltaTime(s string, past bool) (time.Time, error) {
mandelsoft marked this conversation as resolved.
Show resolved Hide resolved
var t time.Time

f := int64(generics.Conditional(past, -1, 1))

if len(s) < 2 {
return t, errors.Newf("invalid time diff %q", s)
}
i, err := strconv.ParseInt(s[:len(s)-1], 10, 64)
if err != nil {
return t, errors.Wrapf(err, "invalid time diff %q", s)
}

d := scale[s[len(s)-1:]]
if d == nil {
return t, errors.Newf("invalid time diff %q", s)
}
return d(i*f, time.Now()), nil
}

type timeModifier func(d int64, t time.Time) time.Time

var scale = map[string]timeModifier{
"s": func(d int64, t time.Time) time.Time { return t.Add(time.Duration(d) * time.Second) },
"m": func(d int64, t time.Time) time.Time { return t.Add(time.Duration(d) * time.Minute) },
"h": func(d int64, t time.Time) time.Time { return t.Add(time.Duration(d) * time.Hour) },
"d": func(d int64, t time.Time) time.Time { return t.AddDate(0, 0, int(d)) },
"M": func(d int64, t time.Time) time.Time { return t.AddDate(0, int(d), 0) },
"y": func(d int64, t time.Time) time.Time { return t.AddDate(int(d), 0, 0) },
}