From 2232d0ec348c81004e36ebbb1e1ab4d61f24b2cc Mon Sep 17 00:00:00 2001 From: Will Lahti Date: Mon, 24 Jul 2017 16:43:57 -0400 Subject: [PATCH] [FAB-5391]Prevent concurrent invokes launching cc cont This CR prevents concurrent invokes from launching a chaincode container at the same time (e.g. during performance testing). The first invoke should succeed (and launch the container) while subsequent invokes should fail until the container has finished launching. Note: This does not change any behavior as subsequent invokes should have failed but it now sends a clear error message that the chaincode container is already launching. Change-Id: Ic1772a5f25dd0e4c34278e6e7bdb8507f16269c8 Signed-off-by: Will Lahti --- core/chaincode/chaincode_support.go | 61 ++++++++++++++++++++++++++--- 1 file changed, 56 insertions(+), 5 deletions(-) diff --git a/core/chaincode/chaincode_support.go b/core/chaincode/chaincode_support.go index a7689276a6a..11b9ead817b 100644 --- a/core/chaincode/chaincode_support.go +++ b/core/chaincode/chaincode_support.go @@ -95,6 +95,10 @@ type runningChaincodes struct { sync.RWMutex // chaincode environment for each chaincode chaincodeMap map[string]*chaincodeRTEnv + + //mark the starting of launch of a chaincode so multiple requests + //do not attempt to start the chaincode at the same time + launchStarted map[string]bool } //GetChain returns the chaincode framework support object @@ -119,6 +123,14 @@ func (chaincodeSupport *ChaincodeSupport) chaincodeHasBeenLaunched(chaincode str return chrte, hasbeenlaunched } +//call this under lock +func (chaincodeSupport *ChaincodeSupport) launchStarted(chaincode string) bool { + if _, launchStarted := chaincodeSupport.runningChaincodes.launchStarted[chaincode]; launchStarted { + return true + } + return false +} + // NewChaincodeSupport creates a new ChaincodeSupport instance func NewChaincodeSupport(getCCEndpoint func() (*pb.PeerEndpoint, error), userrunsCC bool, ccstartuptimeout time.Duration) *ChaincodeSupport { ccprovider.SetChaincodesPath(config.GetPath("peer.fileSystemPath") + string(filepath.Separator) + "chaincodes") @@ -126,7 +138,7 @@ func NewChaincodeSupport(getCCEndpoint func() (*pb.PeerEndpoint, error), userrun pnid := viper.GetString("peer.networkId") pid := viper.GetString("peer.id") - theChaincodeSupport = &ChaincodeSupport{runningChaincodes: &runningChaincodes{chaincodeMap: make(map[string]*chaincodeRTEnv)}, peerNetworkID: pnid, peerID: pid} + theChaincodeSupport = &ChaincodeSupport{runningChaincodes: &runningChaincodes{chaincodeMap: make(map[string]*chaincodeRTEnv), launchStarted: make(map[string]bool)}, peerNetworkID: pnid, peerID: pid} //initialize global chain @@ -396,7 +408,8 @@ func (chaincodeSupport *ChaincodeSupport) getArgsAndEnv(cccid *ccprovider.CCCont return args, envs, nil } -// launchAndWaitForRegister will launch container if not already running. Use the targz to create the image if not found +//launchAndWaitForRegister will launch container if not already running. Use +//the targz to create the image if not found func (chaincodeSupport *ChaincodeSupport) launchAndWaitForRegister(ctxt context.Context, cccid *ccprovider.CCContext, cds *pb.ChaincodeDeploymentSpec, cLang pb.ChaincodeSpec_Type, builder api.BuildSpecFactory) error { canName := cccid.GetCanonicalName() if canName == "" { @@ -408,9 +421,37 @@ func (chaincodeSupport *ChaincodeSupport) launchAndWaitForRegister(ctxt context. //multiple launch by failing if _, hasBeenLaunched := chaincodeSupport.chaincodeHasBeenLaunched(canName); hasBeenLaunched { chaincodeSupport.runningChaincodes.Unlock() - return fmt.Errorf("Error chaincode is being launched: %s", canName) + return fmt.Errorf("Error chaincode has been launched: %s", canName) } + //prohibit multiple simultaneous invokes (for example while flooding the + //system with invokes as in a stress test scenario) from attempting to launch + //the chaincode. The first one wins. Others receive an error. + //NOTE - this transient behavior as the chaincode is being launched is nothing + //new. All invokes (except the one launching the CC) will fail in any case + //until the container is up and registered. + if chaincodeSupport.launchStarted(canName) { + chaincodeSupport.runningChaincodes.Unlock() + return fmt.Errorf("Error chaincode is already launching: %s", canName) + } + + //Chaincode is not up and is not in the process of being launched. Setup flag + //for launching so we can proceed to do that undisturbed by other requests on + //this chaincode + chaincodeLogger.Debugf("chaincode %s is being launched", canName) + chaincodeSupport.runningChaincodes.launchStarted[canName] = true + + //now that chaincode launch sequence is done (whether successful or not), + //unset launch flag as we get out of this function. If launch was not + //successful (handler was not created), next invoke will try again. + defer func() { + chaincodeSupport.runningChaincodes.Lock() + defer chaincodeSupport.runningChaincodes.Unlock() + + delete(chaincodeSupport.runningChaincodes.launchStarted, canName) + chaincodeLogger.Debugf("chaincode %s launch seq completed", canName) + }() + chaincodeSupport.runningChaincodes.Unlock() //launch the chaincode @@ -535,8 +576,8 @@ func (chaincodeSupport *ChaincodeSupport) Launch(context context.Context, cccid if chrte, ok = chaincodeSupport.chaincodeHasBeenLaunched(canName); ok { if !chrte.handler.registered { chaincodeSupport.runningChaincodes.Unlock() - chaincodeLogger.Debugf("premature execution - chaincode (%s) is being launched", canName) - err = fmt.Errorf("premature execution - chaincode (%s) is being launched", canName) + chaincodeLogger.Debugf("premature execution - chaincode (%s) launched and waiting for registration", canName) + err = fmt.Errorf("premature execution - chaincode (%s) launched and waiting for registration", canName) return cID, cMsg, err } if chrte.handler.isRunning() { @@ -547,6 +588,16 @@ func (chaincodeSupport *ChaincodeSupport) Launch(context context.Context, cccid return cID, cMsg, nil } chaincodeLogger.Debugf("Container not in READY state(%s)...send init/ready", chrte.handler.FSM.Current()) + } else { + //chaincode is not up... but is the launch process underway? this is + //strictly not necessary as the actual launch process will catch this + //(in launchAndWaitForRegister), just a bit of optimization for thundering + //herds + if chaincodeSupport.launchStarted(canName) { + chaincodeSupport.runningChaincodes.Unlock() + err = fmt.Errorf("premature execution - chaincode (%s) is being launched", canName) + return cID, cMsg, err + } } chaincodeSupport.runningChaincodes.Unlock()