-
Notifications
You must be signed in to change notification settings - Fork 344
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor: sharky free slot handling and recovery support (#2708)
- Loading branch information
Showing
7 changed files
with
822 additions
and
429 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
// Copyright 2021 The Swarm Authors. All rights reserved. | ||
// Use of this source code is governed by a BSD-style | ||
// license that can be found in the LICENSE file. | ||
|
||
package sharky | ||
|
||
import ( | ||
"fmt" | ||
"os" | ||
"path" | ||
|
||
"github.com/hashicorp/go-multierror" | ||
) | ||
|
||
// Recovery allows disaster recovery | ||
type Recovery struct { | ||
shards []*slots | ||
} | ||
|
||
func NewRecovery(dir string, shardCnt int, shardSize uint32, datasize int) (*Recovery, error) { | ||
shards := make([]*slots, shardCnt) | ||
for i := 0; i < shardCnt; i++ { | ||
file, err := os.OpenFile(path.Join(dir, fmt.Sprintf("shard_%03d", i)), os.O_RDWR|os.O_CREATE, 0666) | ||
if err != nil { | ||
return nil, err | ||
} | ||
fi, err := file.Stat() | ||
if err != nil { | ||
return nil, err | ||
} | ||
if err = file.Close(); err != nil { | ||
return nil, err | ||
} | ||
size := uint32(fi.Size() / int64(datasize)) | ||
ffile, err := os.OpenFile(path.Join(dir, fmt.Sprintf("free_%03d", i)), os.O_RDWR|os.O_CREATE, 0666) | ||
if err != nil { | ||
return nil, err | ||
} | ||
sl := newSlots(ffile, shardSize, nil) | ||
sl.data = make([]byte, size/8) | ||
shards[i] = sl | ||
} | ||
return &Recovery{shards}, nil | ||
} | ||
|
||
// Add marks a location as used (not free) | ||
func (r *Recovery) Add(loc Location) error { | ||
sh := r.shards[loc.Shard] | ||
l := len(sh.data) | ||
if diff := int(loc.Slot/8) - l; diff >= 0 { | ||
sh.extend(diff + 1) | ||
for i := 0; i <= diff; i++ { | ||
sh.data[l+i] = 0x0 | ||
} | ||
} | ||
sh.push(loc.Slot) | ||
return nil | ||
} | ||
|
||
// Save saves all free slots files of the recovery (without closing) | ||
func (r *Recovery) Save() (err error) { | ||
for _, sh := range r.shards { | ||
for i := range sh.data { | ||
sh.data[i] ^= 0xff | ||
} | ||
err = multierror.Append(err, sh.save()) | ||
} | ||
return err.(*multierror.Error).ErrorOrNil() | ||
} | ||
|
||
// Close closes data and free slots files of the recovery (without saving) | ||
func (r *Recovery) Close() (err error) { | ||
for _, sh := range r.shards { | ||
err = multierror.Append(err, sh.file.Close()) | ||
} | ||
return err.(*multierror.Error).ErrorOrNil() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
// Copyright 2021 The Swarm Authors. All rights reserved. | ||
// Use of this source code is governed by a BSD-style | ||
// license that can be found in the LICENSE file. | ||
|
||
package sharky_test | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"encoding/binary" | ||
"errors" | ||
"math/rand" | ||
"testing" | ||
"time" | ||
|
||
"github.com/ethersphere/bee/pkg/sharky" | ||
) | ||
|
||
func TestRecovery(t *testing.T) { | ||
datasize := 4 | ||
shards := 8 | ||
shardSize := uint32(16) | ||
limit := shards * int(shardSize) | ||
|
||
dir := t.TempDir() | ||
ctx := context.Background() | ||
size := limit / 2 | ||
data := make([]byte, 4) | ||
locs := make([]sharky.Location, size) | ||
preserved := make(map[uint32]bool) | ||
|
||
func() { | ||
s, err := sharky.New(dir, shards, shardSize, datasize) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
defer func() { | ||
if err := s.Close(); err != nil { | ||
t.Fatal(err) | ||
} | ||
}() | ||
for i := range locs { | ||
binary.BigEndian.PutUint32(data, uint32(i)) | ||
loc, err := s.Write(ctx, data) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
locs[i] = loc | ||
} | ||
// extract locations to preserve / free in map | ||
indexes := make([]uint32, size) | ||
for i := range indexes { | ||
indexes[i] = uint32(i) | ||
} | ||
rest := indexes[:] | ||
for n := size; n > size/2; n-- { | ||
i := rand.Intn(n) | ||
preserved[rest[i]] = false | ||
rest = append(rest[:i], rest[i+1:]...) | ||
} | ||
if len(rest) != len(preserved) { | ||
t.Fatalf("incorrect set sizes: %d <> %d", len(rest), len(preserved)) | ||
} | ||
for _, i := range rest { | ||
preserved[i] = true | ||
} | ||
}() | ||
// recover based on preserved map | ||
func() { | ||
r, err := sharky.NewRecovery(dir, shards, shardSize, datasize) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
defer func() { | ||
if err := r.Close(); err != nil { | ||
t.Fatal(err) | ||
} | ||
}() | ||
for i, add := range preserved { | ||
if add { | ||
if err := r.Add(locs[i]); err != nil { | ||
t.Fatal(err) | ||
} | ||
} | ||
} | ||
if err := r.Save(); err != nil { | ||
t.Fatal(err) | ||
} | ||
}() | ||
// check integrity of reecovered sharky | ||
func() { | ||
s, err := sharky.New(dir, shards, shardSize, datasize) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
defer func() { | ||
if err := s.Close(); err != nil { | ||
t.Fatal(err) | ||
} | ||
}() | ||
buf := make([]byte, datasize) | ||
t.Run("preserved are found", func(t *testing.T) { | ||
for i := range preserved { | ||
loc := locs[int(i)] | ||
if err := s.Read(ctx, loc, buf); err != nil { | ||
t.Fatal(err) | ||
} | ||
j := binary.BigEndian.Uint32(buf) | ||
if i != j { | ||
t.Fatalf("data not preserved at location %v: want %d; got %d", loc, i, j) | ||
} | ||
} | ||
}) | ||
var freelocs []sharky.Location | ||
payload := []byte{0xff} | ||
t.Run("correct number of free slots", func(t *testing.T) { | ||
cctx, cancel := context.WithTimeout(ctx, 800*time.Millisecond) | ||
defer cancel() | ||
for { | ||
loc, err := s.Write(cctx, payload) | ||
if err != nil { | ||
if errors.Is(err, context.DeadlineExceeded) { | ||
break | ||
} | ||
t.Fatal(err) | ||
} | ||
freelocs = append(freelocs, loc) | ||
} | ||
if len(freelocs) != limit-size/2 { | ||
t.Fatalf("incorrect number of free slots: wanted %d; got %d", limit-size/2, len(freelocs)) | ||
} | ||
}) | ||
t.Run("added locs are still preserved", func(t *testing.T) { | ||
for i, added := range preserved { | ||
if !added { | ||
continue | ||
} | ||
if err := s.Read(ctx, locs[int(i)], buf); err != nil { | ||
t.Fatal(err) | ||
} | ||
j := binary.BigEndian.Uint32(buf) | ||
if i != j { | ||
t.Fatalf("data not preserved at location %v: want %d; got %d", locs[int(j)], i, j) | ||
} | ||
} | ||
}) | ||
t.Run("not added preserved are overwritten", func(t *testing.T) { | ||
for i, added := range preserved { | ||
if added { | ||
continue | ||
} | ||
loc := locs[int(i)] | ||
loc.Length = 1 | ||
if err := s.Read(ctx, loc, buf); err != nil { | ||
t.Fatal(err) | ||
} | ||
data := buf[:len(payload)] | ||
if !bytes.Equal(data, payload) { | ||
t.Fatalf("incorrect data on freed location %v: want %x; got %x", loc, payload, data) | ||
} | ||
} | ||
}) | ||
t.Run("all other slots also overwritten", func(t *testing.T) { | ||
for _, loc := range freelocs { | ||
if err := s.Read(ctx, loc, buf); err != nil { | ||
t.Fatal(err) | ||
} | ||
data := buf[:len(payload)] | ||
if !bytes.Equal(data, payload) { | ||
t.Fatalf("incorrect data on freed location %v: want %x; got %x", loc, payload, data) | ||
} | ||
} | ||
}) | ||
}() | ||
} |
Oops, something went wrong.