Skip to content

Commit

Permalink
Combine syncer localReader into one interface (pingcap#215)
Browse files Browse the repository at this point in the history
* extract syncer localReader to one interface

* use generateStreamer wrap function

* update some implementations and remove useless code

* remove useless err check

* update check syncer reopen

* update error message
  • Loading branch information
3pointer authored Jul 29, 2019
1 parent e37f9cc commit a2a0655
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 81 deletions.
3 changes: 1 addition & 2 deletions pkg/streamer/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,12 +383,11 @@ func (r *BinlogReader) updateUUIDs() error {
}

// Close closes BinlogReader.
func (r *BinlogReader) Close() error {
func (r *BinlogReader) Close() {
r.tctx.L().Info("binlog reader closing")
r.running = false
r.cancel()
r.parser.Stop()
r.wg.Wait()
r.tctx.L().Info("binlog reader closed")
return nil
}
8 changes: 3 additions & 5 deletions pkg/streamer/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,8 +589,7 @@ func (t *testReaderSuite) TestStartSync(c *C) {
// NOTE: load new UUIDs dynamically not supported yet

// close the reader
err = r.Close()
c.Assert(err, IsNil)
r.Close()
}

func (t *testReaderSuite) TestStartSyncError(c *C) {
Expand Down Expand Up @@ -620,7 +619,7 @@ func (t *testReaderSuite) TestStartSyncError(c *C) {
ev, err := s.GetEvent(ctx)
c.Assert(err, ErrorMatches, ".*empty UUIDs not valid.*")
c.Assert(ev, IsNil)
c.Assert(r.Close(), IsNil)
r.Close()

// write UUIDs into index file
r = NewBinlogReader(tctx, cfg) // create a new reader
Expand All @@ -639,8 +638,7 @@ func (t *testReaderSuite) TestStartSyncError(c *C) {
s, err = r.StartSync(startPos)
c.Assert(errors.Cause(err), Equals, ErrReaderRunning)
c.Assert(s, IsNil)

c.Assert(r.Close(), IsNil)
r.Close()
}

func (t *testReaderSuite) genBinlogEvents(c *C, latestPos uint32) []*replication.BinlogEvent {
Expand Down
146 changes: 72 additions & 74 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,46 @@ const (
LocalBinlog
)

// StreamerProducer provides the ability to generate binlog streamer by StartSync()
// but go-mysql StartSync() returns (struct, err) rather than (interface, err)
// And we can't simplely use StartSync() method in SteamerProducer
// so use generateStreamer to wrap StartSync() method to make *BinlogSyncer and *BinlogReader in same interface
// For other implementations who implement StreamerProducer and Streamer can easily take place of Syncer.streamProducer
// For test is easy to mock
type StreamerProducer interface {
generateStreamer(pos mysql.Position) (streamer.Streamer, error)
}

// Read local relay log
type localBinlogReader struct {
reader *streamer.BinlogReader
}

func (l *localBinlogReader) generateStreamer(pos mysql.Position) (streamer.Streamer, error) {
return l.reader.StartSync(pos)
}

// Read remote binlog
type remoteBinlogReader struct {
reader *replication.BinlogSyncer
tctx *tcontext.Context
EnableGTID bool
}

func (r *remoteBinlogReader) generateStreamer(pos mysql.Position) (streamer.Streamer, error) {
defer func() {
lastSlaveConnectionID := r.reader.LastConnectionID()
r.tctx.L().Info("last slave connection", zap.Uint32("connection ID", lastSlaveConnectionID))
}()
if r.EnableGTID {
// NOTE: our (per-table based) checkpoint does not support GTID yet
return nil, errors.New("don't support open streamer with GTID mode")
}

streamer, err := r.reader.StartSync(pos)
return streamer, errors.Trace(err)
}

// Syncer can sync your MySQL data to another MySQL database.
type Syncer struct {
sync.RWMutex
Expand All @@ -94,11 +134,9 @@ type Syncer struct {
ddlExecInfo *DDLExecInfo // DDL execute (ignore) info
injectEventCh chan *replication.BinlogEvent // extra binlog event chan, used to inject binlog event into the main for loop

// TODO: extract to interface?
syncer *replication.BinlogSyncer
localReader *streamer.BinlogReader
binlogType BinlogType
streamer streamer.Streamer
streamerProducer StreamerProducer
binlogType BinlogType
streamer streamer.Streamer

wg sync.WaitGroup
jobWg sync.WaitGroup
Expand Down Expand Up @@ -430,16 +468,13 @@ func (s *Syncer) IsFreshTask() (bool, error) {
func (s *Syncer) resetReplicationSyncer() {
if s.binlogType == RemoteBinlog {
// create new binlog-syncer
if s.syncer != nil {
s.closeBinlogSyncer(s.syncer)
if s.streamerProducer != nil {
s.closeBinlogSyncer(s.streamerProducer.(*remoteBinlogReader).reader)
}
s.syncer = replication.NewBinlogSyncer(s.syncCfg)
s.streamerProducer = &remoteBinlogReader{replication.NewBinlogSyncer(s.syncCfg), s.tctx, s.cfg.EnableGTID}
} else if s.binlogType == LocalBinlog {
// TODO: close old local reader before creating a new one
s.localReader = streamer.NewBinlogReader(s.tctx, &streamer.BinlogReaderConfig{
RelayDir: s.cfg.RelayDir,
Timezone: s.timezone,
})
s.streamerProducer = &localBinlogReader{streamer.NewBinlogReader(s.tctx, &streamer.BinlogReaderConfig{RelayDir: s.cfg.RelayDir, Timezone: s.timezone})}
}
}

Expand Down Expand Up @@ -925,11 +960,7 @@ func (s *Syncer) redirectStreamer(pos mysql.Position) error {
var err error
s.tctx.L().Info("reset global streamer", zap.Stringer("position", pos))
s.resetReplicationSyncer()
if s.binlogType == RemoteBinlog {
s.streamer, err = s.getBinlogStreamer(s.syncer, pos)
} else if s.binlogType == LocalBinlog {
s.streamer, err = s.getBinlogStreamer(s.localReader, pos)
}
s.streamer, err = s.streamerProducer.generateStreamer(pos)
return errors.Trace(err)
}

Expand Down Expand Up @@ -965,11 +996,8 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
)
s.tctx.L().Info("replicate binlog from checkpoint", zap.Stringer("checkpoint", lastPos))

if s.binlogType == RemoteBinlog {
s.streamer, err = s.getBinlogStreamer(s.syncer, lastPos)
} else if s.binlogType == LocalBinlog {
s.streamer, err = s.getBinlogStreamer(s.localReader, lastPos)
}
s.streamer, err = s.streamerProducer.generateStreamer(lastPos)

if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1944,39 +1972,6 @@ func (s *Syncer) printStatus(ctx context.Context) {
}
}

// NOTE: refactor with remote and local streamer later
func (s *Syncer) getBinlogStreamer(syncerOrReader interface{}, pos mysql.Position) (streamer.Streamer, error) {
if s.binlogType == RemoteBinlog {
return s.getRemoteBinlogStreamer(syncerOrReader, pos)
}
return s.getLocalBinlogStreamer(syncerOrReader, pos)
}

func (s *Syncer) getLocalBinlogStreamer(syncerOrReader interface{}, pos mysql.Position) (streamer.Streamer, error) {
reader, ok := syncerOrReader.(*streamer.BinlogReader)
if !ok {
return nil, errors.NotValidf("BinlogReader %v", syncerOrReader)
}
return reader.StartSync(pos)
}

func (s *Syncer) getRemoteBinlogStreamer(syncerOrReader interface{}, pos mysql.Position) (streamer.Streamer, error) {
syncer, ok := syncerOrReader.(*replication.BinlogSyncer)
if !ok {
return nil, errors.NotValidf("replication.BinlogSyncer %v", syncerOrReader)
}
defer func() {
lastSlaveConnectionID := syncer.LastConnectionID()
s.tctx.L().Info("last slave connection", zap.Uint32("connection ID", lastSlaveConnectionID))
}()
if s.cfg.EnableGTID {
// NOTE: our (per-table based) checkpoint does not support GTID yet
return nil, errors.New("[syncer] now support GTID mode yet")
}

return s.startSyncByPosition(syncer, pos)
}

func (s *Syncer) createDBs() error {
var err error
s.fromDB, err = createDB(s.cfg, s.cfg.From, maxDMLConnectionTimeout)
Expand Down Expand Up @@ -2050,22 +2045,20 @@ func (s *Syncer) reopenWithRetry(cfg replication.BinlogSyncerConfig) error {
}

func (s *Syncer) reopen(cfg replication.BinlogSyncerConfig) (streamer.Streamer, error) {
if s.syncer != nil {
err := s.closeBinlogSyncer(s.syncer)
s.syncer = nil
if err != nil {
return nil, errors.Trace(err)
if s.streamerProducer != nil {
switch r := s.streamerProducer.(type) {
case *remoteBinlogReader:
err := s.closeBinlogSyncer(r.reader)
if err != nil {
return nil, errors.Trace(err)
}
default:
return nil, errors.Errorf("don’t support to reopen %T", r)
}
}

// TODO: refactor to support relay
s.syncer = replication.NewBinlogSyncer(cfg)
return s.getBinlogStreamer(s.syncer, s.checkpoint.GlobalPoint())
}

func (s *Syncer) startSyncByPosition(syncer *replication.BinlogSyncer, pos mysql.Position) (streamer.Streamer, error) {
streamer, err := syncer.StartSync(pos)
return streamer, errors.Trace(err)
s.streamerProducer = &remoteBinlogReader{replication.NewBinlogSyncer(cfg), s.tctx, s.cfg.EnableGTID}
return s.streamerProducer.generateStreamer(s.checkpoint.GlobalPoint())
}

func (s *Syncer) renameShardingSchema(schema, table string) (string, string) {
Expand Down Expand Up @@ -2129,12 +2122,17 @@ func (s *Syncer) stopSync() {

// before re-write workflow for s.syncer, simply close it
// when resuming, re-create s.syncer
if s.syncer != nil {
s.closeBinlogSyncer(s.syncer)
s.syncer = nil
}
if s.localReader != nil {
s.localReader.Close()

if s.streamerProducer != nil {
switch r := s.streamerProducer.(type) {
case *remoteBinlogReader:
// process remote binlog reader
s.closeBinlogSyncer(r.reader)
s.streamerProducer = nil
case *localBinlogReader:
// process local binlog reader
r.reader.Close()
}
}
}

Expand Down

0 comments on commit a2a0655

Please sign in to comment.