-
Notifications
You must be signed in to change notification settings - Fork 188
relay: use new Reader
, Transformer
and Writer
#171
Changes from 22 commits
fec35ed
3e6a305
f22c105
12d4cac
242ea36
fece692
481b624
c5460f9
dd604c0
c314dd4
92d9983
67c2f7c
f255895
93ec22e
9f22aaa
9e108d7
8e83cf1
a18e395
2f0de38
6c3fab5
201f960
2e8ed95
f20bf92
fed7086
071bc72
c93c496
0527d16
184d1b8
f9b1663
88f6455
3bc0cc1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -90,10 +90,23 @@ func (t *testTCPReaderSuite) setUpData(c *C) { | |
query := fmt.Sprintf("DROP DATABASE `%s`", dbName) | ||
_, err := t.db.Exec(query) | ||
|
||
// delete previous binlog files/events. | ||
query = "RESET MASTER" | ||
_, err = t.db.Exec(query) | ||
c.Assert(err, IsNil) | ||
backoff := 5 | ||
waitTime := 5 * time.Second | ||
waitFn := func() bool { | ||
// delete previous binlog files/events. if other test cases writing events, they may be failed. | ||
query = "RESET MASTER" | ||
_, err = t.db.Exec(query) | ||
c.Assert(err, IsNil) | ||
// check whether other test cases have wrote any events. | ||
time.Sleep(time.Second) | ||
_, gs, err2 := utils.GetMasterStatus(t.db, flavor) | ||
c.Assert(err2, IsNil) | ||
if len(gs.String()) > 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done in f9b1663. |
||
return false // some events exist now, try again later. | ||
} | ||
return true | ||
} | ||
utils.WaitSomething(backoff, waitTime, waitFn) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does it need to check the wait result? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
// execute some SQL statements to generate binlog events. | ||
query = fmt.Sprintf("CREATE DATABASE `%s`", dbName) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,8 @@ | |
package utils | ||
|
||
import ( | ||
"time" | ||
|
||
. "github.com/pingcap/check" | ||
"github.com/siddontang/go-mysql/mysql" | ||
) | ||
|
@@ -142,3 +144,32 @@ func (t *testUtilsSuite) TestCompareBinlogPos(c *C) { | |
} | ||
|
||
} | ||
|
||
func (t *testUtilsSuite) TestWaitSomething(c *C) { | ||
var ( | ||
backoff = 10 | ||
waitTime = 10 * time.Millisecond | ||
count = 0 | ||
) | ||
|
||
// wait fail | ||
f1 := func() bool { | ||
count++ | ||
return false | ||
} | ||
c.Assert(WaitSomething(backoff, waitTime, f1), IsFalse) | ||
c.Assert(count, Equals, backoff) | ||
|
||
count = 0 // reset | ||
// wait success | ||
f2 := func() bool { | ||
count++ | ||
if count >= 5 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done in f9b1663. |
||
return true | ||
} | ||
return false | ||
} | ||
|
||
c.Assert(WaitSomething(backoff, waitTime, f2), IsTrue) | ||
c.Assert(count, Equals, 5) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,3 +28,13 @@ func isIgnorableError(err error) bool { | |
} | ||
return false | ||
} | ||
|
||
// isRetryableError checks whether the error is retryable. | ||
func isRetryableError(err error) bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we rename it to a better name? add some worker like retry from, it would be more clear There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let |
||
err = errors.Cause(err) | ||
switch err { | ||
case context.DeadlineExceeded: | ||
return true | ||
} | ||
return false | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we can use a simple channel send/receive instead of select with a single case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refined in f9b1663.