From f9b1663d8a755052f0307248dd1f9a62ead84dc5 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Wed, 26 Jun 2019 17:10:42 +0800 Subject: [PATCH] *: address comments --- dm/worker/relay_test.go | 6 ++---- pkg/binlog/reader/tcp_test.go | 5 +---- pkg/utils/util_test.go | 5 +---- relay/relay.go | 15 +++++++++++++++ 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/dm/worker/relay_test.go b/dm/worker/relay_test.go index 2abc207112..6fc5903398 100644 --- a/dm/worker/relay_test.go +++ b/dm/worker/relay_test.go @@ -60,10 +60,8 @@ func (d *DummyRelay) InjectInitError(err error) { // Process implements Process interface func (d *DummyRelay) Process(ctx context.Context, pr chan pb.ProcessResult) { - select { - case <-ctx.Done(): - pr <- d.processResult - } + <-ctx.Done() + pr <- d.processResult } // InjectProcessResult injects process result diff --git a/pkg/binlog/reader/tcp_test.go b/pkg/binlog/reader/tcp_test.go index 0a1d47b281..a4cdd8057f 100644 --- a/pkg/binlog/reader/tcp_test.go +++ b/pkg/binlog/reader/tcp_test.go @@ -101,10 +101,7 @@ func (t *testTCPReaderSuite) setUpData(c *C) { time.Sleep(time.Second) _, gs, err2 := utils.GetMasterStatus(t.db, flavor) c.Assert(err2, IsNil) - if len(gs.String()) > 0 { - return false // some events exist now, try again later. - } - return true + return gs.String() == "" // break waiting if no other case wrote any events } utils.WaitSomething(backoff, waitTime, waitFn) diff --git a/pkg/utils/util_test.go b/pkg/utils/util_test.go index ae73756310..75d154dc05 100644 --- a/pkg/utils/util_test.go +++ b/pkg/utils/util_test.go @@ -164,10 +164,7 @@ func (t *testUtilsSuite) TestWaitSomething(c *C) { // wait success f2 := func() bool { count++ - if count >= 5 { - return true - } - return false + return count >= 5 } c.Assert(WaitSomething(backoff, waitTime, f2), IsTrue) diff --git a/relay/relay.go b/relay/relay.go index 58cf9551d9..cc1c81ef3e 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -351,6 +351,9 @@ func (r *Relay) handleEvents(ctx context.Context, reader2 reader.Reader, transfo default: if utils.IsErrBinlogPurged(err) { // TODO: try auto fix GTID, and can support auto switching between upstream server later. + cfg := r.cfg.From + log.Errorf("[relay] the requested binlog files have purged in the master server or the master server behind %s:%d have switched, currently DM do no support to handle this error %v", + cfg.Host, cfg.Port, err) } binlogReadErrorCounter.Inc() } @@ -382,9 +385,11 @@ func (r *Relay) handleEvents(ctx context.Context, reader2 reader.Reader, transfo return errors.Trace(err) } else if wResult.Ignore { log.Infof("[relay] ignore event %+v by writer", e.Header) + r.tryUpdateActiveRelayLog(e, lastPos.Name) // even the event ignored we still need to try this update. continue } relayLogWriteDurationHistogram.Observe(time.Since(writeTimer).Seconds()) + r.tryUpdateActiveRelayLog(e, lastPos.Name) // wrote a event, try update the current active relay log. // 4. update meta and metrics needSavePos := tResult.CanSaveGTID @@ -422,6 +427,16 @@ func (r *Relay) handleEvents(ctx context.Context, reader2 reader.Reader, transfo } } +// tryUpdateActiveRelayLog tries to update current active relay log file. +// we should to update after received/wrote a FormatDescriptionEvent because it means switched to a new relay log file. +// NOTE: we can refactor active (writer/read) relay log mechanism later. +func (r *Relay) tryUpdateActiveRelayLog(e *replication.BinlogEvent, filename string) { + if e.Header.EventType == replication.FORMAT_DESCRIPTION_EVENT { + r.setActiveRelayLog(filename) + log.Infof("[relay] the active relay log file change to %s", filename) + } +} + // reSetupMeta re-setup the metadata when switching to a new upstream master server. func (r *Relay) reSetupMeta() error { uuid, err := utils.GetServerUUID(r.db, r.cfg.Flavor)