From 251e1d5c60e61ca5666da74094ab1417e50c39e9 Mon Sep 17 00:00:00 2001 From: Brett Logan Date: Wed, 30 Jan 2019 19:03:28 -0500 Subject: [PATCH] [FABC-799] Create Postgres Runner Created a Postgres Runner using ifrit and implementing the official Docker sdk, dropping support for fsouza. Change-Id: I677664386a571357b4d3880647fcf044d69d2b4e Signed-off-by: Brett Logan --- integration/runner/defaults.go | 23 ++ integration/runner/namer.go | 26 ++ integration/runner/postgres.go | 305 ++++++++++++++++++ scripts/run_unit_tests | 2 +- vendor/github.com/docker/docker/pkg/README.md | 11 + .../docker/docker/pkg/stdcopy/stdcopy.go | 190 +++++++++++ vendor/vendor.json | 12 + 7 files changed, 568 insertions(+), 1 deletion(-) create mode 100644 integration/runner/defaults.go create mode 100644 integration/runner/namer.go create mode 100644 integration/runner/postgres.go create mode 100644 vendor/github.com/docker/docker/pkg/README.md create mode 100644 vendor/github.com/docker/docker/pkg/stdcopy/stdcopy.go diff --git a/integration/runner/defaults.go b/integration/runner/defaults.go new file mode 100644 index 000000000..27b3bf744 --- /dev/null +++ b/integration/runner/defaults.go @@ -0,0 +1,23 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package runner + +import ( + "time" +) + +// DefaultStartTimeout is the timeout period for starting a container +const DefaultStartTimeout = 30 * time.Second + +// DefaultShutdownTimeout is the timeout period for stopping a container +const DefaultShutdownTimeout = 10 * time.Second + +// DefaultNamer is the default naming function. +var DefaultNamer NameFunc = UniqueName + +// A NameFunc is used to generate container names. +type NameFunc func() string diff --git a/integration/runner/namer.go b/integration/runner/namer.go new file mode 100644 index 000000000..975578a7e --- /dev/null +++ b/integration/runner/namer.go @@ -0,0 +1,26 @@ +package runner + +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +import ( + "crypto/rand" + "encoding/base32" + "fmt" + "io" + "strings" +) + +// UniqueName generates a random string for a Docker containers name +func UniqueName() string { + rname := make([]byte, 16) + _, err := io.ReadFull(rand.Reader, rname) + if err != nil { + panic(fmt.Sprintf("Error generating random name: %s", err)) + } + name := base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(rname) + return strings.ToLower(name) +} diff --git a/integration/runner/postgres.go b/integration/runner/postgres.go new file mode 100644 index 000000000..13b2aa181 --- /dev/null +++ b/integration/runner/postgres.go @@ -0,0 +1,305 @@ +package runner + +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +import ( + "context" + "fmt" + "io" + "net" + "os" + "strconv" + "sync" + "time" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + docker "github.com/docker/docker/client" + "github.com/docker/docker/pkg/stdcopy" + "github.com/docker/go-connections/nat" + "github.com/jmoiron/sqlx" + _ "github.com/lib/pq" //Driver passed to the sqlx package + "github.com/pkg/errors" + "github.com/tedsuo/ifrit" +) + +// PostgresDBDefaultImage is used if none is specified +const PostgresDBDefaultImage = "postgres:9.6" + +// PostgresDB defines a containerized Postgres Server +type PostgresDB struct { + Client *docker.Client + Image string + HostIP string + HostPort int + Name string + ContainerPort int + StartTimeout time.Duration + ShutdownTimeout time.Duration + + ErrorStream io.Writer + OutputStream io.Writer + + containerID string + hostAddress string + containerAddress string + address string + + mutex sync.Mutex + stopped bool +} + +// Run is called by the ifrit runner to start a process +func (c *PostgresDB) Run(sigCh <-chan os.Signal, ready chan<- struct{}) error { + if c.Image == "" { + c.Image = PostgresDBDefaultImage + } + + if c.Name == "" { + c.Name = DefaultNamer() + } + + if c.HostIP == "" { + c.HostIP = "127.0.0.1" + } + + if c.StartTimeout == 0 { + c.StartTimeout = DefaultStartTimeout + } + + if c.ShutdownTimeout == 0 { + c.ShutdownTimeout = time.Duration(DefaultShutdownTimeout) + } + + if c.ContainerPort == 0 { + c.ContainerPort = 5432 + } + + port, err := nat.NewPort("tcp", strconv.Itoa(c.ContainerPort)) + if err != nil { + return err + } + + if c.Client == nil { + client, err := docker.NewClientWithOpts(docker.FromEnv) + if err != nil { + return err + } + client.NegotiateAPIVersion(context.Background()) + c.Client = client + } + + hostConfig := &container.HostConfig{ + AutoRemove: true, + PortBindings: nat.PortMap{ + "5432/tcp": []nat.PortBinding{ + { + HostIP: c.HostIP, + HostPort: strconv.Itoa(c.HostPort), + }, + }, + }, + } + containerConfig := &container.Config{ + Image: c.Image, + } + + containerResp, err := c.Client.ContainerCreate(context.Background(), containerConfig, hostConfig, nil, c.Name) + if err != nil { + return err + } + c.containerID = containerResp.ID + + err = c.Client.ContainerStart(context.Background(), c.containerID, types.ContainerStartOptions{}) + if err != nil { + return err + } + defer c.Stop() + + response, err := c.Client.ContainerInspect(context.Background(), c.containerID) + if err != nil { + return err + } + + if c.HostPort == 0 { + port, err := strconv.Atoi(response.NetworkSettings.Ports[port][0].HostPort) + if err != nil { + return err + } + c.HostPort = port + } + + c.hostAddress = net.JoinHostPort( + response.NetworkSettings.Ports[port][0].HostIP, + response.NetworkSettings.Ports[port][0].HostPort, + ) + c.containerAddress = net.JoinHostPort( + response.NetworkSettings.IPAddress, + port.Port(), + ) + + streamCtx, streamCancel := context.WithCancel(context.Background()) + defer streamCancel() + go c.streamLogs(streamCtx) + + containerExit := c.wait() + ctx, cancel := context.WithTimeout(context.Background(), c.StartTimeout) + defer cancel() + + select { + case <-ctx.Done(): + return errors.Wrapf(ctx.Err(), "database in container %s did not start", c.containerID) + case <-containerExit: + return errors.New("container exited before ready") + case <-c.ready(ctx, c.hostAddress): + c.address = c.hostAddress + case <-c.ready(ctx, c.containerAddress): + c.address = c.containerAddress + } + + cancel() + close(ready) + + for { + select { + case err := <-containerExit: + return err + case <-sigCh: + err := c.Stop() + if err != nil { + return err + } + return nil + } + } +} + +func (c *PostgresDB) endpointReady(ctx context.Context, addr string) bool { + dataSource := fmt.Sprintf("host=%s port=%d user=postgres dbname=postgres sslmode=disable", c.HostIP, c.HostPort) + db, err := sqlx.Open("postgres", dataSource) + if err != nil { + return false + } + + _, err = db.Conn(ctx) + if err != nil { + return false + } + + db.Close() + return true +} + +func (c *PostgresDB) ready(ctx context.Context, addr string) <-chan struct{} { + readyCh := make(chan struct{}) + go func() { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for { + if c.endpointReady(ctx, addr) { + close(readyCh) + return + } + select { + case <-ticker.C: + case <-ctx.Done(): + return + } + } + }() + + return readyCh +} + +func (c *PostgresDB) wait() <-chan error { + exitCh := make(chan error, 1) + go func() { + exitCode, errCh := c.Client.ContainerWait(context.Background(), c.containerID, container.WaitConditionNotRunning) + select { + case exit := <-exitCode: + if exit.StatusCode != 0 { + err := fmt.Errorf("postgres: process exited with %d", exit.StatusCode) + exitCh <- err + } else { + exitCh <- nil + } + case err := <-errCh: + exitCh <- err + } + }() + + return exitCh +} + +func (c *PostgresDB) streamLogs(ctx context.Context) { + if c.ErrorStream == nil && c.OutputStream == nil { + return + } + + logOptions := types.ContainerLogsOptions{ + Follow: true, + ShowStderr: c.ErrorStream != nil, + ShowStdout: c.OutputStream != nil, + } + + out, err := c.Client.ContainerLogs(ctx, c.containerID, logOptions) + if err != nil { + fmt.Fprintf(c.ErrorStream, "log stream ended with error: %s", out) + } + stdcopy.StdCopy(c.OutputStream, c.ErrorStream, out) +} + +// Address returns the address successfully used by the readiness check. +func (c *PostgresDB) Address() string { + return c.address +} + +// HostAddress returns the host address where this PostgresDB instance is available. +func (c *PostgresDB) HostAddress() string { + return c.hostAddress +} + +// ContainerAddress returns the container address where this PostgresDB instance +// is available. +func (c *PostgresDB) ContainerAddress() string { + return c.containerAddress +} + +// ContainerID returns the container ID of this PostgresDB +func (c *PostgresDB) ContainerID() string { + return c.containerID +} + +// Start starts the PostgresDB container using an ifrit runner +func (c *PostgresDB) Start() error { + p := ifrit.Invoke(c) + + select { + case <-p.Ready(): + return nil + case err := <-p.Wait(): + return err + } +} + +// Stop stops and removes the PostgresDB container +func (c *PostgresDB) Stop() error { + c.mutex.Lock() + if c.stopped { + c.mutex.Unlock() + return errors.Errorf("container %s already stopped", c.containerID) + } + c.stopped = true + c.mutex.Unlock() + + err := c.Client.ContainerStop(context.Background(), c.containerID, &c.ShutdownTimeout) + if err != nil { + return err + } + + return nil +} diff --git a/scripts/run_unit_tests b/scripts/run_unit_tests index fd4d638c1..87afdc9fc 100755 --- a/scripts/run_unit_tests +++ b/scripts/run_unit_tests @@ -16,7 +16,7 @@ go get github.com/AlekSi/gocov-xml # Skipping /lib/common package as there is only one file that contains request/response structs used by both client and server. It needs to be removed from exclude package list # when code is added to this package # Skipping credential package as there is only one file that contains Credential interface definition. It needs to be removed from exclude package list when code is added to this package -PKGS=`go list github.com/hyperledger/fabric-ca/... | grep -Ev '/vendor/|/api|/dbutil|/ldap|/mocks|/test/fabric-ca-load-tester|/integration|/fabric-ca-client$|/credential$|/lib/common$|/metrics'` +PKGS=`go list github.com/hyperledger/fabric-ca/... | grep -Ev '/vendor/|/api|/dbutil|integration|/ldap|/mocks|/test/fabric-ca-load-tester|/fabric-ca-client$|/credential$|/lib/common$|/metrics'` gocov test -timeout 15m $PKGS | gocov-xml > coverage.xml diff --git a/vendor/github.com/docker/docker/pkg/README.md b/vendor/github.com/docker/docker/pkg/README.md new file mode 100644 index 000000000..755cd9683 --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/README.md @@ -0,0 +1,11 @@ +pkg/ is a collection of utility packages used by the Moby project without being specific to its internals. + +Utility packages are kept separate from the moby core codebase to keep it as small and concise as possible. +If some utilities grow larger and their APIs stabilize, they may be moved to their own repository under the +Moby organization, to facilitate re-use by other projects. However that is not the priority. + +The directory `pkg` is named after the same directory in the camlistore project. Since Brad is a core +Go maintainer, we thought it made sense to copy his methods for organizing Go code :) Thanks Brad! + +Because utility packages are small and neatly separated from the rest of the codebase, they are a good +place to start for aspiring maintainers and contributors. Get in touch if you want to help maintain them! diff --git a/vendor/github.com/docker/docker/pkg/stdcopy/stdcopy.go b/vendor/github.com/docker/docker/pkg/stdcopy/stdcopy.go new file mode 100644 index 000000000..8f6e0a737 --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/stdcopy/stdcopy.go @@ -0,0 +1,190 @@ +package stdcopy // import "github.com/docker/docker/pkg/stdcopy" + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" + "sync" +) + +// StdType is the type of standard stream +// a writer can multiplex to. +type StdType byte + +const ( + // Stdin represents standard input stream type. + Stdin StdType = iota + // Stdout represents standard output stream type. + Stdout + // Stderr represents standard error steam type. + Stderr + // Systemerr represents errors originating from the system that make it + // into the multiplexed stream. + Systemerr + + stdWriterPrefixLen = 8 + stdWriterFdIndex = 0 + stdWriterSizeIndex = 4 + + startingBufLen = 32*1024 + stdWriterPrefixLen + 1 +) + +var bufPool = &sync.Pool{New: func() interface{} { return bytes.NewBuffer(nil) }} + +// stdWriter is wrapper of io.Writer with extra customized info. +type stdWriter struct { + io.Writer + prefix byte +} + +// Write sends the buffer to the underneath writer. +// It inserts the prefix header before the buffer, +// so stdcopy.StdCopy knows where to multiplex the output. +// It makes stdWriter to implement io.Writer. +func (w *stdWriter) Write(p []byte) (n int, err error) { + if w == nil || w.Writer == nil { + return 0, errors.New("Writer not instantiated") + } + if p == nil { + return 0, nil + } + + header := [stdWriterPrefixLen]byte{stdWriterFdIndex: w.prefix} + binary.BigEndian.PutUint32(header[stdWriterSizeIndex:], uint32(len(p))) + buf := bufPool.Get().(*bytes.Buffer) + buf.Write(header[:]) + buf.Write(p) + + n, err = w.Writer.Write(buf.Bytes()) + n -= stdWriterPrefixLen + if n < 0 { + n = 0 + } + + buf.Reset() + bufPool.Put(buf) + return +} + +// NewStdWriter instantiates a new Writer. +// Everything written to it will be encapsulated using a custom format, +// and written to the underlying `w` stream. +// This allows multiple write streams (e.g. stdout and stderr) to be muxed into a single connection. +// `t` indicates the id of the stream to encapsulate. +// It can be stdcopy.Stdin, stdcopy.Stdout, stdcopy.Stderr. +func NewStdWriter(w io.Writer, t StdType) io.Writer { + return &stdWriter{ + Writer: w, + prefix: byte(t), + } +} + +// StdCopy is a modified version of io.Copy. +// +// StdCopy will demultiplex `src`, assuming that it contains two streams, +// previously multiplexed together using a StdWriter instance. +// As it reads from `src`, StdCopy will write to `dstout` and `dsterr`. +// +// StdCopy will read until it hits EOF on `src`. It will then return a nil error. +// In other words: if `err` is non nil, it indicates a real underlying error. +// +// `written` will hold the total number of bytes written to `dstout` and `dsterr`. +func StdCopy(dstout, dsterr io.Writer, src io.Reader) (written int64, err error) { + var ( + buf = make([]byte, startingBufLen) + bufLen = len(buf) + nr, nw int + er, ew error + out io.Writer + frameSize int + ) + + for { + // Make sure we have at least a full header + for nr < stdWriterPrefixLen { + var nr2 int + nr2, er = src.Read(buf[nr:]) + nr += nr2 + if er == io.EOF { + if nr < stdWriterPrefixLen { + return written, nil + } + break + } + if er != nil { + return 0, er + } + } + + stream := StdType(buf[stdWriterFdIndex]) + // Check the first byte to know where to write + switch stream { + case Stdin: + fallthrough + case Stdout: + // Write on stdout + out = dstout + case Stderr: + // Write on stderr + out = dsterr + case Systemerr: + // If we're on Systemerr, we won't write anywhere. + // NB: if this code changes later, make sure you don't try to write + // to outstream if Systemerr is the stream + out = nil + default: + return 0, fmt.Errorf("Unrecognized input header: %d", buf[stdWriterFdIndex]) + } + + // Retrieve the size of the frame + frameSize = int(binary.BigEndian.Uint32(buf[stdWriterSizeIndex : stdWriterSizeIndex+4])) + + // Check if the buffer is big enough to read the frame. + // Extend it if necessary. + if frameSize+stdWriterPrefixLen > bufLen { + buf = append(buf, make([]byte, frameSize+stdWriterPrefixLen-bufLen+1)...) + bufLen = len(buf) + } + + // While the amount of bytes read is less than the size of the frame + header, we keep reading + for nr < frameSize+stdWriterPrefixLen { + var nr2 int + nr2, er = src.Read(buf[nr:]) + nr += nr2 + if er == io.EOF { + if nr < frameSize+stdWriterPrefixLen { + return written, nil + } + break + } + if er != nil { + return 0, er + } + } + + // we might have an error from the source mixed up in our multiplexed + // stream. if we do, return it. + if stream == Systemerr { + return written, fmt.Errorf("error from daemon in stream: %s", string(buf[stdWriterPrefixLen:frameSize+stdWriterPrefixLen])) + } + + // Write the retrieved frame (without header) + nw, ew = out.Write(buf[stdWriterPrefixLen : frameSize+stdWriterPrefixLen]) + if ew != nil { + return 0, ew + } + + // If the frame has not been fully written: error + if nw != frameSize { + return 0, io.ErrShortWrite + } + written += int64(nw) + + // Move the rest of the buffer to the beginning + copy(buf, buf[frameSize+stdWriterPrefixLen:]) + // Move the index + nr -= frameSize + stdWriterPrefixLen + } +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 6f1b31979..bccbdc5ca 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -318,6 +318,18 @@ "revision": "0d9dc3f4b5f78995d67a6889172de56497eeff80", "revisionTime": "2019-01-28T21:06:06Z" }, + { + "checksumSHA1": "LLacoaahjY+gDR2XDiGEsHS+VYE=", + "path": "github.com/docker/docker/pkg", + "revision": "393838ca5ec5bc4ddf2cc9a8051d36adfd31e6a1", + "revisionTime": "2019-01-31T15:59:11Z" + }, + { + "checksumSHA1": "w0waeTRJ1sFygI0dZXH6l9E1c60=", + "path": "github.com/docker/docker/pkg/stdcopy", + "revision": "393838ca5ec5bc4ddf2cc9a8051d36adfd31e6a1", + "revisionTime": "2019-01-31T15:59:11Z" + }, { "checksumSHA1": "1IPGX6/BnX7QN4DjbBk0UafTB2U=", "path": "github.com/docker/go-connections/nat",