Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Connect Avro Support Part 1: AvroSchemaManager #687

Merged
merged 15 commits into from
Jul 6, 2020

Conversation

liuzix
Copy link
Contributor

@liuzix liuzix commented Jun 23, 2020

What problem does this PR solve?

What is changed and how it works?

Check List

Tests

  • Unit test

Code changes

  • Has exported function/method change

Release note

This is the first PR that will lead to support for outputting Avro-serialized data to Kafka. See more in #660. After research and deliberation, we gave up on syncing the Registry Schema ID through Etcd.

@CLAassistant
Copy link

CLAassistant commented Jun 23, 2020

CLA assistant check
All committers have signed the CLA.

@amyangfei amyangfei added the component/sink Sink component. label Jun 24, 2020
@liuzix liuzix marked this pull request as ready for review June 24, 2020 10:06
uri := m.registryURL + "/subjects/" + url.QueryEscape(tableNameToSchemaSubject(tableName)) + "/versions/latest"
log.Debug("Querying for latest schema", zap.String("uri", uri))

req, err := http.NewRequest("GET", uri, nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please leave a TODO about TLS support.

zap.Int("status", resp.StatusCode),
zap.String("uri", uri),
zap.ByteString("requestBody", payload),
zap.ByteString("responseBody", body))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it contain sensitive data? Eg. password or userdata.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the user configures a registry URL that itself contains sensitive data, e.g. http://username:password@example.com, then it is possible. Other data are strictly about the schema, so they should not contain user data.

}

if jsonResp.ID == 0 {
return errors.New(fmt.Sprintf("Illegal schema ID returned from Registry %d", jsonResp.ID))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return errors.New(fmt.Sprintf("Illegal schema ID returned from Registry %d", jsonResp.ID))
return errors.Errorf("Illegal schema ID returned from Registry %d", jsonResp.ID)

log.Info("Registered schema successfully",
zap.Int("id", jsonResp.ID),
zap.String("uri", uri),
zap.ByteString("body", body))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ByteString is not readable it produces something like "\u00FF". What about "zap.String"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ByteString is readable when it contains only printable characters, and per my understanding, a JSON body should not contain any unprintable characters. If it does, then something suspicious is definitely happening, so some log output is necessary.

},
{
"type": [
"null",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"null",
"null",

Please avoid mixing tab and space.

var regexRemoveSpaces = regexp.MustCompile(`\s`)

// Register the latest schema for a table to the Registry, by passing in a Codec
func (m *AvroSchemaManager) Register(tableName model.TableName, codec *goavro.Codec) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any go package handles registering schema?

cdc/sink/schema_registry.go Outdated Show resolved Hide resolved
cdc/sink/schema_registry_test.go Outdated Show resolved Hide resolved
@amyangfei amyangfei added this to the v4.0.3 milestone Jul 1, 2020
@amyangfei amyangfei added the status/ptal Could you please take a look? label Jul 1, 2020
cdc/sink/codec/schema_registry.go Outdated Show resolved Hide resolved
cdc/sink/codec/schema_registry.go Outdated Show resolved Hide resolved
cdc/sink/codec/schema_registry.go Show resolved Hide resolved
Copy link
Contributor

@zier-one zier-one left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@amyangfei
Copy link
Contributor

Please fix data race in the unit test

@amyangfei
Copy link
Contributor

/run-integration-tests

Copy link
Contributor

@amyangfei amyangfei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@amyangfei amyangfei added LGT2 and removed status/ptal Could you please take a look? labels Jul 6, 2020
@codecov-commenter
Copy link

Codecov Report

Merging #687 into master will not change coverage.
The diff coverage is n/a.

@@             Coverage Diff             @@
##             master       #687   +/-   ##
===========================================
  Coverage   32.3914%   32.3914%           
===========================================
  Files            91         91           
  Lines          9555       9555           
===========================================
  Hits           3095       3095           
  Misses         6196       6196           
  Partials        264        264           

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component/sink Sink component.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants