Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel fetch for chain sync #3887

Merged
merged 11 commits into from
Sep 23, 2020
7 changes: 7 additions & 0 deletions chain/exchange/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/rand"
"time"

"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -357,6 +358,12 @@ func (c *client) sendRequestToPeer(ctx context.Context, peer peer.ID, req *Reque
return nil, xerrors.Errorf("failed to open stream to peer: %w", err)
}

defer func() {
// Note: this will become just stream.Close once we've completed the go-libp2p migration to
// go-libp2p-core 0.7.0
go helpers.FullClose(stream) //nolint:errcheck
}()

// Write request.
_ = stream.SetWriteDeadline(time.Now().Add(WriteReqDeadline))
if err := cborutil.WriteCborRPC(stream, req); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion chain/exchange/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (
WriteReqDeadline = 5 * time.Second
ReadResDeadline = WriteReqDeadline
ReadResMinSpeed = 50 << 10
ShufflePeersPrefix = 5
ShufflePeersPrefix = 16
WriteResDeadline = 60 * time.Second
)

Expand Down
5 changes: 4 additions & 1 deletion chain/exchange/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/lotus/chain/types"

"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/helpers"
inet "github.com/libp2p/go-libp2p-core/network"
)

Expand All @@ -39,7 +40,9 @@ func (s *server) HandleStream(stream inet.Stream) {
ctx, span := trace.StartSpan(context.Background(), "chainxchg.HandleStream")
defer span.End()

defer stream.Close() //nolint:errcheck
// Note: this will become just stream.Close once we've completed the go-libp2p migration to
// go-libp2p-core 0.7.0
defer helpers.FullClose(stream) //nolint:errcheck

var req Request
if err := cborutil.ReadCborRPC(bufio.NewReader(stream), &req); err != nil {
Expand Down
133 changes: 80 additions & 53 deletions chain/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"os"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -63,20 +62,12 @@ var (
// where the Syncer publishes candidate chain heads to be synced.
LocalIncoming = "incoming"

log = logging.Logger("chain")
defaultMessageFetchWindowSize = 200
)
log = logging.Logger("chain")

func init() {
if s := os.Getenv("LOTUS_BSYNC_MSG_WINDOW"); s != "" {
val, err := strconv.Atoi(s)
if err != nil {
log.Errorf("failed to parse LOTUS_BSYNC_MSG_WINDOW: %s", err)
return
}
defaultMessageFetchWindowSize = val
}
}
concurrentSyncRequests = exchange.ShufflePeersPrefix
syncRequestBatchSize = 8
syncRequestRetries = 5
)

// Syncer is in charge of running the chain synchronization logic. As such, it
// is tasked with these functions, amongst others:
Expand Down Expand Up @@ -132,8 +123,6 @@ type Syncer struct {

verifier ffiwrapper.Verifier

windowSize int

tickerCtxCancel context.CancelFunc

checkptLk sync.Mutex
Expand Down Expand Up @@ -175,7 +164,6 @@ func NewSyncer(ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.C
receiptTracker: newBlockReceiptTracker(),
connmgr: connmgr,
verifier: verifier,
windowSize: defaultMessageFetchWindowSize,

incoming: pubsub.New(50),
}
Expand Down Expand Up @@ -1483,8 +1471,6 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS

span.AddAttributes(trace.Int64Attribute("num_headers", int64(len(headers))))

windowSize := syncer.windowSize
mainLoop:
for i := len(headers) - 1; i >= 0; {
fts, err := syncer.store.TryFillTipSet(headers[i])
if err != nil {
Expand All @@ -1498,34 +1484,23 @@ mainLoop:
continue
}

batchSize := windowSize
batchSize := concurrentSyncRequests * syncRequestBatchSize
if i < batchSize {
batchSize = i
if i == 0 {
batchSize = 1
} else {
batchSize = i
}
}

nextI := (i + 1) - batchSize // want to fetch batchSize values, 'i' points to last one we want to fetch, so its 'inclusive' of our request, thus we need to add one to our request start index

ss.SetStage(api.StageFetchingMessages)
var bstout []*exchange.CompactedMessages
for len(bstout) < batchSize {
next := headers[nextI]

nreq := batchSize - len(bstout)
bstips, err := syncer.Exchange.GetChainMessages(ctx, next, uint64(nreq))
if err != nil {
// TODO check errors for temporary nature
if windowSize > 1 {
windowSize /= 2
log.Infof("error fetching messages: %s; reducing window size to %d and trying again", err, windowSize)
continue mainLoop
}
return xerrors.Errorf("message processing failed: %w", err)
}
startOffset := i + 1 - batchSize
bstout, batchErr := syncer.fetchMessages(ctx, headers[startOffset:startOffset+batchSize], startOffset)
ss.SetStage(api.StageMessages)

bstout = append(bstout, bstips...)
nextI += len(bstips)
if batchErr != nil {
return xerrors.Errorf("failed to fetch messages: %w", err)
}
ss.SetStage(api.StageMessages)

for bsi := 0; bsi < len(bstout); bsi++ {
// temp storage so we don't persist data we dont want to
Expand Down Expand Up @@ -1555,24 +1530,76 @@ mainLoop:
}
}

if i >= windowSize {
newWindowSize := windowSize + 10
if newWindowSize > int(exchange.MaxRequestLength) {
newWindowSize = int(exchange.MaxRequestLength)
i -= batchSize
}

return nil
}

func (syncer *Syncer) fetchMessages(ctx context.Context, headers []*types.TipSet, startOffset int) ([]*exchange.CompactedMessages, error) {
batchSize := len(headers)
batch := make([]*exchange.CompactedMessages, batchSize)

var wg sync.WaitGroup
var mx sync.Mutex
var batchErr error

start := build.Clock.Now()

for j := 0; j < batchSize; j += syncRequestBatchSize {
wg.Add(1)
go func(j int) {
defer wg.Done()

nreq := syncRequestBatchSize
if j+nreq > batchSize {
nreq = batchSize - j
}
if newWindowSize > windowSize {
windowSize = newWindowSize
log.Infof("successfully fetched %d messages; increasing window size to %d", len(bstout), windowSize)

failed := false
for offset := 0; !failed && offset < nreq; {
nextI := j + offset
nextHeader := headers[nextI]

var requestErr error
var requestResult []*exchange.CompactedMessages
for retry := 0; requestResult == nil && retry < syncRequestRetries; retry++ {
if retry > 0 {
log.Infof("fetching messages at %d (retry %d)", startOffset+nextI, retry)
} else {
log.Infof("fetching messages at %d", startOffset+nextI)
}

result, err := syncer.Exchange.GetChainMessages(ctx, nextHeader, uint64(nreq-offset))
if err != nil {
requestErr = multierror.Append(requestErr, err)
} else {
requestResult = result
}
}

mx.Lock()
if requestResult != nil {
copy(batch[j+offset:], requestResult)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I concurrent reslice and copy into a slice safe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be safe, I am just not sure -- hence the lock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or do you mean the copy operation on the reslice? That should be perfectly fine, we make a new slice and then we copy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I 100% missed the lock ;p

offset += len(requestResult)
} else {
log.Errorf("error fetching messages at %d: %s", nextI, requestErr)
batchErr = multierror.Append(batchErr, requestErr)
failed = true
}
mx.Unlock()
}
}
}(j)
}
wg.Wait()

i -= batchSize
if batchErr != nil {
return nil, batchErr
}

// remember our window size
syncer.windowSize = windowSize
log.Infof("fetching messages for %d tipsets at %d done; took %s", batchSize, startOffset, build.Clock.Since(start))

return nil
return batch, nil
}

func persistMessages(bs bstore.Blockstore, bst *exchange.CompactedMessages) error {
Expand Down