Skip to content

Commit

Permalink
fix: metacache should only rename entries during cleanup (minio#11503)
Browse files Browse the repository at this point in the history
To avoid large delays in metacache cleanup, use rename
instead of recursive delete calls, renames are cheaper
move the content to minioMetaTmpBucket and then cleanup
this folder once in 24hrs instead.

If the new cache can replace an existing one, we should
let it replace since that is currently being saved anyways,
this avoids pile up of 1000's of metacache entires for
same listing calls that are not necessary to be stored
on disk.
  • Loading branch information
harshavardhana authored Feb 11, 2021
1 parent 0ef3e35 commit b3c56b5
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 68 deletions.
14 changes: 0 additions & 14 deletions cmd/bucket-listobjects-handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/minio/minio/cmd/logger"

"github.com/minio/minio/pkg/bucket/policy"
"github.com/minio/minio/pkg/handlers"
"github.com/minio/minio/pkg/sync/errgroup"
)

Expand Down Expand Up @@ -295,10 +294,6 @@ func proxyRequestByNodeIndex(ctx context.Context, w http.ResponseWriter, r *http
return proxyRequest(ctx, w, r, ep)
}

func proxyRequestByStringHash(ctx context.Context, w http.ResponseWriter, r *http.Request, str string) (success bool) {
return proxyRequestByNodeIndex(ctx, w, r, crcHashMod(str, len(globalProxyEndpoints)))
}

// ListObjectsV1Handler - GET Bucket (List Objects) Version 1.
// --------------------------
// This implementation of the GET operation returns some or all (up to 10000)
Expand Down Expand Up @@ -337,15 +332,6 @@ func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http
return
}

// Forward the request using Source IP or bucket
forwardStr := handlers.GetSourceIPFromHeaders(r)
if forwardStr == "" {
forwardStr = bucket
}
if proxyRequestByStringHash(ctx, w, r, forwardStr) {
return
}

listObjects := objectAPI.ListObjects

// Inititate a list objects operation based on the input params.
Expand Down
48 changes: 42 additions & 6 deletions cmd/erasure-multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/minio/minio-go/v7/pkg/set"
Expand Down Expand Up @@ -91,12 +92,47 @@ func (er erasureObjects) removeObjectPart(bucket, object, uploadID, dataDir stri
// Clean-up the old multipart uploads. Should be run in a Go routine.
func (er erasureObjects) cleanupStaleUploads(ctx context.Context, expiry time.Duration) {
// run multiple cleanup's local to this server.
var wg sync.WaitGroup
for _, disk := range er.getLoadBalancedLocalDisks() {
if disk != nil {
er.cleanupStaleUploadsOnDisk(ctx, disk, expiry)
return
wg.Add(1)
go func(disk StorageAPI) {
defer wg.Done()
er.cleanupStaleUploadsOnDisk(ctx, disk, expiry)
}(disk)
}
}
wg.Wait()
}

func (er erasureObjects) renameAll(ctx context.Context, bucket, prefix string) {
var wg sync.WaitGroup
for _, disk := range er.getDisks() {
if disk == nil {
continue
}
wg.Add(1)
go func(disk StorageAPI) {
defer wg.Done()
disk.RenameFile(ctx, bucket, prefix, minioMetaTmpBucket, mustGetUUID())
}(disk)
}
wg.Wait()
}

func (er erasureObjects) deleteAll(ctx context.Context, bucket, prefix string) {
var wg sync.WaitGroup
for _, disk := range er.getDisks() {
if disk == nil {
continue
}
wg.Add(1)
go func(disk StorageAPI) {
defer wg.Done()
disk.Delete(ctx, bucket, prefix, true)
}(disk)
}
wg.Wait()
}

// Remove the old multipart uploads on the given disk.
Expand All @@ -118,7 +154,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
continue
}
if now.Sub(fi.ModTime) > expiry {
er.deleteObject(ctx, minioMetaMultipartBucket, uploadIDPath, fi.Erasure.DataBlocks+1)
er.renameAll(ctx, minioMetaMultipartBucket, uploadIDPath)
}
}
}
Expand All @@ -127,12 +163,12 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
return
}
for _, tmpDir := range tmpDirs {
fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "", false)
vi, err := disk.StatVol(ctx, pathJoin(minioMetaTmpBucket, tmpDir))
if err != nil {
continue
}
if now.Sub(fi.ModTime) > expiry {
er.deleteObject(ctx, minioMetaTmpBucket, tmpDir, fi.Erasure.DataBlocks+1)
if now.Sub(vi.Created) > expiry {
er.deleteAll(ctx, minioMetaTmpBucket, tmpDir)
}
}
}
Expand Down
26 changes: 14 additions & 12 deletions cmd/erasure-server-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,22 +1121,24 @@ func (z *erasureServerPools) DeleteBucket(ctx context.Context, bucket string, fo
// data is not distributed across sets.
// Errors are logged but individual disk failures are not returned.
func (z *erasureServerPools) deleteAll(ctx context.Context, bucket, prefix string) {
var wg sync.WaitGroup
for _, servers := range z.serverPools {
for _, set := range servers.sets {
for _, disk := range set.getDisks() {
if disk == nil {
continue
}
wg.Add(1)
go func(disk StorageAPI) {
defer wg.Done()
disk.Delete(ctx, bucket, prefix, true)
}(disk)
}
set.deleteAll(ctx, bucket, prefix)
}
}
}

// renameAll will rename bucket+prefix unconditionally across all disks to
// minioMetaTmpBucket + unique uuid,
// Note that set distribution is ignored so it should only be used in cases where
// data is not distributed across sets. Errors are logged but individual
// disk failures are not returned.
func (z *erasureServerPools) renameAll(ctx context.Context, bucket, prefix string) {
for _, servers := range z.serverPools {
for _, set := range servers.sets {
set.renameAll(ctx, bucket, prefix)
}
}
wg.Wait()
}

// This function is used to undo a successful DeleteBucket operation.
Expand Down
11 changes: 5 additions & 6 deletions cmd/metacache-bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func newBucketMetacache(bucket string, cleanup bool) *bucketMetacache {
ez, ok := objAPI.(*erasureServerPools)
if ok {
ctx := context.Background()
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(bucket, slashSeparator))
ez.renameAll(ctx, minioMetaBucket, metacachePrefixForID(bucket, slashSeparator))
}
}
return &bucketMetacache{
Expand Down Expand Up @@ -292,7 +292,7 @@ func (b *bucketMetacache) cleanup() {
caches, rootIdx := b.cloneCaches()

for id, cache := range caches {
if b.transient && time.Since(cache.lastUpdate) > 15*time.Minute && time.Since(cache.lastHandout) > 15*time.Minute {
if b.transient && time.Since(cache.lastUpdate) > 10*time.Minute && time.Since(cache.lastHandout) > 10*time.Minute {
// Keep transient caches only for 15 minutes.
remove[id] = struct{}{}
continue
Expand Down Expand Up @@ -361,7 +361,7 @@ func (b *bucketMetacache) cleanup() {
})
// Keep first metacacheMaxEntries...
for _, cache := range remainCaches[metacacheMaxEntries:] {
if time.Since(cache.lastHandout) > time.Hour {
if time.Since(cache.lastHandout) > 30*time.Minute {
remove[cache.id] = struct{}{}
}
}
Expand Down Expand Up @@ -409,7 +409,6 @@ func (b *bucketMetacache) updateCacheEntry(update metacache) (metacache, error)
defer b.mu.Unlock()
existing, ok := b.caches[update.id]
if !ok {
logger.Info("updateCacheEntry: bucket %s list id %v not found", b.bucket, update.id)
return update, errFileNotFound
}
existing.update(update)
Expand Down Expand Up @@ -465,7 +464,7 @@ func (b *bucketMetacache) deleteAll() {
b.updated = true
if !b.transient {
// Delete all.
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(b.bucket, slashSeparator))
ez.renameAll(ctx, minioMetaBucket, metacachePrefixForID(b.bucket, slashSeparator))
b.caches = make(map[string]metacache, 10)
b.cachesRoot = make(map[string][]string, 10)
return
Expand All @@ -477,7 +476,7 @@ func (b *bucketMetacache) deleteAll() {
wg.Add(1)
go func(cache metacache) {
defer wg.Done()
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(cache.bucket, cache.id))
ez.renameAll(ctx, minioMetaBucket, metacachePrefixForID(cache.bucket, cache.id))
}(b.caches[id])
}
wg.Wait()
Expand Down
19 changes: 13 additions & 6 deletions cmd/metacache-entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,16 +330,23 @@ func (m *metaCacheEntriesSorted) fileInfoVersions(bucket, prefix, delimiter, aft
}

fiv, err := entry.fileInfoVersions(bucket)
if err != nil {
continue
}

fiVersions := fiv.Versions
if afterV != "" {
// Forward first entry to specified version
fiv.forwardPastVersion(afterV)
vidMarkerIdx := fiv.findVersionIndex(afterV)
if vidMarkerIdx >= 0 {
fiVersions = fiVersions[vidMarkerIdx+1:]
}
afterV = ""
}
if err == nil {
for _, version := range fiv.Versions {
versions = append(versions, version.ToObjectInfo(bucket, entry.name))
}

for _, version := range fiVersions {
versions = append(versions, version.ToObjectInfo(bucket, entry.name))
}

continue
}

Expand Down
5 changes: 2 additions & 3 deletions cmd/metacache-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ func (m *metacacheManager) initManager() {
}
m.mu.Unlock()
}
m.getTransient().deleteAll()
}()
}

Expand Down Expand Up @@ -124,11 +123,11 @@ func (m *metacacheManager) updateCacheEntry(update metacache) (metacache, error)
}

b, ok := m.buckets[update.bucket]
m.mu.RUnlock()
if ok {
m.mu.RUnlock()
return b.updateCacheEntry(update)
}
m.mu.RUnlock()

// We should have either a trashed bucket or this
return metacache{}, errVolumeNotFound
}
Expand Down
20 changes: 20 additions & 0 deletions cmd/metacache-server-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package cmd
import (
"context"
"errors"
"fmt"
"io"
"os"
"path"
"strings"
"sync"
Expand All @@ -28,6 +30,24 @@ import (
"github.com/minio/minio/cmd/logger"
)

func renameAllBucketMetacache(epPath string) error {
// Rename all previous `.minio.sys/buckets/<bucketname>/.metacache` to
// to `.minio.sys/tmp/` for deletion.
return readDirFilterFn(pathJoin(epPath, minioMetaBucket, bucketMetaPrefix), func(name string, typ os.FileMode) error {
if typ == os.ModeDir {
tmpMetacacheOld := pathJoin(epPath, minioMetaTmpBucket+"-old", mustGetUUID())
if err := renameAll(pathJoin(epPath, minioMetaBucket, metacachePrefixForID(name, slashSeparator)),
tmpMetacacheOld); err != nil && err != errFileNotFound {
return fmt.Errorf("unable to rename (%s -> %s) %w",
pathJoin(epPath, minioMetaBucket+metacachePrefixForID(minioMetaBucket, slashSeparator)),
tmpMetacacheOld,
osErrToFileErr(err))
}
}
return nil
})
}

// listPath will return the requested entries.
// If no more entries are in the listing io.EOF is returned,
// otherwise nil or an unexpected error is returned.
Expand Down
9 changes: 5 additions & 4 deletions cmd/metacache.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (m *metacache) matches(o *listPathOptions, extend time.Duration) bool {
}
if time.Since(m.lastUpdate) > metacacheMaxRunningAge+extend {
// Cache ended within bloom cycle, but we can extend the life.
o.debugf("cache %s ended (%v) and beyond extended life (%v)", m.id, m.lastUpdate, extend+metacacheMaxRunningAge)
o.debugf("cache %s ended (%v) and beyond extended life (%v)", m.id, m.lastUpdate, metacacheMaxRunningAge+extend)
return false
}
}
Expand Down Expand Up @@ -151,8 +151,8 @@ func (m *metacache) worthKeeping(currentCycle uint64) bool {
// Cycle is too old to be valuable.
return false
case cache.status == scanStateError || cache.status == scanStateNone:
// Remove failed listings after 10 minutes.
return time.Since(cache.lastUpdate) < 10*time.Minute
// Remove failed listings after 5 minutes.
return time.Since(cache.lastUpdate) < 5*time.Minute
}
return true
}
Expand All @@ -170,8 +170,9 @@ func (m *metacache) canBeReplacedBy(other *metacache) bool {
if m.status == scanStateStarted && time.Since(m.lastUpdate) < metacacheMaxRunningAge {
return false
}

// Keep it around a bit longer.
if time.Since(m.lastHandout) < time.Hour || time.Since(m.lastUpdate) < metacacheMaxRunningAge {
if time.Since(m.lastHandout) < 30*time.Minute || time.Since(m.lastUpdate) < metacacheMaxRunningAge {
return false
}

Expand Down
12 changes: 2 additions & 10 deletions cmd/prepare-storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,8 @@ func formatErasureCleanupTmpLocalEndpoints(endpoints Endpoints) error {
osErrToFileErr(err))
}

// Move .minio.sys/buckets/.minio.sys/metacache transient list cache
// folder to speed up startup routines.
tmpMetacacheOld := pathJoin(epPath, minioMetaTmpBucket+"-old", mustGetUUID())
if err := renameAll(pathJoin(epPath, minioMetaBucket, metacachePrefixForID(minioMetaBucket, "")),
tmpMetacacheOld); err != nil && err != errFileNotFound {
return fmt.Errorf("unable to rename (%s -> %s) %w",
pathJoin(epPath, minioMetaBucket+metacachePrefixForID(minioMetaBucket, "")),
tmpMetacacheOld,
osErrToFileErr(err))
}
// Renames and schedules for puring all bucket metacache.
renameAllBucketMetacache(epPath)

// Removal of tmp-old folder is backgrounded completely.
go removeAll(pathJoin(epPath, minioMetaTmpBucket+"-old"))
Expand Down
14 changes: 7 additions & 7 deletions cmd/storage-datatypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,18 @@ type FileInfoVersions struct {
Versions []FileInfo
}

// forwardPastVersion will truncate the result to only contain versions after 'v'.
// If v is empty or the version isn't found no changes will be made.
func (f *FileInfoVersions) forwardPastVersion(v string) {
if v == "" {
return
// findVersionIndex will return the version index where the version
// was found. Returns -1 if not found.
func (f *FileInfoVersions) findVersionIndex(v string) int {
if f == nil || v == "" {
return -1
}
for i, ver := range f.Versions {
if ver.VersionID == v {
f.Versions = f.Versions[i+1:]
return
return i
}
}
return -1
}

// FileInfo - represents file stat information.
Expand Down

0 comments on commit b3c56b5

Please sign in to comment.