From 7c7097cd3ee84d8fa88a3d677ae4aff595fe5ca8 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 28 Jan 2021 16:24:39 +0800 Subject: [PATCH] streamer: add heartbeat for local streamer (#1404) --- pkg/streamer/reader_test.go | 6 ++++++ pkg/streamer/streamer.go | 16 ++++++++++++++++ pkg/streamer/streamer_test.go | 20 ++++++++++++++++++++ 3 files changed, 42 insertions(+) diff --git a/pkg/streamer/reader_test.go b/pkg/streamer/reader_test.go index f9d9e7d44f..863f471359 100644 --- a/pkg/streamer/reader_test.go +++ b/pkg/streamer/reader_test.go @@ -33,6 +33,7 @@ import ( "github.com/google/uuid" . "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/siddontang/go-mysql/mysql" gmysql "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" @@ -62,6 +63,11 @@ func (t *testReaderSuite) SetUpSuite(c *C) { t.lastPos = 0 t.lastGTID, err = gtid.ParserGTID(mysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110002:0") c.Assert(err, IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval", "return(10000)"), IsNil) +} + +func (t *testReaderSuite) TearDownSuite(c *C) { + c.Assert(failpoint.Disable("github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval"), IsNil) } func (t *testReaderSuite) TestParseFileBase(c *C) { diff --git a/pkg/streamer/streamer.go b/pkg/streamer/streamer.go index b426eb17be..a3c72519dd 100644 --- a/pkg/streamer/streamer.go +++ b/pkg/streamer/streamer.go @@ -15,7 +15,9 @@ package streamer import ( "context" + "time" + "github.com/pingcap/dm/pkg/binlog/event" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" @@ -24,6 +26,10 @@ import ( "go.uber.org/zap" ) +var ( + heartbeatInterval = 30 * time.Second +) + // TODO: maybe one day we can make a pull request to go-mysql to support LocalStreamer. // Streamer provides the ability to get binlog event from remote server or local file. @@ -51,7 +57,17 @@ func (s *LocalStreamer) GetEvent(ctx context.Context) (*replication.BinlogEvent, failpoint.Return(nil, terror.ErrSyncClosed.Generate()) }) + failpoint.Inject("SetHeartbeatInterval", func(v failpoint.Value) { + i := v.(int) + log.L().Info("will change heartbeat interval", zap.Int("new", i)) + heartbeatInterval = time.Duration(i) * time.Second + }) + select { + case <-time.After(heartbeatInterval): + // MySQL will send heartbeat event 30s by default + heartbeatHeader := &replication.EventHeader{} + return event.GenHeartbeatEvent(heartbeatHeader), nil case c := <-s.ch: return c, nil case s.err = <-s.ech: diff --git a/pkg/streamer/streamer_test.go b/pkg/streamer/streamer_test.go index e4c178cad5..cd2026fe61 100644 --- a/pkg/streamer/streamer_test.go +++ b/pkg/streamer/streamer_test.go @@ -19,6 +19,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/siddontang/go-mysql/replication" "github.com/pingcap/dm/pkg/binlog/event" @@ -31,6 +32,11 @@ type testStreamerSuite struct { } func (t *testStreamerSuite) TestStreamer(c *C) { + c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval", "return(10000)"), IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval"), IsNil) + }() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() @@ -98,3 +104,17 @@ func (t *testStreamerSuite) TestStreamer(c *C) { c.Assert(terror.ErrNeedSyncAgain.Equal(err), IsTrue) c.Assert(ev2, IsNil) } + +func (t *testStreamerSuite) TestHeartbeat(c *C) { + c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval", "return(1)"), IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/dm/pkg/streamer/SetHeartbeatInterval"), IsNil) + }() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + s := newLocalStreamer() + ev, err := s.GetEvent(ctx) + c.Assert(err, IsNil) + c.Assert(ev.Header.EventType, Equals, replication.HEARTBEAT_EVENT) +}