Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

relay: use new Reader, Transformer and Writer #171

Merged
merged 31 commits into from
Jun 27, 2019
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
fec35ed
*: use `reader` to reader binlog from upstream; do not check gap anymore
csuzhangxc Jun 12, 2019
3e6a305
*: let `reader` handle more errors
csuzhangxc Jun 12, 2019
f22c105
*: use `transformer` and `writer`; TODO, handle duplicated PreviousGT…
csuzhangxc Jun 12, 2019
12d4cac
relay: fix meta update
csuzhangxc Jun 13, 2019
242ea36
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-3
csuzhangxc Jun 17, 2019
fece692
relay: remove some useless code
csuzhangxc Jun 17, 2019
481b624
relay: refine code
csuzhangxc Jun 17, 2019
c5460f9
relay: test for handling events
csuzhangxc Jun 17, 2019
dd604c0
relay: move some test only code into _test.go file
csuzhangxc Jun 18, 2019
c314dd4
relay: extract and test for `isNewServer`
csuzhangxc Jun 18, 2019
92d9983
relay: test `reSetupMeta`
csuzhangxc Jun 18, 2019
67c2f7c
relay: test processing
csuzhangxc Jun 18, 2019
f255895
relay: try fix CI
csuzhangxc Jun 18, 2019
93ec22e
reader: try fix CI
csuzhangxc Jun 19, 2019
9f22aaa
reader: try fix CI
csuzhangxc Jun 19, 2019
9e108d7
relay: try fix CI
csuzhangxc Jun 19, 2019
8e83cf1
*: writer return value rather than pointer
csuzhangxc Jun 19, 2019
a18e395
*: recover relay log file before starting to read binlog events from …
csuzhangxc Jun 19, 2019
2f0de38
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-3
csuzhangxc Jun 21, 2019
6c3fab5
*: address comments, add a `WaitSomething` function
csuzhangxc Jun 21, 2019
201f960
relay: address comments
csuzhangxc Jun 21, 2019
2e8ed95
relay: check GTID sets `Equal` before `Contain`
csuzhangxc Jun 21, 2019
f20bf92
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-3
csuzhangxc Jun 21, 2019
fed7086
Merge branch 'master' into relay-writer-3
IANTHEREAL Jun 21, 2019
071bc72
*: address comments
csuzhangxc Jun 21, 2019
c93c496
Merge remote-tracking branch 'origin/relay-writer-3' into relay-writer-3
csuzhangxc Jun 21, 2019
0527d16
reader: address comment, change `eventTimeout`
csuzhangxc Jun 22, 2019
184d1b8
Update relay/relay.go
csuzhangxc Jun 26, 2019
f9b1663
*: address comments
csuzhangxc Jun 26, 2019
88f6455
Merge branch 'master' into relay-writer-3
csuzhangxc Jun 26, 2019
3bc0cc1
Merge branch 'master' into relay-writer-3
csuzhangxc Jun 26, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 103 additions & 6 deletions dm/worker/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,119 @@ package worker

import (
"context"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
pkgstreamer "github.com/pingcap/dm/pkg/streamer"
"github.com/pingcap/dm/pkg/utils"
"github.com/pingcap/dm/relay"
"github.com/pingcap/dm/relay/purger"
"github.com/pingcap/errors"
)

type testRelay struct{}

var _ = Suite(&testRelay{})

/*********** dummy relay log process unit, used only for testing *************/

// DummyRelay is a dummy relay
type DummyRelay struct {
initErr error

processResult pb.ProcessResult
errorInfo *pb.RelayError
reloadErr error
}

// NewDummyRelay creates an instance of dummy Relay.
func NewDummyRelay(cfg *relay.Config) relay.Process {
return &DummyRelay{}
}

// Init implements Process interface
func (d *DummyRelay) Init() error {
return d.initErr
}

// InjectInitError injects init error
func (d *DummyRelay) InjectInitError(err error) {
d.initErr = err
}

// Process implements Process interface
func (d *DummyRelay) Process(ctx context.Context, pr chan pb.ProcessResult) {
select {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we can use a simple channel send/receive instead of select with a single case

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refined in f9b1663.

case <-ctx.Done():
pr <- d.processResult
}
}

// InjectProcessResult injects process result
func (d *DummyRelay) InjectProcessResult(result pb.ProcessResult) {
d.processResult = result
}

// SwitchMaster implements Process interface
func (d *DummyRelay) SwitchMaster(ctx context.Context, req *pb.SwitchRelayMasterRequest) error {
return nil
}

// Migrate implements Process interface
func (d *DummyRelay) Migrate(ctx context.Context, binlogName string, binlogPos uint32) error {
return nil
}

// ActiveRelayLog implements Process interface
func (d *DummyRelay) ActiveRelayLog() *pkgstreamer.RelayLogInfo {
return nil
}

// Reload implements Process interface
func (d *DummyRelay) Reload(newCfg *relay.Config) error {
return d.reloadErr
}

// InjectReloadError injects reload error
func (d *DummyRelay) InjectReloadError(err error) {
d.reloadErr = err
}

// Update implements Process interface
func (d *DummyRelay) Update(cfg *config.SubTaskConfig) error {
return nil
}

// Resume implements Process interface
func (d *DummyRelay) Resume(ctx context.Context, pr chan pb.ProcessResult) {}

// Pause implements Process interface
func (d *DummyRelay) Pause() {}

// Error implements Process interface
func (d *DummyRelay) Error() interface{} {
return d.errorInfo
}

// Status implements Process interface
func (d *DummyRelay) Status() interface{} {
return &pb.RelayStatus{
Stage: pb.Stage_New,
}
}

// Close implements Process interface
func (d *DummyRelay) Close() {}

// IsClosed implements Process interface
func (d *DummyRelay) IsClosed() bool { return false }

func (t *testRelay) TestRelay(c *C) {
originNewRelay := relay.NewRelay
relay.NewRelay = relay.NewDummyRelay
relay.NewRelay = NewDummyRelay
originNewPurger := purger.NewPurger
purger.NewPurger = purger.NewDummyPurger
defer func() {
Expand Down Expand Up @@ -62,7 +159,7 @@ func (t *testRelay) testInit(c *C, holder *realRelayHolder) {
_, err := holder.Init(nil)
c.Assert(err, IsNil)

r, ok := holder.relay.(*relay.DummyRelay)
r, ok := holder.relay.(*DummyRelay)
c.Assert(ok, IsTrue)

initErr := errors.New("init error")
Expand Down Expand Up @@ -106,7 +203,7 @@ func (t *testRelay) testStart(c *C, holder *realRelayHolder) {
}

func (t *testRelay) testClose(c *C, holder *realRelayHolder) {
r, ok := holder.relay.(*relay.DummyRelay)
r, ok := holder.relay.(*DummyRelay)
c.Assert(ok, IsTrue)
processResult := &pb.ProcessResult{
IsCanceled: true,
Expand Down Expand Up @@ -191,7 +288,7 @@ func (t *testRelay) testUpdate(c *C, holder *realRelayHolder) {
c.Assert(waitRelayStage(holder, originStage, 10), IsTrue)
c.Assert(holder.closed.Get(), Equals, closedFalse)

r, ok := holder.relay.(*relay.DummyRelay)
r, ok := holder.relay.(*DummyRelay)
c.Assert(ok, IsTrue)

err := errors.New("reload error")
Expand All @@ -211,7 +308,7 @@ func (t *testRelay) testStop(c *C, holder *realRelayHolder) {
}

func waitRelayStage(holder *realRelayHolder, expect pb.Stage, backoff int) bool {
return waitSomething(backoff, func() bool {
return utils.WaitSomething(backoff, 10*time.Millisecond, func() bool {
return holder.Stage() == expect
})
}
20 changes: 5 additions & 15 deletions dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/dm/dm/pb"
"google.golang.org/grpc"

"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/utils"
)

func TestServer(t *testing.T) {
Expand Down Expand Up @@ -53,7 +55,7 @@ func (t *testServer) TestServer(c *C) {
c.Assert(err1, IsNil)
}()

c.Assert(waitSomething(30, func() bool {
c.Assert(utils.WaitSomething(30, 10*time.Millisecond, func() bool {
return !s.closed.Get()
}), IsTrue)

Expand Down Expand Up @@ -124,7 +126,7 @@ func (t *testServer) TestServer(c *C) {
// close
s.Close()

c.Assert(waitSomething(10, func() bool {
c.Assert(utils.WaitSomething(30, 10*time.Millisecond, func() bool {
return s.closed.Get()
}), IsTrue)

Expand All @@ -146,15 +148,3 @@ func (t *testServer) createClient(c *C) pb.WorkerClient {
c.Assert(err, IsNil)
return pb.NewWorkerClient(conn)
}

func waitSomething(backoff int, fn func() bool) bool {
for i := 0; i < backoff; i++ {
if fn() {
return true
}

time.Sleep(10 * time.Millisecond)
}

return false
}
21 changes: 17 additions & 4 deletions pkg/binlog/reader/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,23 @@ func (t *testTCPReaderSuite) setUpData(c *C) {
query := fmt.Sprintf("DROP DATABASE `%s`", dbName)
_, err := t.db.Exec(query)

// delete previous binlog files/events.
query = "RESET MASTER"
_, err = t.db.Exec(query)
c.Assert(err, IsNil)
backoff := 5
waitTime := 5 * time.Second
waitFn := func() bool {
// delete previous binlog files/events. if other test cases writing events, they may be failed.
query = "RESET MASTER"
_, err = t.db.Exec(query)
c.Assert(err, IsNil)
// check whether other test cases have wrote any events.
time.Sleep(time.Second)
_, gs, err2 := utils.GetMasterStatus(t.db, flavor)
c.Assert(err2, IsNil)
if len(gs.String()) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return gs.String() == "" is simpler?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in f9b1663.

return false // some events exist now, try again later.
}
return true
}
utils.WaitSomething(backoff, waitTime, waitFn)
Copy link
Contributor

@amyangfei amyangfei Jun 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it need to check the wait result?
besides, in which scenario will other test case write binlog even after this test case is started

Copy link
Member Author

@csuzhangxc csuzhangxc Jun 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WaitSomething can only decrease the possibility of errors.
I observed after RESET MASTER, other test cases write binlog event, may it because multi cases run in different goroutines at the same time?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes?

Copy link
Member Author

@csuzhangxc csuzhangxc Jun 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// execute some SQL statements to generate binlog events.
query = fmt.Sprintf("CREATE DATABASE `%s`", dbName)
Expand Down
6 changes: 0 additions & 6 deletions pkg/streamer/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,6 @@ func (r *BinlogReader) parseFile(

onEventFunc := func(e *replication.BinlogEvent) error {
log.Debugf("[streamer] read event %+v", e.Header)
if e.Header.Flags&0x0040 != 0 {
// now LOG_EVENT_RELAY_LOG_F is only used for events which used to fill the gap in relay log file when switching the master server
log.Debugf("skip event %+v created by relay writer", e.Header)
return nil
}

r.latestServerID = e.Header.ServerID // record server_id

switch e.Header.EventType {
Expand Down
14 changes: 14 additions & 0 deletions pkg/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"os"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/siddontang/go-mysql/mysql"
Expand Down Expand Up @@ -76,3 +77,16 @@ func CompareBinlogPos(a, b mysql.Position, deviation float64) int {

return 1
}

// WaitSomething waits for something done with `true`.
func WaitSomething(backoff int, waitTime time.Duration, fn func() bool) bool {
for i := 0; i < backoff; i++ {
if fn() {
return true
}

time.Sleep(waitTime)
}

return false
}
31 changes: 31 additions & 0 deletions pkg/utils/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package utils

import (
"time"

. "github.com/pingcap/check"
"github.com/siddontang/go-mysql/mysql"
)
Expand Down Expand Up @@ -142,3 +144,32 @@ func (t *testUtilsSuite) TestCompareBinlogPos(c *C) {
}

}

func (t *testUtilsSuite) TestWaitSomething(c *C) {
var (
backoff = 10
waitTime = 10 * time.Millisecond
count = 0
)

// wait fail
f1 := func() bool {
count++
return false
}
c.Assert(WaitSomething(backoff, waitTime, f1), IsFalse)
c.Assert(count, Equals, backoff)

count = 0 // reset
// wait success
f2 := func() bool {
count++
if count >= 5 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return count >= 5

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in f9b1663.

return true
}
return false
}

c.Assert(WaitSomething(backoff, waitTime, f2), IsTrue)
c.Assert(count, Equals, 5)
}
11 changes: 3 additions & 8 deletions relay/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,16 @@ import (
"io/ioutil"
"os"
"strings"
"testing"

. "github.com/pingcap/check"
"github.com/siddontang/go-mysql/mysql"

"github.com/pingcap/dm/pkg/gtid"
)

var _ = Suite(&testRelaySuite{})
var _ = Suite(&testMetaSuite{})

func TestSuite(t *testing.T) {
TestingT(t)
}

type testRelaySuite struct {
type testMetaSuite struct {
}

type MetaTestCase struct {
Expand All @@ -41,7 +36,7 @@ type MetaTestCase struct {
gset gtid.Set
}

func (r *testRelaySuite) TestLocalMeta(c *C) {
func (r *testMetaSuite) TestLocalMeta(c *C) {
dir, err := ioutil.TempDir("", "test_local_meta")
c.Assert(err, IsNil)
defer os.RemoveAll(dir)
Expand Down
6 changes: 3 additions & 3 deletions relay/reader/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import (
"github.com/pingcap/errors"
)

// isIgnorableError checks whether the error is ignorable.
func isIgnorableError(err error) bool {
// isRetryableError checks whether the error is retryable.
func isRetryableError(err error) bool {
Copy link
Collaborator

@IANTHEREAL IANTHEREAL Jun 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we rename it to a better name? add some worker like retry from, it would be more clear

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let Reader itself to retry in 071bc72. keep its name still be isRetryableError.

err = errors.Cause(err)
switch err {
case context.Canceled:
case context.DeadlineExceeded:
return true
}
return false
Expand Down
14 changes: 7 additions & 7 deletions relay/reader/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,26 @@ package reader
import (
"context"

. "github.com/pingcap/check"
"github.com/pingcap/check"
"github.com/pingcap/errors"
)

var (
_ = Suite(&testErrorSuite{})
_ = check.Suite(&testErrorSuite{})
)

type testErrorSuite struct {
}

func (t *testErrorSuite) TestIgnorable(c *C) {
func (t *testErrorSuite) TestRetryable(c *check.C) {
err := errors.New("custom error")
c.Assert(isIgnorableError(err), IsFalse)
c.Assert(isRetryableError(err), check.IsFalse)

cases := []error{
context.Canceled,
errors.Annotate(context.Canceled, "annotated"),
context.DeadlineExceeded,
errors.Annotate(context.DeadlineExceeded, "annotated"),
}
for _, cs := range cases {
c.Assert(isIgnorableError(cs), IsTrue)
c.Assert(isRetryableError(cs), check.IsTrue)
}
}
Loading