diff --git a/pkg/localstore/gc.go b/pkg/localstore/gc.go index 34618dae671..0773ac31dc4 100644 --- a/pkg/localstore/gc.go +++ b/pkg/localstore/gc.go @@ -184,6 +184,10 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { if err != nil { return 0, false, err } + err = db.pushIndex.DeleteInBatch(batch, item) + if err != nil { + return 0, false, err + } err = db.pullIndex.DeleteInBatch(batch, item) if err != nil { return 0, false, err diff --git a/pkg/localstore/migration.go b/pkg/localstore/migration.go index 280b22f811a..9b2ffd582ab 100644 --- a/pkg/localstore/migration.go +++ b/pkg/localstore/migration.go @@ -40,6 +40,7 @@ var schemaMigrations = []migration{ {schemaName: DBSchemaCode, fn: func(*DB) error { return nil }}, {schemaName: DBSchemaYuj, fn: migrateYuj}, {schemaName: DBSchemaBatchIndex, fn: migrateBatchIndex}, + {schemaName: DBSchemaDeadPush, fn: migrateDeadPush}, } func (db *DB) migrate(schemaName string) error { diff --git a/pkg/localstore/migration_dead_push.go b/pkg/localstore/migration_dead_push.go new file mode 100644 index 00000000000..6fcf31f35cb --- /dev/null +++ b/pkg/localstore/migration_dead_push.go @@ -0,0 +1,115 @@ +// Copyright 2021 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package localstore + +import ( + "encoding/binary" + "fmt" + "time" + + "github.com/ethersphere/bee/pkg/postage" + "github.com/ethersphere/bee/pkg/shed" + "github.com/syndtr/goleveldb/leveldb" +) + +// DBSchemaBatchIndex is the bee schema identifier for dead-push. +const DBSchemaDeadPush = "dead-push" + +// migrateDeadPush cleans up dangling push index entries that make the pusher stop pushing entries +func migrateDeadPush(db *DB) error { + start := time.Now() + db.logger.Debug("removing dangling entries from push index") + batch := new(leveldb.Batch) + count := 0 + headerSize := 16 + postage.StampSize + retrievalDataIndex, err := db.shed.NewIndex("Address->StoreTimestamp|BinID|BatchID|BatchIndex|Sig|Data", shed.IndexFuncs{ + EncodeKey: func(fields shed.Item) (key []byte, err error) { + return fields.Address, nil + }, + DecodeKey: func(key []byte) (e shed.Item, err error) { + e.Address = key + return e, nil + }, + EncodeValue: func(fields shed.Item) (value []byte, err error) { + b := make([]byte, headerSize) + binary.BigEndian.PutUint64(b[:8], fields.BinID) + binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp)) + stamp, err := postage.NewStamp(fields.BatchID, fields.Index, fields.Timestamp, fields.Sig).MarshalBinary() + if err != nil { + return nil, err + } + copy(b[16:], stamp) + value = append(b, fields.Data...) + return value, nil + }, + DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { + e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16])) + e.BinID = binary.BigEndian.Uint64(value[:8]) + stamp := new(postage.Stamp) + if err = stamp.UnmarshalBinary(value[16:headerSize]); err != nil { + return e, err + } + e.BatchID = stamp.BatchID() + e.Index = stamp.Index() + e.Timestamp = stamp.Timestamp() + e.Sig = stamp.Sig() + e.Data = value[headerSize:] + return e, nil + }, + }) + if err != nil { + return err + } + pushIndex, err := db.shed.NewIndex("StoreTimestamp|Hash->Tags", shed.IndexFuncs{ + EncodeKey: func(fields shed.Item) (key []byte, err error) { + key = make([]byte, 40) + binary.BigEndian.PutUint64(key[:8], uint64(fields.StoreTimestamp)) + copy(key[8:], fields.Address) + return key, nil + }, + DecodeKey: func(key []byte) (e shed.Item, err error) { + e.Address = key[8:] + e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[:8])) + return e, nil + }, + EncodeValue: func(fields shed.Item) (value []byte, err error) { + tag := make([]byte, 4) + binary.BigEndian.PutUint32(tag, fields.Tag) + return tag, nil + }, + DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { + if len(value) == 4 { // only values with tag should be decoded + e.Tag = binary.BigEndian.Uint32(value) + } + return e, nil + }, + }) + if err != nil { + return err + } + err = pushIndex.Iterate(func(item shed.Item) (stop bool, err error) { + has, err := retrievalDataIndex.Has(item) + if err != nil { + return true, err + } + if !has { + if err = pushIndex.DeleteInBatch(batch, item); err != nil { + return true, err + } + count++ + } + return false, nil + }, nil) + if err != nil { + return fmt.Errorf("iterate index: %w", err) + } + db.logger.Debugf("found %d entries to remove. trying to flush...", count) + err = db.shed.WriteBatch(batch) + if err != nil { + return fmt.Errorf("write batch: %w", err) + } + db.logger.Debugf("done cleaning index. took %s", time.Since(start)) + return nil +} diff --git a/pkg/localstore/mode_set.go b/pkg/localstore/mode_set.go index f6ba20ec375..8b6a26c4861 100644 --- a/pkg/localstore/mode_set.go +++ b/pkg/localstore/mode_set.go @@ -227,6 +227,10 @@ func (db *DB) setRemove(batch *leveldb.Batch, item shed.Item, check bool) (gcSiz if err != nil { return 0, err } + err = db.pushIndex.DeleteInBatch(batch, item) + if err != nil { + return 0, err + } err = db.pullIndex.DeleteInBatch(batch, item) if err != nil { return 0, err diff --git a/pkg/localstore/schema.go b/pkg/localstore/schema.go index 07cf82cb4aa..8355eb74ae4 100644 --- a/pkg/localstore/schema.go +++ b/pkg/localstore/schema.go @@ -21,4 +21,4 @@ const DBSchemaCode = "code" // DBSchemaCurrent represents the DB schema we want to use. // The actual/current DB schema might differ until migrations are run. -var DBSchemaCurrent = DBSchemaBatchIndex +var DBSchemaCurrent = DBSchemaDeadPush