diff --git a/modules/redpanda/redpanda.go b/modules/redpanda/redpanda.go index 686d310d9f..c0936637d3 100644 --- a/modules/redpanda/redpanda.go +++ b/modules/redpanda/redpanda.go @@ -62,7 +62,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustomizer) (*Container, error) { tmpDir, err := os.MkdirTemp("", "redpanda") if err != nil { - return nil, fmt.Errorf("failed to create directory: %w", err) + return nil, fmt.Errorf("create temporary directory: %w", err) } defer os.RemoveAll(tmpDir) @@ -121,13 +121,13 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom // the Kafka API. entrypointPath := filepath.Join(tmpDir, entrypointFile) if err := os.WriteFile(entrypointPath, entrypoint, 0o700); err != nil { - return nil, fmt.Errorf("failed to create entrypoint file: %w", err) + return nil, fmt.Errorf("write entrypoint file: %w", err) } // 4. Register extra kafka listeners if provided, network aliases will be // set if err := registerListeners(settings, req); err != nil { - return nil, fmt.Errorf("failed to register listeners: %w", err) + return nil, fmt.Errorf("register listeners: %w", err) } // Bootstrap config file contains cluster configurations which will only be considered @@ -135,10 +135,10 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom bootstrapConfigPath := filepath.Join(tmpDir, bootstrapConfigFile) bootstrapConfig, err := renderBootstrapConfig(settings) if err != nil { - return nil, fmt.Errorf("failed to create bootstrap config file: %w", err) + return nil, err } if err := os.WriteFile(bootstrapConfigPath, bootstrapConfig, 0o600); err != nil { - return nil, fmt.Errorf("failed to create bootstrap config file: %w", err) + return nil, fmt.Errorf("write bootstrap config: %w", err) } req.Files = append(req.Files, @@ -158,11 +158,11 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom if settings.EnableTLS { certPath := filepath.Join(tmpDir, certFile) if err := os.WriteFile(certPath, settings.cert, 0o600); err != nil { - return nil, fmt.Errorf("failed to create certificate file: %w", err) + return nil, fmt.Errorf("write certificate file: %w", err) } keyPath := filepath.Join(tmpDir, keyFile) if err := os.WriteFile(keyPath, settings.key, 0o600); err != nil { - return nil, fmt.Errorf("failed to create key file: %w", err) + return nil, fmt.Errorf("write key file: %w", err) } req.Files = append(req.Files, @@ -192,34 +192,54 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom // the Redpanda config with the advertised Kafka address. hostIP, err := ctr.Host(ctx) if err != nil { - return c, fmt.Errorf("failed to get container host: %w", err) + return c, fmt.Errorf("host: %w", err) } kafkaPort, err := ctr.MappedPort(ctx, nat.Port(defaultKafkaAPIPort)) if err != nil { - return c, fmt.Errorf("failed to get mapped Kafka port: %w", err) + return c, fmt.Errorf("mapped kafka port: %w", err) } // 7. Render redpanda.yaml config and mount it. nodeConfig, err := renderNodeConfig(settings, hostIP, kafkaPort.Int()) if err != nil { - return c, fmt.Errorf("failed to render node config: %w", err) + return c, err } - err = ctr.CopyToContainer(ctx, nodeConfig, filepath.Join(redpandaDir, "redpanda.yaml"), 600) + err = ctr.CopyToContainer(ctx, nodeConfig, filepath.Join(redpandaDir, "redpanda.yaml"), 0o600) if err != nil { - return c, fmt.Errorf("failed to copy redpanda.yaml into container: %w", err) + return c, fmt.Errorf("copy to container: %w", err) } // 8. Wait until Redpanda is ready to serve requests. + waitHTTP := wait.ForHTTP(defaultAdminAPIPort). + WithStatusCodeMatcher(func(status int) bool { + // Redpanda's admin API returns 404 for requests to "/". + return status == http.StatusNotFound + }) + + var tlsConfig *tls.Config + if settings.EnableTLS { + cert, err := tls.X509KeyPair(settings.cert, settings.key) + if err != nil { + return c, fmt.Errorf("create admin cert: %w", err) + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(settings.cert) + tlsConfig = &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + } + waitHTTP = waitHTTP.WithTLS(true, tlsConfig) + } err = wait.ForAll( wait.ForListeningPort(defaultKafkaAPIPort), - wait.ForListeningPort(defaultAdminAPIPort), + waitHTTP, wait.ForListeningPort(defaultSchemaRegistryPort), wait.ForLog("Successfully started Redpanda!"), ).WaitUntilReady(ctx, ctr) if err != nil { - return c, fmt.Errorf("failed to wait for Redpanda readiness: %w", err) + return c, fmt.Errorf("wait for readiness: %w", err) } c.urlScheme = "http" @@ -231,34 +251,25 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom if len(settings.ServiceAccounts) > 0 { adminAPIPort, err := ctr.MappedPort(ctx, nat.Port(defaultAdminAPIPort)) if err != nil { - return c, fmt.Errorf("failed to get mapped Admin API port: %w", err) + return c, fmt.Errorf("mapped admin port: %w", err) } adminAPIUrl := fmt.Sprintf("%s://%v:%d", c.urlScheme, hostIP, adminAPIPort.Int()) adminCl := NewAdminAPIClient(adminAPIUrl) if settings.EnableTLS { - cert, err := tls.X509KeyPair(settings.cert, settings.key) - if err != nil { - return c, fmt.Errorf("failed to create admin client with cert: %w", err) - } - caCertPool := x509.NewCertPool() - caCertPool.AppendCertsFromPEM(settings.cert) adminCl = adminCl.WithHTTPClient(&http.Client{ Timeout: 5 * time.Second, Transport: &http.Transport{ ForceAttemptHTTP2: true, TLSHandshakeTimeout: 10 * time.Second, - TLSClientConfig: &tls.Config{ - Certificates: []tls.Certificate{cert}, - RootCAs: caCertPool, - }, + TLSClientConfig: tlsConfig, }, }) } for username, password := range settings.ServiceAccounts { if err := adminCl.CreateUser(ctx, username, password); err != nil { - return c, fmt.Errorf("failed to create service account with username %q: %w", username, err) + return c, fmt.Errorf("create user %q: %w", username, err) } } } @@ -299,12 +310,12 @@ func renderBootstrapConfig(settings options) ([]byte, error) { tpl, err := template.New("bootstrap.yaml").Parse(bootstrapConfigTpl) if err != nil { - return nil, fmt.Errorf("failed to parse redpanda config file template: %w", err) + return nil, fmt.Errorf("parse bootstrap template: %w", err) } var bootstrapConfig bytes.Buffer if err := tpl.Execute(&bootstrapConfig, bootstrapTplParams); err != nil { - return nil, fmt.Errorf("failed to render redpanda bootstrap config template: %w", err) + return nil, fmt.Errorf("render bootstrap template: %w", err) } return bootstrapConfig.Bytes(), nil @@ -353,12 +364,12 @@ func renderNodeConfig(settings options, hostIP string, advertisedKafkaPort int) ncTpl, err := template.New("redpanda.yaml").Parse(nodeConfigTpl) if err != nil { - return nil, fmt.Errorf("failed to parse redpanda config file template: %w", err) + return nil, fmt.Errorf("parse node config template: %w", err) } var redpandaYaml bytes.Buffer if err := ncTpl.Execute(&redpandaYaml, tplParams); err != nil { - return nil, fmt.Errorf("failed to render redpanda node config template: %w", err) + return nil, fmt.Errorf("render node config template: %w", err) } return redpandaYaml.Bytes(), nil