Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

relay: align with MySQL when using GTID #1390

Merged
merged 17 commits into from
Jan 26, 2021
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ ErrNoUUIDDirMatchGTID,[code=11120:class=functional:scope=internal:level=high], "
ErrNoRelayPosMatchGTID,[code=11121:class=functional:scope=internal:level=high], "Message: no relay pos match gtid %s"
ErrReaderReachEndOfFile,[code=11122:class=functional:scope=internal:level=low]
ErrMetadataNoBinlogLoc,[code=11123:class=functional:scope=upstream:level=low], "Message: didn't found binlog location in dumped metadata file %s, Workaround: Please check log of dump unit, there maybe errors when read upstream binlog status"
ErrPreviousGTIDNotExist,[code=11124:class=functional:scope=internal:level=high], "Message: no previous gtid event from binlog %s"
ErrConfigCheckItemNotSupport,[code=20001:class=config:scope=internal:level=medium], "Message: checking item %s is not supported\n%s, Workaround: Please check `ignore-checking-items` config in task configuration file, which can be set including `all`/`dump_privilege`/`replication_privilege`/`version`/`binlog_enable`/`binlog_format`/`binlog_row_image`/`table_schema`/`schema_of_shard_tables`/`auto_increment_ID`."
ErrConfigTomlTransform,[code=20002:class=config:scope=internal:level=medium], "Message: %s, Workaround: Please check the configuration file has correct TOML format."
ErrConfigYamlTransform,[code=20003:class=config:scope=internal:level=medium], "Message: %s, Workaround: Please check the configuration file has correct YAML format."
Expand Down
2 changes: 1 addition & 1 deletion chaos/cases/conf/source3.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
source-id: "replica-03"
enable-gtid: false
enable-gtid: true
enable-relay: true

from:
Expand Down
6 changes: 6 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,12 @@ description = ""
workaround = "Please check log of dump unit, there maybe errors when read upstream binlog status"
tags = ["upstream", "low"]

[error.DM-functional-11124]
message = "no previous gtid event from binlog %s"
description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-config-20001]
message = "checking item %s is not supported\n%s"
description = ""
Expand Down
16 changes: 16 additions & 0 deletions pkg/binlog/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,3 +851,19 @@ func GenDummyEvent(header *replication.EventHeader, latestPos uint32, eventSize
ev, err := GenQueryEvent(&headerClone, latestPos, 0, 0, 0, nil, nil, queryBytes)
return ev, err
}

// GenHeartbeatEvent generates a heartbeat event.
// ref: https://dev.mysql.com/doc/internals/en/heartbeat-event.html
func GenHeartbeatEvent(header *replication.EventHeader) *replication.BinlogEvent {
// modify header
headerClone := *header // do a copy
headerClone.Flags = 0
headerClone.EventSize = 39
Copy link
Collaborator

@lance6716 lance6716 Jan 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does MySQL choose not keeping size? I think this may cause positions in query-status mismatched which may confuse user

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It keeps the LogPos but not EventSize.

Copy link
Collaborator Author

@GMHDBJD GMHDBJD Jan 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That meets problem when we calculate startPosition, but we don't calculate it for heartbeat event.

headerClone.Timestamp = 0
headerClone.EventType = replication.HEARTBEAT_EVENT

eventBytes := make([]byte, 39)
ev := &replication.BinlogEvent{Header: &headerClone, Event: &replication.GenericEvent{Data: eventBytes}}

return ev
}
36 changes: 36 additions & 0 deletions pkg/binlog/reader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,39 @@ func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid
}
}
}

// GetPreviousGTIDFromGTIDSet tries to get previous GTID sets from Previous_GTID_EVENT GTID for the specified GITD Set.
// events should be [fake_rotate_event,format_description_event,previous_gtids_event/mariadb_gtid_list_event]
func GetPreviousGTIDFromGTIDSet(ctx context.Context, r Reader, gset gtid.Set) (gtid.Set, error) {
err := r.StartSyncByGTID(gset)
if err != nil {
return nil, err
}
defer r.Close()

for {
var e *replication.BinlogEvent
e, err = r.GetEvent(ctx)
if err != nil {
return nil, err
}

switch e.Header.EventType {
case replication.ROTATE_EVENT:
if e.Header.Timestamp == 0 || e.Header.LogPos == 0 { // fake rotate event
continue
}
return nil, terror.ErrPreviousGTIDNotExist.Generate(gset.String())
case replication.FORMAT_DESCRIPTION_EVENT:
continue
case replication.PREVIOUS_GTIDS_EVENT:
previousGset, err := event.GTIDsFromPreviousGTIDsEvent(e)
return previousGset, err
case replication.MARIADB_GTID_LIST_EVENT:
previousGset, err := event.GTIDsFromMariaDBGTIDListEvent(e)
return previousGset, err
default:
return nil, terror.ErrPreviousGTIDNotExist.Generate(gset.String())
}
}
}
29 changes: 29 additions & 0 deletions pkg/binlog/reader/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,32 @@ func (t *testTCPReaderSuite) TestGetGTIDsForPos(c *C) {
c.Assert(err, ErrorMatches, ".*invalid position .* or GTID not enabled in upstream.*")
c.Assert(gs, IsNil)
}

// added to testTCPReaderSuite to re-use DB connection.
func (t *testTCPReaderSuite) TestGetPreviousGTIDFromGTIDSet(c *C) {
var (
cfg = replication.BinlogSyncerConfig{
ServerID: serverIDs[1],
Flavor: flavor,
Host: t.host,
Port: uint16(t.port),
User: t.user,
Password: t.password,
UseDecimal: true,
VerifyChecksum: true,
}
ctx, cancel = context.WithTimeout(context.Background(), utils.DefaultDBTimeout)
)
defer cancel()

_, endGS, err := utils.GetMasterStatus(ctx, t.db, flavor)
c.Assert(err, IsNil)

r1 := NewTCPReader(cfg)
c.Assert(r1, NotNil)
defer r1.Close()

gs, err := GetPreviousGTIDFromGTIDSet(ctx, r1, endGS)
c.Assert(err, IsNil)
c.Assert(endGS.Contain(gs), IsTrue)
}
Loading