From 0ce5b44bb4e3e293e60d30b2cb3b13fe614d278b Mon Sep 17 00:00:00 2001 From: Adam Bouqdib Date: Thu, 17 Aug 2023 23:12:50 +0100 Subject: [PATCH] feat(modules/redpanda): support tls --- modules/redpanda/mounts/redpanda.yaml.tpl | 20 ++++ modules/redpanda/options.go | 13 +++ modules/redpanda/redpanda.go | 76 ++++++------ modules/redpanda/redpanda_test.go | 134 +++++++++++++++++++++- 4 files changed, 199 insertions(+), 44 deletions(-) diff --git a/modules/redpanda/mounts/redpanda.yaml.tpl b/modules/redpanda/mounts/redpanda.yaml.tpl index 9c7922d75d..a19d21ce19 100644 --- a/modules/redpanda/mounts/redpanda.yaml.tpl +++ b/modules/redpanda/mounts/redpanda.yaml.tpl @@ -27,6 +27,18 @@ redpanda: name: internal port: 9093 +{{ if .EnableTLS }} + admin_api_tls: + - enabled: true + cert_file: /etc/redpanda/cert.pem + key_file: /etc/redpanda/key.pem + kafka_api_tls: + - name: external + enabled: true + cert_file: /etc/redpanda/cert.pem + key_file: /etc/redpanda/key.pem +{{ end }} + schema_registry: schema_registry_api: - address: "0.0.0.0" @@ -34,6 +46,14 @@ schema_registry: port: 8081 authentication_method: {{ .SchemaRegistry.AuthenticationMethod }} +{{ if .EnableTLS }} + schema_registry_api_tls: + - name: main + enabled: true + cert_file: /etc/redpanda/cert.pem + key_file: /etc/redpanda/key.pem +{{ end }} + schema_registry_client: brokers: - address: localhost diff --git a/modules/redpanda/options.go b/modules/redpanda/options.go index 29a32bdb9b..f939ddcaa2 100644 --- a/modules/redpanda/options.go +++ b/modules/redpanda/options.go @@ -26,6 +26,10 @@ type options struct { // AutoCreateTopics is a flag to allow topic auto creation. AutoCreateTopics bool + + // EnableTLS is a flag to enable TLS. + EnableTLS bool + cert, key []byte } func defaultOptions() options { @@ -36,6 +40,7 @@ func defaultOptions() options { SchemaRegistryAuthenticationMethod: "none", ServiceAccounts: make(map[string]string, 0), AutoCreateTopics: false, + EnableTLS: false, } } @@ -93,3 +98,11 @@ func WithAutoCreateTopics() Option { o.AutoCreateTopics = true } } + +func WithEnableTLS(cert, key []byte) Option { + return func(o *options) { + o.EnableTLS = true + o.cert = cert + o.key = key + } +} diff --git a/modules/redpanda/redpanda.go b/modules/redpanda/redpanda.go index cc373b42cd..0a5edb9b0c 100644 --- a/modules/redpanda/redpanda.go +++ b/modules/redpanda/redpanda.go @@ -30,12 +30,13 @@ var ( defaultSchemaRegistryPort = "8081/tcp" ) -// Container represents the Redpanda container type used in the module +// Container represents the Redpanda container type used in the module. type Container struct { testcontainers.Container + urlScheme string } -// RunContainer creates an instance of the Redpanda container type +// RunContainer creates an instance of the Redpanda container type. func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (*Container, error) { // 1. Create container request. // Some (e.g. Image) may be overridden by providing an option argument to this function. @@ -87,26 +88,37 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize return nil, fmt.Errorf("failed to create bootstrap config file: %w", err) } - toBeMountedFiles := []testcontainers.ContainerFile{ - { + req.Files = append(req.Files, + testcontainers.ContainerFile{ HostFilePath: entrypointFile.Name(), ContainerFilePath: "/entrypoint-tc.sh", FileMode: 700, }, - { + testcontainers.ContainerFile{ HostFilePath: bootstrapConfigFile.Name(), ContainerFilePath: "/etc/redpanda/.bootstrap.yaml", - FileMode: 700, + FileMode: 600, }, - } - req.Files = append(req.Files, toBeMountedFiles...) + ) container, err := testcontainers.GenericContainer(ctx, req) if err != nil { return nil, err } - // 4. Get mapped port for the Kafka API, so that we can render and then mount + // 4. Create certificate and key for TLS connections. + if settings.EnableTLS { + err = container.CopyToContainer(ctx, settings.cert, "/etc/redpanda/cert.pem", 600) + if err != nil { + return nil, fmt.Errorf("failed to copy cert.pem into container: %w", err) + } + err = container.CopyToContainer(ctx, settings.key, "/etc/redpanda/key.pem", 600) + if err != nil { + return nil, fmt.Errorf("failed to copy key.pem into container: %w", err) + } + } + + // 5. Get mapped port for the Kafka API, so that we can render and then mount // the Redpanda config with the advertised Kafka address. hostIP, err := container.Host(ctx) if err != nil { @@ -118,7 +130,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize return nil, fmt.Errorf("failed to get mapped Kafka port: %w", err) } - // 5. Render redpanda.yaml config and mount it. + // 6. Render redpanda.yaml config and mount it. nodeConfig, err := renderNodeConfig(settings, hostIP, kafkaPort.Int()) if err != nil { return nil, fmt.Errorf("failed to render node config: %w", err) @@ -159,51 +171,31 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize } } - return &Container{Container: container}, nil + scheme := "http" + if settings.EnableTLS { + scheme += "s" + } + + return &Container{Container: container, urlScheme: scheme}, nil } // KafkaSeedBroker returns the seed broker that should be used for connecting // to the Kafka API with your Kafka client. It'll be returned in the format: // "host:port" - for example: "localhost:55687". func (c *Container) KafkaSeedBroker(ctx context.Context) (string, error) { - return c.getMappedHostPort(ctx, nat.Port(defaultKafkaAPIPort)) + return c.PortEndpoint(ctx, nat.Port(defaultKafkaAPIPort), "") } // AdminAPIAddress returns the address to the Redpanda Admin API. This // is an HTTP-based API and thus the returned format will be: http://host:port. func (c *Container) AdminAPIAddress(ctx context.Context) (string, error) { - hostPort, err := c.getMappedHostPort(ctx, nat.Port(defaultAdminAPIPort)) - if err != nil { - return "", err - } - return fmt.Sprintf("http://%v", hostPort), nil + return c.PortEndpoint(ctx, nat.Port(defaultAdminAPIPort), c.urlScheme) } // SchemaRegistryAddress returns the address to the schema registry API. This // is an HTTP-based API and thus the returned format will be: http://host:port. func (c *Container) SchemaRegistryAddress(ctx context.Context) (string, error) { - hostPort, err := c.getMappedHostPort(ctx, nat.Port(defaultSchemaRegistryPort)) - if err != nil { - return "", err - } - return fmt.Sprintf("http://%v", hostPort), nil -} - -// getMappedHostPort returns the mapped host and port a given nat.Port following -// this format: "host:port". The mapped port is the port that is accessible from -// the host system and is remapped to the given container port. -func (c *Container) getMappedHostPort(ctx context.Context, port nat.Port) (string, error) { - hostIP, err := c.Host(ctx) - if err != nil { - return "", fmt.Errorf("failed to get hostIP: %w", err) - } - - mappedPort, err := c.MappedPort(ctx, port) - if err != nil { - return "", fmt.Errorf("failed to get mapped port: %w", err) - } - - return fmt.Sprintf("%v:%d", hostIP, mappedPort.Int()), nil + return c.PortEndpoint(ctx, nat.Port(defaultSchemaRegistryPort), c.urlScheme) } // createEntrypointTmpFile returns a temporary file with the custom entrypoint @@ -247,14 +239,14 @@ func createBootstrapConfigFile(settings options) (*os.File, error) { return nil, err } - if err := os.WriteFile(bootstrapTmpFile.Name(), bootstrapConfig.Bytes(), 0o700); err != nil { + if err := os.WriteFile(bootstrapTmpFile.Name(), bootstrapConfig.Bytes(), 0o600); err != nil { return nil, err } return bootstrapTmpFile, nil } -// renderNodeConfig renders the redpanda.yaml node config and retuns it as +// renderNodeConfig renders the redpanda.yaml node config and returns it as // byte array. func renderNodeConfig(settings options, hostIP string, advertisedKafkaPort int) ([]byte, error) { tplParams := redpandaConfigTplParams{ @@ -268,6 +260,7 @@ func renderNodeConfig(settings options, hostIP string, advertisedKafkaPort int) SchemaRegistry: redpandaConfigTplParamsSchemaRegistry{ AuthenticationMethod: settings.SchemaRegistryAuthenticationMethod, }, + EnableTLS: settings.EnableTLS, } ncTpl, err := template.New("redpanda.yaml").Parse(nodeConfigTpl) @@ -293,6 +286,7 @@ type redpandaConfigTplParams struct { KafkaAPI redpandaConfigTplParamsKafkaAPI SchemaRegistry redpandaConfigTplParamsSchemaRegistry AutoCreateTopics bool + EnableTLS bool } type redpandaConfigTplParamsKafkaAPI struct { diff --git a/modules/redpanda/redpanda_test.go b/modules/redpanda/redpanda_test.go index af7a5244ef..a7d2bacfba 100644 --- a/modules/redpanda/redpanda_test.go +++ b/modules/redpanda/redpanda_test.go @@ -2,8 +2,11 @@ package redpanda import ( "context" + "crypto/tls" + "crypto/x509" "fmt" "net/http" + "strings" "testing" "time" @@ -47,7 +50,7 @@ func TestRedpanda(t *testing.T) { httpCl := &http.Client{Timeout: 5 * time.Second} schemaRegistryURL, err := container.SchemaRegistryAddress(ctx) require.NoError(t, err) - req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s/subjects", schemaRegistryURL), nil) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/subjects", schemaRegistryURL), nil) require.NoError(t, err) resp, err := httpCl.Do(req) require.NoError(t, err) @@ -59,7 +62,7 @@ func TestRedpanda(t *testing.T) { adminAPIURL, err := container.AdminAPIAddress(ctx) require.NoError(t, err) // } - req, err = http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s/v1/cluster/health_overview", adminAPIURL), nil) + req, err = http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/v1/cluster/health_overview", adminAPIURL), nil) require.NoError(t, err) resp, err = httpCl.Do(req) require.NoError(t, err) @@ -163,7 +166,7 @@ func TestRedpandaWithAuthentication(t *testing.T) { // } // Failed authentication - req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s/subjects", schemaRegistryURL), nil) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/subjects", schemaRegistryURL), nil) require.NoError(t, err) resp, err := httpCl.Do(req) require.NoError(t, err) @@ -205,3 +208,128 @@ func TestRedpandaProduceWithAutoCreateTopics(t *testing.T) { results := kafkaCl.ProduceSync(ctx, &kgo.Record{Topic: "test", Value: []byte("test message")}) require.NoError(t, results.FirstErr()) } + +func TestRedpandaWithEnableTLS(t *testing.T) { + cert, err := tls.X509KeyPair(localhostCert, localhostKey) + require.NoError(t, err, "failed to load key pair") + + ctx := context.Background() + + container, err := RunContainer(ctx, WithEnableTLS(localhostCert, localhostKey)) + require.NoError(t, err) + + t.Cleanup(func() { + if err := container.Terminate(ctx); err != nil { + t.Fatalf("failed to terminate container: %s", err) + } + }) + + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(localhostCert) + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + } + + httpCl := &http.Client{ + Timeout: 5 * time.Second, + Transport: &http.Transport{ + ForceAttemptHTTP2: true, + TLSHandshakeTimeout: 10 * time.Second, + TLSClientConfig: tlsConfig, + }, + } + + // Test Admin API + adminAPIURL, err := container.AdminAPIAddress(ctx) + require.NoError(t, err) + require.True(t, strings.HasPrefix(adminAPIURL, "https://"), "AdminAPIAddress should return https url") + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/v1/cluster/health_overview", adminAPIURL), nil) + require.NoError(t, err) + resp, err := httpCl.Do(req) + require.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + resp.Body.Close() + + // Test Schema Registry API + schemaRegistryURL, err := container.SchemaRegistryAddress(ctx) + require.NoError(t, err) + require.True(t, strings.HasPrefix(adminAPIURL, "https://"), "SchemaRegistryAddress should return https url") + req, err = http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/subjects", schemaRegistryURL), nil) + require.NoError(t, err) + resp, err = httpCl.Do(req) + require.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + resp.Body.Close() + + brokers, err := container.KafkaSeedBroker(ctx) + require.NoError(t, err) + + kafkaCl, err := kgo.NewClient( + kgo.SeedBrokers(brokers), + kgo.DialTLSConfig(tlsConfig), + ) + require.NoError(t, err) + defer kafkaCl.Close() + + // Test produce to unknown topic + results := kafkaCl.ProduceSync(ctx, &kgo.Record{Topic: "test", Value: []byte("test message")}) + require.Error(t, results.FirstErr(), kerr.UnknownTopicOrPartition) +} + +// localhostCert is a PEM-encoded TLS cert with SAN IPs +// generated from src/crypto/tls: +// go run generate_cert.go --rsa-bits 2048 --host 127.0.0.1,::1,localhost --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h +var localhostCert = []byte(`-----BEGIN CERTIFICATE----- +MIIDODCCAiCgAwIBAgIRAKMykg5qJSCb4L3WtcZznSQwDQYJKoZIhvcNAQELBQAw +EjEQMA4GA1UEChMHQWNtZSBDbzAgFw03MDAxMDEwMDAwMDBaGA8yMDg0MDEyOTE2 +MDAwMFowEjEQMA4GA1UEChMHQWNtZSBDbzCCASIwDQYJKoZIhvcNAQEBBQADggEP +ADCCAQoCggEBAPYcLIhqCsrmqvsY1gWqI1jx3Ytn5Qjfvlg3BPD/YeD4UVouBhgQ +NIIERFCmDUzu52pXYZeCouBIVDWqZKixQf3PyBzAqbFvX0pTsZrOnvjuoahzjEcl +x+CfkIp58mVaV/8v9TyBYCXNuHlI7Pndu/3U5d6npSg8+dTkwW3VZzZyHpsDW+a4 +ByW02NI58LoHzQPMRg9MFToL1qNQy4PFyADf2N/3/SYOkrbSrXA0jYqXE8yvQGYe +LWcoQ+4YkurSS1TgSNEKxrzGj8w4xRjEjRNsLVNWd8uxZkHwv6LXOn4s39ix3jN4 +7OJJHA8fJAWxAP4ThrpM1j5J+Rq1PD380u8CAwEAAaOBhjCBgzAOBgNVHQ8BAf8E +BAMCAqQwEwYDVR0lBAwwCgYIKwYBBQUHAwEwDwYDVR0TAQH/BAUwAwEB/zAdBgNV +HQ4EFgQU8gMBt2leRAnGgCQ6pgIYPHY35GAwLAYDVR0RBCUwI4IJbG9jYWxob3N0 +hwR/AAABhxAAAAAAAAAAAAAAAAAAAAABMA0GCSqGSIb3DQEBCwUAA4IBAQA5F6aw +6JJMsnCjxRGYXb252zqjxOxweawZ2je4UAGSsF27Phm1Bx6/2mzPpgIB0I7xNBFL +ljtqBG/FpH6qWpkkegljL8Z5soXiye/4r1G+V6hadm32/OLQCS//dyq7W1a2uVlS +KdFjoNqRW2PacVQLjnTbP2SJV5CnrJgCsSMXVoNnKdj5gr5ltNNAt9TAJ85iFa5d +rJla/XghtqEOzYtigKPF7EVqRRl4RmPu30hxwDZMT60ptFolfCEeXpDra5uonJMv +ElEbzK8ZzXmvWCj94RjPkGKZs8+SDM2qfKPk5ZW2xJxwqS3tkEkZlj1L+b7zYOlt +aJ65OWCXHLecrgdl +-----END CERTIFICATE-----`) + +// localhostKey is the private key for localhostCert. +var localhostKey = []byte(testingKey(`-----BEGIN TESTING KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQD2HCyIagrK5qr7 +GNYFqiNY8d2LZ+UI375YNwTw/2Hg+FFaLgYYEDSCBERQpg1M7udqV2GXgqLgSFQ1 +qmSosUH9z8gcwKmxb19KU7Gazp747qGoc4xHJcfgn5CKefJlWlf/L/U8gWAlzbh5 +SOz53bv91OXep6UoPPnU5MFt1Wc2ch6bA1vmuAcltNjSOfC6B80DzEYPTBU6C9aj +UMuDxcgA39jf9/0mDpK20q1wNI2KlxPMr0BmHi1nKEPuGJLq0ktU4EjRCsa8xo/M +OMUYxI0TbC1TVnfLsWZB8L+i1zp+LN/Ysd4zeOziSRwPHyQFsQD+E4a6TNY+Sfka +tTw9/NLvAgMBAAECggEBALKxAiSJ2gw4Lyzhe4PhZIjQE+uEI+etjKbAS/YvdwHB +SlAP2pzeJ0G/l1p3NnEFhUDQ8SrwzxHJclsEvNE+4otGsiUuPgd2tdlhqzKbkxFr +MjT8sH14EQgm0uu4Xyb30ayXRZgI16abF7X4HRfOxxAl5EElt+TfYQYSkd8Nc0Mz +bD7g0riSdOKVhNIkUTT1U7x8ClIgff6vbWztOVP4hGezqEKpO/8+JBkg2GLeH3lC +PyuHEb33Foxg7SX35M1a89EKC2p4ER6/nfg6wGYyIsn42gBk1JgQdg23x7c/0WOu +vcw1unNP2kCbnsCeZ6KPRRGXEjbpTqOTzAUOekOeOgECgYEA9/jwK2wrX2J3kJN7 +v6kmxazigXHCa7XmFMgTfdqiQfXfjdi/4u+4KAX04jWra3ZH4KT98ztPOGjb6KhM +hfMldsxON8S9IQPcbDyj+5R77KU4BG/JQBEOX1uzS9KjMVG5e9ZUpG5UnSoSOgyM +oN3DZto7C5ULO2U2MT8JaoGb53cCgYEA/hPNMsCXFairxKy0BCsvJFan93+GIdwM +YoAGLc4Oj67ES8TYC4h9Im5i81JYOjpY4aZeKdj8S+ozmbqqa/iJiAfOr37xOMuX +AQA2T8uhPXXNXA5s6T3LaIXtzL0NmRRZCtuyEGdCidIXub7Bz8LrfsMc+s/jv57f +4IPmW12PPkkCgYBpEdDqBT5nfzh8SRGhR1IHZlbfVE12CDACVDh2FkK0QjNETjgY +N0zHoKZ/hxAoS4jvNdnoyxOpKj0r2sv54enY6X6nALTGnXUzY4p0GhlcTzFqJ9eV +TuTRIPDaytidGCzIvStGNP2jTmVEtXaM3wphtUxZfwCwXRVWToh12Y8uxwKBgA1a +FQp5vHbS6lPnj344lr2eIC2NcgsNeUkj2S9HCNTcJkylB4Vzor/QdTq8NQ66Sjlx +eLlSQc/retK1UIdkBDY10tK+JQcLC+Btlm0TEmIccrJHv8lyCeJwR1LfDHvi6dr8 +OJtMEd8UP1Lvh1fXsnBy6G71xc4oFzPBOrXKcOChAoGACOgyYe47ZizScsUGjCC7 +xARTEolZhtqHKVd5s9oi95P0r7A1gcNx/9YW0rCT2ZD8BD9H++HTE2L+mh3R9zDn +jwDeW7wVZec+oyGdc9L+B1xU25O+88zNLxlRAX8nXJbHdgL83UclmC51GbXejloP +D4ZNvyXf/6E27Ibu6v2p/vs= +-----END TESTING KEY-----`)) + +func testingKey(s string) string { return strings.ReplaceAll(s, "TESTING KEY", "PRIVATE KEY") }