Skip to content

Commit

Permalink
fix: push index dangling item
Browse files Browse the repository at this point in the history
chore: pr comments
  • Loading branch information
acud committed Sep 22, 2021
1 parent 356759e commit 49e929a
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 1 deletion.
4 changes: 4 additions & 0 deletions pkg/localstore/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
if err != nil {
return 0, false, err
}
err = db.pushIndex.DeleteInBatch(batch, item)
if err != nil {
return 0, false, err
}
err = db.pullIndex.DeleteInBatch(batch, item)
if err != nil {
return 0, false, err
Expand Down
1 change: 1 addition & 0 deletions pkg/localstore/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var schemaMigrations = []migration{
{schemaName: DBSchemaCode, fn: func(*DB) error { return nil }},
{schemaName: DBSchemaYuj, fn: migrateYuj},
{schemaName: DBSchemaBatchIndex, fn: migrateBatchIndex},
{schemaName: DBSchemaDeadPush, fn: migrateDeadPush},
}

func (db *DB) migrate(schemaName string) error {
Expand Down
115 changes: 115 additions & 0 deletions pkg/localstore/migration_dead_push.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package localstore

import (
"encoding/binary"
"fmt"
"time"

"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/shed"
"github.com/syndtr/goleveldb/leveldb"
)

// DBSchemaBatchIndex is the bee schema identifier for dead-push.
const DBSchemaDeadPush = "dead-push"

// migrateDeadPush cleans up dangling push index entries that make the pusher stop pushing entries
func migrateDeadPush(db *DB) error {
start := time.Now()
db.logger.Debug("removing dangling entries from push index")
batch := new(leveldb.Batch)
count := 0
headerSize := 16 + postage.StampSize
retrievalDataIndex, err := db.shed.NewIndex("Address->StoreTimestamp|BinID|BatchID|BatchIndex|Sig|Data", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
return fields.Address, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.Address = key
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
b := make([]byte, headerSize)
binary.BigEndian.PutUint64(b[:8], fields.BinID)
binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
stamp, err := postage.NewStamp(fields.BatchID, fields.Index, fields.Timestamp, fields.Sig).MarshalBinary()
if err != nil {
return nil, err
}
copy(b[16:], stamp)
value = append(b, fields.Data...)
return value, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16]))
e.BinID = binary.BigEndian.Uint64(value[:8])
stamp := new(postage.Stamp)
if err = stamp.UnmarshalBinary(value[16:headerSize]); err != nil {
return e, err
}
e.BatchID = stamp.BatchID()
e.Index = stamp.Index()
e.Timestamp = stamp.Timestamp()
e.Sig = stamp.Sig()
e.Data = value[headerSize:]
return e, nil
},
})
if err != nil {
return err
}
pushIndex, err := db.shed.NewIndex("StoreTimestamp|Hash->Tags", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
key = make([]byte, 40)
binary.BigEndian.PutUint64(key[:8], uint64(fields.StoreTimestamp))
copy(key[8:], fields.Address)
return key, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.Address = key[8:]
e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[:8]))
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
tag := make([]byte, 4)
binary.BigEndian.PutUint32(tag, fields.Tag)
return tag, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
if len(value) == 4 { // only values with tag should be decoded
e.Tag = binary.BigEndian.Uint32(value)
}
return e, nil
},
})
if err != nil {
return err
}
err = pushIndex.Iterate(func(item shed.Item) (stop bool, err error) {
has, err := retrievalDataIndex.Has(item)
if err != nil {
return true, err
}
if !has {
if err = pushIndex.DeleteInBatch(batch, item); err != nil {
return true, err
}
count++
}
return false, nil
}, nil)
if err != nil {
return fmt.Errorf("iterate index: %w", err)
}
db.logger.Debugf("found %d entries to remove. trying to flush...", count)
err = db.shed.WriteBatch(batch)
if err != nil {
return fmt.Errorf("write batch: %w", err)
}
db.logger.Debugf("done cleaning index. took %s", time.Since(start))
return nil
}
4 changes: 4 additions & 0 deletions pkg/localstore/mode_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ func (db *DB) setRemove(batch *leveldb.Batch, item shed.Item, check bool) (gcSiz
if err != nil {
return 0, err
}
err = db.pushIndex.DeleteInBatch(batch, item)
if err != nil {
return 0, err
}
err = db.pullIndex.DeleteInBatch(batch, item)
if err != nil {
return 0, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/localstore/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ const DBSchemaCode = "code"

// DBSchemaCurrent represents the DB schema we want to use.
// The actual/current DB schema might differ until migrations are run.
var DBSchemaCurrent = DBSchemaBatchIndex
var DBSchemaCurrent = DBSchemaDeadPush

0 comments on commit 49e929a

Please sign in to comment.