Skip to content

Commit

Permalink
Merge branch 'main' into deprecate-mount-apis
Browse files Browse the repository at this point in the history
* main:
  fix(reaper): fix race condition when reusing reapers (#1904)
  • Loading branch information
mdelapenya committed Nov 8, 2023
2 parents 01add1d + fc966d5 commit 7dd4b4e
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 13 deletions.
88 changes: 75 additions & 13 deletions reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math/rand"
"net"
"regexp"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -50,6 +51,14 @@ func NewReaper(ctx context.Context, sessionID string, provider ReaperProvider, r
return reuseOrCreateReaper(ctx, sessionID, provider, WithImageName(reaperImageName))
}

// reaperContainerNameFromSessionID returns the container name that uniquely
// identifies the container based on the session id.
func reaperContainerNameFromSessionID(sessionID string) string {
// The session id is 64 characters, so we will not hit the limit of 128
// characters for container names.
return fmt.Sprintf("reaper_%s", sessionID)
}

// lookUpReaperContainer returns a DockerContainer type with the reaper container in the case
// it's found in the running state, and including the labels for sessionID, reaper, and ryuk.
// It will perform a retry with exponential backoff to allow for the container to be started and
Expand All @@ -67,7 +76,7 @@ func lookUpReaperContainer(ctx context.Context, sessionID string) (*DockerContai

// we want random intervals between 100ms and 500ms for concurrent executions
// to not be synchronized: it could be the case that multiple executions of this
// function happen at the same time (specially when called from a different test
// function happen at the same time (specifically when called from a different test
// process execution), and we want to avoid that they all try to find the reaper
// container at the same time.
exp.InitialInterval = time.Duration(rand.Intn(5)*100) * time.Millisecond
Expand All @@ -82,6 +91,7 @@ func lookUpReaperContainer(ctx context.Context, sessionID string) (*DockerContai
filters.Arg("label", fmt.Sprintf("%s=%s", testcontainersdocker.LabelSessionID, sessionID)),
filters.Arg("label", fmt.Sprintf("%s=%t", testcontainersdocker.LabelReaper, true)),
filters.Arg("label", fmt.Sprintf("%s=%t", testcontainersdocker.LabelRyuk, true)),
filters.Arg("name", reaperContainerNameFromSessionID(sessionID)),
}

resp, err := dockerClient.ContainerList(ctx, types.ContainerListOptions{
Expand Down Expand Up @@ -146,19 +156,11 @@ func reuseOrCreateReaper(ctx context.Context, sessionID string, provider ReaperP
reaperContainer, err := lookUpReaperContainer(context.Background(), sessionID)
if err == nil && reaperContainer != nil {
// The reaper container exists as a Docker container: re-use it
endpoint, err := reaperContainer.PortEndpoint(ctx, "8080", "")
Logger.Printf("🔥 Reaper obtained from Docker for this test session %s", reaperContainer.ID)
reaperInstance, err = reuseReaperContainer(ctx, sessionID, provider, reaperContainer)
if err != nil {
return nil, err
}

Logger.Printf("🔥 Reaper obtained from Docker for this test session %s", reaperContainer.ID)
reaperInstance = &Reaper{
Provider: provider,
SessionID: sessionID,
Endpoint: endpoint,
container: reaperContainer,
}

return reaperInstance, nil
}

Expand All @@ -182,8 +184,25 @@ func reuseOrCreateReaper(ctx context.Context, sessionID string, provider ReaperP
return reaperInstance, nil
}

// newReaper creates a Reaper with a sessionID to identify containers and a provider to use
// Do not call this directly, use reuseOrCreateReaper instead
var createContainerFailDueToNameConflictRegex = regexp.MustCompile("Conflict. The container name .* is already in use by container .*")

// reuseReaperContainer constructs a Reaper from an already running reaper
// DockerContainer.
func reuseReaperContainer(ctx context.Context, sessionID string, provider ReaperProvider, reaperContainer *DockerContainer) (*Reaper, error) {
endpoint, err := reaperContainer.PortEndpoint(ctx, "8080", "")
if err != nil {
return nil, err
}
return &Reaper{
Provider: provider,
SessionID: sessionID,
Endpoint: endpoint,
container: reaperContainer,
}, nil
}

// newReaper creates a Reaper with a sessionID to identify containers and a
// provider to use. Do not call this directly, use reuseOrCreateReaper instead.
func newReaper(ctx context.Context, sessionID string, provider ReaperProvider, opts ...ContainerOption) (*Reaper, error) {
dockerHostMount := testcontainersdocker.ExtractDockerSocket(ctx)

Expand All @@ -208,6 +227,7 @@ func newReaper(ctx context.Context, sessionID string, provider ReaperProvider, o
Labels: testcontainersdocker.DefaultLabels(sessionID),
Privileged: tcConfig.RyukPrivileged,
WaitingFor: wait.ForListeningPort(listeningPort),
Name: reaperContainerNameFromSessionID(sessionID),
ReaperOptions: opts,
HostConfigModifier: func(hc *container.HostConfig) {
hc.AutoRemove = true
Expand Down Expand Up @@ -237,6 +257,48 @@ func newReaper(ctx context.Context, sessionID string, provider ReaperProvider, o

c, err := provider.RunContainer(ctx, req)
if err != nil {
// We need to check whether the error is caused by a container with the same name
// already existing due to race conditions. We manually match the error message
// as we do not have any error types to check against.
if createContainerFailDueToNameConflictRegex.MatchString(err.Error()) {
// Manually retrieve the already running reaper container. However, we need to
// use retries here as there are two possible race conditions that might lead to
// errors: In most cases, there is a small delay between container creation and
// actually being visible in list-requests. This means that creation might fail
// due to name conflicts, but when we list containers with this name, we do not
// get any results. In another case, the container might have simply died in the
// meantime and therefore cannot be found.
const timeout = 5 * time.Second
const cooldown = 100 * time.Millisecond
start := time.Now()
var reaperContainer *DockerContainer
for time.Since(start) < timeout {
reaperContainer, err = lookUpReaperContainer(ctx, sessionID)
if err == nil && reaperContainer != nil {
break
}
select {
case <-ctx.Done():
case <-time.After(cooldown):
}
}
if err != nil {
return nil, fmt.Errorf("look up reaper container due to name conflict failed: %w", err)
}
// If the reaper container was not found, it is most likely to have died in
// between as we can exclude any client errors because of the previous error
// check. Because the reaper should only die if it performed clean-ups, we can
// fail here as the reaper timeout needs to be increased, anyway.
if reaperContainer == nil {
return nil, fmt.Errorf("look up reaper container returned nil although creation failed due to name conflict")
}
Logger.Printf("🔥 Reaper obtained from Docker for this test session %s", reaperContainer.ID)
reaper, err := reuseReaperContainer(ctx, sessionID, provider, reaperContainer)
if err != nil {
return nil, err
}
return reaper, nil
}
return nil, err
}
reaper.container = c
Expand Down
46 changes: 46 additions & 0 deletions reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,3 +515,49 @@ func TestReaper_reuseItFromOtherTestProgramUsingDocker(t *testing.T) {
terminateContainerOnEnd(t, ctx, reaper.container)
}
}

// TestReaper_ReuseRunning tests whether reusing the reaper if using
// testcontainers from concurrently multiple packages works as expected. In this
// case, global locks are without any effect as Go tests different packages
// isolated. Therefore, this test does not use the same logic with locks on
// purpose. We expect reaper creation to still succeed in case a reaper is
// already running for the same session id by returning its container instance
// instead.
func TestReaper_ReuseRunning(t *testing.T) {
const concurrency = 64

timeout, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

sessionID := SessionID()

dockerProvider, err := NewDockerProvider()
require.NoError(t, err, "new docker provider should not fail")

obtainedReaperContainerIDs := make([]string, concurrency)
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
i := i
wg.Add(1)
go func() {
defer wg.Done()
reaperContainer, err := lookUpReaperContainer(timeout, sessionID)
if err == nil && reaperContainer != nil {
// Found.
obtainedReaperContainerIDs[i] = reaperContainer.GetContainerID()
return
}
// Not found -> create.
createdReaper, err := newReaper(timeout, sessionID, dockerProvider)
require.NoError(t, err, "new reaper should not fail")
obtainedReaperContainerIDs[i] = createdReaper.container.GetContainerID()
}()
}
wg.Wait()

// Assure that all calls returned the same container.
firstContainerID := obtainedReaperContainerIDs[0]
for i, containerID := range obtainedReaperContainerIDs {
assert.Equal(t, firstContainerID, containerID, "call %d should have returned same container id", i)
}
}

0 comments on commit 7dd4b4e

Please sign in to comment.