-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support extract-key kafka message transform #1183
Conversation
@@ -495,7 +495,7 @@ protected final void injectPayloadModel( | |||
.name("catalog0") | |||
.schema() | |||
.version("latest") | |||
.subject(subject) | |||
.subject(message.payload.name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be consistent with the others, either subject
everywhere or message.payload.name
everywhere.
The value of subject
is passed as part of the URL to access schema registry.
/subjects/{0}/versions/{1}
So this should probably be based on the channel address topic with -value
suffix, rather than relying on any naming convention alignment between message payload name and channel address (kafka topic).
Note: given that channels (kafka topics) can have more than one message type in the general case, seems like we actually want to potentially use topic-record name strategy instead of topic name strategy to differentiate the schemas for different channel (topic) messages.
In that case, it would seem to make sense to base the record part of the topic-record name strategy on the message name within the channel messages.
KafkaTopicTransformsConfigBuilder<KafkaTopicTransformsConfig> topicBuilder = KafkaTopicTransformsConfig.builder(); | ||
|
||
String extractKey = object.containsKey(EXTRACT_KEY_NAME) | ||
? object.getString(EXTRACT_KEY_NAME) | ||
: null; | ||
topicBuilder.extractKey(extractKey); | ||
|
||
JsonObject headers = object.containsKey(EXTRACT_HEADERS_NAME) ? object.getJsonObject(EXTRACT_HEADERS_NAME) : null; | ||
|
||
if (headers != null) | ||
{ | ||
for (Map.Entry<String, JsonValue> entry : headers.entrySet()) | ||
{ | ||
JsonString jsonString = (JsonString) entry.getValue(); | ||
topicBuilder.extractHeader(entry.getKey(), jsonString.getString()); | ||
} | ||
} | ||
|
||
return topicBuilder.build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KafkaTopicTransformsConfigBuilder<KafkaTopicTransformsConfig> topicBuilder = KafkaTopicTransformsConfig.builder(); | |
String extractKey = object.containsKey(EXTRACT_KEY_NAME) | |
? object.getString(EXTRACT_KEY_NAME) | |
: null; | |
topicBuilder.extractKey(extractKey); | |
JsonObject headers = object.containsKey(EXTRACT_HEADERS_NAME) ? object.getJsonObject(EXTRACT_HEADERS_NAME) : null; | |
if (headers != null) | |
{ | |
for (Map.Entry<String, JsonValue> entry : headers.entrySet()) | |
{ | |
JsonString jsonString = (JsonString) entry.getValue(); | |
topicBuilder.extractHeader(entry.getKey(), jsonString.getString()); | |
} | |
} | |
return topicBuilder.build(); | |
KafkaTopicTransformsConfigBuilder<KafkaTopicTransformsConfig> transforms = KafkaTopicTransformsConfig.builder(); | |
if (object.containsKey(EXTRACT_KEY_NAME)) | |
{ | |
String extractKey = object.getString(EXTRACT_KEY_NAME); | |
transforms.extractKey(extractKey); | |
} | |
if (object.containsKey(EXTRACT_HEADERS_NAME)) | |
{ | |
JsonObject headers = object.getJsonObject(EXTRACT_HEADERS_NAME); | |
for (Map.Entry<String, JsonValue> entry : headers.entrySet()) | |
{ | |
String headerName = entry.getKey(); | |
JsonString headerValue = (JsonString) entry.getValue(); | |
transforms.extractHeader(headerName, headerValue.getString()); | |
} | |
} | |
return transforms.build(); |
"{" + | ||
"\"correlation-id\": \"${message.value.correlationId}\"" + | ||
"}" + | ||
"]" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Include extract-key
here please.
.header("correlation-id", "${message.value.correlationId}") | ||
.transforms() | ||
.extractHeader("correlation-id", "${message.value.correlationId}") | ||
.build() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Include extract-key
here please.
@@ -339,18 +339,37 @@ | |||
"enum": [ "none", "json_patch" ], | |||
"deprecated": true | |||
}, | |||
"headers": | |||
"transforms": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sure you have caught up to latest develop, I think this change is already present there.
@@ -75,7 +75,7 @@ read zilla:data.ext ${kafka:matchDataEx() | |||
.fetch() | |||
.partition(0, 2, 8) | |||
.header("header1", "value1") | |||
.header("correlation-id", "12345") | |||
.header("correlation-id", "1234") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, what prompted this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The script we are using has 4 bytes key1
and the TestConverter was returning 12345
which has 5 bytes and it was causing testware issue. I just changed TestConverter to return 1234 to match number of bytes in server script.
private final ModelConfigAdapter converter = new ModelConfigAdapter(); | ||
private final KafkaTopicTransformsConfigAdapter transformsConverter = new KafkaTopicTransformsConfigAdapter(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private final ModelConfigAdapter converter = new ModelConfigAdapter(); | |
private final KafkaTopicTransformsConfigAdapter transformsConverter = new KafkaTopicTransformsConfigAdapter(); | |
private final ModelConfigAdapter model = new ModelConfigAdapter(); | |
private final KafkaTopicTransformsConfigAdapter transforms = new KafkaTopicTransformsConfigAdapter(); |
These are not converters (means something else for us too) plus probably no need to include the type in the name.
@@ -360,14 +364,14 @@ public void writeEntry( | |||
ConverterHandler convertKey, | |||
ConverterHandler convertValue, | |||
boolean verbose, | |||
List<KafkaTopicHeaderType> headerTypes) | |||
KafkaTopicTransformsConfig transforms) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is assuming we can specify either extract-key
or extract-headers
only once in the transforms
array, right?
|
||
convertKey.extracted(transforms.extractKey, writeKey); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is assuming that extract-key
is always processed after extract-key
, which might be fine for now if we cannot extract the key from a header yet, but probably not sufficient for the general case later where these transforms are expected to execute in the order defined, the output of the previous feeding into the input of the next.
private AsyncapiChannelResolver channels; | ||
private AsyncapiMessageResolver messages; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private AsyncapiChannelResolver channels; | |
private AsyncapiMessageResolver messages; | |
private final AsyncapiChannelResolver channels; | |
private final AsyncapiMessageResolver messages; |
Description
Support extract-key kafka message transform.
Fixes #1176