Skip to content

Commit

Permalink
sink(ticdc): remove pulsar sink (#6983)
Browse files Browse the repository at this point in the history
close #7087
  • Loading branch information
Rustin170506 authored Sep 16, 2022
1 parent 8438de5 commit de294a6
Show file tree
Hide file tree
Showing 19 changed files with 3 additions and 750 deletions.
40 changes: 0 additions & 40 deletions cdc/sink/mq/manager/pulsar_manager.go

This file was deleted.

48 changes: 0 additions & 48 deletions cdc/sink/mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/pingcap/tiflow/cdc/sink/mq/manager"
"github.com/pingcap/tiflow/cdc/sink/mq/producer"
"github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka"
"github.com/pingcap/tiflow/cdc/sink/mq/producer/pulsar"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -472,50 +471,3 @@ func NewKafkaSaramaSink(ctx context.Context, sinkURI *url.URL,
}
return sink, nil
}

// NewPulsarSink creates a new Pulsar mqSink.
func NewPulsarSink(ctx context.Context, sinkURI *url.URL,
replicaConfig *config.ReplicaConfig, errCh chan error,
) (*mqSink, error) {
log.Warn("Pulsar Sink is not recommended for production use.")
s := sinkURI.Query().Get(config.ProtocolKey)
if s != "" {
replicaConfig.Sink.Protocol = s
}

var protocol config.Protocol
if err := protocol.FromString(replicaConfig.Sink.Protocol); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}

encoderConfig := common.NewConfig(protocol)
if err := encoderConfig.Apply(sinkURI, replicaConfig); err != nil {
return nil, errors.Trace(err)
}
// todo: set by pulsar producer's `max.message.bytes`
// encoderConfig = encoderConfig.WithMaxMessageBytes()
if err := encoderConfig.Validate(); err != nil {
return nil, errors.Trace(err)
}

producer, err := pulsar.NewProducer(sinkURI, errCh)
if err != nil {
return nil, errors.Trace(err)
}
fakeTopicManager := manager.NewPulsarTopicManager(
producer.GetPartitionNum(),
)
sink, err := newMqSink(
ctx,
fakeTopicManager,
producer,
"",
replicaConfig,
encoderConfig,
errCh,
)
if err != nil {
return nil, errors.Trace(err)
}
return sink, nil
}
32 changes: 0 additions & 32 deletions cdc/sink/mq/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/Shopify/sarama"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec/open"
Expand Down Expand Up @@ -165,37 +164,6 @@ func TestKafkaSink(t *testing.T) {
}
}

func TestPulsarSinkEncoderConfig(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

err := failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/mq/producer/pulsar/MockPulsar",
"return(true)")
require.Nil(t, err)

uri := "pulsar://127.0.0.1:1234/kafka-test?" +
"max-message-bytes=4194304&max-batch-size=1&protocol=open-protocol"

sinkURI, err := url.Parse(uri)
require.Nil(t, err)
replicaConfig := config.GetDefaultReplicaConfig()
errCh := make(chan error, 1)

sink, err := NewPulsarSink(ctx, sinkURI, replicaConfig, errCh)
require.Nil(t, err)

encoder := sink.encoderBuilder.Build()
require.IsType(t, &open.BatchEncoder{}, encoder)
require.Equal(t, 1, encoder.(*open.BatchEncoder).MaxBatchSize)
require.Equal(t, 4194304, encoder.(*open.BatchEncoder).MaxMessageBytes)

// FIXME: mock pulsar client doesn't support close,
// so we can't call sink.Close() to close it.
// We will leak goroutine if we don't close it.
cancel()
sink.flushWorker.close()
sink.resolvedBuffer.Close()
}

func TestFlushRowChangedEvents(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

Expand Down
32 changes: 0 additions & 32 deletions cdc/sink/mq/producer/pulsar/doc.go

This file was deleted.

191 changes: 0 additions & 191 deletions cdc/sink/mq/producer/pulsar/option.go

This file was deleted.

Loading

0 comments on commit de294a6

Please sign in to comment.