Skip to content

Commit

Permalink
Add fetch_in_order field to schema_registry input
Browse files Browse the repository at this point in the history
Signed-off-by: Mihai Todor <todormihai@gmail.com>
  • Loading branch information
mihaitodor committed Sep 26, 2024
1 parent 735128b commit 141ec00
Show file tree
Hide file tree
Showing 7 changed files with 353 additions and 207 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ All notable changes to this project will be documented in this file.
- Field `metadata_max_age` added to the `kafka_migrator` input. (@mihaitodor)
- New experimental `cypher` output. (@rockwotj)
- New experimental `couchbase` output. (@rockwotj)
- Field `fetch_in_order` added to the `schema_registry` input. (@mihaitodor)

### Fixed

Expand Down
10 changes: 10 additions & 0 deletions docs/modules/components/pages/inputs/schema_registry.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ input:
url: "" # No default (required)
include_deleted: false
subject_filter: ""
fetch_in_order: true
tls:
enabled: false
skip_cert_verify: false
Expand Down Expand Up @@ -148,6 +149,15 @@ Include only subjects which match the regular expression filter. All subjects ar
*Default*: `""`
=== `fetch_in_order`
Fetch all schemas on connect and sort them by ID. Should be set to `true` when schema references are used.
*Type*: `bool`
*Default*: `true`
=== `tls`
Custom TLS settings can be used to override system defaults.
Expand Down
121 changes: 107 additions & 14 deletions internal/impl/confluent/sr/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,26 +97,26 @@ type SchemaReference struct {
func (c *Client) GetSchemaByID(ctx context.Context, id int) (resPayload SchemaInfo, err error) {
var resCode int
var resBody []byte
if resCode, resBody, err = c.doRequest(ctx, "GET", fmt.Sprintf("/schemas/ids/%v", id)); err != nil {
err = fmt.Errorf("request failed for schema '%v': %v", id, err)
if resCode, resBody, err = c.doRequest(ctx, http.MethodGet, fmt.Sprintf("/schemas/ids/%d", id), nil); err != nil {
err = fmt.Errorf("request failed for schema '%d': %s", id, err)
c.mgr.Logger().Errorf(err.Error())
return
}

if resCode == http.StatusNotFound {
err = fmt.Errorf("schema '%v' not found by registry", id)
err = fmt.Errorf("schema '%d' not found by registry", id)
c.mgr.Logger().Errorf(err.Error())
return
}

if len(resBody) == 0 {
c.mgr.Logger().Errorf("request for schema '%v' returned an empty body", id)
c.mgr.Logger().Errorf("request for schema '%d' returned an empty body", id)
err = errors.New("schema request returned an empty body")
return
}

if err = json.Unmarshal(resBody, &resPayload); err != nil {
c.mgr.Logger().Errorf("failed to parse response for schema '%v': %v", id, err)
c.mgr.Logger().Errorf("failed to parse response for schema '%d': %s", id, err)
return
}
return
Expand All @@ -126,38 +126,121 @@ func (c *Client) GetSchemaByID(ctx context.Context, id int) (resPayload SchemaIn
func (c *Client) GetSchemaBySubjectAndVersion(ctx context.Context, subject string, version *int) (resPayload SchemaInfo, err error) {
var path string
if version != nil {
path = fmt.Sprintf("/subjects/%s/versions/%v", url.PathEscape(subject), *version)
path = fmt.Sprintf("/subjects/%s/versions/%d", url.PathEscape(subject), *version)
} else {
path = fmt.Sprintf("/subjects/%s/versions/latest", url.PathEscape(subject))
}

var resCode int
var resBody []byte
if resCode, resBody, err = c.doRequest(ctx, "GET", path); err != nil {
err = fmt.Errorf("request failed for schema subject '%v': %v", subject, err)
if resCode, resBody, err = c.doRequest(ctx, http.MethodGet, path, nil); err != nil {
err = fmt.Errorf("request failed for schema subject %q: %s", subject, err)
c.mgr.Logger().Errorf(err.Error())
return
}

if resCode == http.StatusNotFound {
err = fmt.Errorf("schema subject '%v' not found by registry", subject)
err = fmt.Errorf("schema subject %q not found by registry", subject)
c.mgr.Logger().Errorf(err.Error())
return
}

if len(resBody) == 0 {
c.mgr.Logger().Errorf("request for schema subject '%v' returned an empty body", subject)
c.mgr.Logger().Errorf("request for schema subject %q returned an empty body", subject)
err = errors.New("schema request returned an empty body")
return
}

if err = json.Unmarshal(resBody, &resPayload); err != nil {
c.mgr.Logger().Errorf("failed to parse response for schema subject '%v': %v", subject, err)
c.mgr.Logger().Errorf("failed to parse response for schema subject %q: %s", subject, err)
return
}
return
}

// GetMode returns the mode of the Schema Registry instance.
func (c *Client) GetMode(ctx context.Context) (string, error) {
var resCode int
var body []byte
var err error
if resCode, body, err = c.doRequest(ctx, http.MethodGet, "/mode", nil); err != nil {
return "", fmt.Errorf("request failed: %s", err)
}

if resCode != http.StatusOK {
return "", fmt.Errorf("request returned status: %d", resCode)
}

var payload struct {
Mode string
}
if err := json.Unmarshal(body, &payload); err != nil {
return "", fmt.Errorf("failed to unmarshal response: %s", err)
}

return payload.Mode, nil
}

// GetSubjects returns the registered subjects.
func (c *Client) GetSubjects(ctx context.Context) ([]string, error) {
var resCode int
var body []byte
var err error
if resCode, body, err = c.doRequest(ctx, http.MethodGet, "/subjects", nil); err != nil {
return nil, fmt.Errorf("request failed: %s", err)
}

if resCode != http.StatusOK {
return nil, fmt.Errorf("request returned status: %d", resCode)
}

var subjects []string
if err := json.Unmarshal(body, &subjects); err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %s", err)
}

return subjects, nil
}

// GetVersionsForSubject returns the versions for a given subject.
func (c *Client) GetVersionsForSubject(ctx context.Context, subject string) ([]int, error) {
path := fmt.Sprintf("/subjects/%s/versions", url.PathEscape(subject))
var resCode int
var body []byte
var err error
if resCode, body, err = c.doRequest(ctx, http.MethodGet, path, nil); err != nil {
return nil, fmt.Errorf("request failed: %s", err)
}

if resCode != http.StatusOK {
return nil, fmt.Errorf("request returned status: %d", resCode)
}

var versions []int
if err := json.Unmarshal(body, &versions); err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %s", err)
}

return versions, nil
}

// CreateSchema creates a new schema for the given subject.
func (c *Client) CreateSchema(ctx context.Context, subject string, data []byte) error {
path := fmt.Sprintf("/subjects/%s/versions", url.PathEscape(subject))

var resCode int
var err error
if resCode, _, err = c.doRequest(ctx, http.MethodPost, path, data); err != nil {
return fmt.Errorf("request failed: %s", err)
}

if resCode != http.StatusOK {
return fmt.Errorf("request returned status: %d", resCode)
}

return nil
}

type refWalkFn func(ctx context.Context, name string, info SchemaInfo) error

// WalkReferences goes through the provided schema info and for each reference
Expand Down Expand Up @@ -193,7 +276,7 @@ func (c *Client) walkReferencesTracked(ctx context.Context, seen map[string]int,
return nil
}

func (c *Client) doRequest(ctx context.Context, verb, reqPath string) (resCode int, resBody []byte, err error) {
func (c *Client) doRequest(ctx context.Context, verb, reqPath string, body []byte) (resCode int, resBody []byte, err error) {
reqURL := *c.SchemaRegistryBaseURL
if reqURL.Path, err = url.JoinPath(reqURL.Path, reqPath); err != nil {
return
Expand All @@ -208,11 +291,21 @@ func (c *Client) doRequest(ctx context.Context, verb, reqPath string) (resCode i
}
}

var bodyReader io.Reader
if len(body) > 0 {
bodyReader = bytes.NewReader(body)
} else {
bodyReader = http.NoBody
}
var req *http.Request
if req, err = http.NewRequestWithContext(ctx, verb, reqURLString, http.NoBody); err != nil {
if req, err = http.NewRequestWithContext(ctx, verb, reqURLString, bodyReader); err != nil {
return
}
req.Header.Add("Accept", "application/vnd.schemaregistry.v1+json")
headerKey := "Accept"
if verb == http.MethodPost {
headerKey = "Content-Type"
}
req.Header.Add(headerKey, "application/vnd.schemaregistry.v1+json")
if err = c.requestSigner(c.mgr.FS(), req); err != nil {
return
}
Expand Down
78 changes: 66 additions & 12 deletions internal/impl/kafka/enterprise/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,9 @@ func TestSchemaRegistryIntegration(t *testing.T) {
name string
schema string
includeSoftDeletedSubjects bool
extraSubject string
subjectFilter string
schemaWithReference string
}{
{
name: "roundtrip",
Expand All @@ -312,7 +314,15 @@ func TestSchemaRegistryIntegration(t *testing.T) {
{
name: "roundtrip with subject filter",
schema: `{"name":"foo", "type": "string"}`,
subjectFilter: `\w+-\w+-\w+-\w+-\w+`,
extraSubject: "foobar",
subjectFilter: `^\w+-\w+-\w+-\w+-\w+$`,
},
{
name: "roundtrip with schema references",
schema: `{"name":"foo", "type": "string"}`,
// A UUID which always gets picked first when querying the `/subjects` endpoint.
extraSubject: "ffffffff-ffff-ffff-ffff-ffffffffffff",
schemaWithReference: `{"name":"bar", "type": "record", "fields":[{"name":"data", "type": "foo"}]}`,
},
}

Expand All @@ -339,20 +349,27 @@ func TestSchemaRegistryIntegration(t *testing.T) {
u4, err := uuid.NewV4()
require.NoError(t, err)
subject := u4.String()
extraSubject := "foobar"

t.Cleanup(func() {
cleanupSubject(sourcePort, subject, false)
cleanupSubject(sourcePort, subject, true)
cleanupSubject(sinkPort, subject, false)
cleanupSubject(sinkPort, subject, true)

if test.extraSubject != "" {
cleanupSubject(sourcePort, test.extraSubject, false)
cleanupSubject(sourcePort, test.extraSubject, true)
cleanupSubject(sinkPort, test.extraSubject, false)
cleanupSubject(sinkPort, test.extraSubject, true)
}
})

postContentType := "application/vnd.schemaregistry.v1+json"
type payload struct {
Subject string `json:"subject,omitempty"`
Version int `json:"version,omitempty"`
Schema string `json:"schema"`
Subject string `json:"subject,omitempty"`
Version int `json:"version,omitempty"`
Schema string `json:"schema"`
References []map[string]any `json:"references,omitempty"`
}
body, err := json.Marshal(payload{Schema: test.schema})
require.NoError(t, err)
Expand All @@ -363,11 +380,13 @@ func TestSchemaRegistryIntegration(t *testing.T) {
resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, http.StatusOK, resp.StatusCode)

if test.subjectFilter != "" {
resp, err = http.DefaultClient.Post(fmt.Sprintf("http://localhost:%s/subjects/%s/versions", sourcePort, extraSubject), postContentType, bytes.NewReader(body))
resp, err = http.DefaultClient.Post(fmt.Sprintf("http://localhost:%s/subjects/%s/versions", sourcePort, test.extraSubject), postContentType, bytes.NewReader(body))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, http.StatusOK, resp.StatusCode)
}

if test.includeSoftDeletedSubjects {
Expand All @@ -376,6 +395,19 @@ func TestSchemaRegistryIntegration(t *testing.T) {
resp, err = http.DefaultClient.Do(req)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, http.StatusOK, resp.StatusCode)
}

if test.schemaWithReference != "" {
body, err := json.Marshal(payload{Schema: test.schemaWithReference, References: []map[string]any{{"name": "foo", "subject": subject, "version": 1}}})
require.NoError(t, err)
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://localhost:%s/subjects/%s/versions", sourcePort, test.extraSubject), bytes.NewReader(body))
require.NoError(t, err)
req.Header.Set("Content-Type", postContentType)
resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, http.StatusOK, resp.StatusCode)
}

streamBuilder := service.NewStreamBuilder()
Expand All @@ -385,11 +417,17 @@ input:
url: http://localhost:%s
include_deleted: %t
subject_filter: %s
fetch_in_order: %t
output:
schema_registry:
url: http://localhost:%s
subject: ${! @schema_registry_subject }
`, sourcePort, test.includeSoftDeletedSubjects, test.subjectFilter, sinkPort)))
fallback:
- schema_registry:
url: http://localhost:%s
subject: ${! @schema_registry_subject }
# Preserve schema order.
max_in_flight: 1
# Don't retry the same message multiple times so we do fail if schemas with references are sent in the wrong order
- drop: {}
`, sourcePort, test.includeSoftDeletedSubjects, test.subjectFilter, test.schemaWithReference != "", sinkPort)))
require.NoError(t, streamBuilder.SetLoggerYAML(`level: OFF`))

stream, err := streamBuilder.Build()
Expand All @@ -398,7 +436,8 @@ output:
ctx, done := context.WithTimeout(context.Background(), 3*time.Second)
defer done()

require.NoError(t, stream.Run(ctx))
err = stream.Run(ctx)
require.NoError(t, err)

resp, err = http.DefaultClient.Get(fmt.Sprintf("http://localhost:%s/subjects", sinkPort))
require.NoError(t, err)
Expand All @@ -408,7 +447,7 @@ output:
require.Equal(t, http.StatusOK, resp.StatusCode)
if test.subjectFilter != "" {
assert.Contains(t, string(body), subject)
assert.NotContains(t, string(body), extraSubject)
assert.NotContains(t, string(body), test.extraSubject)
}

resp, err = http.DefaultClient.Get(fmt.Sprintf("http://localhost:%s/subjects/%s/versions/1", sinkPort, subject))
Expand All @@ -423,6 +462,21 @@ output:
assert.Equal(t, subject, p.Subject)
assert.Equal(t, 1, p.Version)
assert.JSONEq(t, test.schema, p.Schema)

if test.schemaWithReference != "" {
resp, err = http.DefaultClient.Get(fmt.Sprintf("http://localhost:%s/subjects/%s/versions/1", sinkPort, test.extraSubject))
require.NoError(t, err)
body, err = io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, http.StatusOK, resp.StatusCode)

var p payload
require.NoError(t, json.Unmarshal(body, &p))
assert.Equal(t, test.extraSubject, p.Subject)
assert.Equal(t, 1, p.Version)
assert.JSONEq(t, test.schemaWithReference, p.Schema)
}
})
}
}
Expand Down
Loading

0 comments on commit 141ec00

Please sign in to comment.