diff --git a/Makefile b/Makefile index 00f4c96155..232a691f92 100644 --- a/Makefile +++ b/Makefile @@ -44,6 +44,7 @@ $(PATHINSTBIN)/%: $(SOURCE_FILES) @go build $(GO_FLAGS) -tags "$(TAGS)" -ldflags "$(LD_FLAGS) $(VER_FLAGS)" -o $@ ./cmd/$* $(APPS): %: $(PATHINSTBIN)/% + go generate ./internal/impl/redpanda # TOOLS = redpanda-docs TODO # tools: $(TOOLS) diff --git a/docs/modules/components/pages/processors/redpanda_data_transform.adoc b/docs/modules/components/pages/processors/redpanda_data_transform.adoc new file mode 100644 index 0000000000..858c81865d --- /dev/null +++ b/docs/modules/components/pages/processors/redpanda_data_transform.adoc @@ -0,0 +1,241 @@ += redpanda_data_transform +:type: processor +:status: experimental +:categories: ["Redpanda"] + + + +//// + THIS FILE IS AUTOGENERATED! + + To make changes, edit the corresponding source file under: + + https://github.com/redpanda-data/connect/tree/main/internal/impl/. + + And: + + https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl +//// + + +component_type_dropdown::[] + + +Executes a Redpanda Data Transform as a processor + +Introduced in version 0.1.0. + + +[tabs] +====== +Common:: ++ +-- + +```yml +# Common config fields, showing default values +label: "" +redpanda_data_transform: + module_path: "" # No default (required) + input_key: "" # No default (optional) + output_key: "" # No default (optional) + input_headers: + include_prefixes: [] + include_patterns: [] + output_metadata: + include_prefixes: [] + include_patterns: [] +``` + +-- +Advanced:: ++ +-- + +```yml +# All config fields, showing default values +label: "" +redpanda_data_transform: + module_path: "" # No default (required) + input_key: "" # No default (optional) + output_key: "" # No default (optional) + input_headers: + include_prefixes: [] + include_patterns: [] + output_metadata: + include_prefixes: [] + include_patterns: [] + timestamp: ${! timestamp_unix() } # No default (optional) + timeout: 10s + max_memory_pages: 1600 +``` + +-- +====== + +This processor executes a Redpanda Data Transform WebAssembly module, calling OnRecordWritten for each message being processed. + + +== Fields + +=== `module_path` + +The path of the target WASM module to execute. + + +*Type*: `string` + + +=== `input_key` + +An optional key to populate for each message. +This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. + + +*Type*: `string` + + +=== `output_key` + +An optional name of metadata for an output message key. + + +*Type*: `string` + + +=== `input_headers` + +Determine which (if any) metadata values should be added to messages as headers. + + +*Type*: `object` + + +=== `input_headers.include_prefixes` + +Provide a list of explicit metadata key prefixes to match against. + + +*Type*: `array` + +*Default*: `[]` + +```yml +# Examples + +include_prefixes: + - foo_ + - bar_ + +include_prefixes: + - kafka_ + +include_prefixes: + - content- +``` + +=== `input_headers.include_patterns` + +Provide a list of explicit metadata key regular expression (re2) patterns to match against. + + +*Type*: `array` + +*Default*: `[]` + +```yml +# Examples + +include_patterns: + - .* + +include_patterns: + - _timestamp_unix$ +``` + +=== `output_metadata` + +Determine which (if any) message headers should be added to the output as metadata. + + +*Type*: `object` + + +=== `output_metadata.include_prefixes` + +Provide a list of explicit metadata key prefixes to match against. + + +*Type*: `array` + +*Default*: `[]` + +```yml +# Examples + +include_prefixes: + - foo_ + - bar_ + +include_prefixes: + - kafka_ + +include_prefixes: + - content- +``` + +=== `output_metadata.include_patterns` + +Provide a list of explicit metadata key regular expression (re2) patterns to match against. + + +*Type*: `array` + +*Default*: `[]` + +```yml +# Examples + +include_patterns: + - .* + +include_patterns: + - _timestamp_unix$ +``` + +=== `timestamp` + +An optional timestamp to set for each message. When left empty, the current timestamp is used. +This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. + + +*Type*: `string` + + +```yml +# Examples + +timestamp: ${! timestamp_unix() } + +timestamp: ${! metadata("kafka_timestamp_unix") } +``` + +=== `timeout` + +The maximum period of time for a message to be processed + + +*Type*: `string` + +*Default*: `"10s"` + +=== `max_memory_pages` + +The maximum amount of wasm memory pages (64KiB) that an individual wasm module instance can use + + +*Type*: `int` + +*Default*: `1600` + + diff --git a/internal/impl/redpanda/.gitignore b/internal/impl/redpanda/.gitignore new file mode 100644 index 0000000000..917660a348 --- /dev/null +++ b/internal/impl/redpanda/.gitignore @@ -0,0 +1 @@ +*.wasm \ No newline at end of file diff --git a/internal/impl/redpanda/functions.go b/internal/impl/redpanda/functions.go new file mode 100644 index 0000000000..5f2e3b4454 --- /dev/null +++ b/internal/impl/redpanda/functions.go @@ -0,0 +1,156 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package redpanda + +import ( + "context" + + "github.com/tetratelabs/wazero/api" +) + +const ( + noActiveTransform = int32(-1) + invalidBuffer = int32(-2) +) + +var transformHostFunctions = map[string]func(r *dataTransformEngine) any{} + +func registerModuleRunnerFunction(name string, ctor func(r *dataTransformEngine) any) struct{} { + transformHostFunctions[name] = ctor + return struct{}{} +} + +var _ = registerModuleRunnerFunction("check_abi_version_1", func(r *dataTransformEngine) any { + return func(ctx context.Context, m api.Module) { + // Placeholder for ABI compatibility check + } +}) +var _ = registerModuleRunnerFunction("check_abi_version_2", func(r *dataTransformEngine) any { + return func(ctx context.Context, m api.Module) { + // Placeholder for ABI compatibility check + } +}) + +var _ = registerModuleRunnerFunction("read_batch_header", func(r *dataTransformEngine) any { + return func( + ctx context.Context, + m api.Module, + baseOffset, + recordCount, + partitionLeaderEpoch, + attributes, + lastOffsetDelta, + baseTimestamp, + maxTimestamp, + producerId, + producerEpoch, + baseSequence uint32) int32 { + // Notify the host we're done processing a batch. + r.hostChan <- nil + // Wait for new batch to be submitted for processing. + select { + case _, ok := <-r.guestChan: + if !ok { + return noActiveTransform + } + case <-ctx.Done(): + return noActiveTransform + } + if !m.Memory().WriteUint32Le(recordCount, uint32(len(r.inputBatch))) { + return invalidBuffer + } + longest := 0 + for _, msg := range r.inputBatch { + longest = max(longest, msg.maxSize()) + } + // We should write dummy values in the other fields, but they are + // currently unused by SDKs. + return int32(longest) + } +}) + +var _ = registerModuleRunnerFunction("read_next_record", func(r *dataTransformEngine) any { + return func(ctx context.Context, m api.Module, attributes, timestamp, offset, dataPtr, dataLen uint32) int32 { + if r.targetIndex >= len(r.inputBatch) { + return noActiveTransform + } + mem := m.Memory() + msg := r.inputBatch[r.targetIndex] + if !mem.WriteByte(attributes, 0) { + return invalidBuffer + } + if !mem.WriteUint64Le(timestamp, uint64(msg.timestamp)) { + return invalidBuffer + } + if !mem.WriteUint64Le(timestamp, uint64(msg.offset)) { + return invalidBuffer + } + data, ok := mem.Read(dataPtr, dataLen) + if !ok { + return invalidBuffer + } + n := msg.serialize(data) + if n < 0 { + return invalidBuffer + } + r.targetIndex += 1 + return int32(n) + } +}) + +var _ = registerModuleRunnerFunction("write_record", func(r *dataTransformEngine) any { + return func(ctx context.Context, m api.Module, dataPtr, dataLen uint32) int32 { + buf, ok := m.Memory().Read(dataPtr, dataLen) + if !ok { + return invalidBuffer + } + var tmsg transformMessage + _, err := tmsg.deserialize(buf) + if err != nil { + return invalidBuffer + } + smsg, err := r.convertTransformMessage(tmsg) + if err != nil { + return invalidBuffer + } + r.outputBatch = append(r.outputBatch, smsg) + return int32(len(buf)) + } +}) + +var _ = registerModuleRunnerFunction("write_record_with_options", func(r *dataTransformEngine) any { + return func(ctx context.Context, m api.Module, dataPtr, dataLen, optsPtr, optsLen uint32) int32 { + dataBuf, ok := m.Memory().Read(dataPtr, dataLen) + if !ok { + return invalidBuffer + } + var tmsg transformMessage + _, err := tmsg.deserialize(dataBuf) + if err != nil { + return invalidBuffer + } + optsBuf, ok := m.Memory().Read(dataPtr, dataLen) + if !ok { + return invalidBuffer + } + var opts transformWriteOptions + _, err = opts.deserialize(optsBuf) + if err != nil { + return invalidBuffer + } + tmsg.outputTopic = &opts.topic + return int32(len(dataBuf)) + } +}) diff --git a/internal/impl/redpanda/processor_data_transform.go b/internal/impl/redpanda/processor_data_transform.go new file mode 100644 index 0000000000..2d5824a80d --- /dev/null +++ b/internal/impl/redpanda/processor_data_transform.go @@ -0,0 +1,441 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package redpanda + +import ( + "context" + "errors" + "fmt" + "io" + "strconv" + "sync" + "time" + + "github.com/dustin/go-humanize" + "github.com/tetratelabs/wazero" + "github.com/tetratelabs/wazero/api" + "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" + "github.com/tetratelabs/wazero/sys" + + "github.com/redpanda-data/benthos/v4/public/service" +) + +const ( + dtpFieldModulePath = "module_path" + dtpFieldInputKey = "input_key" + dtpFieldOutputKey = "output_key" + dtpFieldInputHeaders = "input_headers" + dtpFieldOutputMetadata = "output_metadata" + dtpFieldTimestamp = "timestamp" + dtpFieldTimeout = "timeout" + dtpFieldMaxMemoryPages = "max_memory_pages" + wasmPageSize = 64 * humanize.KiByte + dtpDefaultMaxMemory = 100 * humanize.MiByte +) + +func dataTransformProcessorConfig() *service.ConfigSpec { + return service.NewConfigSpec(). + Categories("Utility"). + Summary("Executes a Redpanda Data Transform as a processor"). + Description(` +This processor executes a Redpanda Data Transform WebAssembly module, calling OnRecordWritten for each message being processed. +`). + Field(service.NewStringField(dtpFieldModulePath). + Description("The path of the target WASM module to execute.")). + Field(service.NewInterpolatedStringField(dtpFieldInputKey). + Description("An optional key to populate for each message.").Optional()). + Field(service.NewStringField(dtpFieldOutputKey). + Description("An optional name of metadata for an output message key.").Optional()). + Field(service.NewMetadataFilterField(dtpFieldInputHeaders). + Description("Determine which (if any) metadata values should be added to messages as headers."). + Optional()). + Field(service.NewMetadataFilterField(dtpFieldOutputMetadata). + Description("Determine which (if any) message headers should be added to the output as metadata."). + Optional()). + Field(service.NewInterpolatedStringField(dtpFieldTimestamp). + Description("An optional timestamp to set for each message. When left empty, the current timestamp is used."). + Example(`${! timestamp_unix() }`). + Example(`${! metadata("kafka_timestamp_unix") }`). + Optional(). + Advanced()). + Field(service.NewDurationField(dtpFieldTimeout). + Description("The maximum period of time for a message to be processed"). + Default("10s"). + Advanced()). + Field(service.NewIntField(dtpFieldMaxMemoryPages). + Description("The maximum amount of wasm memory pages (64KiB) that an individual wasm module instance can use"). + Default(dtpDefaultMaxMemory / wasmPageSize). + Advanced()). + Version("0.1.0") +} + +func init() { + err := service.RegisterBatchProcessor( + "redpanda_data_transform", dataTransformProcessorConfig(), + func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchProcessor, error) { + return newDataTransformProcessorFromConfig(conf, mgr) + }) + if err != nil { + panic(err) + } +} + +//------------------------------------------------------------------------------ + +type dataTransformConfig struct { + inputKey *service.InterpolatedString + outputKeyField *string + timestamp *service.InterpolatedString + inputMetadata *service.MetadataFilter + outputMetadata *service.MetadataFilter + + timeout time.Duration + maxMemoryPages int +} + +//------------------------------------------------------------------------------ + +type dataTransformEnginePool struct { + log *service.Logger + wasmBinary wazero.CompiledModule + runtimeConfig wazero.RuntimeConfig + modulePool sync.Pool + cfg dataTransformConfig +} + +func newDataTransformProcessorFromConfig(conf *service.ParsedConfig, mgr *service.Resources) (*dataTransformEnginePool, error) { + pathStr, err := conf.FieldString(dtpFieldModulePath) + if err != nil { + return nil, err + } + + file, err := mgr.FS().Open(pathStr) + if err != nil { + return nil, err + } + fileBytes, err := io.ReadAll(file) + if err != nil { + return nil, err + } + + var cfg dataTransformConfig + + if conf.Contains(dtpFieldInputKey) { + inputKey, err := conf.FieldInterpolatedString(dtpFieldInputKey) + if err != nil { + return nil, err + } + cfg.inputKey = inputKey + } + + if conf.Contains(dtpFieldOutputKey) { + inputKey, err := conf.FieldString(dtpFieldOutputKey) + if err != nil { + return nil, err + } + cfg.outputKeyField = &inputKey + } + + if conf.Contains(dtpFieldInputHeaders) { + inputMetadata, err := conf.FieldMetadataFilter(dtpFieldInputHeaders) + if err != nil { + return nil, err + } + cfg.inputMetadata = inputMetadata + } + + if conf.Contains(dtpFieldOutputMetadata) { + outputMetadata, err := conf.FieldMetadataFilter(dtpFieldOutputMetadata) + if err != nil { + return nil, err + } + cfg.outputMetadata = outputMetadata + } + + if conf.Contains(dtpFieldTimestamp) { + ts, err := conf.FieldInterpolatedString(dtpFieldTimestamp) + if err != nil { + return nil, err + } + cfg.timestamp = ts + } + + timeout, err := conf.FieldDuration(dtpFieldTimestamp) + if err != nil { + return nil, err + } + cfg.timeout = timeout + + maxMemoryPages, err := conf.FieldInt(dtpFieldMaxMemoryPages) + if err != nil { + return nil, err + } + cfg.maxMemoryPages = maxMemoryPages + + return newDataTransformProcessor(fileBytes, cfg, mgr) +} + +func newDataTransformProcessor(wasmBinary []byte, cfg dataTransformConfig, mgr *service.Resources) (*dataTransformEnginePool, error) { + ctx := context.Background() + runtimeCfg := wazero.NewRuntimeConfig(). + WithCloseOnContextDone(true). + WithCompilationCache(wazero.NewCompilationCache()). + WithMemoryLimitPages(uint32(cfg.maxMemoryPages)) + r := wazero.NewRuntimeWithConfig(ctx, runtimeCfg) + cm, err := r.CompileModule(ctx, wasmBinary) + if err != nil { + // Still cleanup but ignore errors as it would mask the compilation failure + _ = r.Close(ctx) + return nil, err + } + err = r.Close(ctx) + if err != nil { + return nil, err + } + // TODO: Validate more ABI contract than just memory + _, ok := cm.ExportedMemories()["memory"] + if !ok { + return nil, errors.New("missing exported Wasm memory") + } + proc := &dataTransformEnginePool{ + log: mgr.Logger(), + modulePool: sync.Pool{}, + runtimeConfig: runtimeCfg, + wasmBinary: cm, + cfg: cfg, + } + // Ensure we can create at least one module runner. + modRunner, err := proc.newModule() + if err != nil { + return nil, err + } + + proc.modulePool.Put(modRunner) + return proc, nil +} + +func (p *dataTransformEnginePool) newModule() (engine *dataTransformEngine, err error) { + ctx := context.Background() + r := wazero.NewRuntimeWithConfig(ctx, p.runtimeConfig) + engine = &dataTransformEngine{ + log: p.log, + cfg: &p.cfg, + runtime: r, + hostChan: make(chan any), + guestChan: make(chan any), + procErr: nil, + } + defer func() { + if err != nil { + engine.runtime.Close(context.Background()) + } + }() + + builder := r.NewHostModuleBuilder("redpanda_transform") + for name, ctor := range transformHostFunctions { + builder = builder.NewFunctionBuilder().WithFunc(ctor(engine)).Export(name) + } + if _, err = builder.Instantiate(ctx); err != nil { + return + } + + if _, err = wasi_snapshot_preview1.Instantiate(ctx, r); err != nil { + return + } + cfg := wazero.NewModuleConfig(). + WithStartFunctions(). + WithArgs("transform"). + WithName("transform"). + WithEnv("REDPANDA_INPUT_TOPIC", "benthos") + for i := 0; i < 8; i += 1 { + cfg = cfg.WithEnv(fmt.Sprintf("REDPANDA_OUTPUT_TOPIC_%d", i), fmt.Sprintf("output_%d", i)) + } + if engine.mod, err = r.InstantiateModule(ctx, p.wasmBinary, cfg); err != nil { + return + } + start := engine.mod.ExportedFunction("_start") + if start == nil { + err = errors.New("_start function is required") + engine.mod.Close(ctx) + return + } + go func() { + _, err := start.Call(context.Background()) + if !engine.mod.IsClosed() { + _ = engine.mod.Close(context.Background()) + } + if err == nil { + err = sys.NewExitError(0) + } + engine.procErr = err + close(engine.hostChan) + }() + + // Wait for the engine to start + select { + case <-engine.hostChan: + case <-time.After(p.cfg.timeout): + _ = engine.mod.Close(ctx) + drainChannel(engine.hostChan) // Wait for goroutine to exit + } + return engine, engine.procErr +} + +func (p *dataTransformEnginePool) ProcessBatch(ctx context.Context, batch service.MessageBatch) ([]service.MessageBatch, error) { + var modRunner *dataTransformEngine + var err error + if modRunnerPtr := p.modulePool.Get(); modRunnerPtr != nil { + modRunner = modRunnerPtr.(*dataTransformEngine) + } else { + if modRunner, err = p.newModule(); err != nil { + return nil, err + } + } + + res, err := modRunner.Run(ctx, batch) + if err != nil { + _ = modRunner.Close(ctx) + return nil, err + } + p.modulePool.Put(modRunner) + return []service.MessageBatch{res}, nil +} + +func (p *dataTransformEnginePool) Close(ctx context.Context) error { + for { + mr := p.modulePool.Get() + if mr == nil { + return p.wasmBinary.Close(ctx) + } + if err := mr.(*dataTransformEngine).Close(ctx); err != nil { + return err + } + } +} + +//------------------------------------------------------------------------------ + +type dataTransformEngine struct { + log *service.Logger + cfg *dataTransformConfig + + runtime wazero.Runtime + mod api.Module + + inputBatch []transformMessage + outputBatch service.MessageBatch + targetIndex int + + procErr error + hostChan chan any + guestChan chan any +} + +func (r *dataTransformEngine) newTransformMessage(message *service.Message) (tmsg transformMessage, err error) { + tmsg.value, err = message.AsBytes() + if err != nil { + return + } + if r.cfg.inputKey != nil { + if tmsg.key, err = r.cfg.inputKey.TryBytes(message); err != nil { + return + } + } + if r.cfg.timestamp != nil { + var tsStr string + if tsStr, err = r.cfg.timestamp.TryString(message); err != nil { + err = fmt.Errorf("timestamp interpolation error: %w", err) + return + } + if tmsg.timestamp, err = strconv.ParseInt(tsStr, 10, 64); err != nil { + err = fmt.Errorf("failed to parse timestamp: %w", err) + return + } + } else { + tmsg.timestamp = time.Now().UnixMilli() + } + err = r.cfg.inputMetadata.Walk(message, func(key string, value string) error { + tmsg.headers = append(tmsg.headers, transformHeader{key, []byte(value)}) + return nil + }) + return +} + +func (r *dataTransformEngine) convertTransformMessage(message transformMessage) (*service.Message, error) { + msg := service.NewMessage(message.value) + if r.cfg.outputMetadata != nil { + for _, hdr := range message.headers { + if r.cfg.outputMetadata.Match(hdr.key) { + msg.MetaSetMut(hdr.key, hdr.value) + } + } + } + if r.cfg.outputKeyField != nil { + msg.MetaSetMut(*r.cfg.outputKeyField, message.key) + } + if message.outputTopic != nil { + msg.MetaSetMut("data_transform_output_topic", *message.outputTopic) + } + return msg, nil +} + +func (r *dataTransformEngine) reset() { + r.inputBatch = nil + r.targetIndex = 0 + r.outputBatch = nil +} + +func (r *dataTransformEngine) Run(ctx context.Context, batch service.MessageBatch) (service.MessageBatch, error) { + if r.procErr != nil { + return nil, r.procErr + } + defer r.reset() + r.inputBatch = make([]transformMessage, len(batch)) + r.targetIndex = 0 + for i, msg := range batch { + tm, err := r.newTransformMessage(msg) + if err != nil { + return nil, err + } + r.inputBatch[i] = tm + } + // Notify the guest that it has data to process + r.guestChan <- nil + // Wait for the guest to process everything + select { + case <-r.hostChan: + case <-time.After(r.cfg.timeout): + _ = r.mod.Close(ctx) + drainChannel(r.hostChan) + } + return r.outputBatch, r.procErr +} + +func (r *dataTransformEngine) Close(ctx context.Context) error { + close(r.guestChan) + _ = r.mod.Close(ctx) + drainChannel(r.hostChan) // Wait for goroutine to exit + err := r.runtime.Close(ctx) + return err +} + +func drainChannel(ch <-chan any) { + for { + _, ok := <-ch + if !ok { + break + } + } +} diff --git a/internal/impl/redpanda/processor_data_transform_test.go b/internal/impl/redpanda/processor_data_transform_test.go new file mode 100644 index 0000000000..e7e250e37b --- /dev/null +++ b/internal/impl/redpanda/processor_data_transform_test.go @@ -0,0 +1,183 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package redpanda + +//go:generate env GOOS=wasip1 GOARCH=wasm go build -C ../../../resources/testdata/redpanda_data_transforms/uppercase -o $PWD/uppercase.wasm + +import ( + "context" + "fmt" + "os" + "strings" + "sync" + "testing" + "time" + + "github.com/redpanda-data/benthos/v4/public/service" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func defaultConfig() dataTransformConfig { + var cfg dataTransformConfig + cfg.maxMemoryPages = 1000 + cfg.timeout = time.Second + return cfg +} + +func TestDataTransformProcessorSerial(t *testing.T) { + wasm, err := os.ReadFile("./uppercase.wasm") + if os.IsNotExist(err) { + t.Skip("skipping as wasm example not compiled, run go generate to remedy") + } + require.NoError(t, err) + + proc, err := newDataTransformProcessor(wasm, defaultConfig(), service.MockResources()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, proc.Close(context.Background())) + }) + + for i := 0; i < 1000; i++ { + inMsg := service.NewMessage([]byte(`hello world`)) + outBatches, err := proc.ProcessBatch(context.Background(), service.MessageBatch{inMsg}) + require.NoError(t, err) + + require.Len(t, outBatches, 1) + require.Len(t, outBatches[0], 1) + resBytes, err := outBatches[0][0].AsBytes() + require.NoError(t, err) + + assert.Equal(t, "HELLO WORLD", string(resBytes)) + } +} + +func TestDataTransformProcessorInitTimeout(t *testing.T) { + wasm, err := os.ReadFile("./uppercase.wasm") + if os.IsNotExist(err) { + t.Skip("skipping as wasm example not compiled, run go generate to remedy") + } + require.NoError(t, err) + cfg := defaultConfig() + cfg.timeout = time.Nanosecond + _, err = newDataTransformProcessor(wasm, cfg, service.MockResources()) + require.Error(t, err) +} + +func TestDataTransformProcessorOutOfMemory(t *testing.T) { + wasm, err := os.ReadFile("./uppercase.wasm") + if os.IsNotExist(err) { + t.Skip("skipping as wasm example not compiled, run go generate to remedy") + } + require.NoError(t, err) + cfg := defaultConfig() + cfg.maxMemoryPages = 1 + _, err = newDataTransformProcessor(wasm, cfg, service.MockResources()) + require.Error(t, err) +} + +func TestDataTransformProcessorKeys(t *testing.T) { + wasm, err := os.ReadFile("./uppercase.wasm") + if os.IsNotExist(err) { + t.Skip("skipping as wasm example not compiled, run go generate to remedy") + } + require.NoError(t, err) + cfg := defaultConfig() + cfg.inputKey, err = service.NewInterpolatedString(`${! metadata("example_input_key") }`) + require.NoError(t, err) + var outputKeyField = "example_output_key" + cfg.outputKeyField = &outputKeyField + proc, err := newDataTransformProcessor(wasm, cfg, service.MockResources()) + require.NoError(t, err) + inMsg := service.NewMessage([]byte(`hello world`)) + inMsg.MetaSetMut("example_input_key", "foobar") + outBatches, err := proc.ProcessBatch(context.Background(), service.MessageBatch{inMsg}) + require.NoError(t, err) + require.Len(t, outBatches, 1) + require.Len(t, outBatches[0], 1) + outKey, ok := outBatches[0][0].MetaGetMut(outputKeyField) + assert.True(t, ok) + assert.Equal(t, []byte("foobar"), outKey) +} + +func TestDataTransformProcessorParallel(t *testing.T) { + wasm, err := os.ReadFile("./uppercase.wasm") + if os.IsNotExist(err) { + t.Skip("skipping as wasm example not compiled, run go generate to remedy") + } + require.NoError(t, err) + + proc, err := newDataTransformProcessor(wasm, defaultConfig(), service.MockResources()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, proc.Close(context.Background())) + }) + + tStarted := time.Now() + var wg sync.WaitGroup + for j := 0; j < 10; j++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + + iters := 0 + for time.Since(tStarted) < (time.Millisecond * 500) { + iters++ + exp := fmt.Sprintf("hello world %v:%v", id, iters) + inMsg := service.NewMessage([]byte(exp)) + outBatches, err := proc.ProcessBatch(context.Background(), service.MessageBatch{inMsg}) + require.NoError(t, err) + + require.Len(t, outBatches, 1) + require.Len(t, outBatches[0], 1) + resBytes, err := outBatches[0][0].AsBytes() + require.NoError(t, err) + assert.Equal(t, strings.ToUpper(exp), string(resBytes)) + } + }(j) + } + wg.Wait() +} + +func BenchmarkRedpandaDataTransforms(b *testing.B) { + wasm, err := os.ReadFile("./uppercase.wasm") + if os.IsNotExist(err) { + b.Skip("skipping as wasm example not compiled, run go generate to remedy") + } + require.NoError(b, err) + + proc, err := newDataTransformProcessor(wasm, defaultConfig(), service.MockResources()) + require.NoError(b, err) + b.Cleanup(func() { + require.NoError(b, proc.Close(context.Background())) + }) + + b.ResetTimer() + b.ReportAllocs() + + inMsg := service.NewMessage([]byte(`hello world`)) + + for i := 0; i < b.N; i++ { + outBatches, err := proc.ProcessBatch(context.Background(), service.MessageBatch{inMsg.Copy()}) + require.NoError(b, err) + + require.Len(b, outBatches, 1) + require.Len(b, outBatches[0], 1) + + _, err = outBatches[0][0].AsBytes() + require.NoError(b, err) + } +} diff --git a/internal/impl/redpanda/serde.go b/internal/impl/redpanda/serde.go new file mode 100644 index 0000000000..2d1e3f75ec --- /dev/null +++ b/internal/impl/redpanda/serde.go @@ -0,0 +1,239 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package redpanda + +import ( + "encoding/binary" + "errors" + "slices" + "unsafe" +) + +type transformHeader struct { + key string + value []byte +} + +func (h *transformHeader) deserialize(output []byte) (n int, err error) { + var amt int + h.key, amt, err = readSizedString(output) + if err != nil { + return + } + n = amt + h.value, amt, err = readSizedCopy(output[n:]) + n += amt + return +} + +func (h *transformHeader) serialize(output []byte) int { + nk := writeSizedString(h.key, output) + if nk < 0 { + return nk + } + nv := writeSized(h.value, output[nk:]) + if nv < 0 { + return nv + } + return nk + nv +} + +func (h *transformHeader) maxSize() int { + return sizedLenString(h.key) + sizedLen(h.value) +} + +//------------------------------------------------------------------------------ + +type transformMessage struct { + timestamp int64 + offset int64 + key []byte + value []byte + headers []transformHeader + outputTopic *string +} + +func (m *transformMessage) deserialize(output []byte) (n int, err error) { + var amt int + m.key, amt, err = readSizedCopy(output) + if err != nil { + return + } + n = amt + m.value, amt, err = readSizedCopy(output[n:]) + n += amt + if err != nil { + return + } + var numHeaders int + numHeaders, amt, err = readNum(output[n:]) + if err != nil { + return + } + n += amt + for i := 0; i < numHeaders; i += 1 { + var h transformHeader + amt, err = h.deserialize(output[n:]) + if err != nil { + return + } + n += amt + m.headers = append(m.headers, h) + } + return +} + +func (m *transformMessage) maxSize() int { + total := sizedLen(m.key) + total += sizedLen(m.value) + total += binary.MaxVarintLen64 + for _, h := range m.headers { + total += h.maxSize() + } + return total +} + +func (m *transformMessage) serialize(output []byte) int { + var total int + n := writeSized(m.key, output) + if n < 0 { + return n + } + total += n + n = writeSized(m.value, output[total:]) + if n < 0 { + return n + } + total += n + n = writeNum(len(m.headers), output[total:]) + if n < 0 { + return n + } + total += n + for _, h := range m.headers { + n := h.serialize(output[total:]) + if n < 0 { + return n + } + total += n + } + return total +} + +//------------------------------------------------------------------------------ + +type transformWriteOptions struct { + topic string +} + +const outputTopicKey = 0x01 + +func (o *transformWriteOptions) deserialize(output []byte) (int, error) { + if len(output) == 0 { + return 0, nil + } + if output[0] != outputTopicKey { + return 0, errInvalidData + } + topic, n, err := readSizedString(output[1:]) + if err != nil { + return 0, err + } + o.topic = topic + return n + 1, nil +} + +//------------------------------------------------------------------------------ + +func writeNum(n int, out []byte) int { + if len(out) < binary.MaxVarintLen64 { + return -1 + } + return binary.PutVarint(out, int64(n)) +} + +func writeSized(b []byte, out []byte) int { + if len(out) < binary.MaxVarintLen64 { + return -1 + } + if b == nil { + return binary.PutVarint(out, -1) + } + n := binary.PutVarint(out, int64(len(b))) + if len(out) < len(b)+n { + return -1 + } + n += copy(out[n:], b) + return n +} + +func writeSizedString(s string, out []byte) int { + return writeSized(unsafe.Slice(unsafe.StringData(s), len(s)), out) +} + +func sizedLen(b []byte) int { + return binary.MaxVarintLen64 + len(b) +} + +func sizedLenString(b string) int { + return binary.MaxVarintLen64 + len(b) +} + +var errInvalidData = errors.New("unable to decode payload from Redpanda Data Transform") + +func readNum(b []byte) (int, int, error) { + n, amt := binary.Varint(b) + if amt <= 0 { + return 0, 0, errInvalidData + } + return int(n), amt, nil +} + +func readSized(b []byte) ([]byte, int, error) { + v, num := binary.Varint(b) + if num <= 0 { + return nil, 0, errInvalidData + } + if v < 0 { + return nil, num, nil + } + b = b[num:] + if int(v) > len(b) { + return nil, 0, errInvalidData + } + return b[:v], num + int(v), nil +} + +func readSizedCopy(b []byte) ([]byte, int, error) { + b, amt, err := readSized(b) + if err != nil { + return b, amt, err + } + if b == nil { + return b, amt, nil + } + return slices.Clone(b), amt, nil +} + +func readSizedString(b []byte) (string, int, error) { + s, amt, err := readSized(b) + if err != nil { + return "", amt, err + } + if s == nil { + return "", amt, nil + } + return string(s), amt, nil +} diff --git a/internal/impl/redpanda/serde_test.go b/internal/impl/redpanda/serde_test.go new file mode 100644 index 0000000000..408f54b0cc --- /dev/null +++ b/internal/impl/redpanda/serde_test.go @@ -0,0 +1,47 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package redpanda + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestStringSerde(t *testing.T) { + out := make([]byte, 1024) + n := writeSizedString("foo", out) + s, amt, err := readSizedString(out[:n]) + require.NoError(t, err) + require.Equal(t, "foo", s) + require.Equal(t, n, amt) +} + +func TestMessageSerde(t *testing.T) { + m := transformMessage{ + key: []byte("abc"), + value: []byte("123"), + headers: []transformHeader{ + {key: "foo", value: []byte("bar")}, + }, + } + out := make([]byte, m.maxSize()) + n := m.serialize(out) + require.LessOrEqual(t, n, m.maxSize()) + var read transformMessage + amt, err := read.deserialize(out[:n]) + require.NoError(t, err) + require.Equal(t, n, amt) +} diff --git a/public/components/all/package.go b/public/components/all/package.go index 6074e577de..c5d2f338c0 100644 --- a/public/components/all/package.go +++ b/public/components/all/package.go @@ -57,6 +57,7 @@ import ( _ "github.com/redpanda-data/connect/v4/public/components/pure/extended" _ "github.com/redpanda-data/connect/v4/public/components/pusher" _ "github.com/redpanda-data/connect/v4/public/components/redis" + _ "github.com/redpanda-data/connect/v4/public/components/redpanda" _ "github.com/redpanda-data/connect/v4/public/components/sentry" _ "github.com/redpanda-data/connect/v4/public/components/sftp" _ "github.com/redpanda-data/connect/v4/public/components/snowflake" diff --git a/public/components/redpanda/package.go b/public/components/redpanda/package.go new file mode 100644 index 0000000000..0ddcfb86e2 --- /dev/null +++ b/public/components/redpanda/package.go @@ -0,0 +1,20 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wasm + +import ( + // Bring in the internal plugin definitions. + _ "github.com/redpanda-data/connect/v4/internal/impl/redpanda" +) diff --git a/resources/testdata/redpanda_data_transforms/uppercase/.gitignore b/resources/testdata/redpanda_data_transforms/uppercase/.gitignore new file mode 100644 index 0000000000..19e1bced9a --- /dev/null +++ b/resources/testdata/redpanda_data_transforms/uppercase/.gitignore @@ -0,0 +1 @@ +*.wasm diff --git a/resources/testdata/redpanda_data_transforms/uppercase/README.md b/resources/testdata/redpanda_data_transforms/uppercase/README.md new file mode 100644 index 0000000000..d0d233ff94 --- /dev/null +++ b/resources/testdata/redpanda_data_transforms/uppercase/README.md @@ -0,0 +1,15 @@ +# Redpanda Golang WASM Transform + +To get started you first need to have at least go 1.20 installed. + +You can get started by modifying the transform.go file +with your logic. + +Once you're ready to test out your transform live you need to: + +1. Make sure you have a container running via rpk container start +1. Run rpk transform build +1. Create your topics via rpk topic create +1. Run rpk transform deploy +1. Then use rpk topic produce and rpk topic consume + to see your transformation live! diff --git a/resources/testdata/redpanda_data_transforms/uppercase/go.mod b/resources/testdata/redpanda_data_transforms/uppercase/go.mod new file mode 100644 index 0000000000..b44456a37d --- /dev/null +++ b/resources/testdata/redpanda_data_transforms/uppercase/go.mod @@ -0,0 +1,5 @@ +module uppercase + +go 1.22 + +require github.com/redpanda-data/redpanda/src/transform-sdk/go/transform v1.0.2 diff --git a/resources/testdata/redpanda_data_transforms/uppercase/go.sum b/resources/testdata/redpanda_data_transforms/uppercase/go.sum new file mode 100644 index 0000000000..146798fc65 --- /dev/null +++ b/resources/testdata/redpanda_data_transforms/uppercase/go.sum @@ -0,0 +1,2 @@ +github.com/redpanda-data/redpanda/src/transform-sdk/go/transform v1.0.2 h1:34F42buBTGuK1uaXKky1PdxAZzqMh6kQE1ojCLf/hWw= +github.com/redpanda-data/redpanda/src/transform-sdk/go/transform v1.0.2/go.mod h1:QGgiwwf/BIsD1b7EiyQ/Apzw+RLSpasRDdpOCiefQFQ= diff --git a/resources/testdata/redpanda_data_transforms/uppercase/transform.go b/resources/testdata/redpanda_data_transforms/uppercase/transform.go new file mode 100644 index 0000000000..030a76bcf2 --- /dev/null +++ b/resources/testdata/redpanda_data_transforms/uppercase/transform.go @@ -0,0 +1,18 @@ +package main + +import ( + "bytes" + + "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" +) + +func main() { + transform.OnRecordWritten(makeUppercase) +} + +func makeUppercase(e transform.WriteEvent, w transform.RecordWriter) error { + return w.Write(transform.Record{ + Key: e.Record().Key, + Value: bytes.ToUpper(e.Record().Value), + }) +}