Skip to content

Commit

Permalink
Merge branch 'main' into etcd-module
Browse files Browse the repository at this point in the history
* main:
  chore: use a much smaller image for testing (testcontainers#2795)
  fix: parallel containers clean race (testcontainers#2790)
  fix(registry): wait for (testcontainers#2793)
  fix: container timeout test (testcontainers#2792)
  docs: document redpanda options (testcontainers#2789)
  • Loading branch information
mdelapenya committed Sep 24, 2024
2 parents dda3352 + 738e8fc commit db70ad3
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 77 deletions.
5 changes: 1 addition & 4 deletions docker_auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func prepareLocalRegistryWithAuth(t *testing.T) string {
ContainerFilePath: "/data",
},
},
WaitingFor: wait.ForExposedPort(),
WaitingFor: wait.ForHTTP("/").WithPort("5000/tcp"),
}
// }

Expand All @@ -311,9 +311,6 @@ func prepareLocalRegistryWithAuth(t *testing.T) string {
removeImageFromLocalCache(t, addr+"/redis:5.0-alpine")
})

_, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

return addr
}

Expand Down
6 changes: 2 additions & 4 deletions docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,14 +658,12 @@ func TestContainerCreationTimesOutWithHttp(t *testing.T) {
ExposedPorts: []string{
nginxDefaultPort,
},
WaitingFor: wait.ForHTTP("/").WithStartupTimeout(1 * time.Second),
WaitingFor: wait.ForHTTP("/").WithStartupTimeout(time.Millisecond * 500),
},
Started: true,
})
CleanupContainer(t, nginxC)
if err == nil {
t.Error("Expected timeout")
}
require.Error(t, err, "expected timeout")
}

func TestContainerCreationWaitsForLogContextTimeout(t *testing.T) {
Expand Down
71 changes: 71 additions & 0 deletions docs/modules/redpanda.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ If you need to enable TLS use `WithTLS` with a valid PEM encoded certificate and

#### Additional Listener

- Since testcontainers-go <a href="https://github.com/testcontainers/testcontainers-go/releases/tag/v0.28.0"><span class="tc-version">:material-tag: v0.28.0</span></a>

There are scenarios where additional listeners are needed, for example if you
want to consume/from another container in the same network

Expand All @@ -79,12 +81,77 @@ Produce messages using the new registered listener
[Produce/consume via registered listener](../../modules/redpanda/redpanda_test.go) inside_block:withListenerExec
<!--/codeinclude-->

#### Adding Service Accounts

- Since testcontainers-go <a href="https://github.com/testcontainers/testcontainers-go/releases/tag/v0.20.0"><span class="tc-version">:material-tag: v0.20.0</span></a>

It's possible to add service accounts to the Redpanda container using the `WithNewServiceAccount` option, setting the service account name and its password.
E.g. `WithNewServiceAccount("service-account", "password")`.
#### Adding Super Users
- Since testcontainers-go <a href="https://github.com/testcontainers/testcontainers-go/releases/tag/v0.20.0"><span class="tc-version">:material-tag: v0.20.0</span></a>
When a super user is needed, you can use the `WithSuperusers` option, passing a variadic list of super users.
E.g. `WithSuperusers("superuser-1", "superuser-2")`.
#### Enabling SASL
- Since testcontainers-go <a href="https://github.com/testcontainers/testcontainers-go/releases/tag/v0.20.0"><span class="tc-version">:material-tag: v0.20.0</span></a>
The `WithEnableSASL()` option enables SASL scram sha authentication. By default, no authentication (plaintext) is used.
When setting an authentication method, make sure to add users as well and authorize them using the `WithSuperusers()` option.
#### WithEnableKafkaAuthorization
- Since testcontainers-go <a href="https://github.com/testcontainers/testcontainers-go/releases/tag/v0.20.0"><span class="tc-version">:material-tag: v0.20.0</span></a>
The `WithEnableKafkaAuthorization` enables authorization for connections on the Kafka API.
#### WithEnableWasmTransform
- Since testcontainers-go <a href="https://github.com/testcontainers/testcontainers-go/releases/tag/v0.28.0"><span class="tc-version">:material-tag: v0.28.0</span></a>
The `WithEnableWasmTransform` enables wasm transform.
!!!warning
Should not be used with RP versions before 23.3
#### WithEnableSchemaRegistryHTTPBasicAuth
- Since testcontainers-go <a href="https://github.com/testcontainers/testcontainers-go/releases/tag/v0.20.0"><span class="tc-version">:material-tag: v0.20.0</span></a>
The `WithEnableSchemaRegistryHTTPBasicAuth` enables HTTP basic authentication for the Schema Registry.
#### WithAutoCreateTopics
- Since testcontainers-go <a href="https://github.com/testcontainers/testcontainers-go/releases/tag/v0.22.0"><span class="tc-version">:material-tag: v0.22.0</span></a>
The `WithAutoCreateTopics` option enables the auto-creation of topics.
#### WithTLS
- Since testcontainers-go <a href="https://github.com/testcontainers/testcontainers-go/releases/tag/v0.24.0"><span class="tc-version">:material-tag: v0.24.0</span></a>
The `WithTLS` option enables TLS encryption. It requires a valid PEM encoded certificate and key, passed as byte slices.
E.g. `WithTLS([]byte(cert), []byte(key))`.
#### WithBootstrapConfig
- Since testcontainers-go <a href="https://github.com/testcontainers/testcontainers-go/releases/tag/v0.33.0"><span class="tc-version">:material-tag: v0.33.0</span></a>
`WithBootstrapConfig` adds an arbitrary config key-value pair to the Redpanda container. Per the name, this config will be interpolated into the generated bootstrap
config file, which is particularly useful for configs requiring a restart when otherwise applied to a running Redpanda instance.
E.g. `WithBootstrapConfig("config_key", config_value)`, where `config_value` is of type `any`.
### Container Methods
The Redpanda container exposes the following methods:
#### KafkaSeedBroker
- Since testcontainers-go <a href="https://github.com/testcontainers/testcontainers-go/releases/tag/v0.20.0"><span class="tc-version">:material-tag: v0.20.0</span></a>
KafkaSeedBroker returns the seed broker that should be used for connecting
to the Kafka API with your Kafka client. It'll be returned in the format:
"host:port" - for example: "localhost:55687".
Expand All @@ -95,6 +162,8 @@ to the Kafka API with your Kafka client. It'll be returned in the format:

#### SchemaRegistryAddress

- Since testcontainers-go <a href="https://github.com/testcontainers/testcontainers-go/releases/tag/v0.20.0"><span class="tc-version">:material-tag: v0.20.0</span></a>

SchemaRegistryAddress returns the address to the schema registry API. This
is an HTTP-based API and thus the returned format will be: http://host:port.

Expand All @@ -105,6 +174,8 @@ is an HTTP-based API and thus the returned format will be: http://host:port.

#### AdminAPIAddress

- Since testcontainers-go <a href="https://github.com/testcontainers/testcontainers-go/releases/tag/v0.20.0"><span class="tc-version">:material-tag: v0.20.0</span></a>

AdminAPIAddress returns the address to the Redpanda Admin API. This
is an HTTP-based API and thus the returned format will be: http://host:port.

Expand Down
7 changes: 3 additions & 4 deletions modules/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,9 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
// convenient for testing
"REGISTRY_STORAGE_DELETE_ENABLED": "true",
},
WaitingFor: wait.ForAll(
wait.ForExposedPort(),
wait.ForLog("listening on [::]:5000").WithStartupTimeout(10*time.Second),
),
WaitingFor: wait.ForHTTP("/").
WithPort(registryPort).
WithStartupTimeout(10 * time.Second),
}

genericContainerReq := testcontainers.GenericContainerRequest{
Expand Down
70 changes: 29 additions & 41 deletions parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package testcontainers

import (
"context"
"errors"
"fmt"
"sync"
)
Expand Down Expand Up @@ -32,24 +31,27 @@ func (gpe ParallelContainersError) Error() string {
return fmt.Sprintf("%v", gpe.Errors)
}

// parallelContainersResult represents result.
type parallelContainersResult struct {
ParallelContainersRequestError
Container Container
}

func parallelContainersRunner(
ctx context.Context,
requests <-chan GenericContainerRequest,
errorsCh chan<- ParallelContainersRequestError,
containers chan<- Container,
results chan<- parallelContainersResult,
wg *sync.WaitGroup,
) {
defer wg.Done()
for req := range requests {
c, err := GenericContainer(ctx, req)
res := parallelContainersResult{Container: c}
if err != nil {
errorsCh <- ParallelContainersRequestError{
Request: req,
Error: errors.Join(err, TerminateContainer(c)),
}
continue
res.Request = req
res.Error = err
}
containers <- c
results <- res
}
}

Expand All @@ -65,41 +67,26 @@ func ParallelContainers(ctx context.Context, reqs ParallelContainerRequest, opt
}

tasksChan := make(chan GenericContainerRequest, tasksChanSize)
errsChan := make(chan ParallelContainersRequestError)
resChan := make(chan Container)
waitRes := make(chan struct{})

containers := make([]Container, 0)
errors := make([]ParallelContainersRequestError, 0)
resultsChan := make(chan parallelContainersResult, tasksChanSize)
done := make(chan struct{})

wg := sync.WaitGroup{}
var wg sync.WaitGroup
wg.Add(tasksChanSize)

// run workers
for i := 0; i < tasksChanSize; i++ {
go parallelContainersRunner(ctx, tasksChan, errsChan, resChan, &wg)
go parallelContainersRunner(ctx, tasksChan, resultsChan, &wg)
}

var errs []ParallelContainersRequestError
containers := make([]Container, 0, len(reqs))
go func() {
for {
select {
case c, ok := <-resChan:
if !ok {
resChan = nil
} else {
containers = append(containers, c)
}
case e, ok := <-errsChan:
if !ok {
errsChan = nil
} else {
errors = append(errors, e)
}
}

if resChan == nil && errsChan == nil {
waitRes <- struct{}{}
break
defer close(done)
for res := range resultsChan {
if res.Error != nil {
errs = append(errs, res.ParallelContainersRequestError)
} else {
containers = append(containers, res.Container)
}
}
}()
Expand All @@ -108,14 +95,15 @@ func ParallelContainers(ctx context.Context, reqs ParallelContainerRequest, opt
tasksChan <- req
}
close(tasksChan)

wg.Wait()
close(resChan)
close(errsChan)

<-waitRes
close(resultsChan)

<-done

if len(errors) != 0 {
return containers, ParallelContainersError{Errors: errors}
if len(errs) != 0 {
return containers, ParallelContainersError{Errors: errs}
}

return containers, nil
Expand Down
29 changes: 10 additions & 19 deletions parallel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package testcontainers

import (
"context"
"errors"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -99,23 +98,18 @@ func TestParallelContainers(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
res, err := ParallelContainers(context.Background(), tc.reqs, ParallelContainersOptions{})
if err != nil {
require.NotZero(t, tc.expErrors)
var e ParallelContainersError
errors.As(err, &e)
if len(e.Errors) != tc.expErrors {
t.Fatalf("expected errors: %d, got: %d\n", tc.expErrors, len(e.Errors))
}
}

for _, c := range res {
c := c
CleanupContainer(t, c)
}

if len(res) != tc.resLen {
t.Fatalf("expected containers: %d, got: %d\n", tc.resLen, len(res))
if tc.expErrors != 0 {
require.Error(t, err)
var errs ParallelContainersError
require.ErrorAs(t, err, &errs)
require.Len(t, errs.Errors, tc.expErrors)
}

require.Len(t, res, tc.resLen)
})
}
}
Expand Down Expand Up @@ -157,11 +151,8 @@ func TestParallelContainersWithReuse(t *testing.T) {
ctx := context.Background()

res, err := ParallelContainers(ctx, parallelRequest, ParallelContainersOptions{})
if err != nil {
var e ParallelContainersError
errors.As(err, &e)
t.Fatalf("expected errors: %d, got: %d\n", 0, len(e.Errors))
for _, c := range res {
CleanupContainer(t, c)
}
// Container is reused, only terminate first container
CleanupContainer(t, res[0])
require.NoError(t, err)
}
11 changes: 6 additions & 5 deletions wait/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ import (
func ExampleExecStrategy() {
ctx := context.Background()
req := testcontainers.ContainerRequest{
Image: "localstack/localstack:latest",
WaitingFor: wait.ForExec([]string{"awslocal", "dynamodb", "list-tables"}),
Image: "alpine:latest",
Entrypoint: []string{"tail", "-f", "/dev/null"}, // needed for the container to stay alive
WaitingFor: wait.ForExec([]string{"ls", "/"}).WithStartupTimeout(1 * time.Second),
}

localstack, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ctr, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
})
defer func() {
if err := testcontainers.TerminateContainer(localstack); err != nil {
if err := testcontainers.TerminateContainer(ctr); err != nil {
log.Printf("failed to terminate container: %s", err)
}
}()
Expand All @@ -40,7 +41,7 @@ func ExampleExecStrategy() {
return
}

state, err := localstack.State(ctx)
state, err := ctr.State(ctx)
if err != nil {
log.Printf("failed to get container state: %s", err)
return
Expand Down

0 comments on commit db70ad3

Please sign in to comment.