Skip to content

Commit

Permalink
cherry pick pingcap#1390 to release-2.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
GMHDBJD authored and ti-srebot committed Jan 26, 2021
1 parent 2124999 commit 971c2ad
Show file tree
Hide file tree
Showing 12 changed files with 434 additions and 223 deletions.
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ ErrNoUUIDDirMatchGTID,[code=11120:class=functional:scope=internal:level=high], "
ErrNoRelayPosMatchGTID,[code=11121:class=functional:scope=internal:level=high], "Message: no relay pos match gtid %s"
ErrReaderReachEndOfFile,[code=11122:class=functional:scope=internal:level=low]
ErrMetadataNoBinlogLoc,[code=11123:class=functional:scope=upstream:level=low], "Message: didn't found binlog location in dumped metadata file %s, Workaround: Please check log of dump unit, there maybe errors when read upstream binlog status"
ErrPreviousGTIDNotExist,[code=11124:class=functional:scope=internal:level=high], "Message: no previous gtid event from binlog %s"
ErrConfigCheckItemNotSupport,[code=20001:class=config:scope=internal:level=medium], "Message: checking item %s is not supported\n%s, Workaround: Please check `ignore-checking-items` config in task configuration file, which can be set including `all`/`dump_privilege`/`replication_privilege`/`version`/`binlog_enable`/`binlog_format`/`binlog_row_image`/`table_schema`/`schema_of_shard_tables`/`auto_increment_ID`."
ErrConfigTomlTransform,[code=20002:class=config:scope=internal:level=medium], "Message: %s, Workaround: Please check the configuration file has correct TOML format."
ErrConfigYamlTransform,[code=20003:class=config:scope=internal:level=medium], "Message: %s, Workaround: Please check the configuration file has correct YAML format."
Expand Down
2 changes: 1 addition & 1 deletion chaos/cases/conf/source3.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
source-id: "replica-03"
enable-gtid: false
enable-gtid: true
enable-relay: true

from:
Expand Down
6 changes: 6 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,12 @@ description = ""
workaround = "Please check log of dump unit, there maybe errors when read upstream binlog status"
tags = ["upstream", "low"]

[error.DM-functional-11124]
message = "no previous gtid event from binlog %s"
description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-config-20001]
message = "checking item %s is not supported\n%s"
description = ""
Expand Down
16 changes: 16 additions & 0 deletions pkg/binlog/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,3 +851,19 @@ func GenDummyEvent(header *replication.EventHeader, latestPos uint32, eventSize
ev, err := GenQueryEvent(&headerClone, latestPos, 0, 0, 0, nil, nil, queryBytes)
return ev, err
}

// GenHeartbeatEvent generates a heartbeat event.
// ref: https://dev.mysql.com/doc/internals/en/heartbeat-event.html
func GenHeartbeatEvent(header *replication.EventHeader) *replication.BinlogEvent {
// modify header
headerClone := *header // do a copy
headerClone.Flags = 0
headerClone.EventSize = 39
headerClone.Timestamp = 0
headerClone.EventType = replication.HEARTBEAT_EVENT

eventBytes := make([]byte, 39)
ev := &replication.BinlogEvent{Header: &headerClone, Event: &replication.GenericEvent{Data: eventBytes}}

return ev
}
36 changes: 36 additions & 0 deletions pkg/binlog/reader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,39 @@ func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid
}
}
}

// GetPreviousGTIDFromGTIDSet tries to get previous GTID sets from Previous_GTID_EVENT GTID for the specified GITD Set.
// events should be [fake_rotate_event,format_description_event,previous_gtids_event/mariadb_gtid_list_event]
func GetPreviousGTIDFromGTIDSet(ctx context.Context, r Reader, gset gtid.Set) (gtid.Set, error) {
err := r.StartSyncByGTID(gset)
if err != nil {
return nil, err
}
defer r.Close()

for {
var e *replication.BinlogEvent
e, err = r.GetEvent(ctx)
if err != nil {
return nil, err
}

switch e.Header.EventType {
case replication.ROTATE_EVENT:
if e.Header.Timestamp == 0 || e.Header.LogPos == 0 { // fake rotate event
continue
}
return nil, terror.ErrPreviousGTIDNotExist.Generate(gset.String())
case replication.FORMAT_DESCRIPTION_EVENT:
continue
case replication.PREVIOUS_GTIDS_EVENT:
previousGset, err := event.GTIDsFromPreviousGTIDsEvent(e)
return previousGset, err
case replication.MARIADB_GTID_LIST_EVENT:
previousGset, err := event.GTIDsFromMariaDBGTIDListEvent(e)
return previousGset, err
default:
return nil, terror.ErrPreviousGTIDNotExist.Generate(gset.String())
}
}
}
29 changes: 29 additions & 0 deletions pkg/binlog/reader/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,32 @@ func (t *testTCPReaderSuite) TestGetGTIDsForPos(c *C) {
c.Assert(err, ErrorMatches, ".*invalid position .* or GTID not enabled in upstream.*")
c.Assert(gs, IsNil)
}

// added to testTCPReaderSuite to re-use DB connection.
func (t *testTCPReaderSuite) TestGetPreviousGTIDFromGTIDSet(c *C) {
var (
cfg = replication.BinlogSyncerConfig{
ServerID: serverIDs[1],
Flavor: flavor,
Host: t.host,
Port: uint16(t.port),
User: t.user,
Password: t.password,
UseDecimal: true,
VerifyChecksum: true,
}
ctx, cancel = context.WithTimeout(context.Background(), utils.DefaultDBTimeout)
)
defer cancel()

_, endGS, err := utils.GetMasterStatus(ctx, t.db, flavor)
c.Assert(err, IsNil)

r1 := NewTCPReader(cfg)
c.Assert(r1, NotNil)
defer r1.Close()

gs, err := GetPreviousGTIDFromGTIDSet(ctx, r1, endGS)
c.Assert(err, IsNil)
c.Assert(endGS.Contain(gs), IsTrue)
}
Loading

0 comments on commit 971c2ad

Please sign in to comment.