Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/notify: fix create receiver on closed notifier (#1185) #1199

Merged
merged 3 commits into from
Dec 22, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,19 @@ func newProcessor(
sinkEmittedResolvedNotifier := new(notify.Notifier)
localResolvedNotifier := new(notify.Notifier)
localCheckpointTsNotifier := new(notify.Notifier)
sinkEmittedResolvedReceiver, err := sinkEmittedResolvedNotifier.NewReceiver(50 * time.Millisecond)
if err != nil {
return nil, err
}
localResolvedReceiver, err := localResolvedNotifier.NewReceiver(50 * time.Millisecond)
if err != nil {
return nil, err
}
localCheckpointTsReceiver, err := localCheckpointTsNotifier.NewReceiver(50 * time.Millisecond)
if err != nil {
return nil, err
}

p := &processor{
id: uuid.New().String(),
limitter: limitter,
Expand All @@ -208,14 +221,14 @@ func newProcessor(
output: make(chan *model.PolymorphicEvent, defaultOutputChanSize),

sinkEmittedResolvedNotifier: sinkEmittedResolvedNotifier,
sinkEmittedResolvedReceiver: sinkEmittedResolvedNotifier.NewReceiver(50 * time.Millisecond),
sinkEmittedResolvedReceiver: sinkEmittedResolvedReceiver,

localResolvedNotifier: localResolvedNotifier,
localResolvedReceiver: localResolvedNotifier.NewReceiver(50 * time.Millisecond),
localResolvedReceiver: localResolvedReceiver,

checkpointTs: checkpointTs,
localCheckpointTsNotifier: localCheckpointTsNotifier,
localCheckpointTsReceiver: localCheckpointTsNotifier.NewReceiver(50 * time.Millisecond),
localCheckpointTsReceiver: localCheckpointTsReceiver,

tables: make(map[int64]*tableInfo),
markTableIDs: make(map[int64]struct{}),
Expand Down Expand Up @@ -667,9 +680,12 @@ func (p *processor) globalStatusWorker(ctx context.Context) error {
lastResolvedTs uint64
watchKey = kv.GetEtcdKeyJob(p.changefeedID)
globalResolvedTsNotifier = new(notify.Notifier)
globalResolvedTsReceiver = globalResolvedTsNotifier.NewReceiver(1 * time.Second)
)
defer globalResolvedTsNotifier.Close()
globalResolvedTsReceiver, err := globalResolvedTsNotifier.NewReceiver(1 * time.Second)
if err != nil {
return err
}

updateStatus := func(changefeedStatus *model.ChangeFeedStatus) {
atomic.StoreUint64(&p.globalcheckpointTs, changefeedStatus.CheckpointTs)
Expand Down
5 changes: 4 additions & 1 deletion cdc/puller/entry_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ func (es *EntrySorter) Run(ctx context.Context) error {
}
}
})
receiver := es.resolvedNotifier.NewReceiver(1000 * time.Millisecond)
receiver, err := es.resolvedNotifier.NewReceiver(1000 * time.Millisecond)
if err != nil {
return err
}
defer es.resolvedNotifier.Close()
errg.Go(func() error {
var sorted []*model.PolymorphicEvent
Expand Down
11 changes: 10 additions & 1 deletion cdc/puller/puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/regionspan"
"github.com/pingcap/ticdc/pkg/retry"
"github.com/pingcap/ticdc/pkg/security"
"github.com/pingcap/ticdc/pkg/txnutil"
"github.com/pingcap/ticdc/pkg/util/testleak"
Expand Down Expand Up @@ -165,7 +167,14 @@ func (s *pullerSuite) TestPullerResolvedForward(c *check.C) {
c.Assert(ev.OpType, check.Equals, model.OpTypeResolved)
c.Assert(ev.CRTs, check.Equals, uint64(1000))
c.Assert(plr.IsInitialized(), check.IsTrue)
c.Assert(plr.GetResolvedTs(), check.Equals, uint64(1000))
err := retry.Run(time.Millisecond*10, 10, func() error {
ts := plr.GetResolvedTs()
if ts != uint64(1000) {
return errors.Errorf("resolved ts %d of puller does not forward to 1000", ts)
}
return nil
})
c.Assert(err, check.IsNil)

store.Close()
cancel()
Expand Down
6 changes: 5 additions & 1 deletion cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ func newMqSink(
return ret
}

resolvedReceiver, err := notifier.NewReceiver(50 * time.Millisecond)
if err != nil {
return nil, err
}
k := &mqSink{
mqProducer: mqProducer,
dispatcher: d,
Expand All @@ -137,7 +141,7 @@ func newMqSink(
partitionInput: partitionInput,
partitionResolvedTs: make([]uint64, partitionNum),
resolvedNotifier: notifier,
resolvedReceiver: notifier.NewReceiver(50 * time.Millisecond),
resolvedReceiver: resolvedReceiver,

statistics: NewStatistics(ctx, "MQ", opts),
}
Expand Down
19 changes: 15 additions & 4 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64
}

func (s *mysqlSink) flushRowChangedEvents(ctx context.Context) {
receiver := s.resolvedNotifier.NewReceiver(50 * time.Millisecond)
receiver, err := s.resolvedNotifier.NewReceiver(50 * time.Millisecond)
if err != nil {
log.Error("flush row changed events routine starts failed", zap.Error(err))
return
}
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -580,17 +584,23 @@ func newMySQLSink(

sink.execWaitNotifier = new(notify.Notifier)
sink.resolvedNotifier = new(notify.Notifier)
sink.createSinkWorkers(ctx)
err = sink.createSinkWorkers(ctx)
if err != nil {
return nil, err
}

go sink.flushRowChangedEvents(ctx)

return sink, nil
}

func (s *mysqlSink) createSinkWorkers(ctx context.Context) {
func (s *mysqlSink) createSinkWorkers(ctx context.Context) error {
s.workers = make([]*mysqlSinkWorker, s.params.workerCount)
for i := range s.workers {
receiver := s.execWaitNotifier.NewReceiver(defaultFlushInterval)
receiver, err := s.execWaitNotifier.NewReceiver(defaultFlushInterval)
if err != nil {
return err
}
worker := newMySQLSinkWorker(
s.params.maxTxnRow, i, s.metricBucketSizeCounters[i], receiver, s.execDMLs)
s.workers[i] = worker
Expand All @@ -604,6 +614,7 @@ func (s *mysqlSink) createSinkWorkers(ctx context.Context) {
}
}()
}
return nil
}

func (s *mysqlSink) notifyAndWaitExec(ctx context.Context) {
Expand Down
7 changes: 3 additions & 4 deletions cdc/sink/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,13 +383,15 @@ func (s MySQLSinkSuite) TestMysqlSinkWorker(c *check.C) {
ctx := context.Background()

notifier := new(notify.Notifier)
receiver, err := notifier.NewReceiver(-1)
c.Assert(err, check.IsNil)
for i, tc := range testCases {
cctx, cancel := context.WithCancel(ctx)
var outputRows [][]*model.RowChangedEvent
var outputReplicaIDs []uint64
w := newMySQLSinkWorker(tc.maxTxnRow, 1,
bucketSizeCounter.WithLabelValues("capture", "changefeed", "1"),
notifier.NewReceiver(-1),
receiver,
func(ctx context.Context, events []*model.RowChangedEvent, replicaID uint64, bucket int) error {
outputRows = append(outputRows, events)
outputReplicaIDs = append(outputReplicaIDs, replicaID)
Expand Down Expand Up @@ -1036,9 +1038,6 @@ func (s MySQLSinkSuite) TestAdjustSQLMode(c *check.C) {
sink, err := newMySQLSink(ctx, changefeed, sinkURI, f, rc, opts)
c.Assert(err, check.IsNil)

// TODO: remove this line after https://github.com/pingcap/ticdc/issues/1169 is fixed
time.Sleep(time.Millisecond * 500)

err = sink.Close()
c.Assert(err, check.IsNil)
}
Expand Down
6 changes: 5 additions & 1 deletion cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,10 @@ func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, c
}

notifier := new(notify.Notifier)
flushedReceiver, err := notifier.NewReceiver(50 * time.Millisecond)
if err != nil {
return nil, err
}
k := &kafkaSaramaProducer{
asyncClient: asyncClient,
syncClient: syncClient,
Expand All @@ -352,7 +356,7 @@ func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, c
sent uint64
}, partitionNum),
flushedNotifier: notifier,
flushedReceiver: notifier.NewReceiver(50 * time.Millisecond),
flushedReceiver: flushedReceiver,
closeCh: make(chan struct{}),
failpointCh: make(chan error, 1),
}
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,11 @@ error = '''
old value is not enabled
'''

["CDC:ErrOperateOnClosedNotifier"]
error = '''
operate on a closed notifier
'''

["CDC:ErrOwnerCampaignKeyDeleted"]
error = '''
owner campaign key deleted
Expand Down
1 change: 1 addition & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ var (
ErrLoadTimezone = errors.Normalize("load timezone", errors.RFCCodeText("CDC:ErrLoadTimezone"))
ErrURLFormatInvalid = errors.Normalize("url format is invalid", errors.RFCCodeText("CDC:ErrURLFormatInvalid"))
ErrIntersectNoOverlap = errors.Normalize("span doesn't overlap: %+v vs %+v", errors.RFCCodeText("CDC:ErrIntersectNoOverlap"))
ErrOperateOnClosedNotifier = errors.Normalize("operate on a closed notifier", errors.RFCCodeText("CDC:ErrOperateOnClosedNotifier"))

// encode/decode, data format and data integrity errors
ErrInvalidRecordKey = errors.Normalize("invalid record key - %q", errors.RFCCodeText("CDC:ErrInvalidRecordKey"))
Expand Down
13 changes: 11 additions & 2 deletions pkg/notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package notify
import (
"sync"
"time"

"github.com/pingcap/ticdc/pkg/errors"
)

// Notifier provides a one-to-many notification mechanism
Expand All @@ -26,6 +28,7 @@ type Notifier struct {
}
maxIndex int
mu sync.RWMutex
closed bool
}

// Notify sends a signal to the Receivers
Expand Down Expand Up @@ -77,9 +80,12 @@ func (r *Receiver) signalTickLoop() {

// NewReceiver creates a receiver
// returns a channel to receive notifications and a function to close this receiver
func (n *Notifier) NewReceiver(tickTime time.Duration) *Receiver {
func (n *Notifier) NewReceiver(tickTime time.Duration) (*Receiver, error) {
n.mu.Lock()
defer n.mu.Unlock()
if n.closed {
return nil, errors.ErrOperateOnClosedNotifier.GenWithStackByArgs()
}
currentIndex := n.maxIndex
n.maxIndex++
receiverCh := make(chan struct{}, 1)
Expand All @@ -104,7 +110,7 @@ func (n *Notifier) NewReceiver(tickTime time.Duration) *Receiver {
rec *Receiver
index int
}{rec: rec, index: currentIndex})
return rec
return rec, nil
}

func (n *Notifier) remove(index int) {
Expand All @@ -123,6 +129,8 @@ func (n *Notifier) remove(index int) {
}

// Close closes the notify and stops all receiver in this notifier
// Note we must `Close` the notifier if we can't ensure each receiver of this
// notifier is called `Stop` in order to prevent goroutine leak.
func (n *Notifier) Close() {
n.mu.Lock()
defer n.mu.Unlock()
Expand All @@ -133,4 +141,5 @@ func (n *Notifier) Close() {
close(receiver.rec.closeCh)
}
n.receivers = nil
n.closed = true
}
28 changes: 22 additions & 6 deletions pkg/notify/notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/pingcap/check"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/util/testleak"
)

Expand All @@ -33,9 +34,12 @@ var _ = check.Suite(&notifySuite{})
func (s *notifySuite) TestNotifyHub(c *check.C) {
defer testleak.AfterTest(c)()
notifier := new(Notifier)
r1 := notifier.NewReceiver(-1)
r2 := notifier.NewReceiver(-1)
r3 := notifier.NewReceiver(-1)
r1, err := notifier.NewReceiver(-1)
c.Assert(err, check.IsNil)
r2, err := notifier.NewReceiver(-1)
c.Assert(err, check.IsNil)
r3, err := notifier.NewReceiver(-1)
c.Assert(err, check.IsNil)
finishedCh := make(chan struct{})
go func() {
for i := 0; i < 5; i++ {
Expand All @@ -52,12 +56,14 @@ func (s *notifySuite) TestNotifyHub(c *check.C) {
r2.Stop()
r3.Stop()
c.Assert(len(notifier.receivers), check.Equals, 0)
r4 := notifier.NewReceiver(-1)
r4, err := notifier.NewReceiver(-1)
c.Assert(err, check.IsNil)
<-r4.C
r4.Stop()

notifier2 := new(Notifier)
r5 := notifier2.NewReceiver(10 * time.Millisecond)
r5, err := notifier2.NewReceiver(10 * time.Millisecond)
c.Assert(err, check.IsNil)
<-r5.C
r5.Stop()
<-finishedCh // To make the leak checker happy
Expand All @@ -80,8 +86,10 @@ func (s *notifySuite) TestContinusStop(c *check.C) {
}()
n := 50
receivers := make([]*Receiver, n)
var err error
for i := 0; i < n; i++ {
receivers[i] = notifier.NewReceiver(10 * time.Millisecond)
receivers[i], err = notifier.NewReceiver(10 * time.Millisecond)
c.Assert(err, check.IsNil)
}
for i := 0; i < n; i++ {
i := i
Expand All @@ -100,3 +108,11 @@ func (s *notifySuite) TestContinusStop(c *check.C) {
}
<-ctx.Done()
}

func (s *notifySuite) TestNewReceiverWithClosedNotifier(c *check.C) {
defer testleak.AfterTest(c)()
notifier := new(Notifier)
notifier.Close()
_, err := notifier.NewReceiver(50 * time.Millisecond)
c.Assert(errors.ErrOperateOnClosedNotifier.Equal(err), check.IsTrue)
}