-
Notifications
You must be signed in to change notification settings - Fork 188
Combine syncer localReader into one interface #215
Conversation
@@ -383,12 +383,11 @@ func (r *BinlogReader) updateUUIDs() error { | |||
} | |||
|
|||
// Close closes BinlogReader. | |||
func (r *BinlogReader) Close() error { | |||
func (r *BinlogReader) Close() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this implementation aligned with *BinlogSyncer Close() in go-mysql
Codecov Report
@@ Coverage Diff @@
## master #215 +/- ##
================================================
- Coverage 59.4612% 58.8729% -0.5884%
================================================
Files 123 123
Lines 14887 14054 -833
================================================
- Hits 8852 8274 -578
+ Misses 5150 4953 -197
+ Partials 885 827 -58 |
syncer/syncer.go
Outdated
if s.localReader != nil { | ||
s.localReader.Close() | ||
if s.streamerProducer != nil { | ||
rr, ok := s.streamerProducer.(*remoteBinlogReader) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
switch r := s.streamerProducer.(type) {
case *remoteBinlogReader:
// process remote binlog reader
case *localBinlogReader:
// process local binlog reader
}
use type-switch is more clear
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} else if s.binlogType == LocalBinlog { | ||
s.streamer, err = s.getBinlogStreamer(s.localReader, pos) | ||
} | ||
s.streamer, err = s.streamerProducer.generateStreamer(pos) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can remove getBinlogStreamer
, getLocalBinlogStreamer
and getRemoteBinlogStreamer
after we extract the interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
syncer/syncer.go
Outdated
@@ -79,6 +79,46 @@ const ( | |||
LocalBinlog | |||
) | |||
|
|||
// StreamerProducer provide the ability to generate binlog streamer by StartSync() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// StreamerProducer provide the ability to generate binlog streamer by StartSync() | |
// StreamerProducer provides the ability to generate binlog streamer by StartSync() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
syncer/syncer.go
Outdated
@@ -79,6 +79,46 @@ const ( | |||
LocalBinlog | |||
) | |||
|
|||
// StreamerProducer provide the ability to generate binlog streamer by StartSync() | |||
// but go-mysql StartSync() return (struct, err) rather than (interface, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// but go-mysql StartSync() return (struct, err) rather than (interface, err) | |
// but go-mysql StartSync() returns (struct, err) rather than (interface, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
syncer/syncer.go
Outdated
}() | ||
if r.EnableGTID { | ||
// NOTE: our (per-table based) checkpoint does not support GTID yet | ||
return nil, errors.New("[syncer] now support GTID mode yet") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return nil, errors.New("[syncer] now support GTID mode yet") | |
return nil, errors.New("[syncer] not support GTID mode yet") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -589,7 +589,6 @@ func (t *testReaderSuite) TestStartSync(c *C) { | |||
// NOTE: load new UUIDs dynamically not supported yet | |||
|
|||
// close the reader | |||
err = r.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although Close
doesn't return error anymore, we'd better reserve it to form a complete usage flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I forgot to use it, it's been fixed already in the latest commit
pkg/streamer/reader_test.go
Outdated
@@ -589,8 +589,8 @@ func (t *testReaderSuite) TestStartSync(c *C) { | |||
// NOTE: load new UUIDs dynamically not supported yet | |||
|
|||
// close the reader | |||
err = r.Close() | |||
c.Assert(err, IsNil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
already added back
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
if s.syncer != nil { | ||
err := s.closeBinlogSyncer(s.syncer) | ||
s.syncer = nil | ||
if s.streamerProducer != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we check whether it's remote binlog reader
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only remote binlog reader call this function currently, but adding a check is better
Rest LGTM |
syncer/syncer.go
Outdated
return nil, errors.Trace(err) | ||
} | ||
case *localBinlogReader: | ||
return nil, errors.New("[syncer] not support local relay reader reopen currently") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return nil, errors.New("[syncer] not support local relay reader reopen currently") | |
return nil, errors.New("don't support reopen %T currently", r) |
syncer/syncer.go
Outdated
}() | ||
if r.EnableGTID { | ||
// NOTE: our (per-table based) checkpoint does not support GTID yet | ||
return nil, errors.New("[syncer] not support GTID mode yet") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return nil, errors.New("[syncer] not support GTID mode yet") | |
return nil, errors.New("don't support open streamer with GTID mode") |
syncer/syncer.go
Outdated
if err != nil { | ||
return nil, errors.Trace(err) | ||
if s.streamerProducer != nil { | ||
switch s.streamerProducer.(type) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
switch s.streamerProducer.(type) { | |
switch r := s.streamerProducer.(type) { |
syncer/syncer.go
Outdated
if s.streamerProducer != nil { | ||
switch s.streamerProducer.(type) { | ||
case *remoteBinlogReader: | ||
err := s.closeBinlogSyncer(s.streamerProducer.(*remoteBinlogReader).reader) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
err := s.closeBinlogSyncer(s.streamerProducer.(*remoteBinlogReader).reader) | |
err := s.closeBinlogSyncer(r.reader) |
syncer/syncer.go
Outdated
if err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
case *localBinlogReader: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case *localBinlogReader: | |
default: |
/run-all-tests |
1 similar comment
/run-all-tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good Job! LGTM
* extract syncer localReader to one interface * use generateStreamer wrap function * update some implementations and remove useless code * remove useless err check * update check syncer reopen * update error message
What problem does this PR solve?
Combine two Syncer members (
Syncer.syncer
andSyncer.localReader
) into one variable (streamerProducer), base on two reasons:Syncer.syncer
andSyncer.localReader
has same ability to generate stream which can generate events, And Syncer only need one not two at any timeSyncer.streamerProducer
, such as a mock testWhat is changed and how it works?
Combine two Syncer members into one
streamerProducer
Check List
Tests