Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avro: Add name strategy option for schema registry #3936

Closed
wants to merge 10 commits into from
21 changes: 19 additions & 2 deletions cdc/sink/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,13 +542,30 @@ func newAvroEventBatchEncoderBuilder(credential *security.Credential, opts map[s
return nil, cerror.ErrPrepareAvroFailed.GenWithStack(`Avro protocol requires parameter "registry"`)
}

topic, ok := opts["topic"]
if !ok {
topic = ""
}

nameStrategy, ok := opts["nameStrategy"]
dveeden marked this conversation as resolved.
Show resolved Hide resolved
if !ok {
dveeden marked this conversation as resolved.
Show resolved Hide resolved
switch nameStrategy {
case "topic":
nameStrategy = "topic"
case "legacy", "":
nameStrategy = "legacy"
default:
return nil, fmt.Errorf("invalid name strategy %s", nameStrategy)
}
}

ctx := context.Background()
keySchemaManager, err := NewAvroSchemaManager(ctx, credential, registryURI, keySchemaSuffix)
keySchemaManager, err := NewAvroSchemaManager(ctx, credential, registryURI, keySchemaSuffix, topic, nameStrategy)
if err != nil {
return nil, errors.Trace(err)
}

valueSchemaManager, err := NewAvroSchemaManager(ctx, credential, registryURI, valueSchemaSuffix)
valueSchemaManager, err := NewAvroSchemaManager(ctx, credential, registryURI, valueSchemaSuffix, topic, nameStrategy)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
15 changes: 14 additions & 1 deletion cdc/sink/codec/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import (
type AvroSchemaManager struct {
registryURL string
subjectSuffix string
topic string
nameStrategy string

credential *security.Credential

Expand Down Expand Up @@ -73,7 +75,7 @@ type lookupResponse struct {

// NewAvroSchemaManager creates a new AvroSchemaManager
func NewAvroSchemaManager(
ctx context.Context, credential *security.Credential, registryURL string, subjectSuffix string,
ctx context.Context, credential *security.Credential, registryURL string, subjectSuffix string, topic string, nameStrategy string,
) (*AvroSchemaManager, error) {
registryURL = strings.TrimRight(registryURL, "/")
// Test connectivity to the Schema Registry
Expand Down Expand Up @@ -108,6 +110,8 @@ func NewAvroSchemaManager(
registryURL: registryURL,
cache: make(map[string]*schemaCacheEntry, 1),
subjectSuffix: subjectSuffix,
topic: topic,
nameStrategy: nameStrategy,
credential: credential,
}, nil
}
Expand Down Expand Up @@ -385,6 +389,12 @@ func httpRetry(ctx context.Context, credential *security.Credential, r *http.Req
if resp.StatusCode >= 200 && resp.StatusCode < 300 || (resp.StatusCode == 404 && allow404) {
break
}

// Avoid retrying "HTTP 409 Conflict"
if resp.StatusCode == 409 {
return nil, errors.New("HTTP server returned status code 409, indicating conflict")
}

log.Warn("HTTP server returned with error", zap.Int("status", resp.StatusCode))
_ = resp.Body.Close()

Expand All @@ -404,5 +414,8 @@ func httpRetry(ctx context.Context, credential *security.Credential, r *http.Req

func (m *AvroSchemaManager) tableNameToSchemaSubject(tableName model.TableName) string {
// We should guarantee unique names for subjects
if m.nameStrategy == "topic" {
return m.topic + m.subjectSuffix
}
return tableName.Schema + "_" + tableName.Table + m.subjectSuffix
}
1 change: 1 addition & 0 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi
if topic == "" {
return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("no topic is specified in sink-uri")
}
opts["topic"] = topic
dveeden marked this conversation as resolved.
Show resolved Hide resolved

sProducer, err := kafka.NewKafkaSaramaProducer(ctx, topic, producerConfig, errCh)
if err != nil {
Expand Down