Skip to content

Commit

Permalink
Implement Redpanda Data Transform Processor
Browse files Browse the repository at this point in the history
This adds a processor for Redpanda Data Transforms as a Redpanda connect
processor. The advantage of this processor over the existing wasm
processor is that Data Transforms has an existing SDK and developer
experience through `rpk transform init && rpk transform build`.

We implement the ABI contract for data transforms using Wazero, which
fairly closely follows the implementation within the broker. From an
implementation prespective we also have a single ABI contract no matter
which guest language is being used (Go/Rust/JavaScript/etc).

We support users specifying how to map kafka terms from their existing
messages, using similar methods to the kafka input/output plugins that
already exist within Redpanda.

For testing we check in an uppercase transform to generate a binary
using `rpk`.
  • Loading branch information
rockwotj committed Jul 17, 2024
1 parent 37b2b01 commit bd6868f
Show file tree
Hide file tree
Showing 15 changed files with 1,371 additions and 0 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ $(PATHINSTBIN)/%: $(SOURCE_FILES)
@go build $(GO_FLAGS) -tags "$(TAGS)" -ldflags "$(LD_FLAGS) $(VER_FLAGS)" -o $@ ./cmd/$*

$(APPS): %: $(PATHINSTBIN)/%
go generate ./internal/impl/redpanda

# TOOLS = redpanda-docs TODO
# tools: $(TOOLS)
Expand Down
241 changes: 241 additions & 0 deletions docs/modules/components/pages/processors/redpanda_data_transform.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
= redpanda_data_transform
:type: processor
:status: experimental
:categories: ["Redpanda"]



////
THIS FILE IS AUTOGENERATED!

To make changes, edit the corresponding source file under:

https://github.com/redpanda-data/connect/tree/main/internal/impl/<provider>.

And:

https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl
////
component_type_dropdown::[]
Executes a Redpanda Data Transform as a processor
Introduced in version 0.1.0.
[tabs]
======
Common::
+
--
```yml
# Common config fields, showing default values
label: ""
redpanda_data_transform:
module_path: "" # No default (required)
input_key: "" # No default (optional)
output_key: "" # No default (optional)
input_headers:
include_prefixes: []
include_patterns: []
output_metadata:
include_prefixes: []
include_patterns: []
```
--
Advanced::
+
--
```yml
# All config fields, showing default values
label: ""
redpanda_data_transform:
module_path: "" # No default (required)
input_key: "" # No default (optional)
output_key: "" # No default (optional)
input_headers:
include_prefixes: []
include_patterns: []
output_metadata:
include_prefixes: []
include_patterns: []
timestamp: ${! timestamp_unix() } # No default (optional)
timeout: 10s
max_memory_pages: 1600
```
--
======
This processor executes a Redpanda Data Transform WebAssembly module, calling OnRecordWritten for each message being processed.
== Fields
=== `module_path`
The path of the target WASM module to execute.
*Type*: `string`
=== `input_key`
An optional key to populate for each message.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
*Type*: `string`
=== `output_key`
An optional name of metadata for an output message key.
*Type*: `string`
=== `input_headers`
Determine which (if any) metadata values should be added to messages as headers.
*Type*: `object`
=== `input_headers.include_prefixes`
Provide a list of explicit metadata key prefixes to match against.
*Type*: `array`
*Default*: `[]`
```yml
# Examples
include_prefixes:
- foo_
- bar_
include_prefixes:
- kafka_
include_prefixes:
- content-
```
=== `input_headers.include_patterns`
Provide a list of explicit metadata key regular expression (re2) patterns to match against.
*Type*: `array`
*Default*: `[]`
```yml
# Examples
include_patterns:
- .*
include_patterns:
- _timestamp_unix$
```
=== `output_metadata`
Determine which (if any) message headers should be added to the output as metadata.
*Type*: `object`
=== `output_metadata.include_prefixes`
Provide a list of explicit metadata key prefixes to match against.
*Type*: `array`
*Default*: `[]`
```yml
# Examples
include_prefixes:
- foo_
- bar_
include_prefixes:
- kafka_
include_prefixes:
- content-
```
=== `output_metadata.include_patterns`
Provide a list of explicit metadata key regular expression (re2) patterns to match against.
*Type*: `array`
*Default*: `[]`
```yml
# Examples
include_patterns:
- .*
include_patterns:
- _timestamp_unix$
```
=== `timestamp`
An optional timestamp to set for each message. When left empty, the current timestamp is used.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
*Type*: `string`
```yml
# Examples
timestamp: ${! timestamp_unix() }
timestamp: ${! metadata("kafka_timestamp_unix") }
```
=== `timeout`
The maximum period of time for a message to be processed
*Type*: `string`
*Default*: `"10s"`
=== `max_memory_pages`
The maximum amount of wasm memory pages (64KiB) that an individual wasm module instance can use
*Type*: `int`
*Default*: `1600`
1 change: 1 addition & 0 deletions internal/impl/redpanda/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.wasm
Loading

0 comments on commit bd6868f

Please sign in to comment.