Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: use objstorage to find obsolete objects #2347

Merged
merged 1 commit into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 43 additions & 20 deletions cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/stretchr/testify/require"
)

func TestArchiveCleaner(t *testing.T) {
func TestCleaner(t *testing.T) {
dbs := make(map[string]*DB)
defer func() {
for _, db := range dbs {
Expand All @@ -26,19 +26,14 @@ func TestArchiveCleaner(t *testing.T) {

mem := vfs.NewMem()
var memLog base.InMemLogger
opts := (&Options{
Cleaner: ArchiveCleaner{},
FS: vfs.WithLogging(mem, memLog.Infof),
WALDir: "wal",
}).WithFSDefaults()

fs := vfs.WithLogging(mem, memLog.Infof)
datadriven.RunTest(t, "testdata/cleaner", func(t *testing.T, td *datadriven.TestData) string {
memLog.Reset()
switch td.Cmd {
case "batch":
if len(td.CmdArgs) != 1 {
return "batch <db>"
}
memLog.Reset()
d := dbs[td.CmdArgs[0].String()]
b := d.NewBatch()
if err := runBatchDefineCmd(td, b); err != nil {
Expand All @@ -53,7 +48,6 @@ func TestArchiveCleaner(t *testing.T) {
if len(td.CmdArgs) != 1 {
return "compact <db>"
}
memLog.Reset()
d := dbs[td.CmdArgs[0].String()]
if err := d.Compact(nil, []byte("\xff"), false); err != nil {
return err.Error()
Expand All @@ -64,13 +58,24 @@ func TestArchiveCleaner(t *testing.T) {
if len(td.CmdArgs) != 1 {
return "flush <db>"
}
memLog.Reset()
d := dbs[td.CmdArgs[0].String()]
if err := d.Flush(); err != nil {
return err.Error()
}
return memLog.String()

case "close":
if len(td.CmdArgs) != 1 {
return "close <db>"
}
dbDir := td.CmdArgs[0].String()
d := dbs[dbDir]
if err := d.Close(); err != nil {
return err.Error()
}
delete(dbs, dbDir)
return memLog.String()

case "list":
if len(td.CmdArgs) != 1 {
return "list <dir>"
Expand All @@ -83,26 +88,44 @@ func TestArchiveCleaner(t *testing.T) {
return fmt.Sprintf("%s\n", strings.Join(paths, "\n"))

case "open":
if len(td.CmdArgs) != 1 && len(td.CmdArgs) != 2 {
return "open <dir> [readonly]"
if len(td.CmdArgs) < 1 || len(td.CmdArgs) > 3 {
return "open <dir> [archive] [readonly]"
}
opts.ReadOnly = false
if len(td.CmdArgs) == 2 {
if td.CmdArgs[1].String() != "readonly" {
return "open <dir> [readonly]"
dir := td.CmdArgs[0].String()
opts := (&Options{
FS: fs,
WALDir: dir + "_wal",
}).WithFSDefaults()

for i := 1; i < len(td.CmdArgs); i++ {
switch td.CmdArgs[i].String() {
case "readonly":
opts.ReadOnly = true
case "archive":
opts.Cleaner = ArchiveCleaner{}
default:
return "open <dir> [archive] [readonly]"
}
opts.ReadOnly = true
}

memLog.Reset()
dir := td.CmdArgs[0].String()
d, err := Open(dir, opts)
if err != nil {
return err.Error()
}
dbs[dir] = d
return memLog.String()

case "create-bogus-file":
if len(td.CmdArgs) != 1 {
return "create-bogus-file <db/file>"
}
dst, err := fs.Create(td.CmdArgs[0].String())
require.NoError(t, err)
_, err = dst.Write([]byte("bogus data"))
require.NoError(t, err)
require.NoError(t, dst.Sync())
require.NoError(t, dst.Close())
return memLog.String()

default:
return fmt.Sprintf("unknown command: %s", td.Cmd)
}
Expand Down
23 changes: 16 additions & 7 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -3268,20 +3268,29 @@ func (d *DB) scanObsoleteFiles(list []string) {
}
obsoleteOptions = append(obsoleteOptions, fi)
case fileTypeTable:
// TODO(radu): use the objstorage provider to find obsolete tables instead.
if _, ok := liveFileNums[fileNum]; ok {
// Objects are handled through the objstorage provider below.
default:
// Don't delete files we don't know about.
}
}

objects := d.objProvider.List()
for _, obj := range objects {
switch obj.FileType {
case fileTypeTable:
if _, ok := liveFileNums[obj.FileNum]; ok {
continue
}
fileMeta := &fileMetadata{
FileNum: fileNum,
FileNum: obj.FileNum,
}
if stat, err := d.opts.FS.Stat(filename); err == nil {
fileMeta.Size = uint64(stat.Size())
if size, err := d.objProvider.Size(obj); err == nil {
fileMeta.Size = uint64(size)
}
obsoleteTables = append(obsoleteTables, fileMeta)

default:
// Don't delete files we don't know about.
continue
// Ignore object types we don't know about.
}
}

Expand Down
41 changes: 32 additions & 9 deletions objstorage/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package objstorage
import (
"io"
"os"
"sort"
"sync"

"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -306,15 +307,6 @@ func (p *Provider) LinkOrCopyFromLocal(
panic("unimplemented")
}

// Path returns an internal, implementation-dependent path for the object. It is
// meant to be used for informational purposes (like logging).
func (p *Provider) Path(meta ObjectMetadata) string {
if !meta.IsShared() {
return p.vfsPath(meta.FileType, meta.FileNum)
}
panic("unimplemented")
}

// Lookup returns the metadata of an object that is already known to the Provider.
// Does not perform any I/O.
func (p *Provider) Lookup(fileType base.FileType, fileNum base.FileNum) (ObjectMetadata, error) {
Expand All @@ -337,6 +329,37 @@ func (p *Provider) Lookup(fileType base.FileType, fileNum base.FileNum) (ObjectM
return meta, nil
}

// Path returns an internal, implementation-dependent path for the object. It is
// meant to be used for informational purposes (like logging).
func (p *Provider) Path(meta ObjectMetadata) string {
if !meta.IsShared() {
return p.vfsPath(meta.FileType, meta.FileNum)
}
panic("unimplemented")
}

// Size returns the size of the object.
func (p *Provider) Size(meta ObjectMetadata) (int64, error) {
if !meta.IsShared() {
return p.vfsSize(meta.FileType, meta.FileNum)
}
panic("unimplemented")
}

// List returns the objects currently known to the provider. Does not perform any I/O.
func (p *Provider) List() []ObjectMetadata {
p.mu.RLock()
defer p.mu.RUnlock()
res := make([]ObjectMetadata, len(p.mu.knownObjects))
for _, meta := range p.mu.knownObjects {
res = append(res, meta)
}
sort.Slice(res, func(i, j int) bool {
return res[i].FileNum < res[j].FileNum
})
return res
}

func (p *Provider) addMetadata(meta ObjectMetadata) {
p.mu.Lock()
defer p.mu.Unlock()
Expand Down
9 changes: 9 additions & 0 deletions objstorage/vfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,12 @@ func (p *Provider) vfsFindExisting(ls []string) []ObjectMetadata {
}
return res
}

func (p *Provider) vfsSize(fileType base.FileType, fileNum base.FileNum) (int64, error) {
filename := p.vfsPath(fileType, fileNum)
stat, err := p.st.FS.Stat(filename)
if err != nil {
return 0, err
}
return stat.Size(), nil
}
Loading