Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mq(ticdc): implement producer interfrace using kafka-go #8146

Merged
merged 13 commits into from
Feb 1, 2023
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,5 @@ replace github.com/benbjohnson/clock v1.3.0 => github.com/benbjohnson/clock v1.1

// copy from TiDB
replace go.opencensus.io => go.opencensus.io v0.23.1-0.20220331163232-052120675fac

replace github.com/segmentio/kafka-go v0.4.38 => github.com/3AceShowHand/kafka-go v0.0.0-20230109084533-cd584ac8c705
sdojjy marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ cloud.google.com/go/storage v1.21.0/go.mod h1:XmRlxkgPjlBONznT2dDUU/5XlpU2OjMnKu
cloud.google.com/go/storage v1.22.1 h1:F6IlQJZrZM++apn9V5/VfS3gbTUYg98PS3EMQAzqtfg=
cloud.google.com/go/storage v1.22.1/go.mod h1:S8N1cAStu7BOeFfE8KAQzmyyLkK8p/vmRq6kuBTW58Y=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/3AceShowHand/kafka-go v0.0.0-20230109084533-cd584ac8c705 h1:Z++RzhDGHRmCLXCfGtr0q9WvLL0qooLuIfdnquRAE68=
github.com/3AceShowHand/kafka-go v0.0.0-20230109084533-cd584ac8c705/go.mod h1:ikyuGon/60MN/vXFgykf7Zm8P5Be49gJU6vezwjnnhU=
github.com/AlekSi/gocov-xml v1.0.0/go.mod h1:J0qYeZ6tDg4oZubW9mAAgxlqw39PDfoEkzB3HXSbEuA=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.20.0 h1:KQgdWmEOmaJKxaUUZwHAYh12t+b+ZJf8q3friycK1kA=
Expand Down Expand Up @@ -1110,8 +1112,6 @@ github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZ
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/segmentio/kafka-go v0.4.38 h1:iQdOBbUSdfuYlFpvjuALgj7N6DrdPA0HfB4AhREOdtg=
github.com/segmentio/kafka-go v0.4.38/go.mod h1:ikyuGon/60MN/vXFgykf7Zm8P5Be49gJU6vezwjnnhU=
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/shhdgit/testfixtures/v3 v3.6.2-0.20211219171712-c4f264d673d3/go.mod h1:Z0OLtuFJ7Y4yLsVijHK8uq95NjGFlYJy+I00ElAEtUQ=
Expand Down
14 changes: 13 additions & 1 deletion pkg/sink/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ import (
"go.uber.org/zap"
)

const (
// SASLTypePlaintext represents the plain mechanism
SASLTypePlaintext = "PLAIN"
// SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism.
SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
// SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
// SASLTypeGSSAPI represents the gssapi mechanism.
SASLTypeGSSAPI = "GSSAPI"
)

// Options stores user specified configurations
type Options struct {
BrokerEndpoints []string
Expand Down Expand Up @@ -340,7 +351,8 @@ var (
commonInvalidChar = regexp.MustCompile(`[\?:,"]`)
)

func newKafkaClientID(role, captureAddr string,
// NewKafkaClientID generates kafka client id
func NewKafkaClientID(role, captureAddr string,
changefeedID model.ChangeFeedID,
configuredClientID string,
) (clientID string, err error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/kafka/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestClientID(t *testing.T) {
},
}
for _, tc := range testCases {
id, err := newKafkaClientID(tc.role, tc.addr,
id, err := NewKafkaClientID(tc.role, tc.addr,
model.DefaultChangeFeedID(tc.changefeedID), tc.configuredID)
if tc.hasError {
require.Error(t, err)
Expand Down
10 changes: 5 additions & 5 deletions pkg/sink/kafka/sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewSaramaConfig(ctx context.Context, o *Options) (*sarama.Config, error) {
captureAddr := contextutil.CaptureAddrFromCtx(ctx)
changefeedID := contextutil.ChangefeedIDFromCtx(ctx)

config.ClientID, err = newKafkaClientID(role, captureAddr, changefeedID, o.ClientID)
config.ClientID, err = NewKafkaClientID(role, captureAddr, changefeedID, o.ClientID)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -139,19 +139,19 @@ func completeSaramaSASLConfig(config *sarama.Config, o *Options) {
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLMechanism(o.SASL.SASLMechanism)
switch o.SASL.SASLMechanism {
case sarama.SASLTypeSCRAMSHA256, sarama.SASLTypeSCRAMSHA512, sarama.SASLTypePlaintext:
case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512, SASLTypePlaintext:
config.Net.SASL.User = o.SASL.SASLUser
config.Net.SASL.Password = o.SASL.SASLPassword
if strings.EqualFold(string(o.SASL.SASLMechanism), sarama.SASLTypeSCRAMSHA256) {
if strings.EqualFold(string(o.SASL.SASLMechanism), SASLTypeSCRAMSHA256) {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &security.XDGSCRAMClient{HashGeneratorFcn: security.SHA256}
}
} else if strings.EqualFold(string(o.SASL.SASLMechanism), sarama.SASLTypeSCRAMSHA512) {
} else if strings.EqualFold(string(o.SASL.SASLMechanism), SASLTypeSCRAMSHA512) {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &security.XDGSCRAMClient{HashGeneratorFcn: security.SHA512}
}
}
case sarama.SASLTypeGSSAPI:
case SASLTypeGSSAPI:
config.Net.SASL.GSSAPI.AuthType = int(o.SASL.GSSAPI.AuthType)
config.Net.SASL.GSSAPI.Username = o.SASL.GSSAPI.Username
config.Net.SASL.GSSAPI.ServiceName = o.SASL.GSSAPI.ServiceName
Expand Down
Loading