diff --git a/chain/sync.go b/chain/sync.go index be6c3595c63..7562e233d59 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -17,6 +17,7 @@ import ( "github.com/Gurpartap/async" "github.com/hashicorp/go-multierror" + blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" logging "github.com/ipfs/go-log/v2" @@ -374,21 +375,28 @@ func (syncer *Syncer) InformNewBlock(from peer.ID, blk *types.FullBlock) bool { return syncer.InformNewHead(from, fts) } -func copyBlockstore(from, to bstore.Blockstore) error { - cids, err := from.AllKeysChan(context.TODO()) +func copyBlockstore(ctx context.Context, from, to bstore.Blockstore) error { + ctx, span := trace.StartSpan(ctx, "copyBlockstore") + defer span.End() + + cids, err := from.AllKeysChan(ctx) if err != nil { return err } + // TODO: should probably expose better methods on the blockstore for this operation + var blks []blocks.Block for c := range cids { b, err := from.Get(c) if err != nil { return err } - if err := to.Put(b); err != nil { - return err - } + blks = append(blks, b) + } + + if err := to.PutMany(blks); err != nil { + return err } return nil @@ -1515,11 +1523,11 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS return err } - if err := persistMessages(bs, bstip); err != nil { + if err := persistMessages(ctx, bs, bstip); err != nil { return err } - if err := copyBlockstore(bs, syncer.store.Blockstore()); err != nil { + if err := copyBlockstore(ctx, bs, syncer.store.Blockstore()); err != nil { return xerrors.Errorf("message processing failed: %w", err) } } @@ -1596,7 +1604,10 @@ func (syncer *Syncer) fetchMessages(ctx context.Context, headers []*types.TipSet return batch, nil } -func persistMessages(bs bstore.Blockstore, bst *exchange.CompactedMessages) error { +func persistMessages(ctx context.Context, bs bstore.Blockstore, bst *exchange.CompactedMessages) error { + _, span := trace.StartSpan(ctx, "persistMessages") + defer span.End() + for _, m := range bst.Bls { //log.Infof("putting BLS message: %s", m.Cid()) if _, err := store.PutMessage(bs, m); err != nil { diff --git a/chain/vm/vm.go b/chain/vm/vm.go index 35ece7c5706..3bafbe0900f 100644 --- a/chain/vm/vm.go +++ b/chain/vm/vm.go @@ -546,7 +546,7 @@ func (vm *VM) Flush(ctx context.Context) (cid.Cid, error) { return cid.Undef, xerrors.Errorf("flushing vm: %w", err) } - if err := Copy(from, to, root); err != nil { + if err := Copy(ctx, from, to, root); err != nil { return cid.Undef, xerrors.Errorf("copying tree: %w", err) } @@ -600,9 +600,18 @@ func linksForObj(blk block.Block, cb func(cid.Cid)) error { } } -func Copy(from, to blockstore.Blockstore, root cid.Cid) error { +func Copy(ctx context.Context, from, to blockstore.Blockstore, root cid.Cid) error { + ctx, span := trace.StartSpan(ctx, "vm.Copy") // nolint + defer span.End() + + var numBlocks int + var totalCopySize int + var batch []block.Block batchCp := func(blk block.Block) error { + numBlocks++ + totalCopySize += len(blk.RawData()) + batch = append(batch, blk) if len(batch) > 100 { if err := to.PutMany(batch); err != nil { @@ -623,6 +632,11 @@ func Copy(from, to blockstore.Blockstore, root cid.Cid) error { } } + span.AddAttributes( + trace.Int64Attribute("numBlocks", int64(numBlocks)), + trace.Int64Attribute("copySize", int64(totalCopySize)), + ) + return nil }