diff --git a/flatfs.go b/flatfs.go index efaffa0..6441911 100644 --- a/flatfs.go +++ b/flatfs.go @@ -394,7 +394,7 @@ func (fs *Datastore) Put(key datastore.Key, value []byte) error { var err error for i := 1; i <= putMaxRetries; i++ { - err = fs.doWriteOp(&op{ + _, err = fs.doWriteOp(&op{ typ: opPut, key: key, v: value, @@ -451,16 +451,18 @@ func isTooManyFDError(err error) bool { // we assume that the first succeeding operation // on that key was the last one to happen after // all successful others. -func (fs *Datastore) doWriteOp(oper *op) error { +// +// done is true if we actually performed the operation, false if we skipped or +// failed. +func (fs *Datastore) doWriteOp(oper *op) (done bool, err error) { keyStr := oper.key.String() opRes := fs.opMap.Begin(keyStr) if opRes == nil { // nothing to do, a concurrent op succeeded - return nil + return false, nil } // Do the operation - var err error for i := 0; i < 6; i++ { err = fs.doOp(oper) @@ -474,7 +476,7 @@ func (fs *Datastore) doWriteOp(oper *op) error { // waiting on this result to succeed. Otherwise, they will // retry. opRes.Finish(err == nil) - return err + return err == nil, err } func (fs *Datastore) doPut(key datastore.Key, val []byte) error { @@ -623,12 +625,13 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error { // move files to their proper places for fi, op := range files { - err := fs.doWriteOp(op) + done, err := fs.doWriteOp(op) if err != nil { return err + } else if done { + // signify removed + ops[fi] = 2 } - // signify removed - ops[fi] = 2 } // now sync the dirs for those files @@ -733,11 +736,12 @@ func (fs *Datastore) Delete(key datastore.Key) error { return ErrClosed } - return fs.doWriteOp(&op{ + _, err := fs.doWriteOp(&op{ typ: opDelete, key: key, v: nil, }) + return err } // This function always runs within an opLock for the given diff --git a/flatfs_test.go b/flatfs_test.go index 9d490f6..9ede51a 100644 --- a/flatfs_test.go +++ b/flatfs_test.go @@ -23,6 +23,26 @@ import ( "github.com/ipfs/go-ds-flatfs" ) +func checkTemp(t *testing.T, dir string) { + tempDir, err := os.Open(filepath.Join(dir, ".temp")) + if err != nil { + t.Errorf("failed to open temp dir: %s", err) + return + } + + names, err := tempDir.Readdirnames(-1) + tempDir.Close() + + if err != nil { + t.Errorf("failed to read temp dir: %s", err) + return + } + + for _, name := range names { + t.Errorf("found leftover temporary file: %s", name) + } +} + func tempdir(t testing.TB) (path string, cleanup func()) { path, err := ioutil.TempDir("", "test-datastore-flatfs-") if err != nil { @@ -45,9 +65,73 @@ func tryAllShardFuncs(t *testing.T, testFunc func(mkShardFunc, *testing.T)) { type mkShardFunc func(int) *flatfs.ShardIdV1 +func testBatch(dirFunc mkShardFunc, t *testing.T) { + temp, cleanup := tempdir(t) + defer cleanup() + defer checkTemp(t, temp) + + fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) + if err != nil { + t.Fatalf("New fail: %v\n", err) + } + defer fs.Close() + + batches := make([]datastore.Batch, 9) + for i := range batches { + batch, err := fs.Batch() + if err != nil { + t.Fatal(err) + } + + batches[i] = batch + + err = batch.Put(datastore.NewKey("QUUX"), []byte("foo")) + if err != nil { + t.Fatal(err) + } + err = batch.Put(datastore.NewKey(fmt.Sprintf("Q%dX", i)), []byte(fmt.Sprintf("bar%d", i))) + if err != nil { + t.Fatal(err) + } + } + + var wg sync.WaitGroup + wg.Add(len(batches)) + for _, batch := range batches { + batch := batch + go func() { + defer wg.Done() + err := batch.Commit() + if err != nil { + t.Error(err) + } + }() + } + + check := func(key, expected string) { + actual, err := fs.Get(datastore.NewKey(key)) + if err != nil { + t.Fatalf("get for key %s, error: %s", key, err) + } + if string(actual) != expected { + t.Fatalf("for key %s, expected %s, got %s", key, expected, string(actual)) + } + } + + wg.Wait() + + check("QUUX", "foo") + for i := range batches { + check(fmt.Sprintf("Q%dX", i), fmt.Sprintf("bar%d", i)) + } +} + +func TestBatch(t *testing.T) { tryAllShardFuncs(t, testBatch) } + func testPut(dirFunc mkShardFunc, t *testing.T) { temp, cleanup := tempdir(t) defer cleanup() + defer checkTemp(t, temp) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) if err != nil { @@ -71,6 +155,7 @@ func TestPut(t *testing.T) { tryAllShardFuncs(t, testPut) } func testGet(dirFunc mkShardFunc, t *testing.T) { temp, cleanup := tempdir(t) defer cleanup() + defer checkTemp(t, temp) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) if err != nil { @@ -103,6 +188,7 @@ func TestGet(t *testing.T) { tryAllShardFuncs(t, testGet) } func testPutOverwrite(dirFunc mkShardFunc, t *testing.T) { temp, cleanup := tempdir(t) defer cleanup() + defer checkTemp(t, temp) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) if err != nil { @@ -138,6 +224,7 @@ func TestPutOverwrite(t *testing.T) { tryAllShardFuncs(t, testPutOverwrite) } func testGetNotFoundError(dirFunc mkShardFunc, t *testing.T) { temp, cleanup := tempdir(t) defer cleanup() + defer checkTemp(t, temp) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) if err != nil { @@ -162,6 +249,7 @@ type params struct { func testStorage(p *params, t *testing.T) { temp, cleanup := tempdir(t) defer cleanup() + defer checkTemp(t, temp) target := p.dir + string(os.PathSeparator) + p.key + ".data" fs, err := flatfs.CreateOrOpen(temp, p.shard, false) @@ -256,6 +344,7 @@ func TestStorage(t *testing.T) { func testHasNotFound(dirFunc mkShardFunc, t *testing.T) { temp, cleanup := tempdir(t) defer cleanup() + defer checkTemp(t, temp) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) if err != nil { @@ -277,6 +366,7 @@ func TestHasNotFound(t *testing.T) { tryAllShardFuncs(t, testHasNotFound) } func testHasFound(dirFunc mkShardFunc, t *testing.T) { temp, cleanup := tempdir(t) defer cleanup() + defer checkTemp(t, temp) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) if err != nil { @@ -303,6 +393,7 @@ func TestHasFound(t *testing.T) { tryAllShardFuncs(t, testHasFound) } func testGetSizeFound(dirFunc mkShardFunc, t *testing.T) { temp, cleanup := tempdir(t) defer cleanup() + defer checkTemp(t, temp) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) if err != nil { @@ -321,6 +412,7 @@ func TestGetSizeFound(t *testing.T) { tryAllShardFuncs(t, testGetSizeFound) } func testGetSizeNotFound(dirFunc mkShardFunc, t *testing.T) { temp, cleanup := tempdir(t) defer cleanup() + defer checkTemp(t, temp) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) if err != nil { @@ -347,6 +439,7 @@ func TestGetSizeNotFound(t *testing.T) { tryAllShardFuncs(t, testGetSizeNotFound func testDeleteNotFound(dirFunc mkShardFunc, t *testing.T) { temp, cleanup := tempdir(t) defer cleanup() + defer checkTemp(t, temp) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) if err != nil { @@ -365,6 +458,7 @@ func TestDeleteNotFound(t *testing.T) { tryAllShardFuncs(t, testDeleteNotFound) func testDeleteFound(dirFunc mkShardFunc, t *testing.T) { temp, cleanup := tempdir(t) defer cleanup() + defer checkTemp(t, temp) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) if err != nil { @@ -394,6 +488,7 @@ func TestDeleteFound(t *testing.T) { tryAllShardFuncs(t, testDeleteFound) } func testQuerySimple(dirFunc mkShardFunc, t *testing.T) { temp, cleanup := tempdir(t) defer cleanup() + defer checkTemp(t, temp) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) if err != nil { @@ -434,6 +529,7 @@ func TestQuerySimple(t *testing.T) { tryAllShardFuncs(t, testQuerySimple) } func testDiskUsage(dirFunc mkShardFunc, t *testing.T) { temp, cleanup := tempdir(t) defer cleanup() + defer checkTemp(t, temp) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) if err != nil { @@ -555,6 +651,7 @@ func TestDiskUsageDoubleCount(t *testing.T) { func testDiskUsageDoubleCount(dirFunc mkShardFunc, t *testing.T) { temp, cleanup := tempdir(t) defer cleanup() + defer checkTemp(t, temp) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) if err != nil { @@ -628,6 +725,7 @@ func testDiskUsageDoubleCount(dirFunc mkShardFunc, t *testing.T) { func testDiskUsageBatch(dirFunc mkShardFunc, t *testing.T) { temp, cleanup := tempdir(t) defer cleanup() + defer checkTemp(t, temp) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) if err != nil { @@ -728,6 +826,7 @@ func TestDiskUsageBatch(t *testing.T) { tryAllShardFuncs(t, testDiskUsageBatch) func testDiskUsageEstimation(dirFunc mkShardFunc, t *testing.T) { temp, cleanup := tempdir(t) defer cleanup() + defer checkTemp(t, temp) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) if err != nil { @@ -812,6 +911,7 @@ func TestDiskUsageEstimation(t *testing.T) { tryAllShardFuncs(t, testDiskUsageEs func testBatchPut(dirFunc mkShardFunc, t *testing.T) { temp, cleanup := tempdir(t) defer cleanup() + defer checkTemp(t, temp) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) if err != nil { @@ -827,6 +927,7 @@ func TestBatchPut(t *testing.T) { tryAllShardFuncs(t, testBatchPut) } func testBatchDelete(dirFunc mkShardFunc, t *testing.T) { temp, cleanup := tempdir(t) defer cleanup() + defer checkTemp(t, temp) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) if err != nil { @@ -842,6 +943,7 @@ func TestBatchDelete(t *testing.T) { tryAllShardFuncs(t, testBatchDelete) } func testClose(dirFunc mkShardFunc, t *testing.T) { temp, cleanup := tempdir(t) defer cleanup() + defer checkTemp(t, temp) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) if err != nil { @@ -921,6 +1023,7 @@ func TestNonDatastoreDir(t *testing.T) { func TestNoCluster(t *testing.T) { tempdir, cleanup := tempdir(t) defer cleanup() + defer checkTemp(t, tempdir) fs, err := flatfs.CreateOrOpen(tempdir, flatfs.NextToLast(1), false) if err != nil { @@ -1079,6 +1182,7 @@ func TestQueryLeak(t *testing.T) { func TestSuite(t *testing.T) { temp, cleanup := tempdir(t) defer cleanup() + defer checkTemp(t, temp) fs, err := flatfs.CreateOrOpen(temp, flatfs.Prefix(2), false) if err != nil {