Skip to content

Commit

Permalink
db: use objstorage to find obsolete objects
Browse files Browse the repository at this point in the history
Ignore SST files in the listing and instead get the list of objects
from the objstorage provider.

Extend the cleaner test to check for removal of extra sst files.
  • Loading branch information
RaduBerinde committed Feb 17, 2023
1 parent b2e4607 commit 1a69ef2
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 57 deletions.
63 changes: 43 additions & 20 deletions cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/stretchr/testify/require"
)

func TestArchiveCleaner(t *testing.T) {
func TestCleaner(t *testing.T) {
dbs := make(map[string]*DB)
defer func() {
for _, db := range dbs {
Expand All @@ -26,19 +26,14 @@ func TestArchiveCleaner(t *testing.T) {

mem := vfs.NewMem()
var memLog base.InMemLogger
opts := (&Options{
Cleaner: ArchiveCleaner{},
FS: vfs.WithLogging(mem, memLog.Infof),
WALDir: "wal",
}).WithFSDefaults()

fs := vfs.WithLogging(mem, memLog.Infof)
datadriven.RunTest(t, "testdata/cleaner", func(t *testing.T, td *datadriven.TestData) string {
memLog.Reset()
switch td.Cmd {
case "batch":
if len(td.CmdArgs) != 1 {
return "batch <db>"
}
memLog.Reset()
d := dbs[td.CmdArgs[0].String()]
b := d.NewBatch()
if err := runBatchDefineCmd(td, b); err != nil {
Expand All @@ -53,7 +48,6 @@ func TestArchiveCleaner(t *testing.T) {
if len(td.CmdArgs) != 1 {
return "compact <db>"
}
memLog.Reset()
d := dbs[td.CmdArgs[0].String()]
if err := d.Compact(nil, []byte("\xff"), false); err != nil {
return err.Error()
Expand All @@ -64,13 +58,24 @@ func TestArchiveCleaner(t *testing.T) {
if len(td.CmdArgs) != 1 {
return "flush <db>"
}
memLog.Reset()
d := dbs[td.CmdArgs[0].String()]
if err := d.Flush(); err != nil {
return err.Error()
}
return memLog.String()

case "close":
if len(td.CmdArgs) != 1 {
return "close <db>"
}
dbDir := td.CmdArgs[0].String()
d := dbs[dbDir]
if err := d.Close(); err != nil {
return err.Error()
}
delete(dbs, dbDir)
return memLog.String()

case "list":
if len(td.CmdArgs) != 1 {
return "list <dir>"
Expand All @@ -83,26 +88,44 @@ func TestArchiveCleaner(t *testing.T) {
return fmt.Sprintf("%s\n", strings.Join(paths, "\n"))

case "open":
if len(td.CmdArgs) != 1 && len(td.CmdArgs) != 2 {
return "open <dir> [readonly]"
if len(td.CmdArgs) < 1 || len(td.CmdArgs) > 3 {
return "open <dir> [archive] [readonly]"
}
opts.ReadOnly = false
if len(td.CmdArgs) == 2 {
if td.CmdArgs[1].String() != "readonly" {
return "open <dir> [readonly]"
dir := td.CmdArgs[0].String()
opts := (&Options{
FS: fs,
WALDir: dir + "_wal",
}).WithFSDefaults()

for i := 1; i < len(td.CmdArgs); i++ {
switch td.CmdArgs[i].String() {
case "readonly":
opts.ReadOnly = true
case "archive":
opts.Cleaner = ArchiveCleaner{}
default:
return "open <dir> [archive] [readonly]"
}
opts.ReadOnly = true
}

memLog.Reset()
dir := td.CmdArgs[0].String()
d, err := Open(dir, opts)
if err != nil {
return err.Error()
}
dbs[dir] = d
return memLog.String()

case "create-bogus-file":
if len(td.CmdArgs) != 1 {
return "create-bogus-file <db/file>"
}
dst, err := fs.Create(td.CmdArgs[0].String())
require.NoError(t, err)
_, err = dst.Write([]byte("bogus data"))
require.NoError(t, err)
require.NoError(t, dst.Sync())
require.NoError(t, dst.Close())
return memLog.String()

default:
return fmt.Sprintf("unknown command: %s", td.Cmd)
}
Expand Down
23 changes: 16 additions & 7 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -3268,20 +3268,29 @@ func (d *DB) scanObsoleteFiles(list []string) {
}
obsoleteOptions = append(obsoleteOptions, fi)
case fileTypeTable:
// TODO(radu): use the objstorage provider to find obsolete tables instead.
if _, ok := liveFileNums[fileNum]; ok {
// Objects are handled through the objstorage provider below.
default:
// Don't delete files we don't know about.
}
}

objects := d.objProvider.List()
for _, obj := range objects {
switch obj.FileType {
case fileTypeTable:
if _, ok := liveFileNums[obj.FileNum]; ok {
continue
}
fileMeta := &fileMetadata{
FileNum: fileNum,
FileNum: obj.FileNum,
}
if stat, err := d.opts.FS.Stat(filename); err == nil {
fileMeta.Size = uint64(stat.Size())
if size, err := d.objProvider.Size(obj); err == nil {
fileMeta.Size = uint64(size)
}
obsoleteTables = append(obsoleteTables, fileMeta)

default:
// Don't delete files we don't know about.
continue
// Ignore object types we don't know about.
}
}

Expand Down
41 changes: 32 additions & 9 deletions objstorage/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package objstorage
import (
"io"
"os"
"sort"
"sync"

"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -306,15 +307,6 @@ func (p *Provider) LinkOrCopyFromLocal(
panic("unimplemented")
}

// Path returns an internal, implementation-dependent path for the object. It is
// meant to be used for informational purposes (like logging).
func (p *Provider) Path(meta ObjectMetadata) string {
if !meta.IsShared() {
return p.vfsPath(meta.FileType, meta.FileNum)
}
panic("unimplemented")
}

// Lookup returns the metadata of an object that is already known to the Provider.
// Does not perform any I/O.
func (p *Provider) Lookup(fileType base.FileType, fileNum base.FileNum) (ObjectMetadata, error) {
Expand All @@ -337,6 +329,37 @@ func (p *Provider) Lookup(fileType base.FileType, fileNum base.FileNum) (ObjectM
return meta, nil
}

// Path returns an internal, implementation-dependent path for the object. It is
// meant to be used for informational purposes (like logging).
func (p *Provider) Path(meta ObjectMetadata) string {
if !meta.IsShared() {
return p.vfsPath(meta.FileType, meta.FileNum)
}
panic("unimplemented")
}

// Size returns the size of the object.
func (p *Provider) Size(meta ObjectMetadata) (int64, error) {
if !meta.IsShared() {
return p.vfsSize(meta.FileType, meta.FileNum)
}
panic("unimplemented")
}

// List returns the objects currently known to the provider. Does not perform any I/O.
func (p *Provider) List() []ObjectMetadata {
p.mu.RLock()
defer p.mu.RUnlock()
res := make([]ObjectMetadata, len(p.mu.knownObjects))
for _, meta := range p.mu.knownObjects {
res = append(res, meta)
}
sort.Slice(res, func(i, j int) bool {
return res[i].FileNum < res[j].FileNum
})
return res
}

func (p *Provider) addMetadata(meta ObjectMetadata) {
p.mu.Lock()
defer p.mu.Unlock()
Expand Down
9 changes: 9 additions & 0 deletions objstorage/vfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,12 @@ func (p *Provider) vfsFindExisting(ls []string) []ObjectMetadata {
}
return res
}

func (p *Provider) vfsSize(fileType base.FileType, fileNum base.FileNum) (int64, error) {
filename := p.vfsPath(fileType, fileNum)
stat, err := p.st.FS.Stat(filename)
if err != nil {
return 0, err
}
return stat.Size(), nil
}
Loading

0 comments on commit 1a69ef2

Please sign in to comment.