Skip to content

Commit

Permalink
pkg/notify: fix create receiver on closed notifier
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei committed Dec 9, 2020
1 parent befdc54 commit 59fc7cd
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 23 deletions.
24 changes: 20 additions & 4 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,19 @@ func newProcessor(
sinkEmittedResolvedNotifier := new(notify.Notifier)
localResolvedNotifier := new(notify.Notifier)
localCheckpointTsNotifier := new(notify.Notifier)
sinkEmittedResolvedReceiver, err := sinkEmittedResolvedNotifier.NewReceiver(50 * time.Millisecond)
if err != nil {
return nil, err
}
localResolvedReceiver, err := localResolvedNotifier.NewReceiver(50 * time.Millisecond)
if err != nil {
return nil, err
}
localCheckpointTsReceiver, err := localCheckpointTsNotifier.NewReceiver(50 * time.Millisecond)
if err != nil {
return nil, err
}

p := &processor{
id: uuid.New().String(),
limitter: limitter,
Expand All @@ -218,14 +231,14 @@ func newProcessor(
output: make(chan *model.PolymorphicEvent, defaultOutputChanSize),

sinkEmittedResolvedNotifier: sinkEmittedResolvedNotifier,
sinkEmittedResolvedReceiver: sinkEmittedResolvedNotifier.NewReceiver(50 * time.Millisecond),
sinkEmittedResolvedReceiver: sinkEmittedResolvedReceiver,

localResolvedNotifier: localResolvedNotifier,
localResolvedReceiver: localResolvedNotifier.NewReceiver(50 * time.Millisecond),
localResolvedReceiver: localResolvedReceiver,

checkpointTs: checkpointTs,
localCheckpointTsNotifier: localCheckpointTsNotifier,
localCheckpointTsReceiver: localCheckpointTsNotifier.NewReceiver(50 * time.Millisecond),
localCheckpointTsReceiver: localCheckpointTsReceiver,

tables: make(map[int64]*tableInfo),
markTableIDs: make(map[int64]struct{}),
Expand Down Expand Up @@ -666,9 +679,12 @@ func (p *processor) globalStatusWorker(ctx context.Context) error {
lastResolvedTs uint64
watchKey = kv.GetEtcdKeyJob(p.changefeedID)
globalResolvedTsNotifier = new(notify.Notifier)
globalResolvedTsReceiver = globalResolvedTsNotifier.NewReceiver(1 * time.Second)
)
defer globalResolvedTsNotifier.Close()
globalResolvedTsReceiver, err := globalResolvedTsNotifier.NewReceiver(1 * time.Second)
if err != nil {
return err
}

updateStatus := func(changefeedStatus *model.ChangeFeedStatus) {
atomic.StoreUint64(&p.globalcheckpointTs, changefeedStatus.CheckpointTs)
Expand Down
5 changes: 4 additions & 1 deletion cdc/puller/entry_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ func (es *EntrySorter) Run(ctx context.Context) error {
}
}
})
receiver := es.resolvedNotifier.NewReceiver(1000 * time.Millisecond)
receiver, err := es.resolvedNotifier.NewReceiver(1000 * time.Millisecond)
if err != nil {
return err
}
defer es.resolvedNotifier.Close()
errg.Go(func() error {
var sorted []*model.PolymorphicEvent
Expand Down
6 changes: 5 additions & 1 deletion cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ func newMqSink(
return ret
}

resolvedReceiver, err := notifier.NewReceiver(50 * time.Millisecond)
if err != nil {
return nil, err
}
k := &mqSink{
mqProducer: mqProducer,
dispatcher: d,
Expand All @@ -137,7 +141,7 @@ func newMqSink(
partitionInput: partitionInput,
partitionResolvedTs: make([]uint64, partitionNum),
resolvedNotifier: notifier,
resolvedReceiver: notifier.NewReceiver(50 * time.Millisecond),
resolvedReceiver: resolvedReceiver,

statistics: NewStatistics(ctx, "MQ", opts),
}
Expand Down
19 changes: 15 additions & 4 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64
}

func (s *mysqlSink) flushRowChangedEvents(ctx context.Context) {
receiver := s.resolvedNotifier.NewReceiver(50 * time.Millisecond)
receiver, err := s.resolvedNotifier.NewReceiver(50 * time.Millisecond)
if err != nil {
log.Error("flush row changed events routine starts failed", zap.Error(err))
return
}
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -580,17 +584,23 @@ func newMySQLSink(

sink.execWaitNotifier = new(notify.Notifier)
sink.resolvedNotifier = new(notify.Notifier)
sink.createSinkWorkers(ctx)
err = sink.createSinkWorkers(ctx)
if err != nil {
return nil, err
}

go sink.flushRowChangedEvents(ctx)

return sink, nil
}

func (s *mysqlSink) createSinkWorkers(ctx context.Context) {
func (s *mysqlSink) createSinkWorkers(ctx context.Context) error {
s.workers = make([]*mysqlSinkWorker, s.params.workerCount)
for i := range s.workers {
receiver := s.execWaitNotifier.NewReceiver(defaultFlushInterval)
receiver, err := s.execWaitNotifier.NewReceiver(defaultFlushInterval)
if err != nil {
return err
}
worker := newMySQLSinkWorker(
s.params.maxTxnRow, i, s.metricBucketSizeCounters[i], receiver, s.execDMLs)
s.workers[i] = worker
Expand All @@ -604,6 +614,7 @@ func (s *mysqlSink) createSinkWorkers(ctx context.Context) {
}
}()
}
return nil
}

func (s *mysqlSink) notifyAndWaitExec(ctx context.Context) {
Expand Down
7 changes: 3 additions & 4 deletions cdc/sink/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,11 @@ func (s MySQLSinkSuite) TestMysqlSinkWorker(c *check.C) {
cctx, cancel := context.WithCancel(ctx)
var outputRows [][]*model.RowChangedEvent
var outputReplicaIDs []uint64
receiver, err := notifier.NewReceiver(-1)
c.Assert(err, check.IsNil)
w := newMySQLSinkWorker(tc.maxTxnRow, 1,
bucketSizeCounter.WithLabelValues("capture", "changefeed", "1"),
notifier.NewReceiver(-1),
receiver,
func(ctx context.Context, events []*model.RowChangedEvent, replicaID uint64, bucket int) error {
outputRows = append(outputRows, events)
outputReplicaIDs = append(outputReplicaIDs, replicaID)
Expand Down Expand Up @@ -1036,9 +1038,6 @@ func (s MySQLSinkSuite) TestAdjustSQLMode(c *check.C) {
sink, err := newMySQLSink(ctx, changefeed, sinkURI, f, rc, opts)
c.Assert(err, check.IsNil)

// TODO: remove this line after https://github.com/pingcap/ticdc/issues/1169 is fixed
time.Sleep(time.Millisecond * 500)

err = sink.Close()
c.Assert(err, check.IsNil)
}
Expand Down
6 changes: 5 additions & 1 deletion cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,10 @@ func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, c
}

notifier := new(notify.Notifier)
flushedReceiver, err := notifier.NewReceiver(50 * time.Millisecond)
if err != nil {
return nil, err
}
k := &kafkaSaramaProducer{
asyncClient: asyncClient,
syncClient: syncClient,
Expand All @@ -352,7 +356,7 @@ func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, c
sent uint64
}, partitionNum),
flushedNotifier: notifier,
flushedReceiver: notifier.NewReceiver(50 * time.Millisecond),
flushedReceiver: flushedReceiver,
closeCh: make(chan struct{}),
failpointCh: make(chan error, 1),
}
Expand Down
1 change: 1 addition & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ var (
ErrLoadTimezone = errors.Normalize("load timezone", errors.RFCCodeText("CDC:ErrLoadTimezone"))
ErrURLFormatInvalid = errors.Normalize("url format is invalid", errors.RFCCodeText("CDC:ErrURLFormatInvalid"))
ErrIntersectNoOverlap = errors.Normalize("span doesn't overlap: %+v vs %+v", errors.RFCCodeText("CDC:ErrIntersectNoOverlap"))
ErrOperateOnClosedNotifier = errors.Normalize("operate on a closed notifier", errors.RFCCodeText("CDC:ErrOperateOnClosedNotifier"))

// encode/decode, data format and data integrity errors
ErrInvalidRecordKey = errors.Normalize("invalid record key - %q", errors.RFCCodeText("CDC:ErrInvalidRecordKey"))
Expand Down
13 changes: 11 additions & 2 deletions pkg/notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package notify
import (
"sync"
"time"

"github.com/pingcap/ticdc/pkg/errors"
)

// Notifier provides a one-to-many notification mechanism
Expand All @@ -26,6 +28,7 @@ type Notifier struct {
}
maxIndex int
mu sync.RWMutex
closed bool
}

// Notify sends a signal to the Receivers
Expand Down Expand Up @@ -77,9 +80,12 @@ func (r *Receiver) signalTickLoop() {

// NewReceiver creates a receiver
// returns a channel to receive notifications and a function to close this receiver
func (n *Notifier) NewReceiver(tickTime time.Duration) *Receiver {
func (n *Notifier) NewReceiver(tickTime time.Duration) (*Receiver, error) {
n.mu.Lock()
defer n.mu.Unlock()
if n.closed {
return nil, errors.ErrOperateOnClosedNotifier.GenWithStackByArgs()
}
currentIndex := n.maxIndex
n.maxIndex++
receiverCh := make(chan struct{}, 1)
Expand All @@ -104,7 +110,7 @@ func (n *Notifier) NewReceiver(tickTime time.Duration) *Receiver {
rec *Receiver
index int
}{rec: rec, index: currentIndex})
return rec
return rec, nil
}

func (n *Notifier) remove(index int) {
Expand All @@ -123,6 +129,8 @@ func (n *Notifier) remove(index int) {
}

// Close closes the notify and stops all receiver in this notifier
// Note we must `Close` the notifier if we can't ensure each receiver of this
// notifier is called `Stop` in order to prevent goroutine leak.
func (n *Notifier) Close() {
n.mu.Lock()
defer n.mu.Unlock()
Expand All @@ -133,4 +141,5 @@ func (n *Notifier) Close() {
close(receiver.rec.closeCh)
}
n.receivers = nil
n.closed = true
}
28 changes: 22 additions & 6 deletions pkg/notify/notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/pingcap/check"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/util/testleak"
)

Expand All @@ -33,9 +34,12 @@ var _ = check.Suite(&notifySuite{})
func (s *notifySuite) TestNotifyHub(c *check.C) {
defer testleak.AfterTest(c)()
notifier := new(Notifier)
r1 := notifier.NewReceiver(-1)
r2 := notifier.NewReceiver(-1)
r3 := notifier.NewReceiver(-1)
r1, err := notifier.NewReceiver(-1)
c.Assert(err, check.IsNil)
r2, err := notifier.NewReceiver(-1)
c.Assert(err, check.IsNil)
r3, err := notifier.NewReceiver(-1)
c.Assert(err, check.IsNil)
finishedCh := make(chan struct{})
go func() {
for i := 0; i < 5; i++ {
Expand All @@ -52,12 +56,14 @@ func (s *notifySuite) TestNotifyHub(c *check.C) {
r2.Stop()
r3.Stop()
c.Assert(len(notifier.receivers), check.Equals, 0)
r4 := notifier.NewReceiver(-1)
r4, err := notifier.NewReceiver(-1)
c.Assert(err, check.IsNil)
<-r4.C
r4.Stop()

notifier2 := new(Notifier)
r5 := notifier2.NewReceiver(10 * time.Millisecond)
r5, err := notifier2.NewReceiver(10 * time.Millisecond)
c.Assert(err, check.IsNil)
<-r5.C
r5.Stop()
<-finishedCh // To make the leak checker happy
Expand All @@ -80,8 +86,10 @@ func (s *notifySuite) TestContinusStop(c *check.C) {
}()
n := 50
receivers := make([]*Receiver, n)
var err error
for i := 0; i < n; i++ {
receivers[i] = notifier.NewReceiver(10 * time.Millisecond)
receivers[i], err = notifier.NewReceiver(10 * time.Millisecond)
c.Assert(err, check.IsNil)
}
for i := 0; i < n; i++ {
i := i
Expand All @@ -100,3 +108,11 @@ func (s *notifySuite) TestContinusStop(c *check.C) {
}
<-ctx.Done()
}

func (s *notifySuite) TestNewReceiverWithClosedNotifier(c *check.C) {
defer testleak.AfterTest(c)()
notifier := new(Notifier)
notifier.Close()
_, err := notifier.NewReceiver(50 * time.Millisecond)
c.Assert(errors.ErrOperateOnClosedNotifier.Equal(err), check.IsTrue)
}

0 comments on commit 59fc7cd

Please sign in to comment.