-
Notifications
You must be signed in to change notification settings - Fork 852
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Redpanda Data Transform Processor #2698
Conversation
c847d2e
to
a75d84e
Compare
WithEnv("REDPANDA_INPUT_TOPIC", "benthos") | ||
for i := 0; i < 8; i += 1 { | ||
cfg = cfg.WithEnv(fmt.Sprintf("REDPANDA_OUTPUT_TOPIC_%d", i), fmt.Sprintf("output_%d", i)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just created dummy values here - I'm not sure if there should be anything special
_ = r.mod.Close(ctx) | ||
drainChannel(r.hostChan) | ||
} | ||
return r.outputBatch, r.procErr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed the other module called SetError on each individual message and never returned an error here - is that a better practice? I'm assuming this will stop the pipeline while the service.Message.SetError way does not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's covered in the docs: https://pkg.go.dev/github.com/redpanda-data/benthos/v4@v4.31.0/public/service#BatchProcessor
// Process a batch of messages into one or more resulting batches, or return
// an error if the entire batch could not be processed. If zero messages are
// returned and the error is nil then all messages are filtered.
//
// The provided MessageBatch should NOT be modified, in order to return a
// mutated batch a copy of the slice should be created instead.
//
// When an error is returned all of the input messages will continue down
// the pipeline but will be marked with the error with *message.SetError,
// and metrics and logs will be emitted.
//
// In order to add errors to individual messages of the batch for downstream
// handling use *message.SetError(err) and return it in the resulting batch
// with a nil error.
//
// The Message types returned MUST be derived from the provided messages,
// and CANNOT be custom instantiations of Message. In order to copy the
// provided messages use the Copy method.
Sometimes, I find it useful to set a breakpoint here and follow the call stack. There's some logic here: https://github.com/redpanda-data/benthos/blob/d940a3e6243d9c6e615fa57f99bc90f55d360c22/public/service/processor.go#L112-L115 and then here https://github.com/redpanda-data/benthos/blob/c209d989cf506fcb25c5ac0b0aa1ca8e90f54c09/internal/component/processor/auto_observed.go#L223 which shows what's going on. If you return an error, the returned batch is discarded in the airGapBatchProcessor
and then the AutoObservedBatched
processor will take the original input batch, mark all the messages in it with the received error and then return this batch. Not sure what you mean by stopping the pipeline, but you can filter out messages if you don't include them in the returned batch (and also return a nil error).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah thank you for the detailed explaination. I believe applying an error to the entire batch is the correct approach, the ABI works and checkpoints per batch.
go func() { | ||
_, err := start.Call(context.Background()) | ||
if !engine.mod.IsClosed() { | ||
_ = engine.mod.Close(context.Background()) | ||
} | ||
if err == nil { | ||
err = sys.NewExitError(0) | ||
} | ||
engine.procErr = err | ||
close(engine.hostChan) | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The engine running in the background is a necessary step with how our ABI is structured, but does make things a little more complicated in terms of a clean shutdown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for all the work you put into this @rockwotj! I don't have much experience with wasm to confidently review the implementation. I left some non-critical comments which may be worth addressing and some questions on stuff that I didn't understand. I'll defer to @Jeffail for extra eyes on this.
resources/testdata/redpanda_data_transforms/uppercase/transform.yaml
Outdated
Show resolved
Hide resolved
case <-ctx.Done(): | ||
return noActiveTransform | ||
} | ||
if !m.Memory().WriteUint32Le(recordCount, uint32(len(r.inputBatch))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could m.Memory()
ever be nil? (same in the rest of the functions)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah good catch - I will validate that in newDataTransformProcessor :)
dc66a13
to
23d35ce
Compare
wasmPageSize = 64 * humanize.KiByte | ||
dtpDefaultMaxMemory = 100 * humanize.MiByte |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice 🥇
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot Tyler! I think that covers all the stuff I was able to spot. I'll defer to Ash for any extra feedback.
PS: I left one other small nit on the tests.
func TestDataTransformProcessorSerial(t *testing.T) { | ||
wasm, err := os.ReadFile("./uppercase.wasm") | ||
if os.IsNotExist(err) { | ||
t.Skip("skipping as wasm example not compiled, run build.sh to remedy") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I'd rather fail the tests if this file is missing instead of a silent skip now that we can go generate
it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't feel strongly. Ash do you have opinions? Happy to change them to fail. I'll update the message here either way 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've refactored the tests slightly so that we actually build the WASM on the fly. The advantage is that we don't end up with the file sitting stagnant in the repo after the tests run, which can be annoying for some repo activities when it's not automatically cleaned up. It has the disadvantage of slowing the tests down by about a second, but unless this dramatically changes CI test times (I doubt it'll change it) then I'm happy as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM - thank you!
This adds a processor for Redpanda Data Transforms as a Redpanda connect processor. The advantage of this processor over the existing wasm processor is that Data Transforms has an existing SDK and developer experience through `rpk transform init && rpk transform build`. We implement the ABI contract for data transforms using Wazero, which fairly closely follows the implementation within the broker. From an implementation prespective we also have a single ABI contract no matter which guest language is being used (Go/Rust/JavaScript/etc). We support users specifying how to map kafka terms from their existing messages, using similar methods to the kafka input/output plugins that already exist within Redpanda. For testing we check in an uppercase transform to generate a binary using `rpk`.
Awesome, thanks @rockwotj! |
This adds a processor for Redpanda Data Transforms as a Redpanda connect
processor. The advantage of this processor over the existing wasm
processor is that Data Transforms has an existing SDK and developer
experience through
rpk transform init && rpk transform build
.We implement the ABI contract for data transforms using Wazero, which
fairly closely follows the implementation within the broker. From an
implementation prespective we also have a single ABI contract no matter
which guest language is being used (Go/Rust/JavaScript/etc).
We support users specifying how to map kafka terms from their existing
messages, using similar methods to the kafka input/output plugins that
already exist within Redpanda.
For testing we check in an uppercase transform to generate a binary
using
rpk
.Note that this PR does not implement the schema registry support within
data transforms. That will come in another PR.