Skip to content

Commit

Permalink
[FAB-5391]Prevent concurrent invokes launching cc cont
Browse files Browse the repository at this point in the history
This CR prevents concurrent invokes from launching a chaincode
container at the same time (e.g. during performance testing). The first
invoke should succeed (and launch the container) while subsequent
invokes should fail until the container has finished launching.

Note: This does not change any behavior as subsequent invokes should
have failed but it now sends a clear error message that the chaincode
container is already launching.

Change-Id: Ic1772a5f25dd0e4c34278e6e7bdb8507f16269c8
Signed-off-by: Will Lahti <wtlahti@us.ibm.com>
  • Loading branch information
wlahti committed Jul 25, 2017
1 parent 7a480ce commit 2232d0e
Showing 1 changed file with 56 additions and 5 deletions.
61 changes: 56 additions & 5 deletions core/chaincode/chaincode_support.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ type runningChaincodes struct {
sync.RWMutex
// chaincode environment for each chaincode
chaincodeMap map[string]*chaincodeRTEnv

//mark the starting of launch of a chaincode so multiple requests
//do not attempt to start the chaincode at the same time
launchStarted map[string]bool
}

//GetChain returns the chaincode framework support object
Expand All @@ -119,14 +123,22 @@ func (chaincodeSupport *ChaincodeSupport) chaincodeHasBeenLaunched(chaincode str
return chrte, hasbeenlaunched
}

//call this under lock
func (chaincodeSupport *ChaincodeSupport) launchStarted(chaincode string) bool {
if _, launchStarted := chaincodeSupport.runningChaincodes.launchStarted[chaincode]; launchStarted {
return true
}
return false
}

// NewChaincodeSupport creates a new ChaincodeSupport instance
func NewChaincodeSupport(getCCEndpoint func() (*pb.PeerEndpoint, error), userrunsCC bool, ccstartuptimeout time.Duration) *ChaincodeSupport {
ccprovider.SetChaincodesPath(config.GetPath("peer.fileSystemPath") + string(filepath.Separator) + "chaincodes")

pnid := viper.GetString("peer.networkId")
pid := viper.GetString("peer.id")

theChaincodeSupport = &ChaincodeSupport{runningChaincodes: &runningChaincodes{chaincodeMap: make(map[string]*chaincodeRTEnv)}, peerNetworkID: pnid, peerID: pid}
theChaincodeSupport = &ChaincodeSupport{runningChaincodes: &runningChaincodes{chaincodeMap: make(map[string]*chaincodeRTEnv), launchStarted: make(map[string]bool)}, peerNetworkID: pnid, peerID: pid}

//initialize global chain

Expand Down Expand Up @@ -396,7 +408,8 @@ func (chaincodeSupport *ChaincodeSupport) getArgsAndEnv(cccid *ccprovider.CCCont
return args, envs, nil
}

// launchAndWaitForRegister will launch container if not already running. Use the targz to create the image if not found
//launchAndWaitForRegister will launch container if not already running. Use
//the targz to create the image if not found
func (chaincodeSupport *ChaincodeSupport) launchAndWaitForRegister(ctxt context.Context, cccid *ccprovider.CCContext, cds *pb.ChaincodeDeploymentSpec, cLang pb.ChaincodeSpec_Type, builder api.BuildSpecFactory) error {
canName := cccid.GetCanonicalName()
if canName == "" {
Expand All @@ -408,9 +421,37 @@ func (chaincodeSupport *ChaincodeSupport) launchAndWaitForRegister(ctxt context.
//multiple launch by failing
if _, hasBeenLaunched := chaincodeSupport.chaincodeHasBeenLaunched(canName); hasBeenLaunched {
chaincodeSupport.runningChaincodes.Unlock()
return fmt.Errorf("Error chaincode is being launched: %s", canName)
return fmt.Errorf("Error chaincode has been launched: %s", canName)
}

//prohibit multiple simultaneous invokes (for example while flooding the
//system with invokes as in a stress test scenario) from attempting to launch
//the chaincode. The first one wins. Others receive an error.
//NOTE - this transient behavior as the chaincode is being launched is nothing
//new. All invokes (except the one launching the CC) will fail in any case
//until the container is up and registered.
if chaincodeSupport.launchStarted(canName) {
chaincodeSupport.runningChaincodes.Unlock()
return fmt.Errorf("Error chaincode is already launching: %s", canName)
}

//Chaincode is not up and is not in the process of being launched. Setup flag
//for launching so we can proceed to do that undisturbed by other requests on
//this chaincode
chaincodeLogger.Debugf("chaincode %s is being launched", canName)
chaincodeSupport.runningChaincodes.launchStarted[canName] = true

//now that chaincode launch sequence is done (whether successful or not),
//unset launch flag as we get out of this function. If launch was not
//successful (handler was not created), next invoke will try again.
defer func() {
chaincodeSupport.runningChaincodes.Lock()
defer chaincodeSupport.runningChaincodes.Unlock()

delete(chaincodeSupport.runningChaincodes.launchStarted, canName)
chaincodeLogger.Debugf("chaincode %s launch seq completed", canName)
}()

chaincodeSupport.runningChaincodes.Unlock()

//launch the chaincode
Expand Down Expand Up @@ -535,8 +576,8 @@ func (chaincodeSupport *ChaincodeSupport) Launch(context context.Context, cccid
if chrte, ok = chaincodeSupport.chaincodeHasBeenLaunched(canName); ok {
if !chrte.handler.registered {
chaincodeSupport.runningChaincodes.Unlock()
chaincodeLogger.Debugf("premature execution - chaincode (%s) is being launched", canName)
err = fmt.Errorf("premature execution - chaincode (%s) is being launched", canName)
chaincodeLogger.Debugf("premature execution - chaincode (%s) launched and waiting for registration", canName)
err = fmt.Errorf("premature execution - chaincode (%s) launched and waiting for registration", canName)
return cID, cMsg, err
}
if chrte.handler.isRunning() {
Expand All @@ -547,6 +588,16 @@ func (chaincodeSupport *ChaincodeSupport) Launch(context context.Context, cccid
return cID, cMsg, nil
}
chaincodeLogger.Debugf("Container not in READY state(%s)...send init/ready", chrte.handler.FSM.Current())
} else {
//chaincode is not up... but is the launch process underway? this is
//strictly not necessary as the actual launch process will catch this
//(in launchAndWaitForRegister), just a bit of optimization for thundering
//herds
if chaincodeSupport.launchStarted(canName) {
chaincodeSupport.runningChaincodes.Unlock()
err = fmt.Errorf("premature execution - chaincode (%s) is being launched", canName)
return cID, cMsg, err
}
}
chaincodeSupport.runningChaincodes.Unlock()

Expand Down

0 comments on commit 2232d0e

Please sign in to comment.