Skip to content

Commit

Permalink
localstore: reduce critical section size on gc (#1435)
Browse files Browse the repository at this point in the history
  • Loading branch information
acud authored Mar 19, 2021
1 parent 1de167e commit b90bd7c
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 19 deletions.
82 changes: 63 additions & 19 deletions pkg/localstore/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
)

Expand Down Expand Up @@ -85,13 +86,20 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
}
totalTimeMetric(db.metrics.TotalTimeCollectGarbage, start)
}(time.Now())

batch := new(leveldb.Batch)
target := db.gcTarget()

// protect database from changing idexes and gcSize
// tell the localstore to start logging dirty addresses
db.batchMu.Lock()
defer db.batchMu.Unlock()
db.gcRunning = true
db.batchMu.Unlock()

defer func() {
db.batchMu.Lock()
db.gcRunning = false
db.dirtyAddresses = nil
db.batchMu.Unlock()
}()

// run through the recently pinned chunks and
// remove them from the gcIndex before iterating through gcIndex
Expand All @@ -109,6 +117,7 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
done = true
first := true
start := time.Now()
candidates := make([]shed.Item, 0)
err = db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
if first {
totalTimeMetric(db.metrics.TotalTimeGCFirstItem, start)
Expand All @@ -118,39 +127,69 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
return true, nil
}

candidates = append(candidates, item)

collectedCount++
if collectedCount >= gcBatchSize {
// batch size limit reached,
// another gc run is needed
done = false
return true, nil
}
return false, nil
}, nil)
if err != nil {
return 0, false, err
}
db.metrics.GCCollectedCounter.Add(float64(collectedCount))
if testHookGCIteratorDone != nil {
testHookGCIteratorDone()
}

// protect database from changing idexes and gcSize
db.batchMu.Lock()
defer totalTimeMetric(db.metrics.TotalTimeGCLock, time.Now())
defer db.batchMu.Unlock()

// refresh gcSize value, since it might have
// changed in the meanwhile
gcSize, err = db.gcSize.Get()
if err != nil {
return 0, false, err
}

// get rid of dirty entries
for _, item := range candidates {
if swarm.NewAddress(item.Address).MemberOf(db.dirtyAddresses) {
collectedCount--
if gcSize-collectedCount > target {
done = false
}
continue
}

db.metrics.GCStoreTimeStamps.Set(float64(item.StoreTimestamp))
db.metrics.GCStoreAccessTimeStamps.Set(float64(item.AccessTimestamp))

// delete from retrieve, pull, gc
err = db.retrievalDataIndex.DeleteInBatch(batch, item)
if err != nil {
return true, nil
return 0, false, err
}
err = db.retrievalAccessIndex.DeleteInBatch(batch, item)
if err != nil {
return true, nil
return 0, false, err
}
err = db.pullIndex.DeleteInBatch(batch, item)
if err != nil {
return true, nil
return 0, false, err
}
err = db.gcIndex.DeleteInBatch(batch, item)
if err != nil {
return true, nil
return 0, false, err
}
collectedCount++
if collectedCount >= gcBatchSize {
// bach size limit reached,
// another gc run is needed
done = false
return true, nil
}
return false, nil
}, nil)
if err != nil {
return 0, false, err
}
db.metrics.GCCollectedCounter.Add(float64(collectedCount))
db.metrics.GCCommittedCounter.Add(float64(collectedCount))
db.gcSize.PutInBatch(batch, gcSize-collectedCount)

err = db.shed.WriteBatch(batch)
Expand Down Expand Up @@ -286,3 +325,8 @@ func (db *DB) incGCSizeInBatch(batch *leveldb.Batch, change int64) (err error) {
// information when a garbage collection run is done
// and how many items it removed.
var testHookCollectGarbage func(collectedCount uint64)

// testHookGCIteratorDone is a hook which is called
// when the GC is done collecting candidate items for
// eviction.
var testHookGCIteratorDone func()
144 changes: 144 additions & 0 deletions pkg/localstore/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"io/ioutil"
"math/rand"
"os"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -716,3 +717,146 @@ func addRandomChunks(t *testing.T, count int, db *DB, pin bool) []swarm.Chunk {
}
return chunks
}

// TestGC_NoEvictDirty checks that the garbage collection
// does not evict chunks that are marked as dirty while the gc
// is running.
func TestGC_NoEvictDirty(t *testing.T) {
// lower the maximal number of chunks in a single
// gc batch to ensure multiple batches.
defer func(s uint64) { gcBatchSize = s }(gcBatchSize)
gcBatchSize = 2

chunkCount := 15

db := newTestDB(t, &Options{
Capacity: 10,
})

closed := db.close

testHookCollectGarbageChan := make(chan uint64)
t.Cleanup(setTestHookCollectGarbage(func(collectedCount uint64) {
select {
case testHookCollectGarbageChan <- collectedCount:
case <-closed:
}
}))

dirtyChan := make(chan struct{})
incomingChan := make(chan struct{})
t.Cleanup(setTestHookGCIteratorDone(func() {
incomingChan <- struct{}{}
<-dirtyChan
}))
addrs := make([]swarm.Address, 0)
mtx := new(sync.Mutex)
online := make(chan struct{})
go func() {
close(online) // make sure this is scheduled, otherwise test might flake
i := 0
for range incomingChan {
// set a chunk to be updated in gc, resulting
// in a removal from the gc round. but don't do this
// for all chunks!
if i < 2 {
mtx.Lock()
_, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[i])
mtx.Unlock()
if err != nil {
t.Error(err)
}
i++
// we sleep so that the async update to gc index
// happens and that the dirtyAddresses get updated
time.Sleep(100 * time.Millisecond)
}
dirtyChan <- struct{}{}
}

}()
<-online
// upload random chunks
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunk()

_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}

err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
mtx.Lock()
addrs = append(addrs, ch.Address())
mtx.Unlock()
}

gcTarget := db.gcTarget()
for {
select {
case <-testHookCollectGarbageChan:
case <-time.After(10 * time.Second):
t.Error("collect garbage timeout")
}
gcSize, err := db.gcSize.Get()
if err != nil {
t.Fatal(err)
}
if gcSize == gcTarget {
break
}
}

t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget)))

t.Run("gc index count", newItemsCountTest(db.gcIndex, int(gcTarget)))

t.Run("gc size", newIndexGCSizeTest(db))

// the first synced chunk should be removed
t.Run("get the first two chunks, third is gone", func(t *testing.T) {
_, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[0])
if err != nil {
t.Error("got error but expected none")
}
_, err = db.Get(context.Background(), storage.ModeGetRequest, addrs[1])
if err != nil {
t.Error("got error but expected none")
}
_, err = db.Get(context.Background(), storage.ModeGetRequest, addrs[2])
if !errors.Is(err, storage.ErrNotFound) {
t.Errorf("expected err not found but got %v", err)
}
})

t.Run("only later inserted chunks should be removed", func(t *testing.T) {
for i := 2; i < (chunkCount - int(gcTarget)); i++ {
_, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[i])
if !errors.Is(err, storage.ErrNotFound) {
t.Errorf("got error %v, want %v", err, storage.ErrNotFound)
}
}
})

// last synced chunk should not be removed
t.Run("get most recent synced chunk", func(t *testing.T) {
_, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[len(addrs)-1])
if err != nil {
t.Fatal(err)
}
})

}

// setTestHookGCIteratorDone sets testHookGCIteratorDone and
// returns a function that will reset it to the
// value before the change.
func setTestHookGCIteratorDone(h func()) (reset func()) {
current := testHookGCIteratorDone
reset = func() { testHookGCIteratorDone = current }
testHookGCIteratorDone = h
return reset
}
9 changes: 9 additions & 0 deletions pkg/localstore/localstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ type DB struct {

batchMu sync.Mutex

// gcRunning is true while GC is running. it is
// used to avoid touching dirty gc index entries
// while garbage collecting.
gcRunning bool

// dirtyAddresses are marked while gc is running
// in order to avoid the removal of dirty entries.
dirtyAddresses []swarm.Address

// this channel is closed when close function is called
// to terminate other goroutines
close chan struct{}
Expand Down
14 changes: 14 additions & 0 deletions pkg/localstore/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
)

type metrics struct {
TotalTimeGCLock prometheus.Counter
TotalTimeGCFirstItem prometheus.Counter
TotalTimeCollectGarbage prometheus.Counter
TotalTimeGCExclude prometheus.Counter
Expand All @@ -26,6 +27,7 @@ type metrics struct {
GCCounter prometheus.Counter
GCErrorCounter prometheus.Counter
GCCollectedCounter prometheus.Counter
GCCommittedCounter prometheus.Counter
GCExcludeCounter prometheus.Counter
GCExcludeError prometheus.Counter
GCExcludeWriteBatchError prometheus.Counter
Expand Down Expand Up @@ -63,6 +65,12 @@ func newMetrics() metrics {
subsystem := "localstore"

return metrics{
TotalTimeGCLock: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "gc_lock_time",
Help: "Total time under lock in gc.",
}),
TotalTimeGCFirstItem: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Expand Down Expand Up @@ -153,6 +161,12 @@ func newMetrics() metrics {
Name: "gc_collected_count",
Help: "Number of times the GC_COLLECTED operation is done.",
}),
GCCommittedCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "gc_committed_count",
Help: "Number of gc items to commit.",
}),
GCExcludeCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Expand Down
3 changes: 3 additions & 0 deletions pkg/localstore/mode_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ func (db *DB) updateGCItems(items ...shed.Item) {
func (db *DB) updateGC(item shed.Item) (err error) {
db.batchMu.Lock()
defer db.batchMu.Unlock()
if db.gcRunning {
db.dirtyAddresses = append(db.dirtyAddresses, swarm.NewAddress(item.Address))
}

batch := new(leveldb.Batch)

Expand Down
5 changes: 5 additions & 0 deletions pkg/localstore/mode_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e
// protect parallel updates
db.batchMu.Lock()
defer db.batchMu.Unlock()
if db.gcRunning {
for _, ch := range chs {
db.dirtyAddresses = append(db.dirtyAddresses, ch.Address())
}
}

batch := new(leveldb.Batch)

Expand Down
3 changes: 3 additions & 0 deletions pkg/localstore/mode_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func (db *DB) set(mode storage.ModeSet, addrs ...swarm.Address) (err error) {
// protect parallel updates
db.batchMu.Lock()
defer db.batchMu.Unlock()
if db.gcRunning {
db.dirtyAddresses = append(db.dirtyAddresses, addrs...)
}

batch := new(leveldb.Batch)

Expand Down
11 changes: 11 additions & 0 deletions pkg/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@ func (a Address) Equal(b Address) bool {
return bytes.Equal(a.b, b.b)
}

// MemberOf returns true if the address is a member of the
// provided set.
func (a Address) MemberOf(addrs []Address) bool {
for _, v := range addrs {
if v.Equal(a) {
return true
}
}
return false
}

// IsZero returns true if the Address is not set to any value.
func (a Address) IsZero() bool {
return a.Equal(ZeroAddress)
Expand Down
Loading

0 comments on commit b90bd7c

Please sign in to comment.