Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm: Shed Index and Uint64Field additions #18398

Merged
merged 3 commits into from
Jan 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion swarm/shed/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// more complex operations on storage data organized in fields and indexes.
//
// Only type which holds logical information about swarm storage chunks data
// and metadata is IndexItem. This part is not generalized mostly for
// and metadata is Item. This part is not generalized mostly for
// performance reasons.
package shed

Expand Down
40 changes: 20 additions & 20 deletions swarm/shed/example_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,20 @@ func New(path string) (s *Store, err error) {
}
// Index storing actual chunk address, data and store timestamp.
s.retrievalIndex, err = db.NewIndex("Address->StoreTimestamp|Data", shed.IndexFuncs{
EncodeKey: func(fields shed.IndexItem) (key []byte, err error) {
EncodeKey: func(fields shed.Item) (key []byte, err error) {
return fields.Address, nil
},
DecodeKey: func(key []byte) (e shed.IndexItem, err error) {
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.Address = key
return e, nil
},
EncodeValue: func(fields shed.IndexItem) (value []byte, err error) {
EncodeValue: func(fields shed.Item) (value []byte, err error) {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(fields.StoreTimestamp))
value = append(b, fields.Data...)
return value, nil
},
DecodeValue: func(value []byte) (e shed.IndexItem, err error) {
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8]))
e.Data = value[8:]
return e, nil
Expand All @@ -96,19 +96,19 @@ func New(path string) (s *Store, err error) {
// Index storing access timestamp for a particular address.
// It is needed in order to update gc index keys for iteration order.
s.accessIndex, err = db.NewIndex("Address->AccessTimestamp", shed.IndexFuncs{
EncodeKey: func(fields shed.IndexItem) (key []byte, err error) {
EncodeKey: func(fields shed.Item) (key []byte, err error) {
return fields.Address, nil
},
DecodeKey: func(key []byte) (e shed.IndexItem, err error) {
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.Address = key
return e, nil
},
EncodeValue: func(fields shed.IndexItem) (value []byte, err error) {
EncodeValue: func(fields shed.Item) (value []byte, err error) {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(fields.AccessTimestamp))
return b, nil
},
DecodeValue: func(value []byte) (e shed.IndexItem, err error) {
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
e.AccessTimestamp = int64(binary.BigEndian.Uint64(value))
return e, nil
},
Expand All @@ -118,23 +118,23 @@ func New(path string) (s *Store, err error) {
}
// Index with keys ordered by access timestamp for garbage collection prioritization.
s.gcIndex, err = db.NewIndex("AccessTimestamp|StoredTimestamp|Address->nil", shed.IndexFuncs{
EncodeKey: func(fields shed.IndexItem) (key []byte, err error) {
EncodeKey: func(fields shed.Item) (key []byte, err error) {
b := make([]byte, 16, 16+len(fields.Address))
binary.BigEndian.PutUint64(b[:8], uint64(fields.AccessTimestamp))
binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
key = append(b, fields.Address...)
return key, nil
},
DecodeKey: func(key []byte) (e shed.IndexItem, err error) {
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.AccessTimestamp = int64(binary.BigEndian.Uint64(key[:8]))
e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[8:16]))
e.Address = key[16:]
return e, nil
},
EncodeValue: func(fields shed.IndexItem) (value []byte, err error) {
EncodeValue: func(fields shed.Item) (value []byte, err error) {
return nil, nil
},
DecodeValue: func(value []byte) (e shed.IndexItem, err error) {
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
return e, nil
},
})
Expand All @@ -146,7 +146,7 @@ func New(path string) (s *Store, err error) {

// Put stores the chunk and sets it store timestamp.
func (s *Store) Put(_ context.Context, ch storage.Chunk) (err error) {
return s.retrievalIndex.Put(shed.IndexItem{
return s.retrievalIndex.Put(shed.Item{
Address: ch.Address(),
Data: ch.Data(),
StoreTimestamp: time.Now().UTC().UnixNano(),
Expand All @@ -161,7 +161,7 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e
batch := new(leveldb.Batch)

// Get the chunk data and storage timestamp.
item, err := s.retrievalIndex.Get(shed.IndexItem{
item, err := s.retrievalIndex.Get(shed.Item{
Address: addr,
})
if err != nil {
Expand All @@ -172,13 +172,13 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e
}

// Get the chunk access timestamp.
accessItem, err := s.accessIndex.Get(shed.IndexItem{
accessItem, err := s.accessIndex.Get(shed.Item{
Address: addr,
})
switch err {
case nil:
// Remove gc index entry if access timestamp is found.
err = s.gcIndex.DeleteInBatch(batch, shed.IndexItem{
err = s.gcIndex.DeleteInBatch(batch, shed.Item{
Address: item.Address,
StoreTimestamp: accessItem.AccessTimestamp,
AccessTimestamp: item.StoreTimestamp,
Expand All @@ -197,7 +197,7 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e
accessTimestamp := time.Now().UTC().UnixNano()

// Put new access timestamp in access index.
err = s.accessIndex.PutInBatch(batch, shed.IndexItem{
err = s.accessIndex.PutInBatch(batch, shed.Item{
Address: addr,
AccessTimestamp: accessTimestamp,
})
Expand All @@ -206,7 +206,7 @@ func (s *Store) Get(_ context.Context, addr storage.Address) (c storage.Chunk, e
}

// Put new access timestamp in gc index.
err = s.gcIndex.PutInBatch(batch, shed.IndexItem{
err = s.gcIndex.PutInBatch(batch, shed.Item{
Address: item.Address,
AccessTimestamp: accessTimestamp,
StoreTimestamp: item.StoreTimestamp,
Expand Down Expand Up @@ -244,7 +244,7 @@ func (s *Store) CollectGarbage() (err error) {
// New batch for a new cg round.
trash := new(leveldb.Batch)
// Iterate through all index items and break when needed.
err = s.gcIndex.IterateAll(func(item shed.IndexItem) (stop bool, err error) {
err = s.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
// Remove the chunk.
err = s.retrievalIndex.DeleteInBatch(trash, item)
if err != nil {
Expand All @@ -265,7 +265,7 @@ func (s *Store) CollectGarbage() (err error) {
return true, nil
}
return false, nil
})
}, nil)
if err != nil {
return err
}
Expand Down
38 changes: 38 additions & 0 deletions swarm/shed/field_uint64.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,44 @@ func (f Uint64Field) IncInBatch(batch *leveldb.Batch) (val uint64, err error) {
return val, nil
}

// Dec decrements a uint64 value in the database.
// This operation is not goroutine save.
// The field is protected from overflow to a negative value.
func (f Uint64Field) Dec() (val uint64, err error) {
val, err = f.Get()
if err != nil {
if err == leveldb.ErrNotFound {
val = 0
} else {
return 0, err
}
}
if val != 0 {
val--
}
return val, f.Put(val)
}

// DecInBatch decrements a uint64 value in the batch
// by retreiving a value from the database, not the same batch.
// This operation is not goroutine save.
// The field is protected from overflow to a negative value.
func (f Uint64Field) DecInBatch(batch *leveldb.Batch) (val uint64, err error) {
val, err = f.Get()
if err != nil {
if err == leveldb.ErrNotFound {
val = 0
} else {
return 0, err
}
}
if val != 0 {
val--
}
f.PutInBatch(batch, val)
return val, nil
}

// encode transforms uint64 to 8 byte long
// slice in big endian encoding.
func encodeUint64(val uint64) (b []byte) {
Expand Down
106 changes: 106 additions & 0 deletions swarm/shed/field_uint64_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,109 @@ func TestUint64Field_IncInBatch(t *testing.T) {
t.Errorf("got uint64 %v, want %v", got, want)
}
}

// TestUint64Field_Dec validates Dec operation
// of the Uint64Field.
func TestUint64Field_Dec(t *testing.T) {
db, cleanupFunc := newTestDB(t)
defer cleanupFunc()

counter, err := db.NewUint64Field("counter")
if err != nil {
t.Fatal(err)
}

// test overflow protection
var want uint64
got, err := counter.Dec()
if err != nil {
t.Fatal(err)
}
if got != want {
t.Errorf("got uint64 %v, want %v", got, want)
}

want = 32
err = counter.Put(want)
if err != nil {
t.Fatal(err)
}

want = 31
got, err = counter.Dec()
if err != nil {
t.Fatal(err)
}
if got != want {
t.Errorf("got uint64 %v, want %v", got, want)
}
}

// TestUint64Field_DecInBatch validates DecInBatch operation
// of the Uint64Field.
func TestUint64Field_DecInBatch(t *testing.T) {
db, cleanupFunc := newTestDB(t)
defer cleanupFunc()

counter, err := db.NewUint64Field("counter")
if err != nil {
t.Fatal(err)
}

batch := new(leveldb.Batch)
var want uint64
got, err := counter.DecInBatch(batch)
if err != nil {
t.Fatal(err)
}
if got != want {
t.Errorf("got uint64 %v, want %v", got, want)
}
err = db.WriteBatch(batch)
if err != nil {
t.Fatal(err)
}
got, err = counter.Get()
if err != nil {
t.Fatal(err)
}
if got != want {
t.Errorf("got uint64 %v, want %v", got, want)
}

batch2 := new(leveldb.Batch)
want = 42
counter.PutInBatch(batch2, want)
err = db.WriteBatch(batch2)
if err != nil {
t.Fatal(err)
}
got, err = counter.Get()
if err != nil {
t.Fatal(err)
}
if got != want {
t.Errorf("got uint64 %v, want %v", got, want)
}

batch3 := new(leveldb.Batch)
want = 41
got, err = counter.DecInBatch(batch3)
if err != nil {
t.Fatal(err)
}
if got != want {
t.Errorf("got uint64 %v, want %v", got, want)
}
err = db.WriteBatch(batch3)
if err != nil {
t.Fatal(err)
}
got, err = counter.Get()
if err != nil {
t.Fatal(err)
}
if got != want {
t.Errorf("got uint64 %v, want %v", got, want)
}
}
Loading