Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support for WASM Transforms to Redpanda Module #2170

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions modules/redpanda/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func ExampleRunContainer() {
redpandaContainer, err := redpanda.RunContainer(ctx,
redpanda.WithEnableSASL(),
redpanda.WithEnableKafkaAuthorization(),
redpanda.WithEnableWasmTransform(),
redpanda.WithNewServiceAccount("superuser-1", "test"),
redpanda.WithNewServiceAccount("superuser-2", "test"),
redpanda.WithNewServiceAccount("no-superuser", "test"),
Expand Down
4 changes: 2 additions & 2 deletions modules/redpanda/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/testcontainers/testcontainers-go v0.27.0
github.com/twmb/franz-go v1.15.4
github.com/twmb/franz-go/pkg/kadm v1.10.0
golang.org/x/mod v0.14.0
)

require (
Expand Down Expand Up @@ -58,9 +59,8 @@ require (
go.opentelemetry.io/otel/trace v1.19.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect
golang.org/x/mod v0.11.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/tools v0.10.0 // indirect
golang.org/x/tools v0.13.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/grpc v1.58.3 // indirect
google.golang.org/protobuf v1.31.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions modules/redpanda/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU=
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0=
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0=
github.com/Masterminds/semver/v3 v3.2.1/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ=
github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow=
github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM=
github.com/Microsoft/hcsshim v0.11.4 h1:68vKo2VN8DE9AdN4tnkWnmdhqdbpUFM8OF3Airm7fz8=
Expand Down Expand Up @@ -137,6 +139,8 @@ golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU=
golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0=
golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
Expand Down Expand Up @@ -168,6 +172,8 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.10.0 h1:tvDr/iQoUqNdohiYm0LmmKcBk+q86lb9EprIUFhHHGg=
golang.org/x/tools v0.10.0/go.mod h1:UJwyiVBsOA2uwvK/e5OY3GTpDUJriEd+/YlqAwLPmyM=
golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
4 changes: 4 additions & 0 deletions modules/redpanda/mounts/bootstrap.yaml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ superusers:
kafka_enable_authorization: true
{{- end }}

{{- if .EnableWasmTransform }}
data_transforms_enabled: true
{{- end }}

{{- if .AutoCreateTopics }}
auto_create_topics_enabled: true
{{- end }}
11 changes: 11 additions & 0 deletions modules/redpanda/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type options struct {
// or "http_basic" for HTTP basic authentication.
SchemaRegistryAuthenticationMethod string

// EnableWasmTransform is a flag to enable wasm transform.
EnableWasmTransform bool

// ServiceAccounts is a map of username (key) to password (value) of users
// that shall be created, so that you can use these to authenticate against
// Redpanda (either for the Kafka API or Schema Registry HTTP access).
Expand Down Expand Up @@ -97,6 +100,14 @@ func WithEnableKafkaAuthorization() Option {
}
}

// WithEnableWasmTransform enables wasm transform.
// Should not be used with RP versions before 23.3
func WithEnableWasmTransform() Option {
return func(o *options) {
o.EnableWasmTransform = true
}
}

// WithEnableSchemaRegistryHTTPBasicAuth enables HTTP basic authentication for
// Schema Registry.
func WithEnableSchemaRegistryHTTPBasicAuth() Option {
Expand Down
31 changes: 30 additions & 1 deletion modules/redpanda/redpanda.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import (
"net/http"
"os"
"path/filepath"
"strings"
"text/template"
"time"

"github.com/docker/go-connections/nat"
"golang.org/x/mod/semver"

"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
Expand Down Expand Up @@ -62,7 +64,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
// Some (e.g. Image) may be overridden by providing an option argument to this function.
req := testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "docker.redpanda.com/redpandadata/redpanda:v23.1.7",
Image: "docker.redpanda.com/redpandadata/redpanda:v23.3.3",
User: "root:root",
// Files: Will be added later after we've rendered our YAML templates.
ExposedPorts: []string{
Expand Down Expand Up @@ -92,6 +94,11 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
opt.Customize(&req)
}

// 2.1. If the image is not at least v23.3, disable wasm transform
if !isAtLeastVersion(req.ContainerRequest.Image, "23.3") {
settings.EnableWasmTransform = false
}

// 3. Create temporary entrypoint file. We need a custom entrypoint that waits
// until the actual Redpanda node config is mounted. Once the redpanda config is
// mounted we will call the original entrypoint with the same parameters.
Expand Down Expand Up @@ -267,6 +274,7 @@ func renderBootstrapConfig(settings options) ([]byte, error) {
Superusers: settings.Superusers,
KafkaAPIEnableAuthorization: settings.KafkaEnableAuthorization,
AutoCreateTopics: settings.AutoCreateTopics,
EnableWasmTransform: settings.EnableWasmTransform,
}

tpl, err := template.New("bootstrap.yaml").Parse(bootstrapConfigTpl)
Expand Down Expand Up @@ -340,6 +348,7 @@ type redpandaBootstrapConfigTplParams struct {
Superusers []string
KafkaAPIEnableAuthorization bool
AutoCreateTopics bool
EnableWasmTransform bool
}

type redpandaConfigTplParams struct {
Expand All @@ -366,3 +375,23 @@ type listener struct {
Port int
AuthenticationMethod string
}

// isAtLeastVersion returns true if the base image (without tag) is in a version or above
func isAtLeastVersion(image, major string) bool {
parts := strings.Split(image, ":")
version := parts[len(parts)-1]

if version == "latest" {
return true
}

if !strings.HasPrefix(version, "v") {
version = fmt.Sprintf("v%s", version)
}

if semver.IsValid(version) {
return semver.Compare(version, fmt.Sprintf("v%s", major)) >= 0 // version >= v8.x
}

return false
}
162 changes: 162 additions & 0 deletions modules/redpanda/redpanda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,119 @@ func TestRedpandaWithAuthentication(t *testing.T) {
container, err := RunContainer(ctx,
WithEnableSASL(),
WithEnableKafkaAuthorization(),
WithEnableWasmTransform(),
WithNewServiceAccount("superuser-1", "test"),
WithNewServiceAccount("superuser-2", "test"),
WithNewServiceAccount("no-superuser", "test"),
WithSuperusers("superuser-1", "superuser-2"),
WithEnableSchemaRegistryHTTPBasicAuth(),
)
require.NoError(t, err)
// }

// Clean up the container after the test is complete
t.Cleanup(func() {
if err := container.Terminate(ctx); err != nil {
t.Fatalf("failed to terminate container: %s", err)
}
})

// kafkaSeedBroker {
seedBroker, err := container.KafkaSeedBroker(ctx)
// }
require.NoError(t, err)

// Test successful authentication & authorization with all created superusers
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
serviceAccounts := map[string]string{
"superuser-1": "test",
"superuser-2": "test",
}

for user, password := range serviceAccounts {
kafkaCl, err := kgo.NewClient(
kgo.SeedBrokers(seedBroker),
kgo.SASL(scram.Auth{
User: user,
Pass: password,
}.AsSha256Mechanism()),
)
require.NoError(t, err)

kafkaAdmCl := kadm.NewClient(kafkaCl)
_, err = kafkaAdmCl.CreateTopic(ctx, 1, 1, nil, fmt.Sprintf("test-%v", user))
require.NoError(t, err)
kafkaCl.Close()
}

// Test successful authentication, but failed authorization with a non-superuser account
{
kafkaCl, err := kgo.NewClient(
kgo.SeedBrokers(seedBroker),
kgo.SASL(scram.Auth{
User: "no-superuser",
Pass: "test",
}.AsSha256Mechanism()),
)
require.NoError(t, err)

kafkaAdmCl := kadm.NewClient(kafkaCl)
_, err = kafkaAdmCl.CreateTopic(ctx, 1, 1, nil, "test-2")
require.Error(t, err)
require.ErrorContains(t, err, "TOPIC_AUTHORIZATION_FAILED")
kafkaCl.Close()
}

// Test failed authentication
{
kafkaCl, err := kgo.NewClient(
kgo.SeedBrokers(seedBroker),
kgo.SASL(scram.Auth{
User: "wrong",
Pass: "wrong",
}.AsSha256Mechanism()),
)
require.NoError(t, err)

kafkaAdmCl := kadm.NewClient(kafkaCl)
_, err = kafkaAdmCl.Metadata(ctx)
require.Error(t, err)
require.ErrorContains(t, err, "SASL_AUTHENTICATION_FAILED")
}

// Test Schema Registry API
httpCl := &http.Client{Timeout: 5 * time.Second}
// schemaRegistryAddress {
schemaRegistryURL, err := container.SchemaRegistryAddress(ctx)
// }
require.NoError(t, err)

// Failed authentication
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/subjects", schemaRegistryURL), nil)
require.NoError(t, err)
resp, err := httpCl.Do(req)
require.NoError(t, err)
assert.Equal(t, http.StatusUnauthorized, resp.StatusCode)
resp.Body.Close()

// Successful authentication
for user, password := range serviceAccounts {
req.SetBasicAuth(user, password)
resp, err = httpCl.Do(req)
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
resp.Body.Close()
}
}

func TestRedpandaWithOldVersionAndWasm(t *testing.T) {
ctx := context.Background()
// redpandaCreateContainer {
// this would fail to start if we weren't ignoring wasm transforms for older versions
container, err := RunContainer(ctx,
testcontainers.WithImage("redpandadata/redpanda:v23.2.18"),
WithEnableSASL(),
WithEnableKafkaAuthorization(),
WithEnableWasmTransform(),
WithNewServiceAccount("superuser-1", "test"),
WithNewServiceAccount("superuser-2", "test"),
WithNewServiceAccount("no-superuser", "test"),
Expand Down Expand Up @@ -512,3 +625,52 @@ D4ZNvyXf/6E27Ibu6v2p/vs=
-----END TESTING KEY-----`))

func testingKey(s string) string { return strings.ReplaceAll(s, "TESTING KEY", "PRIVATE KEY") }

func Test_isAtLeastVersion(t *testing.T) {
type args struct {
image string
major string
}
tests := []struct {
name string
args args
want bool
}{
{
name: "v21.5.6",
args: args{
image: "redpandadata/redpanda:v21.5.6",
major: "23.3",
},
want: false,
},
{
name: "v23.3.3",
args: args{
image: "redpandadata/redpanda:v23.3.3",
major: "23.3",
},
want: true,
},
{
name: "v23.3.3-rc1",
args: args{
image: "redpandadata/redpanda:v23.3.3-rc1",
major: "23.3",
},
want: true,
},
{
name: "v21.3.3-rc1",
args: args{
image: "redpandadata/redpanda:v21.3.3-rc1",
major: "23.3",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, isAtLeastVersion(tt.args.image, tt.args.major), "isAtLeastVersion(%v, %v)", tt.args.image, tt.args.major)
})
}
}
Loading