Skip to content

Commit

Permalink
fix(compose): container initialisation
Browse files Browse the repository at this point in the history
Fix compose to fully initialise the containers it returns. This ensures
that running things like checks for running behave as expected.

Extracts the functionality to connect to reaper into a helper method so
its consistent across uses.

Fix data race in daemonHost function converting it to a method to make
use of encapsulation.

Fix container and network requests so they use sessionID from labels if
available so that user specified values are respected.

Export the functionality to create a container from a ContainerList
response via provider.ContainerFromType.

Enforce no bare returns instead of no named returns as that was the
original intention.

Fixes #2667
  • Loading branch information
stevenh committed Oct 22, 2024
1 parent e945309 commit 30ba8c3
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 99 deletions.
4 changes: 3 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ linters:
- gofumpt
- misspell
- nolintlint
- nonamedreturns
- nakedret
- testifylint
- thelper

linters-settings:
nakedret:
max-func-lines: 0
errorlint:
# Check whether fmt.Errorf uses the %w verb for formatting errors.
# See the https://github.com/polyfloyd/go-errorlint for caveats.
Expand Down
9 changes: 9 additions & 0 deletions container.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,15 @@ type ContainerRequest struct {
LogConsumerCfg *LogConsumerConfig // define the configuration for the log producer and its log consumers to follow the logs
}

// sessionID returns the session ID for the container request.
func (c *ContainerRequest) sessionID() string {
if sessionID := c.Labels[core.LabelSessionID]; sessionID != "" {
return sessionID
}

return core.SessionID()
}

// containerOptions functional options for a container
type containerOptions struct {
ImageName string
Expand Down
160 changes: 91 additions & 69 deletions docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
Expand Down Expand Up @@ -873,6 +873,32 @@ func (c *DockerContainer) GetLogProductionErrorChannel() <-chan error {
return errCh
}

// connectReaper connects the reaper to the container if it is needed.
func (c *DockerContainer) connectReaper(ctx context.Context) error {
if c.provider.config.RyukDisabled || isReaperImage(c.Image) {
// Reaper is disabled or we are the reaper container.
return nil
}

reaper, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, c.provider.host), core.SessionID(), c.provider)
if err != nil {
return fmt.Errorf("reaper: %w", err)
}

if c.terminationSignal, err = reaper.Connect(); err != nil {
return fmt.Errorf("reaper connect: %w", err)
}

return nil
}

// cleanupTermSignal triggers the termination signal if it was created and an error occurred.
func (c *DockerContainer) cleanupTermSignal(err error) {
if c.terminationSignal != nil && err != nil {
c.terminationSignal <- true
}
}

// DockerNetwork represents a network started using Docker
type DockerNetwork struct {
ID string // Network ID from Docker
Expand Down Expand Up @@ -906,6 +932,7 @@ type DockerProvider struct {
host string
hostCache string
config config.Config
mtx sync.Mutex
}

// Client gets the docker client used by the provider
Expand Down Expand Up @@ -1021,28 +1048,6 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
req.Labels = make(map[string]string)
}

var termSignal chan bool
// the reaper does not need to start a reaper for itself
isReaperContainer := strings.HasSuffix(imageName, config.ReaperDefaultImage)
if !p.config.RyukDisabled && !isReaperContainer {
r, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), core.SessionID(), p)
if err != nil {
return nil, fmt.Errorf("reaper: %w", err)
}

termSignal, err := r.Connect()
if err != nil {
return nil, fmt.Errorf("reaper connect: %w", err)
}

// Cleanup on error.
defer func() {
if err != nil {
termSignal <- true
}
}()
}

if err = req.Validate(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1106,7 +1111,7 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
}
}

if !isReaperContainer {
if !isReaperImage(imageName) {
// Add the labels that identify this as a testcontainers container and
// allow the reaper to terminate it if requested.
AddGenericLabels(req.Labels)
Expand Down Expand Up @@ -1184,26 +1189,35 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
}
}

c := &DockerContainer{
ID: resp.ID,
WaitingFor: req.WaitingFor,
Image: imageName,
imageWasBuilt: req.ShouldBuildImage(),
keepBuiltImage: req.ShouldKeepBuiltImage(),
sessionID: core.SessionID(),
exposedPorts: req.ExposedPorts,
provider: p,
terminationSignal: termSignal,
logger: p.Logger,
lifecycleHooks: req.LifecycleHooks,
// This should match the fields set in ContainerFromDockerResponse.
ctr := &DockerContainer{
ID: resp.ID,
WaitingFor: req.WaitingFor,
Image: imageName,
imageWasBuilt: req.ShouldBuildImage(),
keepBuiltImage: req.ShouldKeepBuiltImage(),
sessionID: req.sessionID(),
exposedPorts: req.ExposedPorts,
provider: p,
logger: p.Logger,
lifecycleHooks: req.LifecycleHooks,
}

err = c.createdHook(ctx)
if err != nil {
return nil, err
if err = ctr.connectReaper(ctx); err != nil {
return ctr, err // No wrap as it would stutter.
}

return c, nil
// Wrapped so the returned error is passed to the cleanup function.
defer func(ctr *DockerContainer) {
ctr.cleanupTermSignal(err)
}(ctr)

if err = ctr.createdHook(ctx); err != nil {
// Return the container to allow caller to clean up.
return ctr, fmt.Errorf("created hook: %w", err)
}

return ctr, nil
}

func (p *DockerProvider) findContainerByName(ctx context.Context, name string) (*types.Container, error) {
Expand All @@ -1215,7 +1229,7 @@ func (p *DockerProvider) findContainerByName(ctx context.Context, name string) (
filter := filters.NewArgs(filters.Arg("name", fmt.Sprintf("^%s$", name)))
containers, err := p.client.ContainerList(ctx, container.ListOptions{Filters: filter})
if err != nil {
return nil, err
return nil, fmt.Errorf("container list: %w", err)
}
defer p.Close()

Expand Down Expand Up @@ -1270,7 +1284,7 @@ func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req Contain
}
}

sessionID := core.SessionID()
sessionID := req.sessionID()

var termSignal chan bool
if !p.config.RyukDisabled {
Expand Down Expand Up @@ -1411,10 +1425,13 @@ func (p *DockerProvider) Config() TestcontainersConfig {
// Warning: this is based on your Docker host setting. Will fail if using an SSH tunnel
// You can use the "TESTCONTAINERS_HOST_OVERRIDE" env variable to set this yourself
func (p *DockerProvider) DaemonHost(ctx context.Context) (string, error) {
return daemonHost(ctx, p)
p.mtx.Lock()
defer p.mtx.Unlock()

return p.daemonHostLocked(ctx)
}

func daemonHost(ctx context.Context, p *DockerProvider) (string, error) {
func (p *DockerProvider) daemonHostLocked(ctx context.Context) (string, error) {
if p.hostCache != "" {
return p.hostCache, nil
}
Expand Down Expand Up @@ -1482,7 +1499,7 @@ func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest)
IPAM: req.IPAM,
}

sessionID := core.SessionID()
sessionID := req.sessionID()

var termSignal chan bool
if !p.config.RyukDisabled {
Expand Down Expand Up @@ -1602,45 +1619,50 @@ func (p *DockerProvider) getDefaultNetwork(ctx context.Context, cli client.APICl
return reaperNetwork, nil
}

// containerFromDockerResponse builds a Docker container struct from the response of the Docker API
func containerFromDockerResponse(ctx context.Context, response types.Container) (*DockerContainer, error) {
provider, err := NewDockerProvider()
if err != nil {
return nil, err
// ContainerFromType builds a Docker container struct from the response of the Docker API
func (p *DockerProvider) ContainerFromType(ctx context.Context, response types.Container) (ctr *DockerContainer, err error) {
exposedPorts := make([]string, len(response.Ports))
for i, port := range response.Ports {
exposedPorts[i] = fmt.Sprintf("%d/%s", port.PublicPort, port.Type)
}

// This should match the fields set in CreateContainer.
ctr = &DockerContainer{
ID: response.ID,
Image: response.Image,
imageWasBuilt: false,
sessionID: response.Labels[core.LabelSessionID],
isRunning: response.State == "running",
exposedPorts: exposedPorts,
provider: p,
logger: p.Logger,
lifecycleHooks: []ContainerLifecycleHooks{
DefaultLoggingHook(p.Logger),
},
}

ctr := DockerContainer{}

ctr.ID = response.ID
ctr.WaitingFor = nil
ctr.Image = response.Image
ctr.imageWasBuilt = false

ctr.logger = provider.Logger
ctr.lifecycleHooks = []ContainerLifecycleHooks{
DefaultLoggingHook(ctr.logger),
if err = ctr.connectReaper(ctx); err != nil {
return nil, err
}
ctr.provider = provider

ctr.sessionID = core.SessionID()
ctr.consumers = []LogConsumer{}
ctr.isRunning = response.State == "running"

// the termination signal should be obtained from the reaper
ctr.terminationSignal = nil
// Wrapped so the returned error is passed to the cleanup function.
defer func(ctr *DockerContainer) {
ctr.cleanupTermSignal(err)
}(ctr)

// populate the raw representation of the container
jsonRaw, err := ctr.inspectRawContainer(ctx)
if err != nil {
return nil, fmt.Errorf("inspect raw container: %w", err)
// Return the container to allow caller to clean up.
return ctr, fmt.Errorf("inspect raw container: %w", err)
}

// the health status of the container, if any
if health := jsonRaw.State.Health; health != nil {
ctr.healthStatus = health.Status
}

return &ctr, nil
return ctr, nil
}

// ListImages list images from the provider. If an image has multiple Tags, each tag is reported
Expand Down
6 changes: 5 additions & 1 deletion generic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,14 @@ func TestGenericReusableContainerInSubprocess(t *testing.T) {
require.NoError(t, err)
require.Len(t, ctrs, 1)

nginxC, err := containerFromDockerResponse(context.Background(), ctrs[0])
provider, err := NewDockerProvider()
require.NoError(t, err)

provider.SetClient(cli)

nginxC, err := provider.ContainerFromType(context.Background(), ctrs[0])
CleanupContainer(t, nginxC)
require.NoError(t, err)
}

func createReuseContainerInSubprocess(t *testing.T) string {
Expand Down
11 changes: 10 additions & 1 deletion modules/compose/compose.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,18 +153,27 @@ func NewDockerComposeWith(opts ...ComposeStackOption) (*dockerCompose, error) {
return nil, fmt.Errorf("initialize docker client: %w", err)
}

provider, err := testcontainers.NewDockerProvider(testcontainers.WithLogger(composeOptions.Logger))
if err != nil {
return nil, fmt.Errorf("new docker provider: %w", err)
}

dockerClient := dockerCli.Client()
provider.SetClient(dockerClient)

composeAPI := &dockerCompose{
name: composeOptions.Identifier,
configs: composeOptions.Paths,
temporaryConfigs: composeOptions.temporaryPaths,
logger: composeOptions.Logger,
projectProfiles: composeOptions.Profiles,
composeService: compose.NewComposeService(dockerCli),
dockerClient: dockerCli.Client(),
dockerClient: dockerClient,
waitStrategies: make(map[string]wait.Strategy),
containers: make(map[string]*testcontainers.DockerContainer),
networks: make(map[string]*testcontainers.DockerNetwork),
sessionID: testcontainers.SessionID(),
provider: provider,
}

return composeAPI, nil
Expand Down
31 changes: 7 additions & 24 deletions modules/compose/compose_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ type dockerCompose struct {

// sessionID is used to identify the reaper session
sessionID string

// provider is used to docker operations.
provider *testcontainers.DockerProvider
}

func (d *dockerCompose) ServiceContainer(ctx context.Context, svcName string) (*testcontainers.DockerContainer, error) {
Expand Down Expand Up @@ -325,17 +328,12 @@ func (d *dockerCompose) Up(ctx context.Context, opts ...StackUpOption) (err erro
return err
}

provider, err := testcontainers.NewDockerProvider(testcontainers.WithLogger(d.logger))
if err != nil {
return fmt.Errorf("new docker provider: %w", err)
}

var termSignals []chan bool
var reaper *testcontainers.Reaper
if !provider.Config().Config.RyukDisabled {
if !d.provider.Config().Config.RyukDisabled {
// NewReaper is deprecated: we need to find a way to create the reaper for compose
// bypassing the deprecation.
reaper, err = testcontainers.NewReaper(ctx, testcontainers.SessionID(), provider, "")
reaper, err = testcontainers.NewReaper(ctx, testcontainers.SessionID(), d.provider, "")
if err != nil {
return fmt.Errorf("create reaper: %w", err)
}
Expand Down Expand Up @@ -492,26 +490,11 @@ func (d *dockerCompose) lookupContainer(ctx context.Context, svcName string) (*t
return nil, fmt.Errorf("no container found for service name %s", svcName)
}

containerInstance := containers[0]
// TODO: Fix as this is only setting a subset of the fields
// and the container is not fully initialized, for example
// the isRunning flag is not set.
// See: https://github.com/testcontainers/testcontainers-go/issues/2667
ctr := &testcontainers.DockerContainer{
ID: containerInstance.ID,
Image: containerInstance.Image,
}
ctr.SetLogger(d.logger)

dockerProvider, err := testcontainers.NewDockerProvider(testcontainers.WithLogger(d.logger))
ctr, err := d.provider.ContainerFromType(ctx, containers[0])
if err != nil {
return nil, fmt.Errorf("new docker provider: %w", err)
return nil, fmt.Errorf("container from type: %w", err)
}

dockerProvider.SetClient(d.dockerClient)

ctr.SetProvider(dockerProvider)

d.containersLock.Lock()
defer d.containersLock.Unlock()
d.containers[svcName] = ctr
Expand Down
Loading

0 comments on commit 30ba8c3

Please sign in to comment.