Skip to content

Commit

Permalink
Ensure extract-key precedes extract-headers when both transforms are …
Browse files Browse the repository at this point in the history
…specified (#1247)
  • Loading branch information
jfallows authored Sep 13, 2024
1 parent 7171a25 commit 949cfca
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 93 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#
# Copyright 2021-2023 Aklivity Inc.
#
# Aklivity licenses this file to you under the Apache License,
# version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#

---
name: test
catalogs:
test0:
type: test
options:
id: 1
schema: |
{
"type": "object",
"properties": {
"correlationId": {
"type": "integer"
},
"status": {
"type": "string"
}
},
"required": [
"correlationId",
"status"
]
}
bindings:
app0:
type: kafka
kind: cache_client
routes:
- exit: cache0
when:
- topic: test
cache0:
type: kafka
kind: cache_server
options:
topics:
- name: test
transforms:
- extract-key: ${message.key.id}
- extract-headers:
correlation-id: ${message.value.correlationId}
value:
model: test
capability: read
length: 12
catalog:
test0:
- id: 1
routes:
- exit: app1
when:
- topic: test

Original file line number Diff line number Diff line change
Expand Up @@ -314,119 +314,129 @@
"path": "/$defs/options/binding/kafka/cache_server_topics",
"value":
{
"topics":
"title": "Topics",
"type": "array",
"items":
{
"title": "Topics",
"type": "array",
"items":
"type": "object",
"additionalProperties": false,
"properties":
{
"type": "object",
"additionalProperties": false,
"properties":
"name":
{
"name":
{
"type": "string"
},
"defaultOffset":
{
"type": "string",
"enum": [ "live", "historical" ]
},
"deltaType":
{
"type": "string",
"enum": [ "none", "json_patch" ],
"deprecated": true
},
"transforms":
{
"title": "Transforms",
"type": "array",
"oneOf":
[
{
"items":
[
"type": "string"
},
"defaultOffset":
{
"type": "string",
"enum": [ "live", "historical" ]
},
"deltaType":
{
"type": "string",
"enum": [ "none", "json_patch" ],
"deprecated": true
},
"transforms":
{
"title": "Transforms",
"type": "array",
"oneOf":
[
{
"minItems": 2,
"maxItems": 2,
"items":
[
{
"type": "object",
"additionalProperties": false,
"properties":
{
"type": "object",
"additionalProperties": false,
"properties":
"extract-key":
{
"extract-headers":
"type": "string",
"additionalProperties": false
}
}
},
{
"type": "object",
"additionalProperties": false,
"properties":
{
"extract-headers":
{
"type": "object",
"patternProperties":
{
"type": "object",
"patternProperties":
"^[a-zA-Z]+[a-zA-Z0-9\\._\\-]*$":
{
"^[a-zA-Z]+[a-zA-Z0-9\\._\\-]*$":
{
"type": "string",
"pattern": "^\\$\\{message\\.value\\.([A-Za-z_][A-Za-z0-9_]*)\\}$"
}
},
"additionalProperties": false
"type": "string",
"pattern": "^\\$\\{message\\.value\\.([A-Za-z_][A-Za-z0-9_]*)\\}$"
}
},
"extract-key":
{
"type": "string",
"additionalProperties": false
}
"additionalProperties": false
}
}
]
},
{
"items":
[
}
],
},
{
"minItems": 1,
"maxItems": 1,
"items":
[
{
"type": "object",
"additionalProperties": false,
"properties":
{
"type": "object",
"additionalProperties": false,
"properties":
"extract-headers":
{
"extract-headers":
"type": "object",
"patternProperties":
{
"type": "object",
"patternProperties":
"^[a-zA-Z]+[a-zA-Z0-9\\._\\-]*$":
{
"^[a-zA-Z]+[a-zA-Z0-9\\._\\-]*$":
{
"type": "string",
"pattern": "^\\$\\{message\\.value\\.([A-Za-z_][A-Za-z0-9_]*)\\}$"
}
},
"additionalProperties": false
}
"type": "string",
"pattern": "^\\$\\{message\\.value\\.([A-Za-z_][A-Za-z0-9_]*)\\}$"
}
},
"additionalProperties": false
}
}
]
},
{
"items":
[
}
]
},
{
"minItems": 1,
"maxItems": 1,
"items":
[
{
"type": "object",
"additionalProperties": false,
"properties":
{
"type": "object",
"additionalProperties": false,
"properties":
"extract-key":
{
"extract-key":
{
"type": "string",
"additionalProperties": false
}
"type": "string",
"additionalProperties": false
}
}
]
}
]
},
"key":
{
"$ref": "#/$defs/converter"
},
"value":
{
"$ref": "#/$defs/converter"
}
}
]
}
]
},
"key":
{
"$ref": "#/$defs/converter"
},
"value":
{
"$ref": "#/$defs/converter"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,12 @@ public void shouldValidateCacheOptionsExtractHeaders()

assertThat(config, not(nullValue()));
}

@Test
public void shouldValidateCacheOptionsExtractKeyAndHeaders()
{
JsonObject config = schema.validate("cache.options.extract.key.and.headers.yaml");

assertThat(config, not(nullValue()));
}
}

0 comments on commit 949cfca

Please sign in to comment.