Skip to content

Commit

Permalink
feat(modules/redpanda): support tls
Browse files Browse the repository at this point in the history
  • Loading branch information
abemedia committed Aug 17, 2023
1 parent 71d58d8 commit 0ce5b44
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 44 deletions.
20 changes: 20 additions & 0 deletions modules/redpanda/mounts/redpanda.yaml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,33 @@ redpanda:
name: internal
port: 9093

{{ if .EnableTLS }}
admin_api_tls:
- enabled: true
cert_file: /etc/redpanda/cert.pem
key_file: /etc/redpanda/key.pem
kafka_api_tls:
- name: external
enabled: true
cert_file: /etc/redpanda/cert.pem
key_file: /etc/redpanda/key.pem
{{ end }}

schema_registry:
schema_registry_api:
- address: "0.0.0.0"
name: main
port: 8081
authentication_method: {{ .SchemaRegistry.AuthenticationMethod }}

{{ if .EnableTLS }}
schema_registry_api_tls:
- name: main
enabled: true
cert_file: /etc/redpanda/cert.pem
key_file: /etc/redpanda/key.pem
{{ end }}

schema_registry_client:
brokers:
- address: localhost
Expand Down
13 changes: 13 additions & 0 deletions modules/redpanda/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ type options struct {

// AutoCreateTopics is a flag to allow topic auto creation.
AutoCreateTopics bool

// EnableTLS is a flag to enable TLS.
EnableTLS bool
cert, key []byte
}

func defaultOptions() options {
Expand All @@ -36,6 +40,7 @@ func defaultOptions() options {
SchemaRegistryAuthenticationMethod: "none",
ServiceAccounts: make(map[string]string, 0),
AutoCreateTopics: false,
EnableTLS: false,
}
}

Expand Down Expand Up @@ -93,3 +98,11 @@ func WithAutoCreateTopics() Option {
o.AutoCreateTopics = true
}
}

func WithEnableTLS(cert, key []byte) Option {
return func(o *options) {
o.EnableTLS = true
o.cert = cert
o.key = key
}
}
76 changes: 35 additions & 41 deletions modules/redpanda/redpanda.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ var (
defaultSchemaRegistryPort = "8081/tcp"
)

// Container represents the Redpanda container type used in the module
// Container represents the Redpanda container type used in the module.
type Container struct {
testcontainers.Container
urlScheme string
}

// RunContainer creates an instance of the Redpanda container type
// RunContainer creates an instance of the Redpanda container type.
func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (*Container, error) {
// 1. Create container request.
// Some (e.g. Image) may be overridden by providing an option argument to this function.
Expand Down Expand Up @@ -87,26 +88,37 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
return nil, fmt.Errorf("failed to create bootstrap config file: %w", err)
}

toBeMountedFiles := []testcontainers.ContainerFile{
{
req.Files = append(req.Files,
testcontainers.ContainerFile{
HostFilePath: entrypointFile.Name(),
ContainerFilePath: "/entrypoint-tc.sh",
FileMode: 700,
},
{
testcontainers.ContainerFile{
HostFilePath: bootstrapConfigFile.Name(),
ContainerFilePath: "/etc/redpanda/.bootstrap.yaml",
FileMode: 700,
FileMode: 600,
},
}
req.Files = append(req.Files, toBeMountedFiles...)
)

container, err := testcontainers.GenericContainer(ctx, req)
if err != nil {
return nil, err
}

// 4. Get mapped port for the Kafka API, so that we can render and then mount
// 4. Create certificate and key for TLS connections.
if settings.EnableTLS {
err = container.CopyToContainer(ctx, settings.cert, "/etc/redpanda/cert.pem", 600)
if err != nil {
return nil, fmt.Errorf("failed to copy cert.pem into container: %w", err)
}
err = container.CopyToContainer(ctx, settings.key, "/etc/redpanda/key.pem", 600)
if err != nil {
return nil, fmt.Errorf("failed to copy key.pem into container: %w", err)
}
}

// 5. Get mapped port for the Kafka API, so that we can render and then mount
// the Redpanda config with the advertised Kafka address.
hostIP, err := container.Host(ctx)
if err != nil {
Expand All @@ -118,7 +130,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
return nil, fmt.Errorf("failed to get mapped Kafka port: %w", err)
}

// 5. Render redpanda.yaml config and mount it.
// 6. Render redpanda.yaml config and mount it.
nodeConfig, err := renderNodeConfig(settings, hostIP, kafkaPort.Int())
if err != nil {
return nil, fmt.Errorf("failed to render node config: %w", err)
Expand Down Expand Up @@ -159,51 +171,31 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
}
}

return &Container{Container: container}, nil
scheme := "http"
if settings.EnableTLS {
scheme += "s"
}

return &Container{Container: container, urlScheme: scheme}, nil
}

// KafkaSeedBroker returns the seed broker that should be used for connecting
// to the Kafka API with your Kafka client. It'll be returned in the format:
// "host:port" - for example: "localhost:55687".
func (c *Container) KafkaSeedBroker(ctx context.Context) (string, error) {
return c.getMappedHostPort(ctx, nat.Port(defaultKafkaAPIPort))
return c.PortEndpoint(ctx, nat.Port(defaultKafkaAPIPort), "")
}

// AdminAPIAddress returns the address to the Redpanda Admin API. This
// is an HTTP-based API and thus the returned format will be: http://host:port.
func (c *Container) AdminAPIAddress(ctx context.Context) (string, error) {
hostPort, err := c.getMappedHostPort(ctx, nat.Port(defaultAdminAPIPort))
if err != nil {
return "", err
}
return fmt.Sprintf("http://%v", hostPort), nil
return c.PortEndpoint(ctx, nat.Port(defaultAdminAPIPort), c.urlScheme)
}

// SchemaRegistryAddress returns the address to the schema registry API. This
// is an HTTP-based API and thus the returned format will be: http://host:port.
func (c *Container) SchemaRegistryAddress(ctx context.Context) (string, error) {
hostPort, err := c.getMappedHostPort(ctx, nat.Port(defaultSchemaRegistryPort))
if err != nil {
return "", err
}
return fmt.Sprintf("http://%v", hostPort), nil
}

// getMappedHostPort returns the mapped host and port a given nat.Port following
// this format: "host:port". The mapped port is the port that is accessible from
// the host system and is remapped to the given container port.
func (c *Container) getMappedHostPort(ctx context.Context, port nat.Port) (string, error) {
hostIP, err := c.Host(ctx)
if err != nil {
return "", fmt.Errorf("failed to get hostIP: %w", err)
}

mappedPort, err := c.MappedPort(ctx, port)
if err != nil {
return "", fmt.Errorf("failed to get mapped port: %w", err)
}

return fmt.Sprintf("%v:%d", hostIP, mappedPort.Int()), nil
return c.PortEndpoint(ctx, nat.Port(defaultSchemaRegistryPort), c.urlScheme)
}

// createEntrypointTmpFile returns a temporary file with the custom entrypoint
Expand Down Expand Up @@ -247,14 +239,14 @@ func createBootstrapConfigFile(settings options) (*os.File, error) {
return nil, err
}

if err := os.WriteFile(bootstrapTmpFile.Name(), bootstrapConfig.Bytes(), 0o700); err != nil {
if err := os.WriteFile(bootstrapTmpFile.Name(), bootstrapConfig.Bytes(), 0o600); err != nil {
return nil, err
}

return bootstrapTmpFile, nil
}

// renderNodeConfig renders the redpanda.yaml node config and retuns it as
// renderNodeConfig renders the redpanda.yaml node config and returns it as
// byte array.
func renderNodeConfig(settings options, hostIP string, advertisedKafkaPort int) ([]byte, error) {
tplParams := redpandaConfigTplParams{
Expand All @@ -268,6 +260,7 @@ func renderNodeConfig(settings options, hostIP string, advertisedKafkaPort int)
SchemaRegistry: redpandaConfigTplParamsSchemaRegistry{
AuthenticationMethod: settings.SchemaRegistryAuthenticationMethod,
},
EnableTLS: settings.EnableTLS,
}

ncTpl, err := template.New("redpanda.yaml").Parse(nodeConfigTpl)
Expand All @@ -293,6 +286,7 @@ type redpandaConfigTplParams struct {
KafkaAPI redpandaConfigTplParamsKafkaAPI
SchemaRegistry redpandaConfigTplParamsSchemaRegistry
AutoCreateTopics bool
EnableTLS bool
}

type redpandaConfigTplParamsKafkaAPI struct {
Expand Down
134 changes: 131 additions & 3 deletions modules/redpanda/redpanda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package redpanda

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"net/http"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -47,7 +50,7 @@ func TestRedpanda(t *testing.T) {
httpCl := &http.Client{Timeout: 5 * time.Second}
schemaRegistryURL, err := container.SchemaRegistryAddress(ctx)
require.NoError(t, err)
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s/subjects", schemaRegistryURL), nil)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/subjects", schemaRegistryURL), nil)
require.NoError(t, err)
resp, err := httpCl.Do(req)
require.NoError(t, err)
Expand All @@ -59,7 +62,7 @@ func TestRedpanda(t *testing.T) {
adminAPIURL, err := container.AdminAPIAddress(ctx)
require.NoError(t, err)
// }
req, err = http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s/v1/cluster/health_overview", adminAPIURL), nil)
req, err = http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/v1/cluster/health_overview", adminAPIURL), nil)
require.NoError(t, err)
resp, err = httpCl.Do(req)
require.NoError(t, err)
Expand Down Expand Up @@ -163,7 +166,7 @@ func TestRedpandaWithAuthentication(t *testing.T) {
// }

// Failed authentication
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s/subjects", schemaRegistryURL), nil)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/subjects", schemaRegistryURL), nil)
require.NoError(t, err)
resp, err := httpCl.Do(req)
require.NoError(t, err)
Expand Down Expand Up @@ -205,3 +208,128 @@ func TestRedpandaProduceWithAutoCreateTopics(t *testing.T) {
results := kafkaCl.ProduceSync(ctx, &kgo.Record{Topic: "test", Value: []byte("test message")})
require.NoError(t, results.FirstErr())
}

func TestRedpandaWithEnableTLS(t *testing.T) {
cert, err := tls.X509KeyPair(localhostCert, localhostKey)
require.NoError(t, err, "failed to load key pair")

ctx := context.Background()

container, err := RunContainer(ctx, WithEnableTLS(localhostCert, localhostKey))
require.NoError(t, err)

t.Cleanup(func() {
if err := container.Terminate(ctx); err != nil {
t.Fatalf("failed to terminate container: %s", err)
}
})

caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(localhostCert)

tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}

httpCl := &http.Client{
Timeout: 5 * time.Second,
Transport: &http.Transport{
ForceAttemptHTTP2: true,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: tlsConfig,
},
}

// Test Admin API
adminAPIURL, err := container.AdminAPIAddress(ctx)
require.NoError(t, err)
require.True(t, strings.HasPrefix(adminAPIURL, "https://"), "AdminAPIAddress should return https url")
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/v1/cluster/health_overview", adminAPIURL), nil)
require.NoError(t, err)
resp, err := httpCl.Do(req)
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
resp.Body.Close()

// Test Schema Registry API
schemaRegistryURL, err := container.SchemaRegistryAddress(ctx)
require.NoError(t, err)
require.True(t, strings.HasPrefix(adminAPIURL, "https://"), "SchemaRegistryAddress should return https url")
req, err = http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/subjects", schemaRegistryURL), nil)
require.NoError(t, err)
resp, err = httpCl.Do(req)
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
resp.Body.Close()

brokers, err := container.KafkaSeedBroker(ctx)
require.NoError(t, err)

kafkaCl, err := kgo.NewClient(
kgo.SeedBrokers(brokers),
kgo.DialTLSConfig(tlsConfig),
)
require.NoError(t, err)
defer kafkaCl.Close()

// Test produce to unknown topic
results := kafkaCl.ProduceSync(ctx, &kgo.Record{Topic: "test", Value: []byte("test message")})
require.Error(t, results.FirstErr(), kerr.UnknownTopicOrPartition)
}

// localhostCert is a PEM-encoded TLS cert with SAN IPs
// generated from src/crypto/tls:
// go run generate_cert.go --rsa-bits 2048 --host 127.0.0.1,::1,localhost --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h
var localhostCert = []byte(`-----BEGIN CERTIFICATE-----
MIIDODCCAiCgAwIBAgIRAKMykg5qJSCb4L3WtcZznSQwDQYJKoZIhvcNAQELBQAw
EjEQMA4GA1UEChMHQWNtZSBDbzAgFw03MDAxMDEwMDAwMDBaGA8yMDg0MDEyOTE2
MDAwMFowEjEQMA4GA1UEChMHQWNtZSBDbzCCASIwDQYJKoZIhvcNAQEBBQADggEP
ADCCAQoCggEBAPYcLIhqCsrmqvsY1gWqI1jx3Ytn5Qjfvlg3BPD/YeD4UVouBhgQ
NIIERFCmDUzu52pXYZeCouBIVDWqZKixQf3PyBzAqbFvX0pTsZrOnvjuoahzjEcl
x+CfkIp58mVaV/8v9TyBYCXNuHlI7Pndu/3U5d6npSg8+dTkwW3VZzZyHpsDW+a4
ByW02NI58LoHzQPMRg9MFToL1qNQy4PFyADf2N/3/SYOkrbSrXA0jYqXE8yvQGYe
LWcoQ+4YkurSS1TgSNEKxrzGj8w4xRjEjRNsLVNWd8uxZkHwv6LXOn4s39ix3jN4
7OJJHA8fJAWxAP4ThrpM1j5J+Rq1PD380u8CAwEAAaOBhjCBgzAOBgNVHQ8BAf8E
BAMCAqQwEwYDVR0lBAwwCgYIKwYBBQUHAwEwDwYDVR0TAQH/BAUwAwEB/zAdBgNV
HQ4EFgQU8gMBt2leRAnGgCQ6pgIYPHY35GAwLAYDVR0RBCUwI4IJbG9jYWxob3N0
hwR/AAABhxAAAAAAAAAAAAAAAAAAAAABMA0GCSqGSIb3DQEBCwUAA4IBAQA5F6aw
6JJMsnCjxRGYXb252zqjxOxweawZ2je4UAGSsF27Phm1Bx6/2mzPpgIB0I7xNBFL
ljtqBG/FpH6qWpkkegljL8Z5soXiye/4r1G+V6hadm32/OLQCS//dyq7W1a2uVlS
KdFjoNqRW2PacVQLjnTbP2SJV5CnrJgCsSMXVoNnKdj5gr5ltNNAt9TAJ85iFa5d
rJla/XghtqEOzYtigKPF7EVqRRl4RmPu30hxwDZMT60ptFolfCEeXpDra5uonJMv
ElEbzK8ZzXmvWCj94RjPkGKZs8+SDM2qfKPk5ZW2xJxwqS3tkEkZlj1L+b7zYOlt
aJ65OWCXHLecrgdl
-----END CERTIFICATE-----`)

// localhostKey is the private key for localhostCert.
var localhostKey = []byte(testingKey(`-----BEGIN TESTING KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQD2HCyIagrK5qr7
GNYFqiNY8d2LZ+UI375YNwTw/2Hg+FFaLgYYEDSCBERQpg1M7udqV2GXgqLgSFQ1
qmSosUH9z8gcwKmxb19KU7Gazp747qGoc4xHJcfgn5CKefJlWlf/L/U8gWAlzbh5
SOz53bv91OXep6UoPPnU5MFt1Wc2ch6bA1vmuAcltNjSOfC6B80DzEYPTBU6C9aj
UMuDxcgA39jf9/0mDpK20q1wNI2KlxPMr0BmHi1nKEPuGJLq0ktU4EjRCsa8xo/M
OMUYxI0TbC1TVnfLsWZB8L+i1zp+LN/Ysd4zeOziSRwPHyQFsQD+E4a6TNY+Sfka
tTw9/NLvAgMBAAECggEBALKxAiSJ2gw4Lyzhe4PhZIjQE+uEI+etjKbAS/YvdwHB
SlAP2pzeJ0G/l1p3NnEFhUDQ8SrwzxHJclsEvNE+4otGsiUuPgd2tdlhqzKbkxFr
MjT8sH14EQgm0uu4Xyb30ayXRZgI16abF7X4HRfOxxAl5EElt+TfYQYSkd8Nc0Mz
bD7g0riSdOKVhNIkUTT1U7x8ClIgff6vbWztOVP4hGezqEKpO/8+JBkg2GLeH3lC
PyuHEb33Foxg7SX35M1a89EKC2p4ER6/nfg6wGYyIsn42gBk1JgQdg23x7c/0WOu
vcw1unNP2kCbnsCeZ6KPRRGXEjbpTqOTzAUOekOeOgECgYEA9/jwK2wrX2J3kJN7
v6kmxazigXHCa7XmFMgTfdqiQfXfjdi/4u+4KAX04jWra3ZH4KT98ztPOGjb6KhM
hfMldsxON8S9IQPcbDyj+5R77KU4BG/JQBEOX1uzS9KjMVG5e9ZUpG5UnSoSOgyM
oN3DZto7C5ULO2U2MT8JaoGb53cCgYEA/hPNMsCXFairxKy0BCsvJFan93+GIdwM
YoAGLc4Oj67ES8TYC4h9Im5i81JYOjpY4aZeKdj8S+ozmbqqa/iJiAfOr37xOMuX
AQA2T8uhPXXNXA5s6T3LaIXtzL0NmRRZCtuyEGdCidIXub7Bz8LrfsMc+s/jv57f
4IPmW12PPkkCgYBpEdDqBT5nfzh8SRGhR1IHZlbfVE12CDACVDh2FkK0QjNETjgY
N0zHoKZ/hxAoS4jvNdnoyxOpKj0r2sv54enY6X6nALTGnXUzY4p0GhlcTzFqJ9eV
TuTRIPDaytidGCzIvStGNP2jTmVEtXaM3wphtUxZfwCwXRVWToh12Y8uxwKBgA1a
FQp5vHbS6lPnj344lr2eIC2NcgsNeUkj2S9HCNTcJkylB4Vzor/QdTq8NQ66Sjlx
eLlSQc/retK1UIdkBDY10tK+JQcLC+Btlm0TEmIccrJHv8lyCeJwR1LfDHvi6dr8
OJtMEd8UP1Lvh1fXsnBy6G71xc4oFzPBOrXKcOChAoGACOgyYe47ZizScsUGjCC7
xARTEolZhtqHKVd5s9oi95P0r7A1gcNx/9YW0rCT2ZD8BD9H++HTE2L+mh3R9zDn
jwDeW7wVZec+oyGdc9L+B1xU25O+88zNLxlRAX8nXJbHdgL83UclmC51GbXejloP
D4ZNvyXf/6E27Ibu6v2p/vs=
-----END TESTING KEY-----`))

func testingKey(s string) string { return strings.ReplaceAll(s, "TESTING KEY", "PRIVATE KEY") }

0 comments on commit 0ce5b44

Please sign in to comment.