Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduce garbage in blockstore #4406

Merged
merged 6 commits into from
Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions chain/sub/incoming.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,21 @@ import (
"github.com/filecoin-project/lotus/lib/bufbstore"
"github.com/filecoin-project/lotus/lib/sigs"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/impl/client"
)

var log = logging.Logger("sub")

var ErrSoftFailure = errors.New("soft validation failure")
var ErrInsufficientPower = errors.New("incoming block's miner does not have minimum power")

var msgCidPrefix = cid.Prefix{
Version: 1,
Codec: cid.DagCBOR,
MhType: client.DefaultHashFunction,
MhLength: 32,
}

func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, bs bserv.BlockService, cmgr connmgr.ConnManager) {
// Timeout after (block time + propagation delay). This is useless at
// this point.
Expand Down Expand Up @@ -168,6 +176,9 @@ func fetchCids(

cidIndex := make(map[cid.Cid]int)
for i, c := range cids {
if c.Prefix() != msgCidPrefix {
return fmt.Errorf("invalid msg CID: %s", c)
}
cidIndex[c] = i
}
if len(cids) != len(cidIndex) {
Expand Down
80 changes: 44 additions & 36 deletions chain/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,49 +323,46 @@ func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error {
return xerrors.Errorf("block %s has too many messages (%d)", fblk.Header.Cid(), msgc)
}

// Collect the CIDs of both types of messages separately: BLS and Secpk.
var bcids, scids []cid.Cid
for _, m := range fblk.BlsMessages {
bcids = append(bcids, m.Cid())
}

for _, m := range fblk.SecpkMessages {
scids = append(scids, m.Cid())
}

// TODO: IMPORTANT(GARBAGE). These message puts and the msgmeta
// computation need to go into the 'temporary' side of the blockstore when
// we implement that
blockstore := syncer.store.Blockstore()

bs := cbor.NewCborStore(blockstore)
// We use a temporary bstore here to avoid writing intermediate pieces
// into the blockstore.
blockstore := bstore.NewTemporary()
cst := cbor.NewCborStore(blockstore)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the comments on 326-328 be removed? I believe this addresses them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they're still valid? That is, at this point we know we have a "sane" block, but it may not be otherwise valid.

But I don't know the original intention of that comment.


// Compute the root CID of the combined message trie.
smroot, err := computeMsgMeta(bs, bcids, scids)
if err != nil {
return xerrors.Errorf("validating msgmeta, compute failed: %w", err)
}

// Check that the message trie root matches with what's in the block.
if fblk.Header.Messages != smroot {
return xerrors.Errorf("messages in full block did not match msgmeta root in header (%s != %s)", fblk.Header.Messages, smroot)
}
var bcids, scids []cid.Cid

for _, m := range fblk.BlsMessages {
_, err := store.PutMessage(blockstore, m)
c, err := store.PutMessage(blockstore, m)
if err != nil {
return xerrors.Errorf("putting bls message to blockstore after msgmeta computation: %w", err)
}
bcids = append(bcids, c)
}

for _, m := range fblk.SecpkMessages {
_, err := store.PutMessage(blockstore, m)
c, err := store.PutMessage(blockstore, m)
if err != nil {
return xerrors.Errorf("putting bls message to blockstore after msgmeta computation: %w", err)
}
scids = append(scids, c)
}

return nil
// Compute the root CID of the combined message trie.
smroot, err := computeMsgMeta(cst, bcids, scids)
if err != nil {
return xerrors.Errorf("validating msgmeta, compute failed: %w", err)
}

// Check that the message trie root matches with what's in the block.
if fblk.Header.Messages != smroot {
return xerrors.Errorf("messages in full block did not match msgmeta root in header (%s != %s)", fblk.Header.Messages, smroot)
}

// Finally, flush.
return vm.Copy(context.TODO(), blockstore, syncer.store.Blockstore(), smroot)
}

func (syncer *Syncer) LocalPeer() peer.ID {
Expand Down Expand Up @@ -1064,8 +1061,7 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock
return err
}

cst := cbor.NewCborStore(syncer.store.Blockstore())
st, err := state.LoadStateTree(cst, stateroot)
st, err := state.LoadStateTree(syncer.store.Store(ctx), stateroot)
if err != nil {
return xerrors.Errorf("failed to load base state tree: %w", err)
}
Expand Down Expand Up @@ -1111,21 +1107,28 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock
return nil
}

store := adt0.WrapStore(ctx, cst)
// Validate message arrays in a temporary blockstore.
tmpbs := bstore.NewTemporary()
tmpstore := adt0.WrapStore(ctx, cbor.NewCborStore(tmpbs))

bmArr := adt0.MakeEmptyArray(store)
bmArr := adt0.MakeEmptyArray(tmpstore)
for i, m := range b.BlsMessages {
if err := checkMsg(m); err != nil {
return xerrors.Errorf("block had invalid bls message at index %d: %w", i, err)
}

c := cbg.CborCid(m.Cid())
if err := bmArr.Set(uint64(i), &c); err != nil {
c, err := store.PutMessage(tmpbs, m)
if err != nil {
return xerrors.Errorf("failed to store message %s: %w", m.Cid(), err)
}

k := cbg.CborCid(c)
if err := bmArr.Set(uint64(i), &k); err != nil {
return xerrors.Errorf("failed to put bls message at index %d: %w", i, err)
}
}

smArr := adt0.MakeEmptyArray(store)
smArr := adt0.MakeEmptyArray(tmpstore)
for i, m := range b.SecpkMessages {
if err := checkMsg(m); err != nil {
return xerrors.Errorf("block had invalid secpk message at index %d: %w", i, err)
Expand All @@ -1142,8 +1145,12 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock
return xerrors.Errorf("secpk message %s has invalid signature: %w", m.Cid(), err)
}

c := cbg.CborCid(m.Cid())
if err := smArr.Set(uint64(i), &c); err != nil {
c, err := store.PutMessage(tmpbs, m)
if err != nil {
return xerrors.Errorf("failed to store message %s: %w", m.Cid(), err)
}
k := cbg.CborCid(c)
if err := smArr.Set(uint64(i), &k); err != nil {
return xerrors.Errorf("failed to put secpk message at index %d: %w", i, err)
}
}
Expand All @@ -1158,7 +1165,7 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock
return err
}

mrcid, err := cst.Put(ctx, &types.MsgMeta{
mrcid, err := tmpstore.Put(ctx, &types.MsgMeta{
BlsMessages: bmroot,
SecpkMessages: smroot,
})
Expand All @@ -1170,7 +1177,8 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock
return fmt.Errorf("messages didnt match message root in header")
}

return nil
// Finally, flush.
return vm.Copy(ctx, tmpbs, syncer.store.Blockstore(), mrcid)
}

func (syncer *Syncer) verifyBlsAggregate(ctx context.Context, sig *crypto.Signature, msgs []cid.Cid, pubks [][]byte) error {
Expand Down
9 changes: 4 additions & 5 deletions lib/blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,18 @@ import (
"context"

ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"

blockstore "github.com/ipfs/go-ipfs-blockstore"
)

// NewTemporary returns a temporary blockstore.
func NewTemporary() blockstore.Blockstore {
return NewBlockstore(ds.NewMapDatastore())
func NewTemporary() MemStore {
return make(MemStore)
}

// NewTemporarySync returns a thread-safe temporary blockstore.
func NewTemporarySync() blockstore.Blockstore {
return NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
func NewTemporarySync() *SyncStore {
return &SyncStore{bs: make(MemStore)}
}

// WrapIDStore wraps the underlying blockstore in an "identity" blockstore.
Expand Down
80 changes: 80 additions & 0 deletions lib/blockstore/memstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package blockstore

import (
"context"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
)

type MemStore map[cid.Cid]blocks.Block

func (m MemStore) DeleteBlock(k cid.Cid) error {
delete(m, k)
return nil
}
func (m MemStore) Has(k cid.Cid) (bool, error) {
_, ok := m[k]
return ok, nil
}
func (m MemStore) Get(k cid.Cid) (blocks.Block, error) {
b, ok := m[k]
if !ok {
return nil, blockstore.ErrNotFound
}
return b, nil
}

// GetSize returns the CIDs mapped BlockSize
func (m MemStore) GetSize(k cid.Cid) (int, error) {
b, ok := m[k]
if !ok {
return 0, blockstore.ErrNotFound
}
return len(b.RawData()), nil
}

// Put puts a given block to the underlying datastore
func (m MemStore) Put(b blocks.Block) error {
// Convert to a basic block for safety, but try to reuse the existing
// block if it's already a basic block.
k := b.Cid()
if _, ok := b.(*blocks.BasicBlock); !ok {
// If we already have the block, abort.
if _, ok := m[k]; ok {
return nil
}
// the error is only for debugging.
b, _ = blocks.NewBlockWithCid(b.RawData(), b.Cid())
}
m[b.Cid()] = b
return nil
}

// PutMany puts a slice of blocks at the same time using batching
// capabilities of the underlying datastore whenever possible.
func (m MemStore) PutMany(bs []blocks.Block) error {
for _, b := range bs {
_ = m.Put(b) // can't fail
}
return nil
}

// AllKeysChan returns a channel from which
// the CIDs in the Blockstore can be read. It should respect
// the given context, closing the channel if it becomes Done.
func (m MemStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
ch := make(chan cid.Cid, len(m))
for k := range m {
ch <- k
}
close(ch)
return ch, nil
}

// HashOnRead specifies if every read block should be
// rehashed to make sure it matches its CID.
func (m MemStore) HashOnRead(enabled bool) {
// no-op
}
68 changes: 68 additions & 0 deletions lib/blockstore/syncstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package blockstore

import (
"context"
"sync"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
)

type SyncStore struct {
mu sync.RWMutex
bs MemStore // specifically use a memStore to save indirection overhead.
}

func (m *SyncStore) DeleteBlock(k cid.Cid) error {
m.mu.Lock()
defer m.mu.Unlock()
return m.bs.DeleteBlock(k)
}
func (m *SyncStore) Has(k cid.Cid) (bool, error) {
m.mu.RLock()
defer m.mu.RUnlock()
return m.bs.Has(k)
}
func (m *SyncStore) Get(k cid.Cid) (blocks.Block, error) {
m.mu.RLock()
defer m.mu.RUnlock()
return m.bs.Get(k)
}

// GetSize returns the CIDs mapped BlockSize
func (m *SyncStore) GetSize(k cid.Cid) (int, error) {
m.mu.RLock()
defer m.mu.RUnlock()
return m.bs.GetSize(k)
}

// Put puts a given block to the underlying datastore
func (m *SyncStore) Put(b blocks.Block) error {
m.mu.Lock()
defer m.mu.Unlock()
return m.bs.Put(b)
}

// PutMany puts a slice of blocks at the same time using batching
// capabilities of the underlying datastore whenever possible.
func (m *SyncStore) PutMany(bs []blocks.Block) error {
m.mu.Lock()
defer m.mu.Unlock()
return m.bs.PutMany(bs)
}

// AllKeysChan returns a channel from which
// the CIDs in the Blockstore can be read. It should respect
// the given context, closing the channel if it becomes Done.
func (m *SyncStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
m.mu.RLock()
defer m.mu.RUnlock()
// this blockstore implementation doesn't do any async work.
return m.bs.AllKeysChan(ctx)
}

// HashOnRead specifies if every read block should be
// rehashed to make sure it matches its CID.
func (m *SyncStore) HashOnRead(enabled bool) {
// noop
}
4 changes: 3 additions & 1 deletion lib/bufbstore/buf_bstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ type BufferedBS struct {
}

func NewBufferedBstore(base bstore.Blockstore) *BufferedBS {
buf := bstore.NewTemporary()
var buf bstore.Blockstore
if os.Getenv("LOTUS_DISABLE_VM_BUF") == "iknowitsabadidea" {
log.Warn("VM BLOCKSTORE BUFFERING IS DISABLED")
buf = base
} else {
buf = bstore.NewTemporary()
}

return &BufferedBS{
Expand Down
Loading