-
Notifications
You must be signed in to change notification settings - Fork 188
relay: use new Reader
, Transformer
and Writer
#171
Conversation
Codecov Report
@@ Coverage Diff @@
## master #171 +/- ##
===========================================
Coverage 54.7612% 54.7612%
===========================================
Files 122 122
Lines 14492 14492
===========================================
Hits 7936 7936
Misses 5766 5766
Partials 790 790 |
/run-all-tests |
1 similar comment
/run-all-tests |
/run-all-tests |
1 similar comment
/run-all-tests |
@GregoryIan @amyangfei PTAL |
pkg/binlog/reader/tcp_test.go
Outdated
_, err = t.db.Exec(query) | ||
c.Assert(err, IsNil) | ||
maxRetryCount := 5 | ||
for i := 0; i < maxRetryCount; i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can have a function in pkg/unit
, like
func WaitSomething(backoff int, fn func() bool) bool {
for i := 0; i < backoff; i++ {
if fn() {
return true
}
time.Sleep(10 * time.Millisecond)
}
return false
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done in 6c3fab5.
case replication.ErrChecksumMismatch: | ||
relayLogDataCorruptionCounter.Inc() | ||
case replication.ErrSyncClosed, replication.ErrNeedSyncAgain: | ||
// do nothing | ||
// do nothing, but the error will be returned |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also stat it into metric?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it will be recorded in relayExitWithErrorCounter
, should we need to add another metric like relayGoMySQLError
?
relay/relay.go
Outdated
lastPos.Pos = tResult.LogPos | ||
err = lastGTID.Set(tResult.GTIDSet) | ||
if err != nil { | ||
log.Errorf("[relay] update last GTID set to %v error %v", tResult.GTIDSet, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's different with
err = r.meta.Save(lastPos, lastGTID)
if err != nil {
return errors.Trace(err)
}
one is lenient, one is strict, can both be lenient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change both to be strict in 201f960.
Rest LGTM |
} | ||
return true | ||
} | ||
utils.WaitSomething(backoff, waitTime, waitFn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it need to check the wait result?
besides, in which scenario will other test case write binlog even after this test case is started
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WaitSomething
can only decrease the possibility of errors.
I observed after RESET MASTER
, other test cases write binlog event, may it because multi cases run in different goroutines at the same time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I observed the fact
- see https://stackoverflow.com/questions/24375966/does-go-test-run-unit-tests-concurrently.
relay/reader/reader.go
Outdated
@@ -27,6 +28,16 @@ import ( | |||
"github.com/pingcap/dm/pkg/log" | |||
) | |||
|
|||
const ( | |||
// event timeout when trying to read events from upstream master server. | |||
eventTimeout = 1 * time.Hour |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to the delay requirements of replication, let it 10 or 1minute?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It only takes effect for getting events from go-mysql (the events are already read from upstream server).
In fact, we can even use a Cancel
rather than a Timeout
, but use a Timeout
we can log something to indicate still running.
the timeout between master and salve is controlled by slaveReadTimeout
(default 1m) and masterHeartbeatPeriod
(default 30s).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
every step will have an impact on the delay, for example, if a master-slave switch happend, there may be no data in 1h, delay would be 1h
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
every step will have an impact on the delay
agree.
if a master-slave switch happend
An error will get with small delay
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, set a reasonable small value and let user or dm know something ASAP
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to 10m
in 0527d16.
LGTM |
/run-all-tests |
dm/worker/relay_test.go
Outdated
|
||
// Process implements Process interface | ||
func (d *DummyRelay) Process(ctx context.Context, pr chan pb.ProcessResult) { | ||
select { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we can use a simple channel send/receive instead of select with a single case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refined in f9b1663.
pkg/binlog/reader/tcp_test.go
Outdated
time.Sleep(time.Second) | ||
_, gs, err2 := utils.GetMasterStatus(t.db, flavor) | ||
c.Assert(err2, IsNil) | ||
if len(gs.String()) > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return gs.String() == ""
is simpler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done in f9b1663.
pkg/utils/util_test.go
Outdated
// wait success | ||
f2 := func() bool { | ||
count++ | ||
if count >= 5 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return count >= 5
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done in f9b1663.
tryReSync = false // do not support repeat try re-sync | ||
continue | ||
} | ||
// TODO: try auto fix GTID, and can support auto switching between upstream server later. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to add some log here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added some log in f9b1663.
|
||
// record current active relay log file, and keep it until newer file opened | ||
// when current file's fd closed, we should not reset this, because it may re-open again | ||
r.setActiveRelayLog(filename) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to call setActiveRelayLog
somewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! add a call to setActiveRelayLog
in f9b1663.
Co-Authored-By: amyangfei <amyangfei@gmail.com>
/run-all-tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
What problem does this PR solve?
this PR is a part of #91.
What is changed and how it works?
use new
Reader
,Transformer
andWriter
to:Check List
Tests
Code changes
Side effects