Skip to content

Commit

Permalink
Externally built and launched chaincodes cleanup on signal
Browse files Browse the repository at this point in the history
When a peer is terminated via SIGTERM or SIGINT, it should
ensure any externally built and launched chaincodes have
a chance to cleanup.

FAB-17155

Signed-off-by: Will Lahti <wtlahti@us.ibm.com>
  • Loading branch information
wlahti authored and sykesm committed May 12, 2020
1 parent f8654e2 commit 33a864d
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 20 deletions.
25 changes: 25 additions & 0 deletions core/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package container
import (
"io"
"sync"
"time"

"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/chaincode/persistence"
Expand Down Expand Up @@ -152,3 +153,27 @@ func (r *Router) Stop(ccid string) error {
func (r *Router) Wait(ccid string) (int, error) {
return r.getInstance(ccid).Wait()
}

func (r *Router) Shutdown(timeout time.Duration) {
done := make(chan struct{})
go func() {
var wg sync.WaitGroup
wg.Add(len(r.containers))
for ccid := range r.containers {
go func(ccid string) {
if err := r.Stop(ccid); err != nil {
vmLogger.Warningf("failed to stop ccid: %s err: %s", ccid, err)
}
wg.Done()
}(ccid)
}
wg.Wait()
close(done)
}()

select {
case <-time.After(timeout):
vmLogger.Warning("timeout while stopping external chaincodes")
case <-done:
}
}
9 changes: 2 additions & 7 deletions core/container/externalbuilder/externalbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,23 +377,18 @@ func (b *Builder) Run(ccid, bldDir string, peerConnection *ccintf.PeerConnection

run := filepath.Join(b.Location, "bin", "run")
cmd := b.NewCommand(run, bldDir, launchDir)
sess, err := Start(b.Logger, cmd)
sess, err := Start(b.Logger, cmd, launchDir)
if err != nil {
os.RemoveAll(launchDir)
return nil, errors.Wrapf(err, "builder '%s' run failed to start", b.Name)
}

go func() {
defer os.RemoveAll(launchDir)
sess.Wait()
}()

return sess, nil
}

// runCommand runs a command and waits for it to complete.
func (b *Builder) runCommand(cmd *exec.Cmd) error {
sess, err := Start(b.Logger, cmd)
sess, err := Start(b.Logger, cmd, "")
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions core/container/externalbuilder/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ var _ = Describe("Instance", func() {
Describe("Stop", func() {
It("terminates the process", func() {
cmd := exec.Command("sleep", "90")
sess, err := externalbuilder.Start(logger, cmd)
sess, err := externalbuilder.Start(logger, cmd, "")
Expect(err).NotTo(HaveOccurred())
instance.Session = sess
instance.TermTimeout = time.Minute
Expand All @@ -275,7 +275,7 @@ var _ = Describe("Instance", func() {
Context("when the process doesn't respond to SIGTERM within TermTimeout", func() {
It("kills the process with malice", func() {
cmd := exec.Command("testdata/ignoreterm.sh")
sess, err := externalbuilder.Start(logger, cmd)
sess, err := externalbuilder.Start(logger, cmd, "")
Expect(err).NotTo(HaveOccurred())

instance.Session = sess
Expand Down
9 changes: 6 additions & 3 deletions core/container/externalbuilder/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ type Session struct {
exited chan struct{}
exitErr error
waitStatus syscall.WaitStatus
launchDir string
}

// Start will start the provided command and return a Session that can be used
// to await completion or signal the process.
//
// The provided logger is used log stderr from the running process.
func Start(logger *flogging.FabricLogger, cmd *exec.Cmd) (*Session, error) {
func Start(logger *flogging.FabricLogger, cmd *exec.Cmd, launchDir string) (*Session, error) {
logger = logger.With("command", filepath.Base(cmd.Path))

stderr, err := cmd.StderrPipe()
Expand All @@ -44,8 +45,9 @@ func Start(logger *flogging.FabricLogger, cmd *exec.Cmd) (*Session, error) {
}

sess := &Session{
command: cmd,
exited: make(chan struct{}),
command: cmd,
exited: make(chan struct{}),
launchDir: launchDir,
}
go sess.waitForExit(logger, stderr)

Expand All @@ -70,6 +72,7 @@ func (s *Session) waitForExit(logger *flogging.FabricLogger, stderr io.Reader) {
defer s.mutex.Unlock()
s.exitErr = err
s.waitStatus = s.command.ProcessState.Sys().(syscall.WaitStatus)
os.RemoveAll(s.launchDir)
close(s.exited)
}

Expand Down
10 changes: 5 additions & 5 deletions core/container/externalbuilder/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var _ = Describe("Session", func() {

It("starts commands and returns a session handle to wait on", func() {
cmd := exec.Command("true")
sess, err := externalbuilder.Start(logger, cmd)
sess, err := externalbuilder.Start(logger, cmd, "")
Expect(err).NotTo(HaveOccurred())
Expect(sess).NotTo(BeNil())

Expand All @@ -46,7 +46,7 @@ var _ = Describe("Session", func() {

It("captures stderr to the provided logger", func() {
cmd := exec.Command("sh", "-c", "echo 'this is a message to stderr' >&2")
sess, err := externalbuilder.Start(logger, cmd)
sess, err := externalbuilder.Start(logger, cmd, "")
Expect(err).NotTo(HaveOccurred())
err = sess.Wait()
Expect(err).NotTo(HaveOccurred())
Expand All @@ -60,7 +60,7 @@ var _ = Describe("Session", func() {
Expect(err).NotTo(HaveOccurred())
defer stdin.Close()

sess, err := externalbuilder.Start(logger, cmd)
sess, err := externalbuilder.Start(logger, cmd, "")
Expect(err).NotTo(HaveOccurred())

exitCh := make(chan error)
Expand All @@ -74,15 +74,15 @@ var _ = Describe("Session", func() {
When("start fails", func() {
It("returns an error", func() {
cmd := exec.Command("./this-is-not-a-command")
_, err := externalbuilder.Start(logger, cmd)
_, err := externalbuilder.Start(logger, cmd, "")
Expect(err).To(MatchError("fork/exec ./this-is-not-a-command: no such file or directory"))
})
})

When("the command fails", func() {
It("returns the exit error from the command", func() {
cmd := exec.Command("false")
sess, err := externalbuilder.Start(logger, cmd)
sess, err := externalbuilder.Start(logger, cmd, "")
Expect(err).NotTo(HaveOccurred())

err = sess.Wait()
Expand Down
41 changes: 40 additions & 1 deletion integration/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package e2e

import (
"bufio"
"context"
"crypto/sha256"
"crypto/tls"
Expand Down Expand Up @@ -81,7 +82,10 @@ var _ = Describe("EndToEnd", func() {
})

Describe("basic solo network with 2 orgs and no docker", func() {
var datagramReader *DatagramReader
var (
datagramReader *DatagramReader
runArtifactsFilePath string
)

BeforeEach(func() {
datagramReader = NewDatagramReader()
Expand All @@ -102,6 +106,13 @@ var _ = Describe("EndToEnd", func() {
BaseProfile: "TwoOrgsOrdererGenesis",
})

runArtifactsFilePath = filepath.Join(testDir, "run-artifacts.txt")
os.Setenv("RUN_ARTIFACTS_FILE", runArtifactsFilePath)
for i, e := range network.ExternalBuilders {
e.EnvironmentWhitelist = append(e.EnvironmentWhitelist, "RUN_ARTIFACTS_FILE")
network.ExternalBuilders[i] = e
}

network.GenerateConfigTree()
for _, peer := range network.PeersWithChannel("testchannel") {
core := network.ReadPeerConfig(peer)
Expand All @@ -119,6 +130,23 @@ var _ = Describe("EndToEnd", func() {
if datagramReader != nil {
datagramReader.Close()
}
if process != nil {
process.Signal(syscall.SIGTERM)
Eventually(process.Wait(), network.EventuallyTimeout).Should(Receive())
}
if runArtifactsFilePath != "" {
// ensure that the temporary directories generated by launched
// external chaincodes have been cleaned up
runArtifactsFile, err := os.Open(runArtifactsFilePath)
Expect(err).NotTo(HaveOccurred())
scanner := bufio.NewScanner(runArtifactsFile)
for scanner.Scan() {
launchFiles, err := ioutil.ReadDir(scanner.Text())
Expect(err).To(MatchError(fmt.Sprintf("open %s: no such file or directory", scanner.Text())))
Expect(launchFiles).To(BeEmpty())
}
runArtifactsFile.Close()
}
})

It("executes a basic solo network with 2 orgs and no docker", func() {
Expand Down Expand Up @@ -150,6 +178,17 @@ var _ = Describe("EndToEnd", func() {
By("deploying the chaincode")
nwo.DeployChaincode(network, "testchannel", orderer, chaincode)

By("ensuring external cc run artifacts exist after deploying")
runArtifactsFile, err := os.Open(runArtifactsFilePath)
Expect(err).NotTo(HaveOccurred())
scanner := bufio.NewScanner(runArtifactsFile)
for scanner.Scan() {
runArtifacts, err := ioutil.ReadDir(scanner.Text())
Expect(err).NotTo(HaveOccurred())
Expect(runArtifacts).NotTo(BeEmpty())
}
runArtifactsFile.Close()

By("getting the client peer by name")
peer := network.Peer("Org1", "peer0")

Expand Down
6 changes: 6 additions & 0 deletions integration/externalbuilders/binary/bin/run
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ fi
OUTPUT=$1
ARTIFACTS=$2

# when set, keep track of the artifacts directories to verify
# proper cleanup in integration test
if [ -n "${RUN_ARTIFACTS_FILE+1}" ]; then
echo $ARTIFACTS >> $RUN_ARTIFACTS_FILE
fi

# shellcheck disable=SC2155
export CORE_CHAINCODE_ID_NAME="$(jq -r .chaincode_id "$ARTIFACTS/chaincode.json")"
export CORE_PEER_TLS_ENABLED="true"
Expand Down
4 changes: 2 additions & 2 deletions internal/peer/node/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,8 +828,8 @@ func serve(args []string) error {
}

handleSignals(addPlatformSignals(map[os.Signal]func(){
syscall.SIGINT: func() { serve <- nil },
syscall.SIGTERM: func() { serve <- nil },
syscall.SIGINT: func() { containerRouter.Shutdown(5 * time.Second); serve <- nil },
syscall.SIGTERM: func() { containerRouter.Shutdown(5 * time.Second); serve <- nil },
}))

logger.Infof("Started peer with ID=[%s], network ID=[%s], address=[%s]", coreConfig.PeerID, coreConfig.NetworkID, coreConfig.PeerAddress)
Expand Down

0 comments on commit 33a864d

Please sign in to comment.