Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove temporary files when multiple write operations conflict #76

Merged
merged 1 commit into from
Apr 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions flatfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (fs *Datastore) Put(key datastore.Key, value []byte) error {

var err error
for i := 1; i <= putMaxRetries; i++ {
err = fs.doWriteOp(&op{
_, err = fs.doWriteOp(&op{
typ: opPut,
key: key,
v: value,
Expand Down Expand Up @@ -451,16 +451,18 @@ func isTooManyFDError(err error) bool {
// we assume that the first succeeding operation
// on that key was the last one to happen after
// all successful others.
func (fs *Datastore) doWriteOp(oper *op) error {
//
// done is true if we actually performed the operation, false if we skipped or
// failed.
func (fs *Datastore) doWriteOp(oper *op) (done bool, err error) {
keyStr := oper.key.String()

opRes := fs.opMap.Begin(keyStr)
if opRes == nil { // nothing to do, a concurrent op succeeded
return nil
return false, nil
}

// Do the operation
var err error
for i := 0; i < 6; i++ {
err = fs.doOp(oper)

Expand All @@ -474,7 +476,7 @@ func (fs *Datastore) doWriteOp(oper *op) error {
// waiting on this result to succeed. Otherwise, they will
// retry.
opRes.Finish(err == nil)
return err
return err == nil, err
}

func (fs *Datastore) doPut(key datastore.Key, val []byte) error {
Expand Down Expand Up @@ -623,12 +625,13 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error {

// move files to their proper places
for fi, op := range files {
err := fs.doWriteOp(op)
done, err := fs.doWriteOp(op)
if err != nil {
return err
} else if done {
// signify removed
ops[fi] = 2
}
// signify removed
ops[fi] = 2
}

// now sync the dirs for those files
Expand Down Expand Up @@ -733,11 +736,12 @@ func (fs *Datastore) Delete(key datastore.Key) error {
return ErrClosed
}

return fs.doWriteOp(&op{
_, err := fs.doWriteOp(&op{
typ: opDelete,
key: key,
v: nil,
})
return err
}

// This function always runs within an opLock for the given
Expand Down
104 changes: 104 additions & 0 deletions flatfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,26 @@ import (
"github.com/ipfs/go-ds-flatfs"
)

func checkTemp(t *testing.T, dir string) {
tempDir, err := os.Open(filepath.Join(dir, ".temp"))
if err != nil {
t.Errorf("failed to open temp dir: %s", err)
return
}

names, err := tempDir.Readdirnames(-1)
tempDir.Close()

if err != nil {
t.Errorf("failed to read temp dir: %s", err)
return
}

for _, name := range names {
t.Errorf("found leftover temporary file: %s", name)
}
}

func tempdir(t testing.TB) (path string, cleanup func()) {
path, err := ioutil.TempDir("", "test-datastore-flatfs-")
if err != nil {
Expand All @@ -45,9 +65,73 @@ func tryAllShardFuncs(t *testing.T, testFunc func(mkShardFunc, *testing.T)) {

type mkShardFunc func(int) *flatfs.ShardIdV1

func testBatch(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
defer fs.Close()

batches := make([]datastore.Batch, 9)
for i := range batches {
batch, err := fs.Batch()
if err != nil {
t.Fatal(err)
}

batches[i] = batch

err = batch.Put(datastore.NewKey("QUUX"), []byte("foo"))
if err != nil {
t.Fatal(err)
}
err = batch.Put(datastore.NewKey(fmt.Sprintf("Q%dX", i)), []byte(fmt.Sprintf("bar%d", i)))
if err != nil {
t.Fatal(err)
}
}

var wg sync.WaitGroup
wg.Add(len(batches))
for _, batch := range batches {
batch := batch
go func() {
defer wg.Done()
err := batch.Commit()
if err != nil {
t.Error(err)
}
}()
}

check := func(key, expected string) {
actual, err := fs.Get(datastore.NewKey(key))
if err != nil {
t.Fatalf("get for key %s, error: %s", key, err)
}
if string(actual) != expected {
t.Fatalf("for key %s, expected %s, got %s", key, expected, string(actual))
}
}

wg.Wait()

check("QUUX", "foo")
for i := range batches {
check(fmt.Sprintf("Q%dX", i), fmt.Sprintf("bar%d", i))
}
}

func TestBatch(t *testing.T) { tryAllShardFuncs(t, testBatch) }

func testPut(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand All @@ -71,6 +155,7 @@ func TestPut(t *testing.T) { tryAllShardFuncs(t, testPut) }
func testGet(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand Down Expand Up @@ -103,6 +188,7 @@ func TestGet(t *testing.T) { tryAllShardFuncs(t, testGet) }
func testPutOverwrite(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand Down Expand Up @@ -138,6 +224,7 @@ func TestPutOverwrite(t *testing.T) { tryAllShardFuncs(t, testPutOverwrite) }
func testGetNotFoundError(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand All @@ -162,6 +249,7 @@ type params struct {
func testStorage(p *params, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

target := p.dir + string(os.PathSeparator) + p.key + ".data"
fs, err := flatfs.CreateOrOpen(temp, p.shard, false)
Expand Down Expand Up @@ -256,6 +344,7 @@ func TestStorage(t *testing.T) {
func testHasNotFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand All @@ -277,6 +366,7 @@ func TestHasNotFound(t *testing.T) { tryAllShardFuncs(t, testHasNotFound) }
func testHasFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand All @@ -303,6 +393,7 @@ func TestHasFound(t *testing.T) { tryAllShardFuncs(t, testHasFound) }
func testGetSizeFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand All @@ -321,6 +412,7 @@ func TestGetSizeFound(t *testing.T) { tryAllShardFuncs(t, testGetSizeFound) }
func testGetSizeNotFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand All @@ -347,6 +439,7 @@ func TestGetSizeNotFound(t *testing.T) { tryAllShardFuncs(t, testGetSizeNotFound
func testDeleteNotFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand All @@ -365,6 +458,7 @@ func TestDeleteNotFound(t *testing.T) { tryAllShardFuncs(t, testDeleteNotFound)
func testDeleteFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand Down Expand Up @@ -394,6 +488,7 @@ func TestDeleteFound(t *testing.T) { tryAllShardFuncs(t, testDeleteFound) }
func testQuerySimple(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand Down Expand Up @@ -434,6 +529,7 @@ func TestQuerySimple(t *testing.T) { tryAllShardFuncs(t, testQuerySimple) }
func testDiskUsage(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand Down Expand Up @@ -555,6 +651,7 @@ func TestDiskUsageDoubleCount(t *testing.T) {
func testDiskUsageDoubleCount(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand Down Expand Up @@ -628,6 +725,7 @@ func testDiskUsageDoubleCount(dirFunc mkShardFunc, t *testing.T) {
func testDiskUsageBatch(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand Down Expand Up @@ -728,6 +826,7 @@ func TestDiskUsageBatch(t *testing.T) { tryAllShardFuncs(t, testDiskUsageBatch)
func testDiskUsageEstimation(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand Down Expand Up @@ -812,6 +911,7 @@ func TestDiskUsageEstimation(t *testing.T) { tryAllShardFuncs(t, testDiskUsageEs
func testBatchPut(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand All @@ -827,6 +927,7 @@ func TestBatchPut(t *testing.T) { tryAllShardFuncs(t, testBatchPut) }
func testBatchDelete(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand All @@ -842,6 +943,7 @@ func TestBatchDelete(t *testing.T) { tryAllShardFuncs(t, testBatchDelete) }
func testClose(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
Expand Down Expand Up @@ -921,6 +1023,7 @@ func TestNonDatastoreDir(t *testing.T) {
func TestNoCluster(t *testing.T) {
tempdir, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, tempdir)

fs, err := flatfs.CreateOrOpen(tempdir, flatfs.NextToLast(1), false)
if err != nil {
Expand Down Expand Up @@ -1079,6 +1182,7 @@ func TestQueryLeak(t *testing.T) {
func TestSuite(t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)

fs, err := flatfs.CreateOrOpen(temp, flatfs.Prefix(2), false)
if err != nil {
Expand Down