-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feature][functions]Create a built-in Function implementing the most common basic transformations #15903
Conversation
import org.apache.pulsar.common.schema.SchemaType; | ||
|
||
@Slf4j | ||
public class CastStep implements TransformStep { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the purpose if this Step is not clear to me.
what does this step do ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It changes the schema to another compatible one. The name is inspired from SMTs
@Override | ||
public void process(TransformContext transformContext) { | ||
Schema<?> keySchema = transformContext.getKeySchema(); | ||
if (keySchema == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does it mean ?
are we assuming that the Key is byte[] or String when we are not using KeyValue Schema ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It means that if we don't have a KeyValue, we don't do anything.
this.context = context; | ||
this.outputTopic = context.getOutputTopic(); | ||
Schema<?> schema = currentRecord.getSchema(); | ||
//TODO: should we make a copy ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to perform a copy, but maybe it is better to wrap to Collections.unmodifiableMap() if we are not supposed to change the contents
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's the thing. We want to give the possibility of steps to modify the properties. Wrapping in an unmodifiable map is a good idea as the step would have to make the copy itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed to lazily set the properties before sending the message if it was not set by one of the steps.
Note that PulsarRecord properties are already unmodifiable.
This can be changed when we implement a step that modifies the properties.
private KeyValueEncodingType keyValueEncodingType; | ||
private String key; | ||
private Map<String, String> properties; | ||
private String outputTopic; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Steps should have the possibility to modify the output topic. So we can implement basic routing operations such as RegexRouter
@Slf4j | ||
@Data | ||
public class TransformContext { | ||
private Context context; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
TypedMessageBuilder<?> message = context.newOutputMessage(outputTopic, outputSchema) | ||
.properties(properties) | ||
.value(outputObject); | ||
if (key != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we are using KeyValue Schema we should not set the Key this way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
import org.apache.pulsar.functions.api.Function; | ||
import org.apache.pulsar.functions.api.Record; | ||
|
||
@Slf4j |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add javadoc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. PTAL
if (keySchemaType == SchemaType.STRING) { | ||
outputSchema = Schema.STRING; | ||
outputKeyObject = outputKeyObject.toString(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
else ?
we should throw a IllegalStateException here if we are not supporting other types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
transformContext.setKeySchema(outputSchema); | ||
transformContext.setKeyObject(outputKeyObject); | ||
} | ||
if (valueSchemaType == SchemaType.STRING) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
else ?
we should throw a IllegalStateException here if we are not supporting other types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
transformContext.setValueSchema(transformContext.getKeySchema()); | ||
transformContext.setValueObject(transformContext.getKeyObject()); | ||
} | ||
// TODO: can we avoid making the conversion to NATIVE_AVRO ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please clarify ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well theoretically we have the Schema.AVRO schema and value of the wrapped key or value and could use that instead of unwrapping the AVRO schema and rewrapping in Schema.NATIVE_AVRO. But there's no clean way to give this information to the TransformContext. So I'll remove this comment for now.
* Reject invalid schema types in cast transfo * Add javadoc * Lazily set message properties
The pr had no activity for 30 days, mark with Stale label. |
Moved to a dedicated repository: |
Motivation
This PR implements PIP-173 : Create a built-in Function implementing the most common basic transformations.
It provides a first set of basic functions:
cast
: modifies the key or value schema to a target compatible schema passed in theschema-type
argument. This PR only enablesSTRING
schema-type. Thepart
argument allows to choose on which part to apply betweenkey
andvalue
. Ifpart
is null or absent the transformations applies to both the key and value.drop-fields
: drops fields given as a string list in parameterfields
. Thepart
argument allows to choose on which part to apply betweenkey
andvalue
. Ifpart
is null or absent the transformations aplpies to both the key and value. Currently only AVRO is supported.merge-key-value
: merges the fields of KeyValue records where both the key and value are structured types of the same schema type. Currently only AVRO is supported.unwrap-key-value
: if the record is a KeyValue, extract the KeyValue's value and make it the record value. If parameterunwrapKey
is present and set totrue
, extract the KeyValue's key instead.Modifications
Add a
transforms
module in multi-modulepulsar-function
project.Create classes
TransformFunction
,TransformContext
, interfaceTransformStep
described in PIP-173Create implementations
CastStep
,DropFieldsStep
,MergeKeyValueStep
,UnwrapKeyValueStep
with testsAdd logic to parse
userConfig
inTransformFunction
and call theTransformStep
implementations.Verifying this change
This change added tests and can be verified as follows:
Run tests from the
transforms
moduleDoes this pull request potentially affect one of the following parts:
no
If
yes
was chosen, please highlight the changesDocumentation
Check the box below or label this PR directly.
Need to update docs?
doc-required
(Your PR needs to update docs and you will update later)
doc-not-needed
(Please explain why)
doc
(Your PR contains doc changes)
doc-complete
(Docs have been already added)