Skip to content

Commit

Permalink
objstorage: keep track of objects in the Provider
Browse files Browse the repository at this point in the history
This change modifies the objstorage API and restructures some of its
code to allow for adding support for shared objects.

The main change is that the Provider maintains its own list of known
objects. We populate the initial list from the local FS listing; for
shared objects the information will come from the shared object
catalog.

This change includes some cleanup in db.Open. We now open the
directories first and move more code up before the `Db` object
initialization (after which cleanup becomes more complicated).
  • Loading branch information
RaduBerinde committed Feb 13, 2023
1 parent 0568b5f commit 7a29c5b
Show file tree
Hide file tree
Showing 16 changed files with 430 additions and 198 deletions.
22 changes: 16 additions & 6 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2575,7 +2575,7 @@ func (d *DB) runCompaction(
pendingOutputs = append(pendingOutputs, fileMeta)
d.mu.Unlock()

writable, err := d.objProvider.Create(fileTypeTable, fileNum)
writable, objMeta, err := d.objProvider.Create(fileTypeTable, fileNum)
if err != nil {
return err
}
Expand All @@ -2587,7 +2587,7 @@ func (d *DB) runCompaction(
d.opts.EventListener.TableCreated(TableCreateInfo{
JobID: jobID,
Reason: reason,
Path: d.objProvider.Path(fileTypeTable, fileNum),
Path: d.objProvider.Path(objMeta),
FileNum: fileNum,
})
writable = &compactionWritable{
Expand Down Expand Up @@ -3113,6 +3113,7 @@ func (d *DB) scanObsoleteFiles(list []string) {
}
obsoleteOptions = append(obsoleteOptions, fi)
case fileTypeTable:
// TODO(radu): use the objstorage provider to find obsolete tables instead.
if _, ok := liveFileNums[fileNum]; ok {
continue
}
Expand Down Expand Up @@ -3381,10 +3382,19 @@ func (d *DB) deleteObsoleteObject(fileType fileType, jobID int, fileNum FileNum)
panic("not an object")
}

path := d.objProvider.Path(fileType, fileNum)
err := d.objProvider.Remove(fileType, fileNum)
if objstorage.IsNotExistError(err) {
return
var path string
meta, err := d.objProvider.Lookup(fileType, fileNum)
if err != nil {
// The provider is not aware of this object. This shouldn't normally happen,
// so expose the error.
path = "<nil>"
} else {
path = d.objProvider.Path(meta)

err = d.objProvider.Remove(fileType, fileNum)
if objstorage.IsNotExistError(err) {
return
}
}

switch fileType {
Expand Down
5 changes: 4 additions & 1 deletion compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,10 @@ func TestCompaction(t *testing.T) {
}
ss := []string(nil)
v := d.mu.versions.currentVersion()
provider := objstorage.New(objstorage.DefaultSettings(mem, "" /* dirName */))
provider, err := objstorage.Open(objstorage.DefaultSettings(mem, "" /* dirName */))
if err != nil {
t.Fatalf("%v", err)
}
for _, levelMetadata := range v.Levels {
iter := levelMetadata.Iter()
for meta := iter.First(); meta != nil; meta = iter.Next() {
Expand Down
4 changes: 2 additions & 2 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func ingestLink(
jobID int, opts *Options, objProvider *objstorage.Provider, paths []string, meta []*fileMetadata,
) error {
for i := range paths {
err := objProvider.LinkOrCopyFromLocal(opts.FS, paths[i], fileTypeTable, meta[i].FileNum)
objMeta, err := objProvider.LinkOrCopyFromLocal(opts.FS, paths[i], fileTypeTable, meta[i].FileNum)
if err != nil {
if err2 := ingestCleanup(objProvider, meta[:i]); err2 != nil {
opts.Logger.Infof("ingest cleanup failed: %v", err2)
Expand All @@ -280,7 +280,7 @@ func ingestLink(
opts.EventListener.TableCreated(TableCreateInfo{
JobID: jobID,
Reason: "ingesting",
Path: objProvider.Path(fileTypeTable, meta[i].FileNum),
Path: objProvider.Path(objMeta),
FileNum: meta[i].FileNum,
})
}
Expand Down
41 changes: 24 additions & 17 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ func TestIngestLink(t *testing.T) {
opts := &Options{FS: vfs.NewMem()}
opts.EnsureDefaults().WithFSDefaults()
require.NoError(t, opts.FS.MkdirAll(dir, 0755))
objProvider := objstorage.New(objstorage.DefaultSettings(opts.FS, dir))
objProvider, err := objstorage.Open(objstorage.DefaultSettings(opts.FS, dir))
require.NoError(t, err)

paths := make([]string, 10)
meta := make([]*fileMetadata, len(paths))
Expand All @@ -314,7 +315,7 @@ func TestIngestLink(t *testing.T) {
opts.FS.Remove(paths[i])
}

err := ingestLink(0 /* jobID */, opts, objProvider, paths, meta)
err = ingestLink(0 /* jobID */, opts, objProvider, paths, meta)
if i < count {
if err == nil {
t.Fatalf("expected error, but found success")
Expand Down Expand Up @@ -372,7 +373,11 @@ func TestIngestLinkFallback(t *testing.T) {

opts := &Options{FS: errorfs.Wrap(mem, errorfs.OnIndex(0))}
opts.EnsureDefaults().WithFSDefaults()
objProvider := objstorage.New(objstorage.DefaultSettings(opts.FS, ""))
objSettings := objstorage.DefaultSettings(opts.FS, "")
// Prevent the provider from listing the dir (where we may get an injected error).
objSettings.FSDirInitialListing = []string{}
objProvider, err := objstorage.Open(objSettings)
require.NoError(t, err)

meta := []*fileMetadata{{FileNum: 1}}
err = ingestLink(0, opts, objProvider, []string{"source"}, meta)
Expand Down Expand Up @@ -1637,7 +1642,7 @@ func TestIngestCleanup(t *testing.T) {
testCases := []struct {
closeFiles []base.FileNum
cleanupFiles []base.FileNum
wantErr error
wantErr string
}{
// Close and remove all files.
{
Expand All @@ -1648,44 +1653,45 @@ func TestIngestCleanup(t *testing.T) {
{
closeFiles: fns,
cleanupFiles: []base.FileNum{3},
wantErr: oserror.ErrNotExist,
wantErr: "unknown to the provider",
},
// Remove a file that has not been closed.
{
closeFiles: []base.FileNum{0, 2},
cleanupFiles: fns,
wantErr: oserror.ErrInvalid,
wantErr: oserror.ErrInvalid.Error(),
},
// Remove all files, one of which is still open, plus a file that does not exist.
{
closeFiles: []base.FileNum{0, 2},
cleanupFiles: []base.FileNum{0, 1, 2, 3},
wantErr: oserror.ErrInvalid, // The first error encountered is due to the open file.
wantErr: oserror.ErrInvalid.Error(), // The first error encountered is due to the open file.
},
}

for _, tc := range testCases {
t.Run("", func(t *testing.T) {
mem := vfs.NewMem()
mem.UseWindowsSemantics(true)
objProvider := objstorage.New(objstorage.DefaultSettings(mem, ""))
objProvider, err := objstorage.Open(objstorage.DefaultSettings(mem, ""))
require.NoError(t, err)

// Create the files in the VFS.
metaMap := make(map[base.FileNum]vfs.File)
metaMap := make(map[base.FileNum]objstorage.Writable)
for _, fn := range fns {
path := base.MakeFilepath(mem, "", base.FileTypeTable, fn)
f, err := mem.Create(path)
metaMap[fn] = f
w, _, err := objProvider.Create(base.FileTypeTable, fn)
require.NoError(t, err)

metaMap[fn] = w
}

// Close a select number of files.
for _, m := range tc.closeFiles {
f, ok := metaMap[m]
w, ok := metaMap[m]
if !ok {
continue
}
require.NoError(t, f.Close())
require.NoError(t, w.Close())
}

// Cleanup the set of files in the FS.
Expand All @@ -1694,9 +1700,10 @@ func TestIngestCleanup(t *testing.T) {
toRemove = append(toRemove, &fileMetadata{FileNum: fn})
}

err := ingestCleanup(objProvider, toRemove)
if tc.wantErr != nil {
require.Equal(t, tc.wantErr, err)
err = ingestCleanup(objProvider, toRemove)
if tc.wantErr != "" {
require.Error(t, err, "got no error, expected %s", tc.wantErr)
require.Contains(t, err.Error(), tc.wantErr)
} else {
require.NoError(t, err)
}
Expand Down
Loading

0 comments on commit 7a29c5b

Please sign in to comment.