Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

properly handle partial sync responses #1487

Merged
merged 1 commit into from
Apr 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions chain/blocksync/blocksync_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tsk types.TipSetKey, count i
continue
}

if res.Status == 0 {
if res.Status == StatusOK || res.Status == StatusPartial {
resp, err := bs.processBlocksResponse(req, res)
if err != nil {
return nil, xerrors.Errorf("success response from peer failed to process: %w", err)
Expand All @@ -114,6 +114,7 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tsk types.TipSetKey, count i
bs.host.ConnManager().TagPeer(p, "bsync", 25)
return resp, nil
}

oerr = bs.processStatus(req, res)
if oerr != nil {
log.Warnf("BlockSync peer %s response was an error: %s", p.String(), oerr)
Expand Down Expand Up @@ -145,7 +146,7 @@ func (bs *BlockSync) GetFullTipSet(ctx context.Context, p peer.ID, tsk types.Tip

return bstsToFullTipSet(bts)
case 101: // Partial Response
return nil, xerrors.Errorf("partial responses are not handled")
return nil, xerrors.Errorf("partial responses are not handled for single tipset fetching")
case 201: // req.Start not found
return nil, fmt.Errorf("not found")
case 202: // Go Away
Expand Down Expand Up @@ -185,7 +186,7 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun
req := &BlockSyncRequest{
Start: h.Cids(),
RequestLength: count,
Options: BSOptMessages | BSOptBlocks,
Options: BSOptMessages,
}

var err error
Expand All @@ -205,8 +206,8 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun
}

if res.Status == StatusPartial {
log.Warn("dont yet handle partial responses")
continue
// TODO: track partial response sizes to ensure we don't overrequest too often
return res.Chain, nil
}

err = bs.processStatus(req, res)
Expand Down
28 changes: 19 additions & 9 deletions chain/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func zipTipSetAndMessages(bs cbor.IpldStore, ts *types.TipSet, allbmsgs []*types
}

if b.Messages != mrcid {
return nil, fmt.Errorf("messages didnt match message root in header")
return nil, fmt.Errorf("messages didnt match message root in header for ts %s", ts.Key())
}

fb := &types.FullBlock{
Expand Down Expand Up @@ -1133,25 +1133,35 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS
batchSize = i
}

next := headers[i-batchSize]
bstips, err := syncer.Bsync.GetChainMessages(ctx, next, uint64(batchSize+1))
if err != nil {
return xerrors.Errorf("message processing failed: %w", err)
nextI := (i + 1) - batchSize // want to fetch batchSize values, 'i' points to last one we want to fetch, so its 'inclusive' of our request, thus we need to add one to our request start index

var bstout []*blocksync.BSTipSet
for len(bstout) < batchSize {
next := headers[nextI]

nreq := batchSize - len(bstout)
bstips, err := syncer.Bsync.GetChainMessages(ctx, next, uint64(nreq))
if err != nil {
return xerrors.Errorf("message processing failed: %w", err)
}

bstout = append(bstout, bstips...)
nextI += len(bstips)
}

for bsi := 0; bsi < len(bstips); bsi++ {
for bsi := 0; bsi < len(bstout); bsi++ {
// temp storage so we don't persist data we dont want to
ds := dstore.NewMapDatastore()
bs := bstore.NewBlockstore(ds)
blks := cbor.NewCborStore(bs)

this := headers[i-bsi]
bstip := bstips[len(bstips)-(bsi+1)]
bstip := bstout[len(bstout)-(bsi+1)]
fts, err := zipTipSetAndMessages(blks, this, bstip.BlsMessages, bstip.SecpkMessages, bstip.BlsMsgIncludes, bstip.SecpkMsgIncludes)
if err != nil {
log.Warnw("zipping failed", "error", err, "bsi", bsi, "i", i,
"height", this.Height(), "bstip-height", bstip.Blocks[0].Height,
"bstips", bstips, "next-height", i+batchSize)
"next-height", i+batchSize)
return xerrors.Errorf("message processing failed: %w", err)
}

Expand All @@ -1167,7 +1177,7 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS
return xerrors.Errorf("message processing failed: %w", err)
}
}
i -= windowSize
i -= batchSize
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion lib/increadtimeout/incrt.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (crt *incrt) Read(buf []byte) (int, error) {

err := crt.rd.SetReadDeadline(start.Add(crt.wait))
if err != nil {
log.Warnf("unable to set daedline: %+v", err)
log.Warnf("unable to set deadline: %+v", err)
}

n, err := crt.rd.Read(buf)
Expand Down