Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

localstore: reduce critical section size on gc #1435

Merged
merged 5 commits into from
Mar 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 63 additions & 19 deletions pkg/localstore/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
)

Expand Down Expand Up @@ -85,13 +86,20 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
}
totalTimeMetric(db.metrics.TotalTimeCollectGarbage, start)
}(time.Now())

batch := new(leveldb.Batch)
target := db.gcTarget()

// protect database from changing idexes and gcSize
// tell the localstore to start logging dirty addresses
db.batchMu.Lock()
defer db.batchMu.Unlock()
db.gcRunning = true
db.batchMu.Unlock()

defer func() {
db.batchMu.Lock()
db.gcRunning = false
db.dirtyAddresses = nil
db.batchMu.Unlock()
}()

// run through the recently pinned chunks and
// remove them from the gcIndex before iterating through gcIndex
Expand All @@ -109,6 +117,7 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
done = true
first := true
start := time.Now()
candidates := make([]shed.Item, 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

candidates array should be package level preallocated with gcBatchSize length

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this will save on allocations, but then the cleanup would be ugly since we'd want to get rid of entries from the previous iteration, so in order to actually save the allocation we'd have to iterate over every item in the slice, setting it to nil, otherwise we'd have dangling shed.Items in memory.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @janos

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) just remember how many you fill in, no need to go through it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could work nicely, just to make sure that every time before the slice is sliced to zero length, it is iterated up to its length and set every element to "zero" Item. Zero item is the item with all slices as nil (important for memory usage) and integers as 0 (not important for memory usage). That way, no dangling Items will exist. Basically a type that would be []shed.Item with reset method and contains method which would replace swrm.Address.MemberOf as per other @zelig's comment which I agree on, also. Somehow, this container naturally imposes a new type, even for future improvements as the data structure may change, as suggested in the description.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so.... not sure if i understand but is this a blocker?

Copy link
Member Author

@acud acud Mar 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) just remember how many you fill in, no need to go through it

This is incorrect, since you will have danging items on the last gc run after which the gc target has been met, leaving dangling shed.Items in slice. We therefore have to iterate over the whole slice every time we finish GCing (either in the last iteration, or on every iteration) for those items to not be referenced anymore, making them available for garbage collection.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@acud For the first iteration it is also fine as it is, for me.

err = db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
if first {
totalTimeMetric(db.metrics.TotalTimeGCFirstItem, start)
Expand All @@ -118,39 +127,69 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
return true, nil
}

candidates = append(candidates, item)

collectedCount++
if collectedCount >= gcBatchSize {
// batch size limit reached,
// another gc run is needed
done = false
return true, nil
}
return false, nil
}, nil)
if err != nil {
return 0, false, err
}
db.metrics.GCCollectedCounter.Add(float64(collectedCount))
if testHookGCIteratorDone != nil {
testHookGCIteratorDone()
}

// protect database from changing idexes and gcSize
db.batchMu.Lock()
defer totalTimeMetric(db.metrics.TotalTimeGCLock, time.Now())
defer db.batchMu.Unlock()

// refresh gcSize value, since it might have
// changed in the meanwhile
gcSize, err = db.gcSize.Get()
if err != nil {
return 0, false, err
}

// get rid of dirty entries
for _, item := range candidates {
if swarm.NewAddress(item.Address).MemberOf(db.dirtyAddresses) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the other way round, container is usually the receiver struct

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but then we'd need to introduce a new type alias for []swarm.Address which is something I'd like to avoid

collectedCount--
if gcSize-collectedCount > target {
done = false
}
continue
}

db.metrics.GCStoreTimeStamps.Set(float64(item.StoreTimestamp))
db.metrics.GCStoreAccessTimeStamps.Set(float64(item.AccessTimestamp))

// delete from retrieve, pull, gc
err = db.retrievalDataIndex.DeleteInBatch(batch, item)
if err != nil {
return true, nil
return 0, false, err
zelig marked this conversation as resolved.
Show resolved Hide resolved
}
err = db.retrievalAccessIndex.DeleteInBatch(batch, item)
if err != nil {
return true, nil
return 0, false, err
}
err = db.pullIndex.DeleteInBatch(batch, item)
if err != nil {
return true, nil
return 0, false, err
}
err = db.gcIndex.DeleteInBatch(batch, item)
if err != nil {
return true, nil
return 0, false, err
}
collectedCount++
if collectedCount >= gcBatchSize {
// bach size limit reached,
// another gc run is needed
done = false
return true, nil
}
return false, nil
}, nil)
if err != nil {
return 0, false, err
}
db.metrics.GCCollectedCounter.Add(float64(collectedCount))
db.metrics.GCCommittedCounter.Add(float64(collectedCount))
db.gcSize.PutInBatch(batch, gcSize-collectedCount)

err = db.shed.WriteBatch(batch)
Expand Down Expand Up @@ -286,3 +325,8 @@ func (db *DB) incGCSizeInBatch(batch *leveldb.Batch, change int64) (err error) {
// information when a garbage collection run is done
// and how many items it removed.
var testHookCollectGarbage func(collectedCount uint64)

// testHookGCIteratorDone is a hook which is called
// when the GC is done collecting candidate items for
// eviction.
var testHookGCIteratorDone func()
144 changes: 144 additions & 0 deletions pkg/localstore/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"io/ioutil"
"math/rand"
"os"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -716,3 +717,146 @@ func addRandomChunks(t *testing.T, count int, db *DB, pin bool) []swarm.Chunk {
}
return chunks
}

// TestGC_NoEvictDirty checks that the garbage collection
// does not evict chunks that are marked as dirty while the gc
// is running.
func TestGC_NoEvictDirty(t *testing.T) {
// lower the maximal number of chunks in a single
// gc batch to ensure multiple batches.
defer func(s uint64) { gcBatchSize = s }(gcBatchSize)
gcBatchSize = 2

chunkCount := 15

db := newTestDB(t, &Options{
Capacity: 10,
})

closed := db.close

testHookCollectGarbageChan := make(chan uint64)
t.Cleanup(setTestHookCollectGarbage(func(collectedCount uint64) {
select {
case testHookCollectGarbageChan <- collectedCount:
case <-closed:
}
}))

dirtyChan := make(chan struct{})
incomingChan := make(chan struct{})
t.Cleanup(setTestHookGCIteratorDone(func() {
incomingChan <- struct{}{}
<-dirtyChan
}))
addrs := make([]swarm.Address, 0)
mtx := new(sync.Mutex)
online := make(chan struct{})
go func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need a diff go routine? you waiting for it to terminate anyway

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm waiting for it to be scheduled. There's no defer call on the close. Otherwise we might get test flakes

close(online) // make sure this is scheduled, otherwise test might flake
i := 0
for range incomingChan {
// set a chunk to be updated in gc, resulting
// in a removal from the gc round. but don't do this
// for all chunks!
if i < 2 {
mtx.Lock()
_, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[i])
mtx.Unlock()
if err != nil {
t.Error(err)
}
i++
// we sleep so that the async update to gc index
// happens and that the dirtyAddresses get updated
time.Sleep(100 * time.Millisecond)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why sleep here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because Gets update the gc and this happens in a different goroutine. So we sleep to guarantee that the update happens, resulting in the dirtyAddresses to be updated

}
dirtyChan <- struct{}{}
}

}()
<-online
// upload random chunks
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunk()

_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}

err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
if err != nil {
t.Fatal(err)
}
mtx.Lock()
addrs = append(addrs, ch.Address())
mtx.Unlock()
}
Comment on lines +780 to +795
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    chunks := generateTestRandomChunks(chunkCount)
    _, err := db.Put(context.Background(), storage.ModePutSync, chunks...)
    if err != nil {
	t.Fatal(err)
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but then we'd still need to create a for loop to get the individual chunk addresses so that we could Set and add to the addrs slice


gcTarget := db.gcTarget()
for {
select {
case <-testHookCollectGarbageChan:
case <-time.After(10 * time.Second):
t.Error("collect garbage timeout")
}
gcSize, err := db.gcSize.Get()
if err != nil {
t.Fatal(err)
}
if gcSize == gcTarget {
break
}
}

t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget)))

t.Run("gc index count", newItemsCountTest(db.gcIndex, int(gcTarget)))

t.Run("gc size", newIndexGCSizeTest(db))

// the first synced chunk should be removed
t.Run("get the first two chunks, third is gone", func(t *testing.T) {
_, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[0])
if err != nil {
t.Error("got error but expected none")
}
_, err = db.Get(context.Background(), storage.ModeGetRequest, addrs[1])
if err != nil {
t.Error("got error but expected none")
}
_, err = db.Get(context.Background(), storage.ModeGetRequest, addrs[2])
if !errors.Is(err, storage.ErrNotFound) {
t.Errorf("expected err not found but got %v", err)
}
})

t.Run("only later inserted chunks should be removed", func(t *testing.T) {
for i := 2; i < (chunkCount - int(gcTarget)); i++ {
_, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[i])
if !errors.Is(err, storage.ErrNotFound) {
t.Errorf("got error %v, want %v", err, storage.ErrNotFound)
}
}
})

// last synced chunk should not be removed
t.Run("get most recent synced chunk", func(t *testing.T) {
_, err := db.Get(context.Background(), storage.ModeGetRequest, addrs[len(addrs)-1])
if err != nil {
t.Fatal(err)
}
})

}

// setTestHookGCIteratorDone sets testHookGCIteratorDone and
// returns a function that will reset it to the
// value before the change.
func setTestHookGCIteratorDone(h func()) (reset func()) {
current := testHookGCIteratorDone
reset = func() { testHookGCIteratorDone = current }
testHookGCIteratorDone = h
return reset
}
9 changes: 9 additions & 0 deletions pkg/localstore/localstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ type DB struct {

batchMu sync.Mutex

// gcRunning is true while GC is running. it is
// used to avoid touching dirty gc index entries
// while garbage collecting.
gcRunning bool

// dirtyAddresses are marked while gc is running
// in order to avoid the removal of dirty entries.
dirtyAddresses []swarm.Address

// this channel is closed when close function is called
// to terminate other goroutines
close chan struct{}
Expand Down
14 changes: 14 additions & 0 deletions pkg/localstore/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
)

type metrics struct {
TotalTimeGCLock prometheus.Counter
TotalTimeGCFirstItem prometheus.Counter
TotalTimeCollectGarbage prometheus.Counter
TotalTimeGCExclude prometheus.Counter
Expand All @@ -26,6 +27,7 @@ type metrics struct {
GCCounter prometheus.Counter
GCErrorCounter prometheus.Counter
GCCollectedCounter prometheus.Counter
GCCommittedCounter prometheus.Counter
GCExcludeCounter prometheus.Counter
GCExcludeError prometheus.Counter
GCExcludeWriteBatchError prometheus.Counter
Expand Down Expand Up @@ -63,6 +65,12 @@ func newMetrics() metrics {
subsystem := "localstore"

return metrics{
TotalTimeGCLock: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "gc_lock_time",
Help: "Total time under lock in gc.",
}),
TotalTimeGCFirstItem: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Expand Down Expand Up @@ -153,6 +161,12 @@ func newMetrics() metrics {
Name: "gc_collected_count",
Help: "Number of times the GC_COLLECTED operation is done.",
}),
GCCommittedCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "gc_committed_count",
Help: "Number of gc items to commit.",
}),
GCExcludeCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Expand Down
3 changes: 3 additions & 0 deletions pkg/localstore/mode_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ func (db *DB) updateGCItems(items ...shed.Item) {
func (db *DB) updateGC(item shed.Item) (err error) {
db.batchMu.Lock()
defer db.batchMu.Unlock()
if db.gcRunning {
db.dirtyAddresses = append(db.dirtyAddresses, swarm.NewAddress(item.Address))
}

batch := new(leveldb.Batch)

Expand Down
5 changes: 5 additions & 0 deletions pkg/localstore/mode_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func (db *DB) put(mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, err e
// protect parallel updates
db.batchMu.Lock()
defer db.batchMu.Unlock()
if db.gcRunning {
for _, ch := range chs {
db.dirtyAddresses = append(db.dirtyAddresses, ch.Address())
}
}

batch := new(leveldb.Batch)

Expand Down
3 changes: 3 additions & 0 deletions pkg/localstore/mode_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func (db *DB) set(mode storage.ModeSet, addrs ...swarm.Address) (err error) {
// protect parallel updates
db.batchMu.Lock()
defer db.batchMu.Unlock()
if db.gcRunning {
db.dirtyAddresses = append(db.dirtyAddresses, addrs...)
}

batch := new(leveldb.Batch)

Expand Down
11 changes: 11 additions & 0 deletions pkg/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@ func (a Address) Equal(b Address) bool {
return bytes.Equal(a.b, b.b)
}

// MemberOf returns true if the address is a member of the
// provided set.
func (a Address) MemberOf(addrs []Address) bool {
for _, v := range addrs {
if v.Equal(a) {
return true
}
}
return false
}

// IsZero returns true if the Address is not set to any value.
func (a Address) IsZero() bool {
return a.Equal(ZeroAddress)
Expand Down
Loading