Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
*: address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc committed Jun 26, 2019
1 parent 184d1b8 commit f9b1663
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 12 deletions.
6 changes: 2 additions & 4 deletions dm/worker/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,8 @@ func (d *DummyRelay) InjectInitError(err error) {

// Process implements Process interface
func (d *DummyRelay) Process(ctx context.Context, pr chan pb.ProcessResult) {
select {
case <-ctx.Done():
pr <- d.processResult
}
<-ctx.Done()
pr <- d.processResult
}

// InjectProcessResult injects process result
Expand Down
5 changes: 1 addition & 4 deletions pkg/binlog/reader/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,7 @@ func (t *testTCPReaderSuite) setUpData(c *C) {
time.Sleep(time.Second)
_, gs, err2 := utils.GetMasterStatus(t.db, flavor)
c.Assert(err2, IsNil)
if len(gs.String()) > 0 {
return false // some events exist now, try again later.
}
return true
return gs.String() == "" // break waiting if no other case wrote any events
}
utils.WaitSomething(backoff, waitTime, waitFn)

Expand Down
5 changes: 1 addition & 4 deletions pkg/utils/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,7 @@ func (t *testUtilsSuite) TestWaitSomething(c *C) {
// wait success
f2 := func() bool {
count++
if count >= 5 {
return true
}
return false
return count >= 5
}

c.Assert(WaitSomething(backoff, waitTime, f2), IsTrue)
Expand Down
15 changes: 15 additions & 0 deletions relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,9 @@ func (r *Relay) handleEvents(ctx context.Context, reader2 reader.Reader, transfo
default:
if utils.IsErrBinlogPurged(err) {
// TODO: try auto fix GTID, and can support auto switching between upstream server later.
cfg := r.cfg.From
log.Errorf("[relay] the requested binlog files have purged in the master server or the master server behind %s:%d have switched, currently DM do no support to handle this error %v",
cfg.Host, cfg.Port, err)
}
binlogReadErrorCounter.Inc()
}
Expand Down Expand Up @@ -382,9 +385,11 @@ func (r *Relay) handleEvents(ctx context.Context, reader2 reader.Reader, transfo
return errors.Trace(err)
} else if wResult.Ignore {
log.Infof("[relay] ignore event %+v by writer", e.Header)
r.tryUpdateActiveRelayLog(e, lastPos.Name) // even the event ignored we still need to try this update.
continue
}
relayLogWriteDurationHistogram.Observe(time.Since(writeTimer).Seconds())
r.tryUpdateActiveRelayLog(e, lastPos.Name) // wrote a event, try update the current active relay log.

// 4. update meta and metrics
needSavePos := tResult.CanSaveGTID
Expand Down Expand Up @@ -422,6 +427,16 @@ func (r *Relay) handleEvents(ctx context.Context, reader2 reader.Reader, transfo
}
}

// tryUpdateActiveRelayLog tries to update current active relay log file.
// we should to update after received/wrote a FormatDescriptionEvent because it means switched to a new relay log file.
// NOTE: we can refactor active (writer/read) relay log mechanism later.
func (r *Relay) tryUpdateActiveRelayLog(e *replication.BinlogEvent, filename string) {
if e.Header.EventType == replication.FORMAT_DESCRIPTION_EVENT {
r.setActiveRelayLog(filename)
log.Infof("[relay] the active relay log file change to %s", filename)
}
}

// reSetupMeta re-setup the metadata when switching to a new upstream master server.
func (r *Relay) reSetupMeta() error {
uuid, err := utils.GetServerUUID(r.db, r.cfg.Flavor)
Expand Down

0 comments on commit f9b1663

Please sign in to comment.