Skip to content

Commit

Permalink
Add flow control to SYNC_STATE_SNAPSHOT
Browse files Browse the repository at this point in the history
This patch changes the peer snapshot retrieval
logic from a non-blocking channel write to a
blocking channel write with a timeout.  This allows
gRPC's own buffering mechanisms to apply back pressure
when sending large state snapshots.

Additionally, this change will suppress some of the
spammy log messages which can flood logs when the
correlation id gets out of sync.

https://jira.hyperledger.org/browse/FAB-380

Change-Id: Icc3d37f2d161f6ac0bae984ca43e2286a45fbb3d
Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
  • Loading branch information
Jason Yellick committed Sep 14, 2016
1 parent 7b2e488 commit 62d866d
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
vp0:
environment:
# The combination of the following two environment variables ensures that a state snapshot will be pulled
# and that the state snapshot buffer will be exhausted
- CORE_PEER_SYNC_STATE_SNAPSHOT_CHANNELSIZE=0
- CORE_STATETRANSFER_MAXDELTAS=1
vp1:
environment:
- CORE_PEER_SYNC_STATE_SNAPSHOT_CHANNELSIZE=0
- CORE_STATETRANSFER_MAXDELTAS=1
vp2:
environment:
- CORE_PEER_SYNC_STATE_SNAPSHOT_CHANNELSIZE=0
- CORE_STATETRANSFER_MAXDELTAS=1
vp3:
environment:
- CORE_PEER_SYNC_STATE_SNAPSHOT_CHANNELSIZE=0
- CORE_STATETRANSFER_MAXDELTAS=1
9 changes: 6 additions & 3 deletions bddtests/peer_basic.feature
Original file line number Diff line number Diff line change
Expand Up @@ -505,9 +505,11 @@ Feature: Network of Peers
| docker-compose-4-consensus-batch.yml | 60 |


#@doNotDecompose
#@wip
@issue_680
@fab380
Scenario Outline: chaincode example02 with 4 peers and 1 membersrvc, issue #680 (State transfer)

Given we compose "<ComposeFile>"
And I register with CA supplying username "binhn" and secret "7avZQLwcUe9q" on peers:
| vp0 |
Expand Down Expand Up @@ -579,8 +581,9 @@ Feature: Network of Peers


Examples: Consensus Options
| ComposeFile | WaitTime |
| docker-compose-4-consensus-batch.yml | 60 |
| ComposeFile | WaitTime |
| docker-compose-4-consensus-batch.yml | 60 |
| docker-compose-4-consensus-batch.yml docker-compose-4-consensus-batch-nosnapshotbuffer.yml | 60 |


@issue_724
Expand Down
24 changes: 19 additions & 5 deletions core/peer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
pb "github.com/hyperledger/fabric/protos"
)

const DefaultSyncSnapshotTimeout time.Duration = 60 * time.Second

// Handler peer handler implementation.
type Handler struct {
chatMutex sync.Mutex
Expand All @@ -43,6 +45,8 @@ type Handler struct {
snapshotRequestHandler *syncStateSnapshotRequestHandler
syncStateDeltasRequestHandler *syncStateDeltasHandler
syncBlocksRequestHandler *syncBlocksRequestHandler
syncSnapshotTimeout time.Duration
lastIgnoredSnapshotCID *uint64
}

// NewPeerHandler returns a new Peer handler
Expand All @@ -56,6 +60,12 @@ func NewPeerHandler(coord MessageHandlerCoordinator, stream ChatStream, initiate
}
d.doneChan = make(chan struct{})

if dur := viper.GetDuration("peer.sync.state.snapshot.writeTimeout"); dur == 0 {
d.syncSnapshotTimeout = DefaultSyncSnapshotTimeout
} else {
d.syncSnapshotTimeout = dur
}

d.snapshotRequestHandler = newSyncStateSnapshotRequestHandler()
d.syncStateDeltasRequestHandler = newSyncStateDeltasHandler()
d.syncBlocksRequestHandler = newSyncBlocksRequestHandler()
Expand Down Expand Up @@ -494,22 +504,26 @@ func (d *Handler) beforeSyncStateSnapshot(e *fsm.Event) {
peerLogger.Errorf("Error sending syncStateSnapshot to channel: %v", x)
}
}()
// Use non-blocking send, will WARN and close channel if missed message.
// Use blocking send and timeout, will WARN and close channel if write times out
d.snapshotRequestHandler.Lock()
defer d.snapshotRequestHandler.Unlock()
timer := time.NewTimer(d.syncSnapshotTimeout)
// Make sure the correlationID matches
if d.snapshotRequestHandler.shouldHandle(syncStateSnapshot.Request.CorrelationId) {
select {
case d.snapshotRequestHandler.channel <- syncStateSnapshot:
default:
case <-timer.C:
// Was not able to write to the channel, in which case the Snapshot stream is incomplete, and must be discarded, closing the channel
// without sending the terminating message which would have had an empty byte slice.
peerLogger.Warningf("Did NOT send SyncStateSnapshot message to channel for correlationId = %d, sequence = %d, closing channel as the message has been discarded", syncStateSnapshot.Request.CorrelationId, syncStateSnapshot.Sequence)
peerLogger.Warningf("Did NOT send SyncStateSnapshot message to channel for correlationId = %d, sequence = %d because we timed out reading, closing channel as the message has been discarded", syncStateSnapshot.Request.CorrelationId, syncStateSnapshot.Sequence)
d.snapshotRequestHandler.reset()
}
} else {
//Ignore the message, does not match the current correlationId
peerLogger.Warningf("Ignoring SyncStateSnapshot message with correlationId = %d, sequence = %d, as current correlationId = %d", syncStateSnapshot.Request.CorrelationId, syncStateSnapshot.Sequence, d.snapshotRequestHandler.correlationID)
if d.lastIgnoredSnapshotCID == nil || *d.lastIgnoredSnapshotCID < syncStateSnapshot.Request.CorrelationId {
peerLogger.Warningf("Ignoring SyncStateSnapshot message with correlationId = %d, sequence = %d, as current correlationId = %d, future messages for this (and older ids) will be suppressed", syncStateSnapshot.Request.CorrelationId, syncStateSnapshot.Sequence, d.snapshotRequestHandler.correlationID)
d.lastIgnoredSnapshotCID = &syncStateSnapshot.Request.CorrelationId
//Ignore the message, does not match the current correlationId
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/peer/statetransfer/statetransfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func (sts *coordinatorImpl) syncBlocks(highBlock, lowBlock uint64, highHash []by

func (sts *coordinatorImpl) syncBlockchainToTarget(blockSyncReq *blockSyncReq) {

logger.Debugf("Processing a blockSyncReq to block %d", blockSyncReq.blockNumber)
logger.Debugf("Processing a blockSyncReq to block %d through %d", blockSyncReq.blockNumber, blockSyncReq.reportOnBlock)

blockchainSize := sts.stack.GetBlockchainSize()

Expand Down
8 changes: 6 additions & 2 deletions peer/core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,13 @@ peer:
snapshot:
# Channel size for readonly syncStateSnapshot messages channel
# for receiving state deltas for snapshot from oppositie Peer Endpoints.
# NOTE: currently messages are not stored and forwarded, but
# rather lost if the channel write blocks.
# NOTE: when the channel is exhausted, the writes block for up to the
# writeTimeout specified below
channelSize: 50
# Write timeout for the syncStateSnapshot messages
# When the channel above is exhausted, messages block before being
# discarded for this amount of time
writeTimeout: 60s
deltas:
# Channel size for readonly syncStateDeltas messages channel for
# receiving state deltas for a syncBlockRange from oppositie
Expand Down

0 comments on commit 62d866d

Please sign in to comment.