diff --git a/modules/redpanda/examples_test.go b/modules/redpanda/examples_test.go index e4d2b0a47c..21913f95ff 100644 --- a/modules/redpanda/examples_test.go +++ b/modules/redpanda/examples_test.go @@ -15,6 +15,7 @@ func ExampleRunContainer() { redpandaContainer, err := redpanda.RunContainer(ctx, redpanda.WithEnableSASL(), redpanda.WithEnableKafkaAuthorization(), + redpanda.WithEnableWasmTransform(), redpanda.WithNewServiceAccount("superuser-1", "test"), redpanda.WithNewServiceAccount("superuser-2", "test"), redpanda.WithNewServiceAccount("no-superuser", "test"), diff --git a/modules/redpanda/go.mod b/modules/redpanda/go.mod index 5f504c12c8..abfd703af8 100644 --- a/modules/redpanda/go.mod +++ b/modules/redpanda/go.mod @@ -8,6 +8,7 @@ require ( github.com/testcontainers/testcontainers-go v0.27.0 github.com/twmb/franz-go v1.15.4 github.com/twmb/franz-go/pkg/kadm v1.10.0 + golang.org/x/mod v0.14.0 ) require ( @@ -58,9 +59,8 @@ require ( go.opentelemetry.io/otel/trace v1.19.0 // indirect golang.org/x/crypto v0.17.0 // indirect golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect - golang.org/x/mod v0.11.0 // indirect golang.org/x/sys v0.16.0 // indirect - golang.org/x/tools v0.10.0 // indirect + golang.org/x/tools v0.13.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect google.golang.org/grpc v1.58.3 // indirect google.golang.org/protobuf v1.31.0 // indirect diff --git a/modules/redpanda/go.sum b/modules/redpanda/go.sum index 107aacb7a8..a459120595 100644 --- a/modules/redpanda/go.sum +++ b/modules/redpanda/go.sum @@ -3,6 +3,8 @@ dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= +github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0= +github.com/Masterminds/semver/v3 v3.2.1/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ= github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow= github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM= github.com/Microsoft/hcsshim v0.11.4 h1:68vKo2VN8DE9AdN4tnkWnmdhqdbpUFM8OF3Airm7fz8= @@ -137,6 +139,8 @@ golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU= golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= +golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -168,6 +172,8 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.10.0 h1:tvDr/iQoUqNdohiYm0LmmKcBk+q86lb9EprIUFhHHGg= golang.org/x/tools v0.10.0/go.mod h1:UJwyiVBsOA2uwvK/e5OY3GTpDUJriEd+/YlqAwLPmyM= +golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ= +golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/modules/redpanda/mounts/bootstrap.yaml.tpl b/modules/redpanda/mounts/bootstrap.yaml.tpl index 78c7053a27..643236bbef 100644 --- a/modules/redpanda/mounts/bootstrap.yaml.tpl +++ b/modules/redpanda/mounts/bootstrap.yaml.tpl @@ -15,6 +15,10 @@ superusers: kafka_enable_authorization: true {{- end }} +{{- if .EnableWasmTransform }} +data_transforms_enabled: true +{{- end }} + {{- if .AutoCreateTopics }} auto_create_topics_enabled: true {{- end }} diff --git a/modules/redpanda/options.go b/modules/redpanda/options.go index 1d4afcf8af..4340be30d8 100644 --- a/modules/redpanda/options.go +++ b/modules/redpanda/options.go @@ -22,6 +22,9 @@ type options struct { // or "http_basic" for HTTP basic authentication. SchemaRegistryAuthenticationMethod string + // EnableWasmTransform is a flag to enable wasm transform. + EnableWasmTransform bool + // ServiceAccounts is a map of username (key) to password (value) of users // that shall be created, so that you can use these to authenticate against // Redpanda (either for the Kafka API or Schema Registry HTTP access). @@ -97,6 +100,14 @@ func WithEnableKafkaAuthorization() Option { } } +// WithEnableWasmTransform enables wasm transform. +// Should not be used with RP versions before 23.3 +func WithEnableWasmTransform() Option { + return func(o *options) { + o.EnableWasmTransform = true + } +} + // WithEnableSchemaRegistryHTTPBasicAuth enables HTTP basic authentication for // Schema Registry. func WithEnableSchemaRegistryHTTPBasicAuth() Option { diff --git a/modules/redpanda/redpanda.go b/modules/redpanda/redpanda.go index 9160524226..6213809d4b 100644 --- a/modules/redpanda/redpanda.go +++ b/modules/redpanda/redpanda.go @@ -11,10 +11,12 @@ import ( "net/http" "os" "path/filepath" + "strings" "text/template" "time" "github.com/docker/go-connections/nat" + "golang.org/x/mod/semver" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" @@ -62,7 +64,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize // Some (e.g. Image) may be overridden by providing an option argument to this function. req := testcontainers.GenericContainerRequest{ ContainerRequest: testcontainers.ContainerRequest{ - Image: "docker.redpanda.com/redpandadata/redpanda:v23.1.7", + Image: "docker.redpanda.com/redpandadata/redpanda:v23.3.3", User: "root:root", // Files: Will be added later after we've rendered our YAML templates. ExposedPorts: []string{ @@ -92,6 +94,11 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize opt.Customize(&req) } + // 2.1. If the image is not at least v23.3, disable wasm transform + if !isAtLeastVersion(req.ContainerRequest.Image, "23.3") { + settings.EnableWasmTransform = false + } + // 3. Create temporary entrypoint file. We need a custom entrypoint that waits // until the actual Redpanda node config is mounted. Once the redpanda config is // mounted we will call the original entrypoint with the same parameters. @@ -267,6 +274,7 @@ func renderBootstrapConfig(settings options) ([]byte, error) { Superusers: settings.Superusers, KafkaAPIEnableAuthorization: settings.KafkaEnableAuthorization, AutoCreateTopics: settings.AutoCreateTopics, + EnableWasmTransform: settings.EnableWasmTransform, } tpl, err := template.New("bootstrap.yaml").Parse(bootstrapConfigTpl) @@ -340,6 +348,7 @@ type redpandaBootstrapConfigTplParams struct { Superusers []string KafkaAPIEnableAuthorization bool AutoCreateTopics bool + EnableWasmTransform bool } type redpandaConfigTplParams struct { @@ -366,3 +375,23 @@ type listener struct { Port int AuthenticationMethod string } + +// isAtLeastVersion returns true if the base image (without tag) is in a version or above +func isAtLeastVersion(image, major string) bool { + parts := strings.Split(image, ":") + version := parts[len(parts)-1] + + if version == "latest" { + return true + } + + if !strings.HasPrefix(version, "v") { + version = fmt.Sprintf("v%s", version) + } + + if semver.IsValid(version) { + return semver.Compare(version, fmt.Sprintf("v%s", major)) >= 0 // version >= v8.x + } + + return false +} diff --git a/modules/redpanda/redpanda_test.go b/modules/redpanda/redpanda_test.go index 668038196d..0a28940779 100644 --- a/modules/redpanda/redpanda_test.go +++ b/modules/redpanda/redpanda_test.go @@ -85,6 +85,119 @@ func TestRedpandaWithAuthentication(t *testing.T) { container, err := RunContainer(ctx, WithEnableSASL(), WithEnableKafkaAuthorization(), + WithEnableWasmTransform(), + WithNewServiceAccount("superuser-1", "test"), + WithNewServiceAccount("superuser-2", "test"), + WithNewServiceAccount("no-superuser", "test"), + WithSuperusers("superuser-1", "superuser-2"), + WithEnableSchemaRegistryHTTPBasicAuth(), + ) + require.NoError(t, err) + // } + + // Clean up the container after the test is complete + t.Cleanup(func() { + if err := container.Terminate(ctx); err != nil { + t.Fatalf("failed to terminate container: %s", err) + } + }) + + // kafkaSeedBroker { + seedBroker, err := container.KafkaSeedBroker(ctx) + // } + require.NoError(t, err) + + // Test successful authentication & authorization with all created superusers + serviceAccounts := map[string]string{ + "superuser-1": "test", + "superuser-2": "test", + } + + for user, password := range serviceAccounts { + kafkaCl, err := kgo.NewClient( + kgo.SeedBrokers(seedBroker), + kgo.SASL(scram.Auth{ + User: user, + Pass: password, + }.AsSha256Mechanism()), + ) + require.NoError(t, err) + + kafkaAdmCl := kadm.NewClient(kafkaCl) + _, err = kafkaAdmCl.CreateTopic(ctx, 1, 1, nil, fmt.Sprintf("test-%v", user)) + require.NoError(t, err) + kafkaCl.Close() + } + + // Test successful authentication, but failed authorization with a non-superuser account + { + kafkaCl, err := kgo.NewClient( + kgo.SeedBrokers(seedBroker), + kgo.SASL(scram.Auth{ + User: "no-superuser", + Pass: "test", + }.AsSha256Mechanism()), + ) + require.NoError(t, err) + + kafkaAdmCl := kadm.NewClient(kafkaCl) + _, err = kafkaAdmCl.CreateTopic(ctx, 1, 1, nil, "test-2") + require.Error(t, err) + require.ErrorContains(t, err, "TOPIC_AUTHORIZATION_FAILED") + kafkaCl.Close() + } + + // Test failed authentication + { + kafkaCl, err := kgo.NewClient( + kgo.SeedBrokers(seedBroker), + kgo.SASL(scram.Auth{ + User: "wrong", + Pass: "wrong", + }.AsSha256Mechanism()), + ) + require.NoError(t, err) + + kafkaAdmCl := kadm.NewClient(kafkaCl) + _, err = kafkaAdmCl.Metadata(ctx) + require.Error(t, err) + require.ErrorContains(t, err, "SASL_AUTHENTICATION_FAILED") + } + + // Test Schema Registry API + httpCl := &http.Client{Timeout: 5 * time.Second} + // schemaRegistryAddress { + schemaRegistryURL, err := container.SchemaRegistryAddress(ctx) + // } + require.NoError(t, err) + + // Failed authentication + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/subjects", schemaRegistryURL), nil) + require.NoError(t, err) + resp, err := httpCl.Do(req) + require.NoError(t, err) + assert.Equal(t, http.StatusUnauthorized, resp.StatusCode) + resp.Body.Close() + + // Successful authentication + for user, password := range serviceAccounts { + req.SetBasicAuth(user, password) + resp, err = httpCl.Do(req) + require.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + resp.Body.Close() + } +} + +func TestRedpandaWithOldVersionAndWasm(t *testing.T) { + ctx := context.Background() + // redpandaCreateContainer { + // this would fail to start if we weren't ignoring wasm transforms for older versions + container, err := RunContainer(ctx, + testcontainers.WithImage("redpandadata/redpanda:v23.2.18"), + WithEnableSASL(), + WithEnableKafkaAuthorization(), + WithEnableWasmTransform(), WithNewServiceAccount("superuser-1", "test"), WithNewServiceAccount("superuser-2", "test"), WithNewServiceAccount("no-superuser", "test"), @@ -512,3 +625,52 @@ D4ZNvyXf/6E27Ibu6v2p/vs= -----END TESTING KEY-----`)) func testingKey(s string) string { return strings.ReplaceAll(s, "TESTING KEY", "PRIVATE KEY") } + +func Test_isAtLeastVersion(t *testing.T) { + type args struct { + image string + major string + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "v21.5.6", + args: args{ + image: "redpandadata/redpanda:v21.5.6", + major: "23.3", + }, + want: false, + }, + { + name: "v23.3.3", + args: args{ + image: "redpandadata/redpanda:v23.3.3", + major: "23.3", + }, + want: true, + }, + { + name: "v23.3.3-rc1", + args: args{ + image: "redpandadata/redpanda:v23.3.3-rc1", + major: "23.3", + }, + want: true, + }, + { + name: "v21.3.3-rc1", + args: args{ + image: "redpandadata/redpanda:v21.3.3-rc1", + major: "23.3", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, isAtLeastVersion(tt.args.image, tt.args.major), "isAtLeastVersion(%v, %v)", tt.args.image, tt.args.major) + }) + } +}