Skip to content

Commit

Permalink
Add fetch_all_schemas_on_connect field to schema_registry input
Browse files Browse the repository at this point in the history
Signed-off-by: Mihai Todor <todormihai@gmail.com>
  • Loading branch information
mihaitodor committed Sep 25, 2024
1 parent feee6c0 commit ee05f61
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 61 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ All notable changes to this project will be documented in this file.
- Field `metadata_max_age` added to the `kafka_migrator` input. (@mihaitodor)
- New experimental `cypher` output. (@rockwotj)
- New experimental `couchbase` output. (@rockwotj)
- Field `fetch_all_schemas_on_connect` added to the `schema_registry` input. (@mihaitodor)

### Fixed

Expand Down
10 changes: 10 additions & 0 deletions docs/modules/components/pages/inputs/schema_registry.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ input:
url: "" # No default (required)
include_deleted: false
subject_filter: ""
fetch_all_schemas_on_connect: true
tls:
enabled: false
skip_cert_verify: false
Expand Down Expand Up @@ -148,6 +149,15 @@ Include only subjects which match the regular expression filter. All subjects ar
*Default*: `""`
=== `fetch_all_schemas_on_connect`
Fetch all schemas on connect. Set this to `true` when schema references are used.
*Type*: `bool`
*Default*: `true`
=== `tls`
Custom TLS settings can be used to override system defaults.
Expand Down
77 changes: 65 additions & 12 deletions internal/impl/kafka/enterprise/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,9 @@ func TestSchemaRegistryIntegration(t *testing.T) {
name string
schema string
includeSoftDeletedSubjects bool
extraSubject string
subjectFilter string
schemaWithReference string
}{
{
name: "roundtrip",
Expand All @@ -312,7 +314,15 @@ func TestSchemaRegistryIntegration(t *testing.T) {
{
name: "roundtrip with subject filter",
schema: `{"name":"foo", "type": "string"}`,
subjectFilter: `\w+-\w+-\w+-\w+-\w+`,
extraSubject: "foobar",
subjectFilter: `^\w+-\w+-\w+-\w+-\w+$`,
},
{
name: "roundtrip with schema references",
schema: `{"name":"foo", "type": "string"}`,
// A UUID which always gets picked first when querying the `/subjects` endpoint.
extraSubject: "ffffffff-ffff-ffff-ffff-ffffffffffff",
schemaWithReference: `{"name":"bar", "type": "record", "fields":[{"name":"data", "type": "foo"}]}`,
},
}

Expand All @@ -339,20 +349,27 @@ func TestSchemaRegistryIntegration(t *testing.T) {
u4, err := uuid.NewV4()
require.NoError(t, err)
subject := u4.String()
extraSubject := "foobar"

t.Cleanup(func() {
cleanupSubject(sourcePort, subject, false)
cleanupSubject(sourcePort, subject, true)
cleanupSubject(sinkPort, subject, false)
cleanupSubject(sinkPort, subject, true)

if test.extraSubject != "" {
cleanupSubject(sourcePort, test.extraSubject, false)
cleanupSubject(sourcePort, test.extraSubject, true)
cleanupSubject(sinkPort, test.extraSubject, false)
cleanupSubject(sinkPort, test.extraSubject, true)
}
})

postContentType := "application/vnd.schemaregistry.v1+json"
type payload struct {
Subject string `json:"subject,omitempty"`
Version int `json:"version,omitempty"`
Schema string `json:"schema"`
Subject string `json:"subject,omitempty"`
Version int `json:"version,omitempty"`
Schema string `json:"schema"`
References []map[string]any `json:"references,omitempty"`
}
body, err := json.Marshal(payload{Schema: test.schema})
require.NoError(t, err)
Expand All @@ -363,11 +380,13 @@ func TestSchemaRegistryIntegration(t *testing.T) {
resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, http.StatusOK, resp.StatusCode)

if test.subjectFilter != "" {
resp, err = http.DefaultClient.Post(fmt.Sprintf("http://localhost:%s/subjects/%s/versions", sourcePort, extraSubject), postContentType, bytes.NewReader(body))
resp, err = http.DefaultClient.Post(fmt.Sprintf("http://localhost:%s/subjects/%s/versions", sourcePort, test.extraSubject), postContentType, bytes.NewReader(body))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, http.StatusOK, resp.StatusCode)
}

if test.includeSoftDeletedSubjects {
Expand All @@ -376,6 +395,19 @@ func TestSchemaRegistryIntegration(t *testing.T) {
resp, err = http.DefaultClient.Do(req)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, http.StatusOK, resp.StatusCode)
}

if test.schemaWithReference != "" {
body, err := json.Marshal(payload{Schema: test.schemaWithReference, References: []map[string]any{{"name": "foo", "subject": subject, "version": 1}}})
require.NoError(t, err)
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://localhost:%s/subjects/%s/versions", sourcePort, test.extraSubject), bytes.NewReader(body))
require.NoError(t, err)
req.Header.Set("Content-Type", postContentType)
resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, http.StatusOK, resp.StatusCode)
}

streamBuilder := service.NewStreamBuilder()
Expand All @@ -385,11 +417,16 @@ input:
url: http://localhost:%s
include_deleted: %t
subject_filter: %s
fetch_all_schemas_on_connect: %t
output:
schema_registry:
url: http://localhost:%s
subject: ${! @schema_registry_subject }
`, sourcePort, test.includeSoftDeletedSubjects, test.subjectFilter, sinkPort)))
fallback:
- schema_registry:
url: http://localhost:%s
subject: ${! @schema_registry_subject }
max_in_flight: 1
# Don't retry the same message multiple times so we do fail if schemas with references are sent in the wrong order
- drop: {}
`, sourcePort, test.includeSoftDeletedSubjects, test.subjectFilter, test.schemaWithReference != "", sinkPort)))
require.NoError(t, streamBuilder.SetLoggerYAML(`level: OFF`))

stream, err := streamBuilder.Build()
Expand All @@ -398,7 +435,8 @@ output:
ctx, done := context.WithTimeout(context.Background(), 3*time.Second)
defer done()

require.NoError(t, stream.Run(ctx))
err = stream.Run(ctx)
require.NoError(t, err)

resp, err = http.DefaultClient.Get(fmt.Sprintf("http://localhost:%s/subjects", sinkPort))
require.NoError(t, err)
Expand All @@ -408,7 +446,7 @@ output:
require.Equal(t, http.StatusOK, resp.StatusCode)
if test.subjectFilter != "" {
assert.Contains(t, string(body), subject)
assert.NotContains(t, string(body), extraSubject)
assert.NotContains(t, string(body), test.extraSubject)
}

resp, err = http.DefaultClient.Get(fmt.Sprintf("http://localhost:%s/subjects/%s/versions/1", sinkPort, subject))
Expand All @@ -423,6 +461,21 @@ output:
assert.Equal(t, subject, p.Subject)
assert.Equal(t, 1, p.Version)
assert.JSONEq(t, test.schema, p.Schema)

if test.schemaWithReference != "" {
resp, err = http.DefaultClient.Get(fmt.Sprintf("http://localhost:%s/subjects/%s/versions/1", sinkPort, test.extraSubject))
require.NoError(t, err)
body, err = io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, http.StatusOK, resp.StatusCode)

var p payload
require.NoError(t, json.Unmarshal(body, &p))
assert.Equal(t, test.extraSubject, p.Subject)
assert.Equal(t, 1, p.Version)
assert.JSONEq(t, test.schemaWithReference, p.Schema)
}
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,33 @@ fields:
mapping: |
#!blobl
let redpandaMigratorOffsets = this.redpanda_migrator.with("seed_brokers", "consumer_group", "client_id", "rack_id", "max_message_bytes", "broker_write_max_bytes", "tls", "sasl")
let redpanda_migrator = this.redpanda_migrator.assign(
if ["topic", "key", "partition", "partitioner", "timestamp"].any(f -> this.redpanda_migrator.keys().contains(f)) {
root = throw("The topic, key, partition, partitioner and timestamp fields of the redpanda_migrator output must be left empty")
}
let rpMigratorMaxInFlight = this.redpanda_migrator.max_in_flight.or(1)
let redpandaMigrator = this.redpanda_migrator.assign(
{
"topic": "${! metadata(\"kafka_topic\").or(throw(\"missing kafka_topic metadata\")) }",
"key": "${! metadata(\"kafka_key\") }",
"partition": "${! metadata(\"kafka_partition\").or(throw(\"missing kafka_partition metadata\")) }",
"partitioner": "manual",
"timestamp": "${! metadata(\"kafka_timestamp_unix\").or(timestamp_unix()) }"
"timestamp": "${! metadata(\"kafka_timestamp_unix\").or(timestamp_unix()) }",
"max_in_flight": $rpMigratorMaxInFlight
}
)
let redpandaMigratorOffsets = this.redpanda_migrator.with("seed_brokers", "consumer_group", "client_id", "rack_id", "max_message_bytes", "broker_write_max_bytes", "tls", "sasl")
if this.schema_registry.keys().contains("subject") {
root = throw("The subject field of the schema_registry output must not be set")
}
let schema_registry = if this.schema_registry.length() > 0 { this.schema_registry.assign({"subject": "${! @schema_registry_subject }"}) }
if ["topic", "key", "partition", "partitioner", "timestamp"].any(f -> this.redpanda_migrator.keys().contains(f)) {
root = throw("The topic, key, partition, partitioner and timestamp fields of the redpanda_migrator output must be left empty")
let srMaxInFlight = this.schema_registry.max_in_flight.or(1)
let schemaRegistry = if this.schema_registry.length() > 0 {
this.schema_registry.assign({
"subject": "${! @schema_registry_subject }",
"max_in_flight": $srMaxInFlight
})
}
root = if this.redpanda_migrator.length() == 0 {
Expand Down Expand Up @@ -95,7 +104,7 @@ mapping: |
Subject '${! @schema_registry_subject }' version ${! @schema_registry_version } already has schema: ${! content() }
- output:
reject: ${! @fallback_error }
""".format($redpanda_migrator.string(), $redpandaMigratorOffsets.string(), $schema_registry.string()).parse_yaml()
""".format($redpandaMigrator.string(), $redpandaMigratorOffsets.string(), $schemaRegistry.string()).parse_yaml()
} else {
"""
switch:
Expand All @@ -120,7 +129,7 @@ mapping: |
- log:
message: |
Dropping message: ${! content() } / ${! metadata() }
""".format($redpanda_migrator.string(), $redpandaMigratorOffsets.string()).parse_yaml()
""".format($redpandaMigrator.string(), $redpandaMigratorOffsets.string()).parse_yaml()
}
tests:
Expand All @@ -131,6 +140,7 @@ tests:
max_in_flight: 1
schema_registry:
url: http://localhost:8081
max_in_flight: 1

expected:
switch:
Expand Down Expand Up @@ -169,6 +179,7 @@ tests:
- schema_registry:
subject: ${! @schema_registry_subject }
url: http://localhost:8081
max_in_flight: 1
- switch:
cases:
- check: '@fallback_error == "request returned status: 422"'
Expand Down
Loading

0 comments on commit ee05f61

Please sign in to comment.