Skip to content

Commit

Permalink
Merge branch 'main' into etcd-module
Browse files Browse the repository at this point in the history
* main:
  chore: update dockercfg module (testcontainers#2801)
  fix: template for code generation (testcontainers#2800)
  fix: update module path (testcontainers#2797)
  fix: container logging deadlocks (testcontainers#2791)
  • Loading branch information
mdelapenya committed Sep 26, 2024
2 parents 72ee43b + 1bf8e2b commit 1d14664
Show file tree
Hide file tree
Showing 103 changed files with 320 additions and 299 deletions.
261 changes: 137 additions & 124 deletions docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"bufio"
"context"
"encoding/base64"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
Expand All @@ -17,7 +16,6 @@ import (
"path/filepath"
"regexp"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
Expand All @@ -30,6 +28,7 @@ import (
"github.com/docker/docker/client"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/pkg/stdcopy"
"github.com/docker/go-connections/nat"
"github.com/moby/term"
specs "github.com/opencontainers/image-spec/specs-go/v1"
Expand All @@ -48,11 +47,21 @@ const (
Podman = "podman"
ReaperDefault = "reaper_default" // Default network name when bridge is not available
packagePath = "github.com/testcontainers/testcontainers-go"

logStoppedForOutOfSyncMessage = "Stopping log consumer: Headers out of sync"
)

var createContainerFailDueToNameConflictRegex = regexp.MustCompile("Conflict. The container name .* is already in use by container .*")
var (
// createContainerFailDueToNameConflictRegex is a regular expression that matches the container is already in use error.
createContainerFailDueToNameConflictRegex = regexp.MustCompile("Conflict. The container name .* is already in use by container .*")

// minLogProductionTimeout is the minimum log production timeout.
minLogProductionTimeout = time.Duration(5 * time.Second)

// maxLogProductionTimeout is the maximum log production timeout.
maxLogProductionTimeout = time.Duration(60 * time.Second)

// errLogProductionStop is the cause for stopping log production.
errLogProductionStop = errors.New("log production stopped")
)

// DockerContainer represents a container started using Docker
type DockerContainer struct {
Expand All @@ -65,23 +74,19 @@ type DockerContainer struct {
isRunning bool
imageWasBuilt bool
// keepBuiltImage makes Terminate not remove the image if imageWasBuilt.
keepBuiltImage bool
provider *DockerProvider
sessionID string
terminationSignal chan bool
consumers []LogConsumer
logProductionError chan error
keepBuiltImage bool
provider *DockerProvider
sessionID string
terminationSignal chan bool
consumers []LogConsumer

// TODO: Remove locking and wait group once the deprecated StartLogProducer and
// StopLogProducer have been removed and hence logging can only be started and
// stopped once.

// logProductionWaitGroup is used to signal when the log production has stopped.
// This allows stopLogProduction to safely set logProductionStop to nil.
// See simplification in https://go.dev/play/p/x0pOElF2Vjf
logProductionWaitGroup sync.WaitGroup

logProductionStop chan struct{}
// logProductionCancel is used to signal the log production to stop.
logProductionCancel context.CancelCauseFunc
logProductionCtx context.Context

logProductionTimeout *time.Duration
logger Logging
Expand Down Expand Up @@ -263,7 +268,6 @@ func (c *DockerContainer) Stop(ctx context.Context, timeout *time.Duration) erro
// without exposing the ability to fully initialize the container state.
// See: https://github.com/testcontainers/testcontainers-go/issues/2667
// TODO: Add a check for isRunning when the above issue is resolved.

err := c.stoppingHook(ctx)
if err != nil {
return fmt.Errorf("stopping hook: %w", err)
Expand Down Expand Up @@ -310,7 +314,7 @@ func (c *DockerContainer) Terminate(ctx context.Context) error {
}

select {
// close reaper if it was created
// Close reaper connection if it was attached.
case c.terminationSignal <- true:
default:
}
Expand Down Expand Up @@ -690,6 +694,29 @@ func (c *DockerContainer) copyToContainer(ctx context.Context, fileContent func(
return nil
}

// logConsumerWriter is a writer that writes to a LogConsumer.
type logConsumerWriter struct {
log Log
consumers []LogConsumer
}

// newLogConsumerWriter creates a new logConsumerWriter for logType that sends messages to all consumers.
func newLogConsumerWriter(logType string, consumers []LogConsumer) *logConsumerWriter {
return &logConsumerWriter{
log: Log{LogType: logType},
consumers: consumers,
}
}

// Write writes the p content to all consumers.
func (lw logConsumerWriter) Write(p []byte) (int, error) {
lw.log.Content = p
for _, consumer := range lw.consumers {
consumer.Accept(lw.log)
}
return len(p), nil
}

type LogProductionOption func(*DockerContainer)

// WithLogProductionTimeout is a functional option that sets the timeout for the log production.
Expand All @@ -707,124 +734,94 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context, opts ...LogProdu

// startLogProduction will start a concurrent process that will continuously read logs
// from the container and will send them to each added LogConsumer.
//
// Default log production timeout is 5s. It is used to set the context timeout
// which means that each log-reading loop will last at least the specified timeout
// and that it cannot be cancelled earlier.
// which means that each log-reading loop will last at up to the specified timeout.
//
// Use functional option WithLogProductionTimeout() to override default timeout. If it's
// lower than 5s and greater than 60s it will be set to 5s or 60s respectively.
func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogProductionOption) error {
c.logProductionStop = make(chan struct{}, 1) // buffered channel to avoid blocking
c.logProductionWaitGroup.Add(1)

for _, opt := range opts {
opt(c)
}

minLogProductionTimeout := time.Duration(5 * time.Second)
maxLogProductionTimeout := time.Duration(60 * time.Second)

if c.logProductionTimeout == nil {
// Validate the log production timeout.
switch {
case c.logProductionTimeout == nil:
c.logProductionTimeout = &minLogProductionTimeout
}

if *c.logProductionTimeout < minLogProductionTimeout {
case *c.logProductionTimeout < minLogProductionTimeout:
c.logProductionTimeout = &minLogProductionTimeout
}

if *c.logProductionTimeout > maxLogProductionTimeout {
case *c.logProductionTimeout > maxLogProductionTimeout:
c.logProductionTimeout = &maxLogProductionTimeout
}

c.logProductionError = make(chan error, 1)
// Setup the log writers.
stdout := newLogConsumerWriter(StdoutLog, c.consumers)
stderr := newLogConsumerWriter(StderrLog, c.consumers)

// Setup the log production context which will be used to stop the log production.
c.logProductionCtx, c.logProductionCancel = context.WithCancelCause(ctx)

go func() {
defer func() {
close(c.logProductionError)
c.logProductionWaitGroup.Done()
}()

since := ""
// if the socket is closed we will make additional logs request with updated Since timestamp
BEGIN:
options := container.LogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: true,
Since: since,
}
err := c.logProducer(stdout, stderr)
// Set context cancel cause, if not already set.
c.logProductionCancel(err)
}()

ctx, cancel := context.WithTimeout(ctx, *c.logProductionTimeout)
return nil
}

// logProducer read logs from the container and writes them to stdout, stderr until either:
// - logProductionCtx is done
// - A fatal error occurs
// - No more logs are available
func (c *DockerContainer) logProducer(stdout, stderr io.Writer) error {
// Clean up idle client connections.
defer c.provider.Close()

// Setup the log options, start from the beginning.
options := container.LogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: true,
}

for {
timeoutCtx, cancel := context.WithTimeout(c.logProductionCtx, *c.logProductionTimeout)
defer cancel()

r, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options)
if err != nil {
c.logProductionError <- err
return
err := c.copyLogs(timeoutCtx, stdout, stderr, options)
switch {
case err == nil:
// No more logs available.
return nil
case c.logProductionCtx.Err() != nil:
// Log production was stopped or caller context is done.
return nil
case timeoutCtx.Err() != nil, errors.Is(err, net.ErrClosed):
// Timeout or client connection closed, retry.
default:
// Unexpected error, retry.
Logger.Printf("Unexpected error reading logs: %v", err)
}
defer c.provider.Close()

for {
select {
case <-c.logProductionStop:
c.logProductionError <- r.Close()
return
default:
}
h := make([]byte, 8)
_, err := io.ReadFull(r, h)
if err != nil {
switch {
case err == io.EOF:
// No more logs coming
case errors.Is(err, net.ErrClosed):
now := time.Now()
since = fmt.Sprintf("%d.%09d", now.Unix(), int64(now.Nanosecond()))
goto BEGIN
case errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled):
// Probably safe to continue here
continue
default:
_, _ = fmt.Fprintf(os.Stderr, "container log error: %+v. %s", err, logStoppedForOutOfSyncMessage)
// if we would continue here, the next header-read will result into random data...
}
return
}

count := binary.BigEndian.Uint32(h[4:])
if count == 0 {
continue
}
logType := h[0]
if logType > 2 {
_, _ = fmt.Fprintf(os.Stderr, "received invalid log type: %d", logType)
// sometimes docker returns logType = 3 which is an undocumented log type, so treat it as stdout
logType = 1
}
// Retry from the last log received.
now := time.Now()
options.Since = fmt.Sprintf("%d.%09d", now.Unix(), int64(now.Nanosecond()))
}
}

// a map of the log type --> int representation in the header, notice the first is blank, this is stdin, but the go docker client doesn't allow following that in logs
logTypes := []string{"", StdoutLog, StderrLog}
// copyLogs copies logs from the container to stdout and stderr.
func (c *DockerContainer) copyLogs(ctx context.Context, stdout, stderr io.Writer, options container.LogsOptions) error {
rc, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options)
if err != nil {
return fmt.Errorf("container logs: %w", err)
}
defer rc.Close()

b := make([]byte, count)
_, err = io.ReadFull(r, b)
if err != nil {
// TODO: add-logger: use logger to log out this error
_, _ = fmt.Fprintf(os.Stderr, "error occurred reading log with known length %s", err.Error())
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
// Probably safe to continue here
continue
}
// we can not continue here as the next read most likely will not be the next header
_, _ = fmt.Fprintln(os.Stderr, logStoppedForOutOfSyncMessage)
return
}
for _, c := range c.consumers {
c.Accept(Log{
LogType: logTypes[logType],
Content: b,
})
}
}
}()
if _, err = stdcopy.StdCopy(stdout, stderr, rc); err != nil {
return fmt.Errorf("stdcopy: %w", err)
}

return nil
}
Expand All @@ -837,18 +834,25 @@ func (c *DockerContainer) StopLogProducer() error {
// stopLogProduction will stop the concurrent process that is reading logs
// and sending them to each added LogConsumer
func (c *DockerContainer) stopLogProduction() error {
// signal the log production to stop
c.logProductionStop <- struct{}{}
if c.logProductionCancel == nil {
return nil
}

c.logProductionWaitGroup.Wait()
// Signal the log production to stop.
c.logProductionCancel(errLogProductionStop)

if err := <-c.logProductionError; err != nil {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
// Returning context errors is not useful for the consumer.
if err := context.Cause(c.logProductionCtx); err != nil {
switch {
case errors.Is(err, errLogProductionStop):
// Log production was stopped.
return nil
case errors.Is(err, context.DeadlineExceeded),
errors.Is(err, context.Canceled):
// Parent context is done.
return nil
default:
return err
}

return err
}

return nil
Expand All @@ -857,7 +861,16 @@ func (c *DockerContainer) stopLogProduction() error {
// GetLogProductionErrorChannel exposes the only way for the consumer
// to be able to listen to errors and react to them.
func (c *DockerContainer) GetLogProductionErrorChannel() <-chan error {
return c.logProductionError
if c.logProductionCtx == nil {
return nil
}

errCh := make(chan error, 1)
go func() {
<-c.logProductionCtx.Done()
errCh <- context.Cause(c.logProductionCtx)
}()
return errCh
}

// DockerNetwork represents a network started using Docker
Expand Down
Loading

0 comments on commit 1d14664

Please sign in to comment.