diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 68ee5e20c63..99ca2fc65ae 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -37,6 +37,7 @@ import ( "github.com/filecoin-project/lotus/lib/bufbstore" "github.com/filecoin-project/lotus/lib/sigs" "github.com/filecoin-project/lotus/metrics" + "github.com/filecoin-project/lotus/node/impl/client" ) var log = logging.Logger("sub") @@ -44,6 +45,13 @@ var log = logging.Logger("sub") var ErrSoftFailure = errors.New("soft validation failure") var ErrInsufficientPower = errors.New("incoming block's miner does not have minimum power") +var msgCidPrefix = cid.Prefix{ + Version: 1, + Codec: cid.DagCBOR, + MhType: client.DefaultHashFunction, + MhLength: 32, +} + func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, bs bserv.BlockService, cmgr connmgr.ConnManager) { // Timeout after (block time + propagation delay). This is useless at // this point. @@ -168,6 +176,9 @@ func fetchCids( cidIndex := make(map[cid.Cid]int) for i, c := range cids { + if c.Prefix() != msgCidPrefix { + return fmt.Errorf("invalid msg CID: %s", c) + } cidIndex[c] = i } if len(cids) != len(cidIndex) { diff --git a/chain/sync.go b/chain/sync.go index dd83bf31984..97915d8df31 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -323,49 +323,46 @@ func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error { return xerrors.Errorf("block %s has too many messages (%d)", fblk.Header.Cid(), msgc) } - // Collect the CIDs of both types of messages separately: BLS and Secpk. - var bcids, scids []cid.Cid - for _, m := range fblk.BlsMessages { - bcids = append(bcids, m.Cid()) - } - - for _, m := range fblk.SecpkMessages { - scids = append(scids, m.Cid()) - } - // TODO: IMPORTANT(GARBAGE). These message puts and the msgmeta // computation need to go into the 'temporary' side of the blockstore when // we implement that - blockstore := syncer.store.Blockstore() - bs := cbor.NewCborStore(blockstore) + // We use a temporary bstore here to avoid writing intermediate pieces + // into the blockstore. + blockstore := bstore.NewTemporary() + cst := cbor.NewCborStore(blockstore) - // Compute the root CID of the combined message trie. - smroot, err := computeMsgMeta(bs, bcids, scids) - if err != nil { - return xerrors.Errorf("validating msgmeta, compute failed: %w", err) - } - - // Check that the message trie root matches with what's in the block. - if fblk.Header.Messages != smroot { - return xerrors.Errorf("messages in full block did not match msgmeta root in header (%s != %s)", fblk.Header.Messages, smroot) - } + var bcids, scids []cid.Cid for _, m := range fblk.BlsMessages { - _, err := store.PutMessage(blockstore, m) + c, err := store.PutMessage(blockstore, m) if err != nil { return xerrors.Errorf("putting bls message to blockstore after msgmeta computation: %w", err) } + bcids = append(bcids, c) } for _, m := range fblk.SecpkMessages { - _, err := store.PutMessage(blockstore, m) + c, err := store.PutMessage(blockstore, m) if err != nil { return xerrors.Errorf("putting bls message to blockstore after msgmeta computation: %w", err) } + scids = append(scids, c) } - return nil + // Compute the root CID of the combined message trie. + smroot, err := computeMsgMeta(cst, bcids, scids) + if err != nil { + return xerrors.Errorf("validating msgmeta, compute failed: %w", err) + } + + // Check that the message trie root matches with what's in the block. + if fblk.Header.Messages != smroot { + return xerrors.Errorf("messages in full block did not match msgmeta root in header (%s != %s)", fblk.Header.Messages, smroot) + } + + // Finally, flush. + return vm.Copy(context.TODO(), blockstore, syncer.store.Blockstore(), smroot) } func (syncer *Syncer) LocalPeer() peer.ID { @@ -1064,8 +1061,7 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock return err } - cst := cbor.NewCborStore(syncer.store.Blockstore()) - st, err := state.LoadStateTree(cst, stateroot) + st, err := state.LoadStateTree(syncer.store.Store(ctx), stateroot) if err != nil { return xerrors.Errorf("failed to load base state tree: %w", err) } @@ -1111,21 +1107,28 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock return nil } - store := adt0.WrapStore(ctx, cst) + // Validate message arrays in a temporary blockstore. + tmpbs := bstore.NewTemporary() + tmpstore := adt0.WrapStore(ctx, cbor.NewCborStore(tmpbs)) - bmArr := adt0.MakeEmptyArray(store) + bmArr := adt0.MakeEmptyArray(tmpstore) for i, m := range b.BlsMessages { if err := checkMsg(m); err != nil { return xerrors.Errorf("block had invalid bls message at index %d: %w", i, err) } - c := cbg.CborCid(m.Cid()) - if err := bmArr.Set(uint64(i), &c); err != nil { + c, err := store.PutMessage(tmpbs, m) + if err != nil { + return xerrors.Errorf("failed to store message %s: %w", m.Cid(), err) + } + + k := cbg.CborCid(c) + if err := bmArr.Set(uint64(i), &k); err != nil { return xerrors.Errorf("failed to put bls message at index %d: %w", i, err) } } - smArr := adt0.MakeEmptyArray(store) + smArr := adt0.MakeEmptyArray(tmpstore) for i, m := range b.SecpkMessages { if err := checkMsg(m); err != nil { return xerrors.Errorf("block had invalid secpk message at index %d: %w", i, err) @@ -1142,8 +1145,12 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock return xerrors.Errorf("secpk message %s has invalid signature: %w", m.Cid(), err) } - c := cbg.CborCid(m.Cid()) - if err := smArr.Set(uint64(i), &c); err != nil { + c, err := store.PutMessage(tmpbs, m) + if err != nil { + return xerrors.Errorf("failed to store message %s: %w", m.Cid(), err) + } + k := cbg.CborCid(c) + if err := smArr.Set(uint64(i), &k); err != nil { return xerrors.Errorf("failed to put secpk message at index %d: %w", i, err) } } @@ -1158,7 +1165,7 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock return err } - mrcid, err := cst.Put(ctx, &types.MsgMeta{ + mrcid, err := tmpstore.Put(ctx, &types.MsgMeta{ BlsMessages: bmroot, SecpkMessages: smroot, }) @@ -1170,7 +1177,8 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock return fmt.Errorf("messages didnt match message root in header") } - return nil + // Finally, flush. + return vm.Copy(ctx, tmpbs, syncer.store.Blockstore(), mrcid) } func (syncer *Syncer) verifyBlsAggregate(ctx context.Context, sig *crypto.Signature, msgs []cid.Cid, pubks [][]byte) error { diff --git a/lib/blockstore/blockstore.go b/lib/blockstore/blockstore.go index 9e74f437322..f274cd70867 100644 --- a/lib/blockstore/blockstore.go +++ b/lib/blockstore/blockstore.go @@ -18,19 +18,18 @@ import ( "context" ds "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" blockstore "github.com/ipfs/go-ipfs-blockstore" ) // NewTemporary returns a temporary blockstore. -func NewTemporary() blockstore.Blockstore { - return NewBlockstore(ds.NewMapDatastore()) +func NewTemporary() MemStore { + return make(MemStore) } // NewTemporarySync returns a thread-safe temporary blockstore. -func NewTemporarySync() blockstore.Blockstore { - return NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) +func NewTemporarySync() *SyncStore { + return &SyncStore{bs: make(MemStore)} } // WrapIDStore wraps the underlying blockstore in an "identity" blockstore. diff --git a/lib/blockstore/memstore.go b/lib/blockstore/memstore.go new file mode 100644 index 00000000000..9745d6f0395 --- /dev/null +++ b/lib/blockstore/memstore.go @@ -0,0 +1,80 @@ +package blockstore + +import ( + "context" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + blockstore "github.com/ipfs/go-ipfs-blockstore" +) + +type MemStore map[cid.Cid]blocks.Block + +func (m MemStore) DeleteBlock(k cid.Cid) error { + delete(m, k) + return nil +} +func (m MemStore) Has(k cid.Cid) (bool, error) { + _, ok := m[k] + return ok, nil +} +func (m MemStore) Get(k cid.Cid) (blocks.Block, error) { + b, ok := m[k] + if !ok { + return nil, blockstore.ErrNotFound + } + return b, nil +} + +// GetSize returns the CIDs mapped BlockSize +func (m MemStore) GetSize(k cid.Cid) (int, error) { + b, ok := m[k] + if !ok { + return 0, blockstore.ErrNotFound + } + return len(b.RawData()), nil +} + +// Put puts a given block to the underlying datastore +func (m MemStore) Put(b blocks.Block) error { + // Convert to a basic block for safety, but try to reuse the existing + // block if it's already a basic block. + k := b.Cid() + if _, ok := b.(*blocks.BasicBlock); !ok { + // If we already have the block, abort. + if _, ok := m[k]; ok { + return nil + } + // the error is only for debugging. + b, _ = blocks.NewBlockWithCid(b.RawData(), b.Cid()) + } + m[b.Cid()] = b + return nil +} + +// PutMany puts a slice of blocks at the same time using batching +// capabilities of the underlying datastore whenever possible. +func (m MemStore) PutMany(bs []blocks.Block) error { + for _, b := range bs { + _ = m.Put(b) // can't fail + } + return nil +} + +// AllKeysChan returns a channel from which +// the CIDs in the Blockstore can be read. It should respect +// the given context, closing the channel if it becomes Done. +func (m MemStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + ch := make(chan cid.Cid, len(m)) + for k := range m { + ch <- k + } + close(ch) + return ch, nil +} + +// HashOnRead specifies if every read block should be +// rehashed to make sure it matches its CID. +func (m MemStore) HashOnRead(enabled bool) { + // no-op +} diff --git a/lib/blockstore/syncstore.go b/lib/blockstore/syncstore.go new file mode 100644 index 00000000000..be9f6b5c40c --- /dev/null +++ b/lib/blockstore/syncstore.go @@ -0,0 +1,68 @@ +package blockstore + +import ( + "context" + "sync" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" +) + +type SyncStore struct { + mu sync.RWMutex + bs MemStore // specifically use a memStore to save indirection overhead. +} + +func (m *SyncStore) DeleteBlock(k cid.Cid) error { + m.mu.Lock() + defer m.mu.Unlock() + return m.bs.DeleteBlock(k) +} +func (m *SyncStore) Has(k cid.Cid) (bool, error) { + m.mu.RLock() + defer m.mu.RUnlock() + return m.bs.Has(k) +} +func (m *SyncStore) Get(k cid.Cid) (blocks.Block, error) { + m.mu.RLock() + defer m.mu.RUnlock() + return m.bs.Get(k) +} + +// GetSize returns the CIDs mapped BlockSize +func (m *SyncStore) GetSize(k cid.Cid) (int, error) { + m.mu.RLock() + defer m.mu.RUnlock() + return m.bs.GetSize(k) +} + +// Put puts a given block to the underlying datastore +func (m *SyncStore) Put(b blocks.Block) error { + m.mu.Lock() + defer m.mu.Unlock() + return m.bs.Put(b) +} + +// PutMany puts a slice of blocks at the same time using batching +// capabilities of the underlying datastore whenever possible. +func (m *SyncStore) PutMany(bs []blocks.Block) error { + m.mu.Lock() + defer m.mu.Unlock() + return m.bs.PutMany(bs) +} + +// AllKeysChan returns a channel from which +// the CIDs in the Blockstore can be read. It should respect +// the given context, closing the channel if it becomes Done. +func (m *SyncStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + m.mu.RLock() + defer m.mu.RUnlock() + // this blockstore implementation doesn't do any async work. + return m.bs.AllKeysChan(ctx) +} + +// HashOnRead specifies if every read block should be +// rehashed to make sure it matches its CID. +func (m *SyncStore) HashOnRead(enabled bool) { + // noop +} diff --git a/lib/bufbstore/buf_bstore.go b/lib/bufbstore/buf_bstore.go index a766c2b5212..4ea746444fd 100644 --- a/lib/bufbstore/buf_bstore.go +++ b/lib/bufbstore/buf_bstore.go @@ -19,10 +19,12 @@ type BufferedBS struct { } func NewBufferedBstore(base bstore.Blockstore) *BufferedBS { - buf := bstore.NewTemporary() + var buf bstore.Blockstore if os.Getenv("LOTUS_DISABLE_VM_BUF") == "iknowitsabadidea" { log.Warn("VM BLOCKSTORE BUFFERING IS DISABLED") buf = base + } else { + buf = bstore.NewTemporary() } return &BufferedBS{ diff --git a/lib/timedbs/timedbs.go b/lib/timedbs/timedbs.go new file mode 100644 index 00000000000..bb03a59e9b6 --- /dev/null +++ b/lib/timedbs/timedbs.go @@ -0,0 +1,156 @@ +package timedbs + +import ( + "context" + "fmt" + "sync" + "time" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + "go.uber.org/multierr" + + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/lib/blockstore" +) + +// TimedCacheBS is a blockstore that keeps blocks for at least the specified +// caching interval before discarding them. Garbage collection must be started +// and stopped by calling Start/Stop. +// +// Under the covers, it's implemented with an active and an inactive blockstore +// that are rotated every cache time interval. This means all blocks will be +// stored at most 2x the cache interval. +type TimedCacheBS struct { + mu sync.RWMutex + active, inactive blockstore.MemStore + + interval time.Duration + closeCh chan struct{} +} + +func NewTimedCacheBS(cacheTime time.Duration) *TimedCacheBS { + return &TimedCacheBS{ + active: blockstore.NewTemporary(), + inactive: blockstore.NewTemporary(), + interval: cacheTime, + } +} + +func (t *TimedCacheBS) Start(ctx context.Context) error { + t.mu.Lock() + defer t.mu.Unlock() + if t.closeCh != nil { + return fmt.Errorf("already started") + } + t.closeCh = make(chan struct{}) + go func() { + ticker := build.Clock.Ticker(t.interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + t.rotate() + case <-t.closeCh: + return + } + } + }() + return nil +} + +func (t *TimedCacheBS) Stop(ctx context.Context) error { + t.mu.Lock() + defer t.mu.Unlock() + if t.closeCh == nil { + return fmt.Errorf("not started started") + } + select { + case <-t.closeCh: + // already closed + default: + close(t.closeCh) + } + return nil +} + +func (t *TimedCacheBS) rotate() { + newBs := blockstore.NewTemporary() + + t.mu.Lock() + t.inactive, t.active = t.active, newBs + t.mu.Unlock() +} + +func (t *TimedCacheBS) Put(b blocks.Block) error { + // Don't check the inactive set here. We want to keep this block for at + // least one interval. + t.mu.Lock() + defer t.mu.Unlock() + return t.active.Put(b) +} + +func (t *TimedCacheBS) PutMany(bs []blocks.Block) error { + t.mu.Lock() + defer t.mu.Unlock() + return t.active.PutMany(bs) +} + +func (t *TimedCacheBS) Get(k cid.Cid) (blocks.Block, error) { + t.mu.RLock() + defer t.mu.RUnlock() + b, err := t.active.Get(k) + if err == blockstore.ErrNotFound { + b, err = t.inactive.Get(k) + } + return b, err +} + +func (t *TimedCacheBS) GetSize(k cid.Cid) (int, error) { + t.mu.RLock() + defer t.mu.RUnlock() + size, err := t.active.GetSize(k) + if err == blockstore.ErrNotFound { + size, err = t.inactive.GetSize(k) + } + return size, err +} + +func (t *TimedCacheBS) Has(k cid.Cid) (bool, error) { + t.mu.RLock() + defer t.mu.RUnlock() + if has, err := t.active.Has(k); err != nil { + return false, err + } else if has { + return true, nil + } + return t.inactive.Has(k) +} + +func (t *TimedCacheBS) HashOnRead(_ bool) { + // no-op +} + +func (t *TimedCacheBS) DeleteBlock(k cid.Cid) error { + t.mu.Lock() + defer t.mu.Unlock() + return multierr.Combine(t.active.DeleteBlock(k), t.inactive.DeleteBlock(k)) +} + +func (t *TimedCacheBS) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + t.mu.RLock() + defer t.mu.RUnlock() + + ch := make(chan cid.Cid, len(t.active)+len(t.inactive)) + for c := range t.active { + ch <- c + } + for c := range t.inactive { + if _, ok := t.active[c]; ok { + continue + } + ch <- c + } + close(ch) + return ch, nil +} diff --git a/lib/timedbs/timedbs_test.go b/lib/timedbs/timedbs_test.go new file mode 100644 index 00000000000..75f298a1d61 --- /dev/null +++ b/lib/timedbs/timedbs_test.go @@ -0,0 +1,78 @@ +package timedbs_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/lotus/lib/timedbs" +) + +func TestTimedBSSimple(t *testing.T) { + tc := timedbs.NewTimedCacheBS(3 * time.Millisecond) + _ = tc.Start(context.Background()) + defer func() { + _ = tc.Stop(context.Background()) + }() + + b1 := blocks.NewBlock([]byte("foo")) + require.NoError(t, tc.Put(b1)) + + b2 := blocks.NewBlock([]byte("bar")) + require.NoError(t, tc.Put(b2)) + + b3 := blocks.NewBlock([]byte("baz")) + + b1out, err := tc.Get(b1.Cid()) + require.NoError(t, err) + require.Equal(t, b1.RawData(), b1out.RawData()) + + has, err := tc.Has(b1.Cid()) + require.NoError(t, err) + require.True(t, has) + + time.Sleep(4 * time.Millisecond) + + // We should still have everything. + has, err = tc.Has(b1.Cid()) + require.NoError(t, err) + require.True(t, has) + + has, err = tc.Has(b2.Cid()) + require.NoError(t, err) + require.True(t, has) + + // extend b2, add b3. + require.NoError(t, tc.Put(b2)) + require.NoError(t, tc.Put(b3)) + + // all keys once. + allKeys, err := tc.AllKeysChan(context.Background()) + var ks []cid.Cid + for k := range allKeys { + ks = append(ks, k) + } + require.NoError(t, err) + require.ElementsMatch(t, ks, []cid.Cid{b1.Cid(), b2.Cid(), b3.Cid()}) + + time.Sleep(4 * time.Millisecond) + + // should still have b2, and b3, but not b1 + + has, err = tc.Has(b1.Cid()) + require.NoError(t, err) + require.False(t, has) + + has, err = tc.Has(b2.Cid()) + require.NoError(t, err) + require.True(t, has) + + has, err = tc.Has(b3.Cid()) + require.NoError(t, err) + require.True(t, has) +} diff --git a/node/modules/chain.go b/node/modules/chain.go index f563b4cddb7..ce3e9f749f2 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "os" + "time" "github.com/ipfs/go-bitswap" "github.com/ipfs/go-bitswap/network" @@ -30,6 +31,8 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/vm" "github.com/filecoin-project/lotus/lib/blockstore" + "github.com/filecoin-project/lotus/lib/bufbstore" + "github.com/filecoin-project/lotus/lib/timedbs" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/repo" @@ -41,8 +44,15 @@ func ChainBitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt r bitswapNetwork := network.NewFromIpfsHost(host, rt, network.Prefix("/chain")) bitswapOptions := []bitswap.Option{bitswap.ProvideEnabled(false)} + // Write all incoming bitswap blocks into a temporary blockstore for two + // block times. If they validate, they'll be persisted later. + cache := timedbs.NewTimedCacheBS(2 * time.Duration(build.BlockDelaySecs) * time.Second) + lc.Append(fx.Hook{OnStop: cache.Stop, OnStart: cache.Start}) + + bitswapBs := bufbstore.NewTieredBstore(bs, cache) + // Use just exch.Close(), closing the context is not needed - exch := bitswap.New(mctx, bitswapNetwork, bs, bitswapOptions...) + exch := bitswap.New(mctx, bitswapNetwork, bitswapBs, bitswapOptions...) lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { return exch.Close()