Skip to content

Commit

Permalink
dm/binlog: find binlog location by timestamp (pingcap#4220)
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored and zhaoxinyu committed Jan 20, 2022
1 parent 8d8f895 commit 3516043
Show file tree
Hide file tree
Showing 23 changed files with 1,158 additions and 254 deletions.
60 changes: 39 additions & 21 deletions dm/pkg/binlog/event/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,19 @@ type DDLDMLResult struct {
// for MySQL:
// 1. BinLogFileHeader, [ fe `bin` ]
// 2. FormatDescriptionEvent
// 3. PreviousGTIDsEvent
// 3. PreviousGTIDsEvent, depends on genGTID
// for MariaDB:
// 1. BinLogFileHeader, [ fe `bin` ]
// 2. FormatDescriptionEvent
// 3. MariadbGTIDListEvent
// 3. MariadbGTIDListEvent, depends on genGTID
// -. MariadbBinlogCheckPointEvent, not added yet
func GenCommonFileHeader(flavor string, serverID uint32, gSet gtid.Set) ([]*replication.BinlogEvent, []byte, error) {
func GenCommonFileHeader(flavor string, serverID uint32, gSet gtid.Set, genGTID bool, ts int64) ([]*replication.BinlogEvent, []byte, error) {
if ts == 0 {
ts = time.Now().Unix()
}
var (
header = &replication.EventHeader{
Timestamp: uint32(time.Now().Unix()),
Timestamp: uint32(ts),
ServerID: serverID,
Flags: defaultHeaderFlags,
}
Expand All @@ -59,46 +62,57 @@ func GenCommonFileHeader(flavor string, serverID uint32, gSet gtid.Set) ([]*repl
}
latestPos += uint32(len(formatDescEv.RawData)) // update latestPos

switch flavor {
case gmysql.MySQLFlavor:
prevGTIDsEv, err = GenPreviousGTIDsEvent(header, latestPos, gSet)
case gmysql.MariaDBFlavor:
prevGTIDsEv, err = GenMariaDBGTIDListEvent(header, latestPos, gSet)
default:
return nil, nil, terror.ErrBinlogFlavorNotSupport.Generate(flavor)
}
if err != nil {
return nil, nil, terror.Annotate(err, "generate PreviousGTIDsEvent/MariadbGTIDListEvent")
if genGTID {
switch flavor {
case gmysql.MySQLFlavor:
prevGTIDsEv, err = GenPreviousGTIDsEvent(header, latestPos, gSet)
case gmysql.MariaDBFlavor:
prevGTIDsEv, err = GenMariaDBGTIDListEvent(header, latestPos, gSet)
default:
return nil, nil, terror.ErrBinlogFlavorNotSupport.Generate(flavor)
}
if err != nil {
return nil, nil, terror.Annotate(err, "generate PreviousGTIDsEvent/MariadbGTIDListEvent")
}
}

var buf bytes.Buffer
_, err = buf.Write(replication.BinLogFileHeader)
if err != nil {
return nil, nil, terror.ErrBinlogWriteDataToBuffer.AnnotateDelegate(err, "write binlog file header % X", replication.BinLogFileHeader)
}

var events []*replication.BinlogEvent
_, err = buf.Write(formatDescEv.RawData)
if err != nil {
return nil, nil, terror.ErrBinlogWriteDataToBuffer.AnnotateDelegate(err, "write FormatDescriptionEvent % X", formatDescEv.RawData)
}
_, err = buf.Write(prevGTIDsEv.RawData)
if err != nil {
return nil, nil, terror.ErrBinlogWriteDataToBuffer.AnnotateDelegate(err, "write PreviousGTIDsEvent/MariadbGTIDListEvent % X", prevGTIDsEv.RawData)
events = append(events, formatDescEv)

if genGTID {
_, err = buf.Write(prevGTIDsEv.RawData)
if err != nil {
return nil, nil, terror.ErrBinlogWriteDataToBuffer.AnnotateDelegate(err, "write PreviousGTIDsEvent/MariadbGTIDListEvent % X", prevGTIDsEv.RawData)
}
events = append(events, prevGTIDsEv)
}

events := []*replication.BinlogEvent{formatDescEv, prevGTIDsEv}
return events, buf.Bytes(), nil
}

// GenCommonGTIDEvent generates a common GTID event.
func GenCommonGTIDEvent(flavor string, serverID uint32, latestPos uint32, gSet gtid.Set) (*replication.BinlogEvent, error) {
func GenCommonGTIDEvent(flavor string, serverID uint32, latestPos uint32, gSet gtid.Set, anonymous bool, ts int64) (*replication.BinlogEvent, error) {
singleGTID, err := verifySingleGTID(flavor, gSet)
if err != nil {
return nil, terror.Annotate(err, "verify single GTID in set")
}

if ts == 0 {
ts = time.Now().Unix()
}
var (
header = &replication.EventHeader{
Timestamp: uint32(time.Now().Unix()),
Timestamp: uint32(ts),
ServerID: serverID,
Flags: defaultHeaderFlags,
}
Expand All @@ -109,7 +123,11 @@ func GenCommonGTIDEvent(flavor string, serverID uint32, latestPos uint32, gSet g
case gmysql.MySQLFlavor:
uuidSet := singleGTID.(*gmysql.UUIDSet)
interval := uuidSet.Intervals[0]
gtidEv, err = GenGTIDEvent(header, latestPos, defaultGTIDFlags, uuidSet.SID.String(), interval.Start, defaultLastCommitted, defaultSequenceNumber)
if anonymous {
gtidEv, err = GenAnonymousGTIDEvent(header, latestPos, defaultGTIDFlags, defaultLastCommitted, defaultSequenceNumber)
} else {
gtidEv, err = GenGTIDEvent(header, latestPos, defaultGTIDFlags, uuidSet.SID.String(), interval.Start, defaultLastCommitted, defaultSequenceNumber)
}
case gmysql.MariaDBFlavor:
mariaGTID := singleGTID.(*gmysql.MariadbGTID)
if mariaGTID.ServerID != header.ServerID {
Expand Down
22 changes: 11 additions & 11 deletions dm/pkg/binlog/event/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (t *testCommonSuite) TestGenCommonFileHeader(c *C) {
gSet, err := gtid.ParserGTID(flavor, gSetStr)
c.Assert(err, IsNil)

events, data, err := GenCommonFileHeader(flavor, serverID, gSet)
events, data, err := GenCommonFileHeader(flavor, serverID, gSet, true, 0)
c.Assert(err, IsNil)
c.Assert(len(events), Equals, 2)
c.Assert(events[0].Header.EventType, Equals, replication.FORMAT_DESCRIPTION_EVENT)
Expand Down Expand Up @@ -79,7 +79,7 @@ func (t *testCommonSuite) TestGenCommonFileHeader(c *C) {
gSet, err = gtid.ParserGTID(flavor, gSetStr)
c.Assert(err, IsNil)

events, data, err = GenCommonFileHeader(flavor, serverID, gSet)
events, data, err = GenCommonFileHeader(flavor, serverID, gSet, true, 0)
c.Assert(err, IsNil)
c.Assert(len(events), Equals, 2)
c.Assert(events[0].Header.EventType, Equals, replication.FORMAT_DESCRIPTION_EVENT)
Expand Down Expand Up @@ -107,31 +107,31 @@ func (t *testCommonSuite) TestGenCommonGTIDEvent(c *C) {
)

// nil gSet, invalid
gtidEv, err := GenCommonGTIDEvent(flavor, serverID, latestPos, gSet)
gtidEv, err := GenCommonGTIDEvent(flavor, serverID, latestPos, gSet, false, 0)
c.Assert(err, NotNil)
c.Assert(gtidEv, IsNil)

// multi GTID in set, invalid
gSetStr := "03fc0263-28c7-11e7-a653-6c0b84d59f30:1-123,05474d3c-28c7-11e7-8352-203db246dd3d:1-456,10b039fc-c843-11e7-8f6a-1866daf8d810:1-789"
gSet, err = gtid.ParserGTID(flavor, gSetStr)
c.Assert(err, IsNil)
gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet)
gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet, false, 0)
c.Assert(err, NotNil)
c.Assert(gtidEv, IsNil)

// multi intervals, invalid
gSetStr = "03fc0263-28c7-11e7-a653-6c0b84d59f30:1-123:200-456"
gSet, err = gtid.ParserGTID(flavor, gSetStr)
c.Assert(err, IsNil)
gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet)
gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet, false, 0)
c.Assert(err, NotNil)
c.Assert(gtidEv, IsNil)

// interval > 1, invalid
gSetStr = "03fc0263-28c7-11e7-a653-6c0b84d59f30:1-123"
gSet, err = gtid.ParserGTID(flavor, gSetStr)
c.Assert(err, IsNil)
gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet)
gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet, false, 0)
c.Assert(err, NotNil)
c.Assert(gtidEv, IsNil)

Expand All @@ -141,7 +141,7 @@ func (t *testCommonSuite) TestGenCommonGTIDEvent(c *C) {
c.Assert(err, IsNil)
sid, err := ParseSID(gSetStr[:len(gSetStr)-4])
c.Assert(err, IsNil)
gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet)
gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet, false, 0)
c.Assert(err, IsNil)
c.Assert(gtidEv, NotNil)
c.Assert(gtidEv.Header.EventType, Equals, replication.GTID_EVENT)
Expand All @@ -160,31 +160,31 @@ func (t *testCommonSuite) TestGenCommonGTIDEvent(c *C) {
flavor = gmysql.MariaDBFlavor

// GTID mismatch with flavor
gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet)
gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet, false, 0)
c.Assert(err, NotNil)
c.Assert(gtidEv, IsNil)

// multi GTID in set, invalid
gSetStr = "1-2-3,4-5-6"
gSet, err = gtid.ParserGTID(flavor, gSetStr)
c.Assert(err, IsNil)
gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet)
gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet, false, 0)
c.Assert(err, NotNil)
c.Assert(gtidEv, IsNil)

// server_id mismatch, invalid
gSetStr = "1-2-3"
gSet, err = gtid.ParserGTID(flavor, gSetStr)
c.Assert(err, IsNil)
gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet)
gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet, false, 0)
c.Assert(err, NotNil)
c.Assert(gtidEv, IsNil)

// valid
gSetStr = fmt.Sprintf("1-%d-3", serverID)
gSet, err = gtid.ParserGTID(flavor, gSetStr)
c.Assert(err, IsNil)
gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet)
gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet, false, 0)
c.Assert(err, IsNil)
c.Assert(gtidEv, NotNil)
c.Assert(gtidEv.Header.EventType, Equals, replication.MARIADB_GTID_EVENT)
Expand Down
60 changes: 21 additions & 39 deletions dm/pkg/binlog/event/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package event

import (
"bytes"
"fmt"
"time"

"github.com/go-mysql-org/go-mysql/replication"
Expand All @@ -24,51 +23,29 @@ import (
"github.com/pingcap/tiflow/dm/pkg/terror"
)

// GenCreateDatabaseEvents generates binlog events for `CREATE DATABASE`.
// events: [GTIDEvent, QueryEvent]
func GenCreateDatabaseEvents(flavor string, serverID uint32, latestPos uint32, latestGTID gtid.Set, schema string) (*DDLDMLResult, error) {
query := fmt.Sprintf("CREATE DATABASE `%s`", schema)
return GenDDLEvents(flavor, serverID, latestPos, latestGTID, schema, query)
}

// GenDropDatabaseEvents generates binlog events for `DROP DATABASE`.
// events: [GTIDEvent, QueryEvent]
func GenDropDatabaseEvents(flavor string, serverID uint32, latestPos uint32, latestGTID gtid.Set, schema string) (*DDLDMLResult, error) {
query := fmt.Sprintf("DROP DATABASE `%s`", schema)
return GenDDLEvents(flavor, serverID, latestPos, latestGTID, schema, query)
}

// GenCreateTableEvents generates binlog events for `CREATE TABLE`.
// events: [GTIDEvent, QueryEvent]
// NOTE: we do not support all `column type` and `column meta` for DML now, so the caller should restrict the `query` statement.
func GenCreateTableEvents(flavor string, serverID uint32, latestPos uint32, latestGTID gtid.Set, schema string, query string) (*DDLDMLResult, error) {
return GenDDLEvents(flavor, serverID, latestPos, latestGTID, schema, query)
}

// GenDropTableEvents generates binlog events for `DROP TABLE`.
// events: [GTIDEvent, QueryEvent]
func GenDropTableEvents(flavor string, serverID uint32, latestPos uint32, latestGTID gtid.Set, schema string, table string) (*DDLDMLResult, error) {
query := fmt.Sprintf("DROP TABLE `%s`.`%s`", schema, table)
return GenDDLEvents(flavor, serverID, latestPos, latestGTID, schema, query)
}

// GenDDLEvents generates binlog events for DDL statements.
// events: [GTIDEvent, QueryEvent]
func GenDDLEvents(flavor string, serverID uint32, latestPos uint32, latestGTID gtid.Set, schema string, query string) (*DDLDMLResult, error) {
func GenDDLEvents(flavor string, serverID, latestPos uint32, latestGTID gtid.Set, schema, query string, genGTID, anonymousGTID bool, ts int64) (*DDLDMLResult, error) {
if ts == 0 {
ts = time.Now().Unix()
}
// GTIDEvent, increase GTID first
latestGTID, err := GTIDIncrease(flavor, latestGTID)
if err != nil {
return nil, terror.Annotatef(err, "increase GTID %s", latestGTID)
}
gtidEv, err := GenCommonGTIDEvent(flavor, serverID, latestPos, latestGTID)
if err != nil {
return nil, terror.Annotate(err, "generate GTIDEvent")
var gtidEv *replication.BinlogEvent
if genGTID {
gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, latestGTID, anonymousGTID, ts)
if err != nil {
return nil, terror.Annotate(err, "generate GTIDEvent")
}
latestPos = gtidEv.Header.LogPos
}
latestPos = gtidEv.Header.LogPos

// QueryEvent
header := &replication.EventHeader{
Timestamp: uint32(time.Now().Unix()),
Timestamp: uint32(ts),
ServerID: serverID,
Flags: defaultHeaderFlags,
}
Expand All @@ -79,17 +56,22 @@ func GenDDLEvents(flavor string, serverID uint32, latestPos uint32, latestGTID g
latestPos = queryEv.Header.LogPos

var buf bytes.Buffer
_, err = buf.Write(gtidEv.RawData)
if err != nil {
return nil, terror.ErrBinlogWriteDataToBuffer.AnnotateDelegate(err, "write GTIDEvent data % X", gtidEv.RawData)
var events []*replication.BinlogEvent
if genGTID {
_, err = buf.Write(gtidEv.RawData)
if err != nil {
return nil, terror.ErrBinlogWriteDataToBuffer.AnnotateDelegate(err, "write GTIDEvent data % X", gtidEv.RawData)
}
events = append(events, gtidEv)
}
_, err = buf.Write(queryEv.RawData)
if err != nil {
return nil, terror.ErrBinlogWriteDataToBuffer.AnnotateDelegate(err, "write QueryEvent data % X", queryEv.RawData)
}
events = append(events, queryEv)

return &DDLDMLResult{
Events: []*replication.BinlogEvent{gtidEv, queryEv},
Events: events,
Data: buf.Bytes(),
LatestPos: latestPos,
LatestGTID: latestGTID,
Expand Down
67 changes: 6 additions & 61 deletions dm/pkg/binlog/event/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,74 +37,19 @@ func (t *testDDLSuite) TestGenDDLEvent(c *C) {

// only some simple tests in this case and we can test parsing a binlog file including common header, DDL and DML in another case.

// test CREATE/DROP DATABASE for MySQL
flavor := gmysql.MySQLFlavor
gSetStr := "03fc0263-28c7-11e7-a653-6c0b84d59f30:123"
latestGTID, err := gtid.ParserGTID(flavor, gSetStr)
c.Assert(err, IsNil)

// CREATE DATABASE
result, err := GenCreateDatabaseEvents(flavor, serverID, latestPos, latestGTID, schema)
c.Assert(err, IsNil)
c.Assert(result.Events, HasLen, 2)
// simply check here, more check did in `event_test.go`
c.Assert(bytes.Contains(result.Data, []byte("CREATE DATABASE")), IsTrue)
c.Assert(bytes.Contains(result.Data, []byte(schema)), IsTrue)
c.Assert(result.LatestPos, Equals, latestPos+uint32(len(result.Data)))
c.Assert(result.LatestGTID.String(), Equals, "03fc0263-28c7-11e7-a653-6c0b84d59f30:124")

latestPos = result.LatestPos // update latest pos
latestGTID = result.LatestGTID

// DROP DATABASE
result, err = GenDropDatabaseEvents(flavor, serverID, latestPos, latestGTID, schema)
c.Assert(err, IsNil)
c.Assert(result.Events, HasLen, 2)
c.Assert(bytes.Contains(result.Data, []byte("DROP DATABASE")), IsTrue)
c.Assert(bytes.Contains(result.Data, []byte(schema)), IsTrue)
c.Assert(result.LatestPos, Equals, latestPos+uint32(len(result.Data)))
c.Assert(result.LatestGTID.String(), Equals, "03fc0263-28c7-11e7-a653-6c0b84d59f30:125")

latestPos = result.LatestPos // update latest pos

// test CREATE/DROP table for MariaDB
flavor = gmysql.MariaDBFlavor
gSetStr = fmt.Sprintf("1-%d-3", serverID)
latestGTID, err = gtid.ParserGTID(flavor, gSetStr)
c.Assert(err, IsNil)

// CREATE TABLE
query := fmt.Sprintf("CREATE TABLE `%s` (c1 int)", table)
result, err = GenCreateTableEvents(flavor, serverID, latestPos, latestGTID, schema, query)
c.Assert(err, IsNil)
c.Assert(result.Events, HasLen, 2)
c.Assert(bytes.Contains(result.Data, []byte("CREATE TABLE")), IsTrue)
c.Assert(bytes.Contains(result.Data, []byte(table)), IsTrue)
c.Assert(result.LatestPos, Equals, latestPos+uint32(len(result.Data)))
c.Assert(result.LatestGTID.String(), Equals, fmt.Sprintf("1-%d-4", serverID))

latestPos = result.LatestPos // update latest pos
latestGTID = result.LatestGTID

// DROP TABLE
result, err = GenDropTableEvents(flavor, serverID, latestPos, latestGTID, schema, table)
flavor := gmysql.MariaDBFlavor
gSetStr := fmt.Sprintf("1-%d-3", serverID)
latestGTID, err := gtid.ParserGTID(flavor, gSetStr)
c.Assert(err, IsNil)
c.Assert(result.Events, HasLen, 2)
c.Assert(bytes.Contains(result.Data, []byte("DROP TABLE")), IsTrue)
c.Assert(bytes.Contains(result.Data, []byte(table)), IsTrue)
c.Assert(result.LatestPos, Equals, latestPos+uint32(len(result.Data)))
c.Assert(result.LatestGTID.String(), Equals, fmt.Sprintf("1-%d-5", serverID))

latestPos = result.LatestPos // update latest pos
latestGTID = result.LatestGTID

// ALTER TABLE
query = fmt.Sprintf("ALTER TABLE `%s`.`%s` CHANGE COLUMN `c2` `c2` decimal(10,3)", schema, table)
result, err = GenDDLEvents(flavor, serverID, latestPos, latestGTID, schema, query)
query := fmt.Sprintf("ALTER TABLE `%s`.`%s` CHANGE COLUMN `c2` `c2` decimal(10,3)", schema, table)
result, err := GenDDLEvents(flavor, serverID, latestPos, latestGTID, schema, query, true, false, 0)
c.Assert(err, IsNil)
c.Assert(result.Events, HasLen, 2)
c.Assert(bytes.Contains(result.Data, []byte("ALTER TABLE")), IsTrue)
c.Assert(bytes.Contains(result.Data, []byte(table)), IsTrue)
c.Assert(result.LatestPos, Equals, latestPos+uint32(len(result.Data)))
c.Assert(result.LatestGTID.String(), Equals, fmt.Sprintf("1-%d-6", serverID))
c.Assert(result.LatestGTID.String(), Equals, fmt.Sprintf("1-%d-4", serverID))
}
Loading

0 comments on commit 3516043

Please sign in to comment.