Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
relay: use new Reader, Transformer and Writer (#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc authored Jun 27, 2019
1 parent b01e7c1 commit 6fbddf3
Show file tree
Hide file tree
Showing 20 changed files with 1,051 additions and 876 deletions.
107 changes: 101 additions & 6 deletions dm/worker/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,117 @@ package worker

import (
"context"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
pkgstreamer "github.com/pingcap/dm/pkg/streamer"
"github.com/pingcap/dm/pkg/utils"
"github.com/pingcap/dm/relay"
"github.com/pingcap/dm/relay/purger"
"github.com/pingcap/errors"
)

type testRelay struct{}

var _ = Suite(&testRelay{})

/*********** dummy relay log process unit, used only for testing *************/

// DummyRelay is a dummy relay
type DummyRelay struct {
initErr error

processResult pb.ProcessResult
errorInfo *pb.RelayError
reloadErr error
}

// NewDummyRelay creates an instance of dummy Relay.
func NewDummyRelay(cfg *relay.Config) relay.Process {
return &DummyRelay{}
}

// Init implements Process interface
func (d *DummyRelay) Init() error {
return d.initErr
}

// InjectInitError injects init error
func (d *DummyRelay) InjectInitError(err error) {
d.initErr = err
}

// Process implements Process interface
func (d *DummyRelay) Process(ctx context.Context, pr chan pb.ProcessResult) {
<-ctx.Done()
pr <- d.processResult
}

// InjectProcessResult injects process result
func (d *DummyRelay) InjectProcessResult(result pb.ProcessResult) {
d.processResult = result
}

// SwitchMaster implements Process interface
func (d *DummyRelay) SwitchMaster(ctx context.Context, req *pb.SwitchRelayMasterRequest) error {
return nil
}

// Migrate implements Process interface
func (d *DummyRelay) Migrate(ctx context.Context, binlogName string, binlogPos uint32) error {
return nil
}

// ActiveRelayLog implements Process interface
func (d *DummyRelay) ActiveRelayLog() *pkgstreamer.RelayLogInfo {
return nil
}

// Reload implements Process interface
func (d *DummyRelay) Reload(newCfg *relay.Config) error {
return d.reloadErr
}

// InjectReloadError injects reload error
func (d *DummyRelay) InjectReloadError(err error) {
d.reloadErr = err
}

// Update implements Process interface
func (d *DummyRelay) Update(cfg *config.SubTaskConfig) error {
return nil
}

// Resume implements Process interface
func (d *DummyRelay) Resume(ctx context.Context, pr chan pb.ProcessResult) {}

// Pause implements Process interface
func (d *DummyRelay) Pause() {}

// Error implements Process interface
func (d *DummyRelay) Error() interface{} {
return d.errorInfo
}

// Status implements Process interface
func (d *DummyRelay) Status() interface{} {
return &pb.RelayStatus{
Stage: pb.Stage_New,
}
}

// Close implements Process interface
func (d *DummyRelay) Close() {}

// IsClosed implements Process interface
func (d *DummyRelay) IsClosed() bool { return false }

func (t *testRelay) TestRelay(c *C) {
originNewRelay := relay.NewRelay
relay.NewRelay = relay.NewDummyRelay
relay.NewRelay = NewDummyRelay
originNewPurger := purger.NewPurger
purger.NewPurger = purger.NewDummyPurger
defer func() {
Expand Down Expand Up @@ -62,7 +157,7 @@ func (t *testRelay) testInit(c *C, holder *realRelayHolder) {
_, err := holder.Init(nil)
c.Assert(err, IsNil)

r, ok := holder.relay.(*relay.DummyRelay)
r, ok := holder.relay.(*DummyRelay)
c.Assert(ok, IsTrue)

initErr := errors.New("init error")
Expand Down Expand Up @@ -106,7 +201,7 @@ func (t *testRelay) testStart(c *C, holder *realRelayHolder) {
}

func (t *testRelay) testClose(c *C, holder *realRelayHolder) {
r, ok := holder.relay.(*relay.DummyRelay)
r, ok := holder.relay.(*DummyRelay)
c.Assert(ok, IsTrue)
processResult := &pb.ProcessResult{
IsCanceled: true,
Expand Down Expand Up @@ -191,7 +286,7 @@ func (t *testRelay) testUpdate(c *C, holder *realRelayHolder) {
c.Assert(waitRelayStage(holder, originStage, 10), IsTrue)
c.Assert(holder.closed.Get(), Equals, closedFalse)

r, ok := holder.relay.(*relay.DummyRelay)
r, ok := holder.relay.(*DummyRelay)
c.Assert(ok, IsTrue)

err := errors.New("reload error")
Expand All @@ -211,7 +306,7 @@ func (t *testRelay) testStop(c *C, holder *realRelayHolder) {
}

func waitRelayStage(holder *realRelayHolder, expect pb.Stage, backoff int) bool {
return waitSomething(backoff, func() bool {
return utils.WaitSomething(backoff, 10*time.Millisecond, func() bool {
return holder.Stage() == expect
})
}
20 changes: 5 additions & 15 deletions dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/dm/dm/pb"
"google.golang.org/grpc"

"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/utils"
)

func TestServer(t *testing.T) {
Expand Down Expand Up @@ -53,7 +55,7 @@ func (t *testServer) TestServer(c *C) {
c.Assert(err1, IsNil)
}()

c.Assert(waitSomething(30, func() bool {
c.Assert(utils.WaitSomething(30, 10*time.Millisecond, func() bool {
return !s.closed.Get()
}), IsTrue)

Expand Down Expand Up @@ -124,7 +126,7 @@ func (t *testServer) TestServer(c *C) {
// close
s.Close()

c.Assert(waitSomething(10, func() bool {
c.Assert(utils.WaitSomething(30, 10*time.Millisecond, func() bool {
return s.closed.Get()
}), IsTrue)

Expand All @@ -146,15 +148,3 @@ func (t *testServer) createClient(c *C) pb.WorkerClient {
c.Assert(err, IsNil)
return pb.NewWorkerClient(conn)
}

func waitSomething(backoff int, fn func() bool) bool {
for i := 0; i < backoff; i++ {
if fn() {
return true
}

time.Sleep(10 * time.Millisecond)
}

return false
}
18 changes: 14 additions & 4 deletions pkg/binlog/reader/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,20 @@ func (t *testTCPReaderSuite) setUpData(c *C) {
query := fmt.Sprintf("DROP DATABASE `%s`", dbName)
_, err := t.db.Exec(query)

// delete previous binlog files/events.
query = "RESET MASTER"
_, err = t.db.Exec(query)
c.Assert(err, IsNil)
backoff := 5
waitTime := 5 * time.Second
waitFn := func() bool {
// delete previous binlog files/events. if other test cases writing events, they may be failed.
query = "RESET MASTER"
_, err = t.db.Exec(query)
c.Assert(err, IsNil)
// check whether other test cases have wrote any events.
time.Sleep(time.Second)
_, gs, err2 := utils.GetMasterStatus(t.db, flavor)
c.Assert(err2, IsNil)
return gs.String() == "" // break waiting if no other case wrote any events
}
utils.WaitSomething(backoff, waitTime, waitFn)

// execute some SQL statements to generate binlog events.
query = fmt.Sprintf("CREATE DATABASE `%s`", dbName)
Expand Down
6 changes: 0 additions & 6 deletions pkg/streamer/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,6 @@ func (r *BinlogReader) parseFile(

onEventFunc := func(e *replication.BinlogEvent) error {
log.Debugf("[streamer] read event %+v", e.Header)
if e.Header.Flags&0x0040 != 0 {
// now LOG_EVENT_RELAY_LOG_F is only used for events which used to fill the gap in relay log file when switching the master server
log.Debugf("skip event %+v created by relay writer", e.Header)
return nil
}

r.latestServerID = e.Header.ServerID // record server_id

switch e.Header.EventType {
Expand Down
14 changes: 14 additions & 0 deletions pkg/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"os"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/siddontang/go-mysql/mysql"
Expand Down Expand Up @@ -76,3 +77,16 @@ func CompareBinlogPos(a, b mysql.Position, deviation float64) int {

return 1
}

// WaitSomething waits for something done with `true`.
func WaitSomething(backoff int, waitTime time.Duration, fn func() bool) bool {
for i := 0; i < backoff; i++ {
if fn() {
return true
}

time.Sleep(waitTime)
}

return false
}
28 changes: 28 additions & 0 deletions pkg/utils/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package utils

import (
"time"

. "github.com/pingcap/check"
"github.com/siddontang/go-mysql/mysql"
)
Expand Down Expand Up @@ -142,3 +144,29 @@ func (t *testUtilsSuite) TestCompareBinlogPos(c *C) {
}

}

func (t *testUtilsSuite) TestWaitSomething(c *C) {
var (
backoff = 10
waitTime = 10 * time.Millisecond
count = 0
)

// wait fail
f1 := func() bool {
count++
return false
}
c.Assert(WaitSomething(backoff, waitTime, f1), IsFalse)
c.Assert(count, Equals, backoff)

count = 0 // reset
// wait success
f2 := func() bool {
count++
return count >= 5
}

c.Assert(WaitSomething(backoff, waitTime, f2), IsTrue)
c.Assert(count, Equals, 5)
}
11 changes: 3 additions & 8 deletions relay/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,16 @@ import (
"io/ioutil"
"os"
"strings"
"testing"

. "github.com/pingcap/check"
"github.com/siddontang/go-mysql/mysql"

"github.com/pingcap/dm/pkg/gtid"
)

var _ = Suite(&testRelaySuite{})
var _ = Suite(&testMetaSuite{})

func TestSuite(t *testing.T) {
TestingT(t)
}

type testRelaySuite struct {
type testMetaSuite struct {
}

type MetaTestCase struct {
Expand All @@ -41,7 +36,7 @@ type MetaTestCase struct {
gset gtid.Set
}

func (r *testRelaySuite) TestLocalMeta(c *C) {
func (r *testMetaSuite) TestLocalMeta(c *C) {
dir, err := ioutil.TempDir("", "test_local_meta")
c.Assert(err, IsNil)
defer os.RemoveAll(dir)
Expand Down
6 changes: 3 additions & 3 deletions relay/reader/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import (
"github.com/pingcap/errors"
)

// isIgnorableError checks whether the error is ignorable.
func isIgnorableError(err error) bool {
// isRetryableError checks whether the error is retryable.
func isRetryableError(err error) bool {
err = errors.Cause(err)
switch err {
case context.Canceled:
case context.DeadlineExceeded:
return true
}
return false
Expand Down
14 changes: 7 additions & 7 deletions relay/reader/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,26 @@ package reader
import (
"context"

. "github.com/pingcap/check"
"github.com/pingcap/check"
"github.com/pingcap/errors"
)

var (
_ = Suite(&testErrorSuite{})
_ = check.Suite(&testErrorSuite{})
)

type testErrorSuite struct {
}

func (t *testErrorSuite) TestIgnorable(c *C) {
func (t *testErrorSuite) TestRetryable(c *check.C) {
err := errors.New("custom error")
c.Assert(isIgnorableError(err), IsFalse)
c.Assert(isRetryableError(err), check.IsFalse)

cases := []error{
context.Canceled,
errors.Annotate(context.Canceled, "annotated"),
context.DeadlineExceeded,
errors.Annotate(context.DeadlineExceeded, "annotated"),
}
for _, cs := range cases {
c.Assert(isIgnorableError(cs), IsTrue)
c.Assert(isRetryableError(cs), check.IsTrue)
}
}
Loading

0 comments on commit 6fbddf3

Please sign in to comment.