Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(redpanda): wait for #2794

Merged
merged 1 commit into from
Sep 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 41 additions & 30 deletions modules/redpanda/redpanda.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustomizer) (*Container, error) {
tmpDir, err := os.MkdirTemp("", "redpanda")
if err != nil {
return nil, fmt.Errorf("failed to create directory: %w", err)
return nil, fmt.Errorf("create temporary directory: %w", err)
}
defer os.RemoveAll(tmpDir)

Expand Down Expand Up @@ -121,24 +121,24 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
// the Kafka API.
entrypointPath := filepath.Join(tmpDir, entrypointFile)
if err := os.WriteFile(entrypointPath, entrypoint, 0o700); err != nil {
return nil, fmt.Errorf("failed to create entrypoint file: %w", err)
return nil, fmt.Errorf("write entrypoint file: %w", err)
}

// 4. Register extra kafka listeners if provided, network aliases will be
// set
if err := registerListeners(settings, req); err != nil {
return nil, fmt.Errorf("failed to register listeners: %w", err)
return nil, fmt.Errorf("register listeners: %w", err)
}

// Bootstrap config file contains cluster configurations which will only be considered
// the very first time you start a cluster.
bootstrapConfigPath := filepath.Join(tmpDir, bootstrapConfigFile)
bootstrapConfig, err := renderBootstrapConfig(settings)
if err != nil {
return nil, fmt.Errorf("failed to create bootstrap config file: %w", err)
return nil, err
}
if err := os.WriteFile(bootstrapConfigPath, bootstrapConfig, 0o600); err != nil {
return nil, fmt.Errorf("failed to create bootstrap config file: %w", err)
return nil, fmt.Errorf("write bootstrap config: %w", err)
}

req.Files = append(req.Files,
Expand All @@ -158,11 +158,11 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
if settings.EnableTLS {
certPath := filepath.Join(tmpDir, certFile)
if err := os.WriteFile(certPath, settings.cert, 0o600); err != nil {
return nil, fmt.Errorf("failed to create certificate file: %w", err)
return nil, fmt.Errorf("write certificate file: %w", err)
}
keyPath := filepath.Join(tmpDir, keyFile)
if err := os.WriteFile(keyPath, settings.key, 0o600); err != nil {
return nil, fmt.Errorf("failed to create key file: %w", err)
return nil, fmt.Errorf("write key file: %w", err)
}

req.Files = append(req.Files,
Expand Down Expand Up @@ -192,34 +192,54 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
// the Redpanda config with the advertised Kafka address.
hostIP, err := ctr.Host(ctx)
if err != nil {
return c, fmt.Errorf("failed to get container host: %w", err)
return c, fmt.Errorf("host: %w", err)
}

kafkaPort, err := ctr.MappedPort(ctx, nat.Port(defaultKafkaAPIPort))
if err != nil {
return c, fmt.Errorf("failed to get mapped Kafka port: %w", err)
return c, fmt.Errorf("mapped kafka port: %w", err)
}

// 7. Render redpanda.yaml config and mount it.
nodeConfig, err := renderNodeConfig(settings, hostIP, kafkaPort.Int())
if err != nil {
return c, fmt.Errorf("failed to render node config: %w", err)
return c, err
}

err = ctr.CopyToContainer(ctx, nodeConfig, filepath.Join(redpandaDir, "redpanda.yaml"), 600)
err = ctr.CopyToContainer(ctx, nodeConfig, filepath.Join(redpandaDir, "redpanda.yaml"), 0o600)
if err != nil {
return c, fmt.Errorf("failed to copy redpanda.yaml into container: %w", err)
return c, fmt.Errorf("copy to container: %w", err)
}

// 8. Wait until Redpanda is ready to serve requests.
waitHTTP := wait.ForHTTP(defaultAdminAPIPort).
WithStatusCodeMatcher(func(status int) bool {
// Redpanda's admin API returns 404 for requests to "/".
return status == http.StatusNotFound
stevenh marked this conversation as resolved.
Show resolved Hide resolved
})

var tlsConfig *tls.Config
if settings.EnableTLS {
cert, err := tls.X509KeyPair(settings.cert, settings.key)
if err != nil {
return c, fmt.Errorf("create admin cert: %w", err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(settings.cert)
tlsConfig = &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}
waitHTTP = waitHTTP.WithTLS(true, tlsConfig)
}
err = wait.ForAll(
wait.ForListeningPort(defaultKafkaAPIPort),
wait.ForListeningPort(defaultAdminAPIPort),
waitHTTP,
wait.ForListeningPort(defaultSchemaRegistryPort),
wait.ForLog("Successfully started Redpanda!"),
).WaitUntilReady(ctx, ctr)
if err != nil {
return c, fmt.Errorf("failed to wait for Redpanda readiness: %w", err)
return c, fmt.Errorf("wait for readiness: %w", err)
}

c.urlScheme = "http"
Expand All @@ -231,34 +251,25 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
if len(settings.ServiceAccounts) > 0 {
adminAPIPort, err := ctr.MappedPort(ctx, nat.Port(defaultAdminAPIPort))
if err != nil {
return c, fmt.Errorf("failed to get mapped Admin API port: %w", err)
return c, fmt.Errorf("mapped admin port: %w", err)
}

adminAPIUrl := fmt.Sprintf("%s://%v:%d", c.urlScheme, hostIP, adminAPIPort.Int())
adminCl := NewAdminAPIClient(adminAPIUrl)
if settings.EnableTLS {
cert, err := tls.X509KeyPair(settings.cert, settings.key)
if err != nil {
return c, fmt.Errorf("failed to create admin client with cert: %w", err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(settings.cert)
adminCl = adminCl.WithHTTPClient(&http.Client{
Timeout: 5 * time.Second,
Transport: &http.Transport{
ForceAttemptHTTP2: true,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
},
TLSClientConfig: tlsConfig,
},
})
}

for username, password := range settings.ServiceAccounts {
if err := adminCl.CreateUser(ctx, username, password); err != nil {
return c, fmt.Errorf("failed to create service account with username %q: %w", username, err)
return c, fmt.Errorf("create user %q: %w", username, err)
}
}
}
Expand Down Expand Up @@ -299,12 +310,12 @@ func renderBootstrapConfig(settings options) ([]byte, error) {

tpl, err := template.New("bootstrap.yaml").Parse(bootstrapConfigTpl)
if err != nil {
return nil, fmt.Errorf("failed to parse redpanda config file template: %w", err)
return nil, fmt.Errorf("parse bootstrap template: %w", err)
}

var bootstrapConfig bytes.Buffer
if err := tpl.Execute(&bootstrapConfig, bootstrapTplParams); err != nil {
return nil, fmt.Errorf("failed to render redpanda bootstrap config template: %w", err)
return nil, fmt.Errorf("render bootstrap template: %w", err)
}

return bootstrapConfig.Bytes(), nil
Expand Down Expand Up @@ -353,12 +364,12 @@ func renderNodeConfig(settings options, hostIP string, advertisedKafkaPort int)

ncTpl, err := template.New("redpanda.yaml").Parse(nodeConfigTpl)
if err != nil {
return nil, fmt.Errorf("failed to parse redpanda config file template: %w", err)
return nil, fmt.Errorf("parse node config template: %w", err)
}

var redpandaYaml bytes.Buffer
if err := ncTpl.Execute(&redpandaYaml, tplParams); err != nil {
return nil, fmt.Errorf("failed to render redpanda node config template: %w", err)
return nil, fmt.Errorf("render node config template: %w", err)
}

return redpandaYaml.Bytes(), nil
Expand Down