diff --git a/dm/pkg/binlog/event/common.go b/dm/pkg/binlog/event/common.go index c9280c89e6c..c30e113e57d 100644 --- a/dm/pkg/binlog/event/common.go +++ b/dm/pkg/binlog/event/common.go @@ -36,16 +36,19 @@ type DDLDMLResult struct { // for MySQL: // 1. BinLogFileHeader, [ fe `bin` ] // 2. FormatDescriptionEvent -// 3. PreviousGTIDsEvent +// 3. PreviousGTIDsEvent, depends on genGTID // for MariaDB: // 1. BinLogFileHeader, [ fe `bin` ] // 2. FormatDescriptionEvent -// 3. MariadbGTIDListEvent +// 3. MariadbGTIDListEvent, depends on genGTID // -. MariadbBinlogCheckPointEvent, not added yet -func GenCommonFileHeader(flavor string, serverID uint32, gSet gtid.Set) ([]*replication.BinlogEvent, []byte, error) { +func GenCommonFileHeader(flavor string, serverID uint32, gSet gtid.Set, genGTID bool, ts int64) ([]*replication.BinlogEvent, []byte, error) { + if ts == 0 { + ts = time.Now().Unix() + } var ( header = &replication.EventHeader{ - Timestamp: uint32(time.Now().Unix()), + Timestamp: uint32(ts), ServerID: serverID, Flags: defaultHeaderFlags, } @@ -59,16 +62,18 @@ func GenCommonFileHeader(flavor string, serverID uint32, gSet gtid.Set) ([]*repl } latestPos += uint32(len(formatDescEv.RawData)) // update latestPos - switch flavor { - case gmysql.MySQLFlavor: - prevGTIDsEv, err = GenPreviousGTIDsEvent(header, latestPos, gSet) - case gmysql.MariaDBFlavor: - prevGTIDsEv, err = GenMariaDBGTIDListEvent(header, latestPos, gSet) - default: - return nil, nil, terror.ErrBinlogFlavorNotSupport.Generate(flavor) - } - if err != nil { - return nil, nil, terror.Annotate(err, "generate PreviousGTIDsEvent/MariadbGTIDListEvent") + if genGTID { + switch flavor { + case gmysql.MySQLFlavor: + prevGTIDsEv, err = GenPreviousGTIDsEvent(header, latestPos, gSet) + case gmysql.MariaDBFlavor: + prevGTIDsEv, err = GenMariaDBGTIDListEvent(header, latestPos, gSet) + default: + return nil, nil, terror.ErrBinlogFlavorNotSupport.Generate(flavor) + } + if err != nil { + return nil, nil, terror.Annotate(err, "generate PreviousGTIDsEvent/MariadbGTIDListEvent") + } } var buf bytes.Buffer @@ -76,29 +81,38 @@ func GenCommonFileHeader(flavor string, serverID uint32, gSet gtid.Set) ([]*repl if err != nil { return nil, nil, terror.ErrBinlogWriteDataToBuffer.AnnotateDelegate(err, "write binlog file header % X", replication.BinLogFileHeader) } + + var events []*replication.BinlogEvent _, err = buf.Write(formatDescEv.RawData) if err != nil { return nil, nil, terror.ErrBinlogWriteDataToBuffer.AnnotateDelegate(err, "write FormatDescriptionEvent % X", formatDescEv.RawData) } - _, err = buf.Write(prevGTIDsEv.RawData) - if err != nil { - return nil, nil, terror.ErrBinlogWriteDataToBuffer.AnnotateDelegate(err, "write PreviousGTIDsEvent/MariadbGTIDListEvent % X", prevGTIDsEv.RawData) + events = append(events, formatDescEv) + + if genGTID { + _, err = buf.Write(prevGTIDsEv.RawData) + if err != nil { + return nil, nil, terror.ErrBinlogWriteDataToBuffer.AnnotateDelegate(err, "write PreviousGTIDsEvent/MariadbGTIDListEvent % X", prevGTIDsEv.RawData) + } + events = append(events, prevGTIDsEv) } - events := []*replication.BinlogEvent{formatDescEv, prevGTIDsEv} return events, buf.Bytes(), nil } // GenCommonGTIDEvent generates a common GTID event. -func GenCommonGTIDEvent(flavor string, serverID uint32, latestPos uint32, gSet gtid.Set) (*replication.BinlogEvent, error) { +func GenCommonGTIDEvent(flavor string, serverID uint32, latestPos uint32, gSet gtid.Set, anonymous bool, ts int64) (*replication.BinlogEvent, error) { singleGTID, err := verifySingleGTID(flavor, gSet) if err != nil { return nil, terror.Annotate(err, "verify single GTID in set") } + if ts == 0 { + ts = time.Now().Unix() + } var ( header = &replication.EventHeader{ - Timestamp: uint32(time.Now().Unix()), + Timestamp: uint32(ts), ServerID: serverID, Flags: defaultHeaderFlags, } @@ -109,7 +123,11 @@ func GenCommonGTIDEvent(flavor string, serverID uint32, latestPos uint32, gSet g case gmysql.MySQLFlavor: uuidSet := singleGTID.(*gmysql.UUIDSet) interval := uuidSet.Intervals[0] - gtidEv, err = GenGTIDEvent(header, latestPos, defaultGTIDFlags, uuidSet.SID.String(), interval.Start, defaultLastCommitted, defaultSequenceNumber) + if anonymous { + gtidEv, err = GenAnonymousGTIDEvent(header, latestPos, defaultGTIDFlags, defaultLastCommitted, defaultSequenceNumber) + } else { + gtidEv, err = GenGTIDEvent(header, latestPos, defaultGTIDFlags, uuidSet.SID.String(), interval.Start, defaultLastCommitted, defaultSequenceNumber) + } case gmysql.MariaDBFlavor: mariaGTID := singleGTID.(*gmysql.MariadbGTID) if mariaGTID.ServerID != header.ServerID { diff --git a/dm/pkg/binlog/event/common_test.go b/dm/pkg/binlog/event/common_test.go index ab584675607..5e48d4a2184 100644 --- a/dm/pkg/binlog/event/common_test.go +++ b/dm/pkg/binlog/event/common_test.go @@ -39,7 +39,7 @@ func (t *testCommonSuite) TestGenCommonFileHeader(c *C) { gSet, err := gtid.ParserGTID(flavor, gSetStr) c.Assert(err, IsNil) - events, data, err := GenCommonFileHeader(flavor, serverID, gSet) + events, data, err := GenCommonFileHeader(flavor, serverID, gSet, true, 0) c.Assert(err, IsNil) c.Assert(len(events), Equals, 2) c.Assert(events[0].Header.EventType, Equals, replication.FORMAT_DESCRIPTION_EVENT) @@ -79,7 +79,7 @@ func (t *testCommonSuite) TestGenCommonFileHeader(c *C) { gSet, err = gtid.ParserGTID(flavor, gSetStr) c.Assert(err, IsNil) - events, data, err = GenCommonFileHeader(flavor, serverID, gSet) + events, data, err = GenCommonFileHeader(flavor, serverID, gSet, true, 0) c.Assert(err, IsNil) c.Assert(len(events), Equals, 2) c.Assert(events[0].Header.EventType, Equals, replication.FORMAT_DESCRIPTION_EVENT) @@ -107,7 +107,7 @@ func (t *testCommonSuite) TestGenCommonGTIDEvent(c *C) { ) // nil gSet, invalid - gtidEv, err := GenCommonGTIDEvent(flavor, serverID, latestPos, gSet) + gtidEv, err := GenCommonGTIDEvent(flavor, serverID, latestPos, gSet, false, 0) c.Assert(err, NotNil) c.Assert(gtidEv, IsNil) @@ -115,7 +115,7 @@ func (t *testCommonSuite) TestGenCommonGTIDEvent(c *C) { gSetStr := "03fc0263-28c7-11e7-a653-6c0b84d59f30:1-123,05474d3c-28c7-11e7-8352-203db246dd3d:1-456,10b039fc-c843-11e7-8f6a-1866daf8d810:1-789" gSet, err = gtid.ParserGTID(flavor, gSetStr) c.Assert(err, IsNil) - gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet) + gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet, false, 0) c.Assert(err, NotNil) c.Assert(gtidEv, IsNil) @@ -123,7 +123,7 @@ func (t *testCommonSuite) TestGenCommonGTIDEvent(c *C) { gSetStr = "03fc0263-28c7-11e7-a653-6c0b84d59f30:1-123:200-456" gSet, err = gtid.ParserGTID(flavor, gSetStr) c.Assert(err, IsNil) - gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet) + gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet, false, 0) c.Assert(err, NotNil) c.Assert(gtidEv, IsNil) @@ -131,7 +131,7 @@ func (t *testCommonSuite) TestGenCommonGTIDEvent(c *C) { gSetStr = "03fc0263-28c7-11e7-a653-6c0b84d59f30:1-123" gSet, err = gtid.ParserGTID(flavor, gSetStr) c.Assert(err, IsNil) - gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet) + gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet, false, 0) c.Assert(err, NotNil) c.Assert(gtidEv, IsNil) @@ -141,7 +141,7 @@ func (t *testCommonSuite) TestGenCommonGTIDEvent(c *C) { c.Assert(err, IsNil) sid, err := ParseSID(gSetStr[:len(gSetStr)-4]) c.Assert(err, IsNil) - gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet) + gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet, false, 0) c.Assert(err, IsNil) c.Assert(gtidEv, NotNil) c.Assert(gtidEv.Header.EventType, Equals, replication.GTID_EVENT) @@ -160,7 +160,7 @@ func (t *testCommonSuite) TestGenCommonGTIDEvent(c *C) { flavor = gmysql.MariaDBFlavor // GTID mismatch with flavor - gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet) + gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet, false, 0) c.Assert(err, NotNil) c.Assert(gtidEv, IsNil) @@ -168,7 +168,7 @@ func (t *testCommonSuite) TestGenCommonGTIDEvent(c *C) { gSetStr = "1-2-3,4-5-6" gSet, err = gtid.ParserGTID(flavor, gSetStr) c.Assert(err, IsNil) - gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet) + gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet, false, 0) c.Assert(err, NotNil) c.Assert(gtidEv, IsNil) @@ -176,7 +176,7 @@ func (t *testCommonSuite) TestGenCommonGTIDEvent(c *C) { gSetStr = "1-2-3" gSet, err = gtid.ParserGTID(flavor, gSetStr) c.Assert(err, IsNil) - gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet) + gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet, false, 0) c.Assert(err, NotNil) c.Assert(gtidEv, IsNil) @@ -184,7 +184,7 @@ func (t *testCommonSuite) TestGenCommonGTIDEvent(c *C) { gSetStr = fmt.Sprintf("1-%d-3", serverID) gSet, err = gtid.ParserGTID(flavor, gSetStr) c.Assert(err, IsNil) - gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet) + gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, gSet, false, 0) c.Assert(err, IsNil) c.Assert(gtidEv, NotNil) c.Assert(gtidEv.Header.EventType, Equals, replication.MARIADB_GTID_EVENT) diff --git a/dm/pkg/binlog/event/ddl.go b/dm/pkg/binlog/event/ddl.go index 7eed3b4ea92..0daaf5cd9c1 100644 --- a/dm/pkg/binlog/event/ddl.go +++ b/dm/pkg/binlog/event/ddl.go @@ -15,7 +15,6 @@ package event import ( "bytes" - "fmt" "time" "github.com/go-mysql-org/go-mysql/replication" @@ -24,51 +23,29 @@ import ( "github.com/pingcap/tiflow/dm/pkg/terror" ) -// GenCreateDatabaseEvents generates binlog events for `CREATE DATABASE`. -// events: [GTIDEvent, QueryEvent] -func GenCreateDatabaseEvents(flavor string, serverID uint32, latestPos uint32, latestGTID gtid.Set, schema string) (*DDLDMLResult, error) { - query := fmt.Sprintf("CREATE DATABASE `%s`", schema) - return GenDDLEvents(flavor, serverID, latestPos, latestGTID, schema, query) -} - -// GenDropDatabaseEvents generates binlog events for `DROP DATABASE`. -// events: [GTIDEvent, QueryEvent] -func GenDropDatabaseEvents(flavor string, serverID uint32, latestPos uint32, latestGTID gtid.Set, schema string) (*DDLDMLResult, error) { - query := fmt.Sprintf("DROP DATABASE `%s`", schema) - return GenDDLEvents(flavor, serverID, latestPos, latestGTID, schema, query) -} - -// GenCreateTableEvents generates binlog events for `CREATE TABLE`. -// events: [GTIDEvent, QueryEvent] -// NOTE: we do not support all `column type` and `column meta` for DML now, so the caller should restrict the `query` statement. -func GenCreateTableEvents(flavor string, serverID uint32, latestPos uint32, latestGTID gtid.Set, schema string, query string) (*DDLDMLResult, error) { - return GenDDLEvents(flavor, serverID, latestPos, latestGTID, schema, query) -} - -// GenDropTableEvents generates binlog events for `DROP TABLE`. -// events: [GTIDEvent, QueryEvent] -func GenDropTableEvents(flavor string, serverID uint32, latestPos uint32, latestGTID gtid.Set, schema string, table string) (*DDLDMLResult, error) { - query := fmt.Sprintf("DROP TABLE `%s`.`%s`", schema, table) - return GenDDLEvents(flavor, serverID, latestPos, latestGTID, schema, query) -} - // GenDDLEvents generates binlog events for DDL statements. // events: [GTIDEvent, QueryEvent] -func GenDDLEvents(flavor string, serverID uint32, latestPos uint32, latestGTID gtid.Set, schema string, query string) (*DDLDMLResult, error) { +func GenDDLEvents(flavor string, serverID, latestPos uint32, latestGTID gtid.Set, schema, query string, genGTID, anonymousGTID bool, ts int64) (*DDLDMLResult, error) { + if ts == 0 { + ts = time.Now().Unix() + } // GTIDEvent, increase GTID first latestGTID, err := GTIDIncrease(flavor, latestGTID) if err != nil { return nil, terror.Annotatef(err, "increase GTID %s", latestGTID) } - gtidEv, err := GenCommonGTIDEvent(flavor, serverID, latestPos, latestGTID) - if err != nil { - return nil, terror.Annotate(err, "generate GTIDEvent") + var gtidEv *replication.BinlogEvent + if genGTID { + gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, latestGTID, anonymousGTID, ts) + if err != nil { + return nil, terror.Annotate(err, "generate GTIDEvent") + } + latestPos = gtidEv.Header.LogPos } - latestPos = gtidEv.Header.LogPos // QueryEvent header := &replication.EventHeader{ - Timestamp: uint32(time.Now().Unix()), + Timestamp: uint32(ts), ServerID: serverID, Flags: defaultHeaderFlags, } @@ -79,17 +56,22 @@ func GenDDLEvents(flavor string, serverID uint32, latestPos uint32, latestGTID g latestPos = queryEv.Header.LogPos var buf bytes.Buffer - _, err = buf.Write(gtidEv.RawData) - if err != nil { - return nil, terror.ErrBinlogWriteDataToBuffer.AnnotateDelegate(err, "write GTIDEvent data % X", gtidEv.RawData) + var events []*replication.BinlogEvent + if genGTID { + _, err = buf.Write(gtidEv.RawData) + if err != nil { + return nil, terror.ErrBinlogWriteDataToBuffer.AnnotateDelegate(err, "write GTIDEvent data % X", gtidEv.RawData) + } + events = append(events, gtidEv) } _, err = buf.Write(queryEv.RawData) if err != nil { return nil, terror.ErrBinlogWriteDataToBuffer.AnnotateDelegate(err, "write QueryEvent data % X", queryEv.RawData) } + events = append(events, queryEv) return &DDLDMLResult{ - Events: []*replication.BinlogEvent{gtidEv, queryEv}, + Events: events, Data: buf.Bytes(), LatestPos: latestPos, LatestGTID: latestGTID, diff --git a/dm/pkg/binlog/event/ddl_test.go b/dm/pkg/binlog/event/ddl_test.go index 40c37227037..ba926c47533 100644 --- a/dm/pkg/binlog/event/ddl_test.go +++ b/dm/pkg/binlog/event/ddl_test.go @@ -37,74 +37,19 @@ func (t *testDDLSuite) TestGenDDLEvent(c *C) { // only some simple tests in this case and we can test parsing a binlog file including common header, DDL and DML in another case. - // test CREATE/DROP DATABASE for MySQL - flavor := gmysql.MySQLFlavor - gSetStr := "03fc0263-28c7-11e7-a653-6c0b84d59f30:123" - latestGTID, err := gtid.ParserGTID(flavor, gSetStr) - c.Assert(err, IsNil) - - // CREATE DATABASE - result, err := GenCreateDatabaseEvents(flavor, serverID, latestPos, latestGTID, schema) - c.Assert(err, IsNil) - c.Assert(result.Events, HasLen, 2) - // simply check here, more check did in `event_test.go` - c.Assert(bytes.Contains(result.Data, []byte("CREATE DATABASE")), IsTrue) - c.Assert(bytes.Contains(result.Data, []byte(schema)), IsTrue) - c.Assert(result.LatestPos, Equals, latestPos+uint32(len(result.Data))) - c.Assert(result.LatestGTID.String(), Equals, "03fc0263-28c7-11e7-a653-6c0b84d59f30:124") - - latestPos = result.LatestPos // update latest pos - latestGTID = result.LatestGTID - - // DROP DATABASE - result, err = GenDropDatabaseEvents(flavor, serverID, latestPos, latestGTID, schema) - c.Assert(err, IsNil) - c.Assert(result.Events, HasLen, 2) - c.Assert(bytes.Contains(result.Data, []byte("DROP DATABASE")), IsTrue) - c.Assert(bytes.Contains(result.Data, []byte(schema)), IsTrue) - c.Assert(result.LatestPos, Equals, latestPos+uint32(len(result.Data))) - c.Assert(result.LatestGTID.String(), Equals, "03fc0263-28c7-11e7-a653-6c0b84d59f30:125") - - latestPos = result.LatestPos // update latest pos - // test CREATE/DROP table for MariaDB - flavor = gmysql.MariaDBFlavor - gSetStr = fmt.Sprintf("1-%d-3", serverID) - latestGTID, err = gtid.ParserGTID(flavor, gSetStr) - c.Assert(err, IsNil) - - // CREATE TABLE - query := fmt.Sprintf("CREATE TABLE `%s` (c1 int)", table) - result, err = GenCreateTableEvents(flavor, serverID, latestPos, latestGTID, schema, query) - c.Assert(err, IsNil) - c.Assert(result.Events, HasLen, 2) - c.Assert(bytes.Contains(result.Data, []byte("CREATE TABLE")), IsTrue) - c.Assert(bytes.Contains(result.Data, []byte(table)), IsTrue) - c.Assert(result.LatestPos, Equals, latestPos+uint32(len(result.Data))) - c.Assert(result.LatestGTID.String(), Equals, fmt.Sprintf("1-%d-4", serverID)) - - latestPos = result.LatestPos // update latest pos - latestGTID = result.LatestGTID - - // DROP TABLE - result, err = GenDropTableEvents(flavor, serverID, latestPos, latestGTID, schema, table) + flavor := gmysql.MariaDBFlavor + gSetStr := fmt.Sprintf("1-%d-3", serverID) + latestGTID, err := gtid.ParserGTID(flavor, gSetStr) c.Assert(err, IsNil) - c.Assert(result.Events, HasLen, 2) - c.Assert(bytes.Contains(result.Data, []byte("DROP TABLE")), IsTrue) - c.Assert(bytes.Contains(result.Data, []byte(table)), IsTrue) - c.Assert(result.LatestPos, Equals, latestPos+uint32(len(result.Data))) - c.Assert(result.LatestGTID.String(), Equals, fmt.Sprintf("1-%d-5", serverID)) - - latestPos = result.LatestPos // update latest pos - latestGTID = result.LatestGTID // ALTER TABLE - query = fmt.Sprintf("ALTER TABLE `%s`.`%s` CHANGE COLUMN `c2` `c2` decimal(10,3)", schema, table) - result, err = GenDDLEvents(flavor, serverID, latestPos, latestGTID, schema, query) + query := fmt.Sprintf("ALTER TABLE `%s`.`%s` CHANGE COLUMN `c2` `c2` decimal(10,3)", schema, table) + result, err := GenDDLEvents(flavor, serverID, latestPos, latestGTID, schema, query, true, false, 0) c.Assert(err, IsNil) c.Assert(result.Events, HasLen, 2) c.Assert(bytes.Contains(result.Data, []byte("ALTER TABLE")), IsTrue) c.Assert(bytes.Contains(result.Data, []byte(table)), IsTrue) c.Assert(result.LatestPos, Equals, latestPos+uint32(len(result.Data))) - c.Assert(result.LatestGTID.String(), Equals, fmt.Sprintf("1-%d-6", serverID)) + c.Assert(result.LatestGTID.String(), Equals, fmt.Sprintf("1-%d-4", serverID)) } diff --git a/dm/pkg/binlog/event/dml.go b/dm/pkg/binlog/event/dml.go index 593275e87f7..079a364555a 100644 --- a/dm/pkg/binlog/event/dml.go +++ b/dm/pkg/binlog/event/dml.go @@ -42,25 +42,32 @@ type DMLData struct { // if DMLData.Query is not empty: // events: [GTIDEvent, QueryEvent, QueryEvent, ..., XIDEvent] // NOTE: multi can be in events. -func GenDMLEvents(flavor string, serverID uint32, latestPos uint32, latestGTID gtid.Set, eventType replication.EventType, xid uint64, dmlData []*DMLData) (*DDLDMLResult, error) { +func GenDMLEvents(flavor string, serverID uint32, latestPos uint32, latestGTID gtid.Set, eventType replication.EventType, xid uint64, dmlData []*DMLData, genGTID, anonymousGTID bool, ts int64) (*DDLDMLResult, error) { if len(dmlData) == 0 { return nil, terror.ErrBinlogDMLEmptyData.Generate() } + if ts == 0 { + ts = time.Now().Unix() + } + // GTIDEvent, increase GTID first. latestGTID, err := GTIDIncrease(flavor, latestGTID) if err != nil { return nil, terror.Annotatef(err, "increase GTID %s", latestGTID) } - gtidEv, err := GenCommonGTIDEvent(flavor, serverID, latestPos, latestGTID) - if err != nil { - return nil, terror.Annotate(err, "generate GTIDEvent") + var gtidEv *replication.BinlogEvent + if genGTID { + gtidEv, err = GenCommonGTIDEvent(flavor, serverID, latestPos, latestGTID, anonymousGTID, ts) + if err != nil { + return nil, terror.Annotate(err, "generate GTIDEvent") + } + latestPos = gtidEv.Header.LogPos } - latestPos = gtidEv.Header.LogPos // QueryEvent, `BEGIN` header := &replication.EventHeader{ - Timestamp: uint32(time.Now().Unix()), + Timestamp: uint32(ts), ServerID: serverID, Flags: defaultHeaderFlags, } @@ -73,7 +80,9 @@ func GenDMLEvents(flavor string, serverID uint32, latestPos uint32, latestGTID g // all events events := make([]*replication.BinlogEvent, 0, 5) - events = append(events, gtidEv) + if genGTID { + events = append(events, gtidEv) + } events = append(events, queryEv) // pairs or QueryEvent diff --git a/dm/pkg/binlog/event/dml_test.go b/dm/pkg/binlog/event/dml_test.go index a18a0516f3d..e7d0b7f0e73 100644 --- a/dm/pkg/binlog/event/dml_test.go +++ b/dm/pkg/binlog/event/dml_test.go @@ -41,7 +41,7 @@ func (t *testDMLSuite) TestGenDMLEvent(c *C) { c.Assert(err, IsNil) // empty data - result, err := GenDMLEvents(flavor, serverID, latestPos, latestGTID, replication.WRITE_ROWS_EVENTv2, xid, nil) + result, err := GenDMLEvents(flavor, serverID, latestPos, latestGTID, replication.WRITE_ROWS_EVENTv2, xid, nil, true, false, 0) c.Assert(err, NotNil) c.Assert(result, IsNil) @@ -58,7 +58,7 @@ func (t *testDMLSuite) TestGenDMLEvent(c *C) { }, } eventType := replication.WRITE_ROWS_EVENTv2 - result, err = GenDMLEvents(flavor, serverID, latestPos, latestGTID, eventType, xid, insertDMLData) + result, err = GenDMLEvents(flavor, serverID, latestPos, latestGTID, eventType, xid, insertDMLData, true, false, 0) c.Assert(err, IsNil) c.Assert(result, NotNil) c.Assert(result.Events, HasLen, 3+2*len(insertDMLData)) @@ -81,7 +81,7 @@ func (t *testDMLSuite) TestGenDMLEvent(c *C) { ColumnType: []byte{gmysql.MYSQL_TYPE_LONG, gmysql.MYSQL_TYPE_STRING}, Rows: insertRows2, }) - result, err = GenDMLEvents(flavor, serverID, latestPos, latestGTID, replication.WRITE_ROWS_EVENTv2, xid, insertDMLData) + result, err = GenDMLEvents(flavor, serverID, latestPos, latestGTID, replication.WRITE_ROWS_EVENTv2, xid, insertDMLData, true, false, 0) c.Assert(err, IsNil) c.Assert(result, NotNil) c.Assert(result.Events, HasLen, 3+2*len(insertDMLData)) // 2 more events for insertRows2 @@ -106,7 +106,7 @@ func (t *testDMLSuite) TestGenDMLEvent(c *C) { }, } eventType = replication.UPDATE_ROWS_EVENTv2 - result, err = GenDMLEvents(flavor, serverID, latestPos, latestGTID, eventType, xid, updateDMLData) + result, err = GenDMLEvents(flavor, serverID, latestPos, latestGTID, eventType, xid, updateDMLData, true, false, 0) c.Assert(err, IsNil) c.Assert(result, NotNil) c.Assert(result.Events, HasLen, 3+2*len(updateDMLData)) @@ -136,7 +136,7 @@ func (t *testDMLSuite) TestGenDMLEvent(c *C) { }, } eventType = replication.DELETE_ROWS_EVENTv2 - result, err = GenDMLEvents(flavor, serverID, latestPos, latestGTID, eventType, xid, deleteDMLData) + result, err = GenDMLEvents(flavor, serverID, latestPos, latestGTID, eventType, xid, deleteDMLData, true, false, 0) c.Assert(err, IsNil) c.Assert(result, NotNil) c.Assert(result.Events, HasLen, 3+2*(len(deleteDMLData))) diff --git a/dm/pkg/binlog/event/event.go b/dm/pkg/binlog/event/event.go index 63e0bf9bfb9..9f11be68754 100644 --- a/dm/pkg/binlog/event/event.go +++ b/dm/pkg/binlog/event/event.go @@ -225,6 +225,14 @@ func GenPreviousGTIDsEvent(header *replication.EventHeader, latestPos uint32, gS // `uuid` is the UUID part of the GTID, like `9f61c5f9-1eef-11e9-b6cf-0242ac140003`. // `gno` is the GNO part of the GTID, like `6`. func GenGTIDEvent(header *replication.EventHeader, latestPos uint32, gtidFlags uint8, uuid string, gno int64, lastCommitted int64, sequenceNumber int64) (*replication.BinlogEvent, error) { + return genGTIDEventInner(header, latestPos, gtidFlags, uuid, gno, lastCommitted, sequenceNumber, replication.GTID_EVENT) +} + +func GenAnonymousGTIDEvent(header *replication.EventHeader, latestPos uint32, gtidFlags uint8, lastCommitted int64, sequenceNumber int64) (*replication.BinlogEvent, error) { + return genGTIDEventInner(header, latestPos, gtidFlags, "00000000-0000-0000-0000-000000000000", 0, lastCommitted, sequenceNumber, replication.ANONYMOUS_GTID_EVENT) +} + +func genGTIDEventInner(header *replication.EventHeader, latestPos uint32, gtidFlags uint8, uuid string, gno int64, lastCommitted int64, sequenceNumber int64, eventType replication.EventType) (*replication.BinlogEvent, error) { payload := new(bytes.Buffer) // GTID flags, 1 byte @@ -269,7 +277,7 @@ func GenGTIDEvent(header *replication.EventHeader, latestPos uint32, gtidFlags u buf := new(bytes.Buffer) event := &replication.GTIDEvent{} - ev, err := assembleEvent(buf, event, false, *header, replication.GTID_EVENT, latestPos, nil, payload.Bytes()) + ev, err := assembleEvent(buf, event, false, *header, eventType, latestPos, nil, payload.Bytes()) return ev, err } diff --git a/dm/pkg/binlog/event/generator.go b/dm/pkg/binlog/event/generator.go index 431ff3a97d9..790efe4d638 100644 --- a/dm/pkg/binlog/event/generator.go +++ b/dm/pkg/binlog/event/generator.go @@ -14,6 +14,10 @@ package event import ( + "fmt" + "time" + + "github.com/coreos/go-semver/semver" gmysql "github.com/go-mysql-org/go-mysql/mysql" "github.com/go-mysql-org/go-mysql/replication" @@ -27,12 +31,25 @@ type Generator struct { ServerID uint32 LatestPos uint32 LatestGTID gtid.Set - PreviousGTIDs gtid.Set + ExecutedGTIDs gtid.Set LatestXID uint64 + + GenGTID bool + AnonymousGTID bool } // NewGenerator creates a new instance of Generator. func NewGenerator(flavor string, serverID uint32, latestPos uint32, latestGTID gtid.Set, previousGTIDs gtid.Set, latestXID uint64) (*Generator, error) { + return newGenerator(flavor, "5.7.0", serverID, latestPos, latestGTID, previousGTIDs, latestXID, true) +} + +func NewGeneratorV2(flavor, version, latestGTIDStr string, enableGTID bool) (*Generator, error) { + latestGTID, _ := gtid.ParserGTID(flavor, latestGTIDStr) + previousGTIDSet, _ := gtid.ParserGTID(flavor, latestGTIDStr) + return newGenerator(flavor, version, 1, 0, latestGTID, previousGTIDSet, 0, enableGTID) +} + +func newGenerator(flavor, version string, serverID uint32, latestPos uint32, latestGTID gtid.Set, previousGTIDs gtid.Set, latestXID uint64, genGTID bool) (*Generator, error) { prevOrigin := previousGTIDs.Origin() if prevOrigin == nil { return nil, terror.ErrPreviousGTIDsNotValid.Generate(previousGTIDs) @@ -42,6 +59,7 @@ func NewGenerator(flavor string, serverID uint32, latestPos uint32, latestGTID g if err != nil { return nil, terror.Annotate(err, "verify single latest GTID in set") } + var anonymousGTID bool switch flavor { case gmysql.MySQLFlavor: uuidSet := singleGTID.(*gmysql.UUIDSet) @@ -55,6 +73,15 @@ func NewGenerator(flavor string, serverID uint32, latestPos uint32, latestGTID g return nil, terror.ErrBinlogLatestGTIDNotInPrev.Generate(latestGTID, previousGTIDs) } + ver, err := semver.NewVersion(version) + if err != nil { + return nil, err + } + if ver.Compare(*semver.New("5.7.0")) >= 0 && !genGTID { + // 5.7+ add anonymous GTID when GTID is disabled + genGTID = true + anonymousGTID = true + } case gmysql.MariaDBFlavor: mariaGTID := singleGTID.(*gmysql.MariadbGTID) if mariaGTID.ServerID != serverID { @@ -69,6 +96,8 @@ func NewGenerator(flavor string, serverID uint32, latestPos uint32, latestGTID g if !ok || prevGTID.ServerID != mariaGTID.ServerID || prevGTID.SequenceNumber != mariaGTID.SequenceNumber { return nil, terror.ErrBinlogLatestGTIDNotInPrev.Generate(latestGTID, previousGTIDs) } + // MariaDB 10.0.2+ always contains GTID + genGTID = true default: return nil, terror.ErrBinlogFlavorNotSupport.Generate(flavor) } @@ -78,8 +107,10 @@ func NewGenerator(flavor string, serverID uint32, latestPos uint32, latestGTID g ServerID: serverID, LatestPos: latestPos, LatestGTID: latestGTID, - PreviousGTIDs: previousGTIDs, + ExecutedGTIDs: previousGTIDs.Clone(), LatestXID: latestXID, + GenGTID: genGTID, + AnonymousGTID: anonymousGTID, }, nil } @@ -92,8 +123,8 @@ func NewGenerator(flavor string, serverID uint32, latestPos uint32, latestGTID g // 1. BinLogFileHeader, [ fe `bin` ] // 2. FormatDescriptionEvent // 3. MariadbGTIDListEvent -func (g *Generator) GenFileHeader() ([]*replication.BinlogEvent, []byte, error) { - events, data, err := GenCommonFileHeader(g.Flavor, g.ServerID, g.PreviousGTIDs) +func (g *Generator) GenFileHeader(ts int64) ([]*replication.BinlogEvent, []byte, error) { + events, data, err := GenCommonFileHeader(g.Flavor, g.ServerID, g.ExecutedGTIDs, g.GenGTID, ts) if err != nil { return nil, nil, err } @@ -104,7 +135,8 @@ func (g *Generator) GenFileHeader() ([]*replication.BinlogEvent, []byte, error) // GenCreateDatabaseEvents generates binlog events for `CREATE DATABASE`. // events: [GTIDEvent, QueryEvent] func (g *Generator) GenCreateDatabaseEvents(schema string) ([]*replication.BinlogEvent, []byte, error) { - result, err := GenCreateDatabaseEvents(g.Flavor, g.ServerID, g.LatestPos, g.LatestGTID, schema) + query := fmt.Sprintf("CREATE DATABASE `%s`", schema) + result, err := GenDDLEvents(g.Flavor, g.ServerID, g.LatestPos, g.LatestGTID, schema, query, g.GenGTID, g.AnonymousGTID, 0) if err != nil { return nil, nil, err } @@ -115,7 +147,8 @@ func (g *Generator) GenCreateDatabaseEvents(schema string) ([]*replication.Binlo // GenDropDatabaseEvents generates binlog events for `DROP DATABASE`. // events: [GTIDEvent, QueryEvent] func (g *Generator) GenDropDatabaseEvents(schema string) ([]*replication.BinlogEvent, []byte, error) { - result, err := GenDropDatabaseEvents(g.Flavor, g.ServerID, g.LatestPos, g.LatestGTID, schema) + query := fmt.Sprintf("DROP DATABASE `%s`", schema) + result, err := GenDDLEvents(g.Flavor, g.ServerID, g.LatestPos, g.LatestGTID, schema, query, g.GenGTID, g.AnonymousGTID, 0) if err != nil { return nil, nil, err } @@ -126,7 +159,7 @@ func (g *Generator) GenDropDatabaseEvents(schema string) ([]*replication.BinlogE // GenCreateTableEvents generates binlog events for `CREATE TABLE`. // events: [GTIDEvent, QueryEvent] func (g *Generator) GenCreateTableEvents(schema string, query string) ([]*replication.BinlogEvent, []byte, error) { - result, err := GenCreateTableEvents(g.Flavor, g.ServerID, g.LatestPos, g.LatestGTID, schema, query) + result, err := GenDDLEvents(g.Flavor, g.ServerID, g.LatestPos, g.LatestGTID, schema, query, g.GenGTID, g.AnonymousGTID, 0) if err != nil { return nil, nil, err } @@ -137,7 +170,8 @@ func (g *Generator) GenCreateTableEvents(schema string, query string) ([]*replic // GenDropTableEvents generates binlog events for `DROP TABLE`. // events: [GTIDEvent, QueryEvent] func (g *Generator) GenDropTableEvents(schema string, table string) ([]*replication.BinlogEvent, []byte, error) { - result, err := GenDropTableEvents(g.Flavor, g.ServerID, g.LatestPos, g.LatestGTID, schema, table) + query := fmt.Sprintf("DROP TABLE `%s`.`%s`", schema, table) + result, err := GenDDLEvents(g.Flavor, g.ServerID, g.LatestPos, g.LatestGTID, schema, query, g.GenGTID, g.AnonymousGTID, 0) if err != nil { return nil, nil, err } @@ -147,8 +181,8 @@ func (g *Generator) GenDropTableEvents(schema string, table string) ([]*replicat // GenDDLEvents generates binlog events for DDL statements. // events: [GTIDEvent, QueryEvent] -func (g *Generator) GenDDLEvents(schema string, query string) ([]*replication.BinlogEvent, []byte, error) { - result, err := GenDDLEvents(g.Flavor, g.ServerID, g.LatestPos, g.LatestGTID, schema, query) +func (g *Generator) GenDDLEvents(schema string, query string, ts int64) ([]*replication.BinlogEvent, []byte, error) { + result, err := GenDDLEvents(g.Flavor, g.ServerID, g.LatestPos, g.LatestGTID, schema, query, g.GenGTID, g.AnonymousGTID, ts) if err != nil { return nil, nil, err } @@ -159,8 +193,8 @@ func (g *Generator) GenDDLEvents(schema string, query string) ([]*replication.Bi // GenDMLEvents generates binlog events for `INSERT`/`UPDATE`/`DELETE`. // events: [GTIDEvent, QueryEvent, TableMapEvent, RowsEvent, ..., XIDEvent] // NOTE: multi pairs can be in events. -func (g *Generator) GenDMLEvents(eventType replication.EventType, dmlData []*DMLData) ([]*replication.BinlogEvent, []byte, error) { - result, err := GenDMLEvents(g.Flavor, g.ServerID, g.LatestPos, g.LatestGTID, eventType, g.LatestXID+1, dmlData) +func (g *Generator) GenDMLEvents(eventType replication.EventType, dmlData []*DMLData, ts int64) ([]*replication.BinlogEvent, []byte, error) { + result, err := GenDMLEvents(g.Flavor, g.ServerID, g.LatestPos, g.LatestGTID, eventType, g.LatestXID+1, dmlData, g.GenGTID, g.AnonymousGTID, ts) if err != nil { return nil, nil, err } @@ -169,7 +203,27 @@ func (g *Generator) GenDMLEvents(eventType replication.EventType, dmlData []*DML return result.Events, result.Data, nil } +func (g *Generator) Rotate(nextName string, ts int64) (*replication.BinlogEvent, []byte, error) { + if ts == 0 { + ts = time.Now().Unix() + } + header := &replication.EventHeader{ + Timestamp: uint32(ts), + ServerID: 11, + Flags: 0x01, + } + ev, err := GenRotateEvent(header, g.LatestPos, []byte(nextName), 4) + if err != nil { + return nil, nil, err + } + g.updateLatestPosGTID(4, nil) + return ev, ev.RawData, nil +} + func (g *Generator) updateLatestPosGTID(latestPos uint32, latestGTID gtid.Set) { g.LatestPos = latestPos - g.LatestGTID = latestGTID + if latestGTID != nil { + g.LatestGTID = latestGTID + _ = g.ExecutedGTIDs.Update(latestGTID.String()) + } } diff --git a/dm/pkg/binlog/event/generator_test.go b/dm/pkg/binlog/event/generator_test.go index bf7ce941279..f570214bf01 100644 --- a/dm/pkg/binlog/event/generator_test.go +++ b/dm/pkg/binlog/event/generator_test.go @@ -135,7 +135,7 @@ func (t *testGeneratorSuite) testGenerate(c *C, flavor string, serverID uint32, allEventTypes := make([]replication.EventType, 0, 50) // file header - currentEvents, data, err := g.GenFileHeader() + currentEvents, data, err := g.GenFileHeader(0) c.Assert(err, IsNil) _, err = f.Write(data) c.Assert(err, IsNil) @@ -178,7 +178,7 @@ func (t *testGeneratorSuite) testGenerate(c *C, flavor string, serverID uint32, }, } eventType := replication.WRITE_ROWS_EVENTv2 - currentEvents, data, err = g.GenDMLEvents(eventType, dmlData) + currentEvents, data, err = g.GenDMLEvents(eventType, dmlData, 0) c.Assert(err, IsNil) _, err = f.Write(data) c.Assert(err, IsNil) @@ -207,7 +207,7 @@ func (t *testGeneratorSuite) testGenerate(c *C, flavor string, serverID uint32, Rows: insertRows2, }, } - currentEvents, data, err = g.GenDMLEvents(eventType, dmlData) + currentEvents, data, err = g.GenDMLEvents(eventType, dmlData, 0) c.Assert(err, IsNil) _, err = f.Write(data) c.Assert(err, IsNil) @@ -237,7 +237,7 @@ func (t *testGeneratorSuite) testGenerate(c *C, flavor string, serverID uint32, }, } eventType = replication.UPDATE_ROWS_EVENTv2 - currentEvents, data, err = g.GenDMLEvents(eventType, dmlData) + currentEvents, data, err = g.GenDMLEvents(eventType, dmlData, 0) c.Assert(err, IsNil) _, err = f.Write(data) c.Assert(err, IsNil) @@ -257,7 +257,7 @@ func (t *testGeneratorSuite) testGenerate(c *C, flavor string, serverID uint32, }, } eventType = replication.DELETE_ROWS_EVENTv2 - currentEvents, data, err = g.GenDMLEvents(eventType, dmlData) + currentEvents, data, err = g.GenDMLEvents(eventType, dmlData, 0) c.Assert(err, IsNil) _, err = f.Write(data) c.Assert(err, IsNil) @@ -266,7 +266,7 @@ func (t *testGeneratorSuite) testGenerate(c *C, flavor string, serverID uint32, // ALTER TABLE query = fmt.Sprintf("ALTER TABLE `%s`.`%s` ADD COLUMN c3 INT", schema, table) - currentEvents, data, err = g.GenDDLEvents(schema, query) + currentEvents, data, err = g.GenDDLEvents(schema, query, 0) c.Assert(err, IsNil) _, err = f.Write(data) c.Assert(err, IsNil) diff --git a/dm/pkg/binlog/file.go b/dm/pkg/binlog/file.go new file mode 100644 index 00000000000..fcf0c506dec --- /dev/null +++ b/dm/pkg/binlog/file.go @@ -0,0 +1,75 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package binlog + +import ( + "os" + "sort" + + "go.uber.org/zap" + + "github.com/pingcap/tiflow/dm/pkg/log" + "github.com/pingcap/tiflow/dm/pkg/terror" +) + +// ReadSortedBinlogFromDir reads and returns all binlog files (sorted ascending by binlog filename and sequence number). +func ReadSortedBinlogFromDir(dirpath string) ([]string, error) { + dir, err := os.Open(dirpath) + if err != nil { + return nil, terror.ErrReadDir.Delegate(err, dirpath) + } + defer dir.Close() + + names, err := dir.Readdirnames(-1) + if err != nil { + return nil, terror.ErrReadDir.Delegate(err, dirpath) + } + if len(names) == 0 { + return nil, nil + } + + // sorting bin.100000, ..., bin.1000000, ..., bin.999999 + type tuple struct { + filename string + parsed Filename + } + tmp := make([]tuple, 0, len(names)-1) + + for _, f := range names { + p, err2 := ParseFilename(f) + if err2 != nil { + // may contain some file that can't be parsed, like relay meta. ignore them + log.L().Info("collecting binlog file, ignore invalid file", zap.String("file", f)) + continue + } + tmp = append(tmp, tuple{ + filename: f, + parsed: p, + }) + } + + sort.Slice(tmp, func(i, j int) bool { + if tmp[i].parsed.BaseName != tmp[j].parsed.BaseName { + return tmp[i].parsed.BaseName < tmp[j].parsed.BaseName + } + return tmp[i].parsed.LessThan(tmp[j].parsed) + }) + + ret := make([]string, len(tmp)) + for i := range tmp { + ret[i] = tmp[i].filename + } + + return ret, nil +} diff --git a/dm/pkg/binlog/file_test.go b/dm/pkg/binlog/file_test.go new file mode 100644 index 00000000000..d67041a0c31 --- /dev/null +++ b/dm/pkg/binlog/file_test.go @@ -0,0 +1,41 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package binlog + +import ( + "os" + "path/filepath" + + . "github.com/pingcap/check" +) + +var _ = Suite(&testFileSuite{}) + +type testFileSuite struct{} + +func (t *testFileSuite) TestReadSortedBinlogFromDir(c *C) { + dir := c.MkDir() + filenames := []string{ + "bin.000001", "bin.000002", "bin.100000", "bin.100001", "bin.1000000", "bin.1000001", "bin.999999", "relay.meta", + } + expected := []string{ + "bin.000001", "bin.000002", "bin.100000", "bin.100001", "bin.999999", "bin.1000000", "bin.1000001", + } + for _, f := range filenames { + c.Assert(os.WriteFile(filepath.Join(dir, f), nil, 0o600), IsNil) + } + ret, err := ReadSortedBinlogFromDir(dir) + c.Assert(err, IsNil) + c.Assert(ret, DeepEquals, expected) +} diff --git a/dm/pkg/binlog/pos_finder.go b/dm/pkg/binlog/pos_finder.go new file mode 100644 index 00000000000..ccd35bccebb --- /dev/null +++ b/dm/pkg/binlog/pos_finder.go @@ -0,0 +1,367 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package binlog + +import ( + "database/sql" + "path" + + "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/replication" + "github.com/pingcap/errors" + "go.uber.org/zap" + + "github.com/pingcap/tiflow/dm/pkg/binlog/common" + "github.com/pingcap/tiflow/dm/pkg/binlog/event" + "github.com/pingcap/tiflow/dm/pkg/binlog/reader" + tcontext "github.com/pingcap/tiflow/dm/pkg/context" + "github.com/pingcap/tiflow/dm/pkg/gtid" + "github.com/pingcap/tiflow/dm/pkg/utils" +) + +type binlogPosFinder struct { + remote bool + tctx *tcontext.Context + enableGTID bool + parser *replication.BinlogParser + flavor string + + // fields used for remote mode + db *sql.DB + syncCfg replication.BinlogSyncerConfig + + // fields used for local relay + relayDir string // should be a directory with current UUID + + // fields used inside FindByTimestamp + targetBinlog binlogSize // target binlog file the timestamp may reside + tsBeforeFirstBinlog bool // whether the timestamp is before the first binlog + lastBinlogFile bool // whether targetBinlog is the last binlog file + + // one binlog file can either be GTID enabled or not, cannot be mixed up + // we mark it using this field to avoid parsing events. + everMetGTIDEvent bool + inTransaction bool // whether in transaction +} + +type PosType int + +func (b PosType) String() string { + switch b { + case BelowLowerBoundBinlogPos: + return "BelowLowerBound" + case InRangeBinlogPos: + return "InRange" + case AboveUpperBoundBinlogPos: + return "AboveUpperBound" + } + return "Invalid" +} + +const ( + InvalidBinlogPos PosType = iota + BelowLowerBoundBinlogPos + InRangeBinlogPos + AboveUpperBoundBinlogPos +) + +func NewLocalBinlogPosFinder(tctx *tcontext.Context, enableGTID bool, flavor string, relayDir string) *binlogPosFinder { + parser := replication.NewBinlogParser() + parser.SetFlavor(flavor) + parser.SetVerifyChecksum(true) + + return &binlogPosFinder{ + remote: false, + tctx: tctx, + enableGTID: enableGTID, + parser: parser, + flavor: flavor, + + relayDir: relayDir, + } +} + +func NewRemoteBinlogPosFinder(tctx *tcontext.Context, db *sql.DB, syncCfg replication.BinlogSyncerConfig, enableGTID bool) *binlogPosFinder { + // make sure raw mode enabled, and MaxReconnectAttempts set + syncCfg.RawModeEnabled = true + if syncCfg.MaxReconnectAttempts == 0 { + syncCfg.MaxReconnectAttempts = common.MaxBinlogSyncerReconnect + } + + parser := replication.NewBinlogParser() + parser.SetFlavor(syncCfg.Flavor) + parser.SetVerifyChecksum(true) + + return &binlogPosFinder{ + remote: true, + tctx: tctx, + enableGTID: enableGTID, + parser: parser, + flavor: syncCfg.Flavor, + + db: db, + syncCfg: syncCfg, + } +} + +func (r *binlogPosFinder) getBinlogFiles() (FileSizes, error) { + if r.remote { + return GetBinaryLogs(r.tctx.Ctx, r.db) + } + return GetLocalBinaryLogs(r.relayDir) +} + +func (r *binlogPosFinder) startSync(position mysql.Position) (reader.Reader, error) { + if r.remote { + binlogReader := reader.NewTCPReader(r.syncCfg) + return binlogReader, binlogReader.StartSyncByPos(position) + } + binlogReader := reader.NewFileReader(&reader.FileReaderConfig{EnableRawMode: true}) + position.Name = path.Join(r.relayDir, position.Name) + return binlogReader, binlogReader.StartSyncByPos(position) +} + +func (r *binlogPosFinder) findMinTimestampOfBinlog(currBinlog binlogSize) (uint32, error) { + var minTS uint32 + binlogReader, err := r.startSync(mysql.Position{Name: currBinlog.name, Pos: FileHeaderLen}) + if err != nil { + return 0, err + } + for { + ev, err := binlogReader.GetEvent(r.tctx.Ctx) + if err != nil { + binlogReader.Close() + return 0, err + } + // break on first non-fake event(must be a format description event) + if !utils.IsFakeRotateEvent(ev.Header) { + minTS = ev.Header.Timestamp + break + } + } + binlogReader.Close() + + return minTS, nil +} + +func (r *binlogPosFinder) initTargetBinlogFile(ts int64) error { + targetTS := uint32(ts) + var lastTS, minTS uint32 + var lastMid int + binaryLogs, err := r.getBinlogFiles() + if err != nil { + return err + } + if len(binaryLogs) == 0 { + // should not happen on a master with binlog enabled + return errors.New("cannot find binlog files") + } + + begin, end := 0, len(binaryLogs)-1 + for begin <= end { + mid := (begin + end) / 2 + currBinlog := binaryLogs[mid] + + minTS, err = r.findMinTimestampOfBinlog(currBinlog) + if err != nil { + return err + } + + r.tctx.L().Debug("min timestamp in binlog file", zap.Reflect("file", currBinlog), zap.Uint32("ts", minTS)) + + lastTS = minTS + lastMid = mid + + if minTS >= targetTS { + end = mid - 1 + } else { + // current binlog maybe the target binlog file, we'll backtrace to it later. + begin = mid + 1 + } + } + if lastTS >= targetTS { + if lastMid == 0 { + // timestamp of first binlog event in first binlog file >= targetTS + r.targetBinlog = binaryLogs[lastMid] + r.tsBeforeFirstBinlog = true + } else { + // timestamp of first event in lastMid >= targetTS, need to search from previous binlog file + r.targetBinlog = binaryLogs[lastMid-1] + } + } else { + r.targetBinlog = binaryLogs[lastMid] + } + r.lastBinlogFile = r.targetBinlog.name == binaryLogs[len(binaryLogs)-1].name + + r.tctx.L().Info("target binlog file", zap.Reflect("file", r.targetBinlog), + zap.Bool("before first binlog", r.tsBeforeFirstBinlog), + zap.Bool("last binlog", r.lastBinlogFile)) + + return nil +} + +func (r *binlogPosFinder) processGTIDRelatedEvent(ev *replication.BinlogEvent, prevSet gtid.Set) (gtid.Set, error) { + ev, err := r.parser.Parse(ev.RawData) + if err != nil { + return nil, err + } + switch ev.Header.EventType { + case replication.PREVIOUS_GTIDS_EVENT: + newSet, err := event.GTIDsFromPreviousGTIDsEvent(ev) + if err != nil { + return nil, err + } + return newSet, nil + case replication.MARIADB_GTID_LIST_EVENT: + newSet, err := event.GTIDsFromMariaDBGTIDListEvent(ev) + if err != nil { + return nil, err + } + return newSet, nil + case replication.MARIADB_GTID_EVENT, replication.GTID_EVENT: + gtidStr, _ := event.GetGTIDStr(ev) + if err := prevSet.Update(gtidStr); err != nil { + return nil, err + } + } + return prevSet, nil +} + +func (r *binlogPosFinder) checkTransactionBeginEvent(ev *replication.BinlogEvent) (bool, error) { + // we find the timestamp at transaction boundary + // When there are GTID events in this binlog file, we use GTID event as the start event, else: + // for DML + // take a 'BEGIN' query event as the start event + // XID event or a 'COMMIT' query event as the end event + // for DDL + // one single query event acts as both the start and end event + var transactionBeginEvent bool + switch ev.Header.EventType { + case replication.FORMAT_DESCRIPTION_EVENT: + _, err := r.parser.Parse(ev.RawData) + if err != nil { + return false, err + } + case replication.GTID_EVENT, replication.ANONYMOUS_GTID_EVENT, replication.MARIADB_GTID_EVENT: + // since 5.7, when GTID not enabled, mysql add a anonymous gtid event. we use this to avoid parsing query event + r.everMetGTIDEvent = true + transactionBeginEvent = true + case replication.QUERY_EVENT: + if !r.everMetGTIDEvent { + // user may change session level binlog-format=statement, but it's an unusual operation, so we parse it every time + // In MySQL 5.6.x without GTID, the timestamp of BEGIN is the timestamp of the first statement in the transaction, + // not the commit timestamp of the transaction. + // To simplify implementation, we use timestamp of BEGIN as the transaction timestamp, + // but this may cause some transaction with timestamp >= target timestamp be skipped. + // TODO maybe add backtrace to support this case later + ev2, err := r.parser.Parse(ev.RawData) + if err != nil { + return false, err + } + e := ev2.Event.(*replication.QueryEvent) + switch string(e.Query) { + case "BEGIN": + transactionBeginEvent = true + r.inTransaction = true + case "COMMIT": // MyISAM use COMMIT to end transaction + r.inTransaction = false + default: + if !r.inTransaction { + // DDL + transactionBeginEvent = true + } + } + } + case replication.XID_EVENT: + r.inTransaction = false + } + return transactionBeginEvent, nil +} + +// FindByTimestamp get binlog location of first event or transaction with timestamp >= ts +// go-mysql has BinlogStreamer.GetEventWithStartTime, but it doesn't fit our need. And we need to support relay log. +// if posType != AboveUpperBoundBinlogPos, then location is the target location we want. +// if posType == BelowLowerBoundBinlogPos, master binlog may have purged. +func (r *binlogPosFinder) FindByTimestamp(ts int64) (*Location, PosType, error) { + r.tctx.L().Info("target timestamp", zap.Int64("ts", ts)) + + if err := r.initTargetBinlogFile(ts); err != nil { + return nil, InvalidBinlogPos, err + } + + targetTS := uint32(ts) + position := mysql.Position{Name: r.targetBinlog.name, Pos: FileHeaderLen} + gtidSet := gtid.MinGTIDSet(r.flavor) + + binlogReader, err := r.startSync(position) + if err != nil { + return nil, InvalidBinlogPos, err + } + defer binlogReader.Close() + for { + ev, err := binlogReader.GetEvent(r.tctx.Ctx) + // let outer layer retry + if err != nil { + return nil, InvalidBinlogPos, err + } + if utils.IsFakeRotateEvent(ev.Header) { + continue + } + + transactionBeginEvent, err := r.checkTransactionBeginEvent(ev) + if err != nil { + return nil, InvalidBinlogPos, err + } + + if transactionBeginEvent && ev.Header.Timestamp >= targetTS { + break + } + position.Pos = ev.Header.LogPos + + if r.enableGTID { + eventType := ev.Header.EventType + if eventType == replication.PREVIOUS_GTIDS_EVENT || + eventType == replication.MARIADB_GTID_LIST_EVENT || + eventType == replication.GTID_EVENT || + eventType == replication.MARIADB_GTID_EVENT { + gtidSet, err = r.processGTIDRelatedEvent(ev, gtidSet) + if err != nil { + return nil, InvalidBinlogPos, err + } + // we meet PREVIOUS_GTIDS_EVENT or MARIADB_GTID_LIST_EVENT first, so break after get previous GTIDs + if r.tsBeforeFirstBinlog { + break + } + } + } + + // still not found the timestamp after reached the end of this binlog file + if int64(position.Pos) >= r.targetBinlog.size { + // if it's the last binlog file, then this timestamp is out of range, + // else the end of this binlog file is the position we want, + // since the timestamp of the first event in next binlog >= target timestamp + if r.lastBinlogFile { + return nil, AboveUpperBoundBinlogPos, nil + } + break + } + } + if r.tsBeforeFirstBinlog { + // always return the position of the first event in target binlog + loc := InitLocation(mysql.Position{Name: r.targetBinlog.name, Pos: FileHeaderLen}, gtidSet) + return &loc, BelowLowerBoundBinlogPos, nil + } + loc := InitLocation(position, gtidSet) + return &loc, InRangeBinlogPos, nil +} diff --git a/dm/pkg/binlog/pos_finder_test.go b/dm/pkg/binlog/pos_finder_test.go new file mode 100644 index 00000000000..0c86e3f9778 --- /dev/null +++ b/dm/pkg/binlog/pos_finder_test.go @@ -0,0 +1,446 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package binlog + +import ( + "bytes" + "context" + "fmt" + "os" + "path" + "strconv" + "time" + + "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/replication" + . "github.com/pingcap/check" + + "github.com/pingcap/tiflow/dm/pkg/binlog/event" + tcontext "github.com/pingcap/tiflow/dm/pkg/context" + "github.com/pingcap/tiflow/dm/pkg/log" +) + +var _ = Suite(&testPosFinderSuite{}) + +type testPosFinderSuite struct{} + +func genBinlogFile(generator *event.Generator, start time.Time, nextFile string) ([]*replication.BinlogEvent, []byte) { + insertDMLData := []*event.DMLData{ + { + TableID: uint64(1), + Schema: fmt.Sprintf("db_%d", 1), + Table: strconv.Itoa(1), + ColumnType: []byte{mysql.MYSQL_TYPE_INT24}, + Rows: [][]interface{}{{int32(1)}, {int32(2)}}, + }, + } + allEvents := make([]*replication.BinlogEvent, 0) + var buf bytes.Buffer + events, data, _ := generator.GenFileHeader(start.Add(1 * time.Second).Unix()) + allEvents = append(allEvents, events...) + buf.Write(data) + + events, data, _ = generator.GenDDLEvents("test", "create table t(id int)", start.Add(2*time.Second).Unix()) + allEvents = append(allEvents, events...) + buf.Write(data) + + events, data, _ = generator.GenDMLEvents(replication.WRITE_ROWS_EVENTv2, insertDMLData, start.Add(3*time.Second).Unix()) + allEvents = append(allEvents, events...) + buf.Write(data) + + events, data, _ = generator.GenDMLEvents(replication.WRITE_ROWS_EVENTv2, insertDMLData, start.Add(5*time.Second).Unix()) + allEvents = append(allEvents, events...) + buf.Write(data) + + ev, data, _ := generator.Rotate(nextFile, start.Add(5*time.Second).Unix()) + allEvents = append(allEvents, ev) + buf.Write(data) + + return allEvents, buf.Bytes() +} + +func (t *testPosFinderSuite) TestTransBoundary(c *C) { + flavor := "mysql" + relayDir := c.MkDir() + beforeTime := time.Now() + latestGTIDStr := "ffffffff-ffff-ffff-ffff-ffffffffffff:1" + generator, _ := event.NewGeneratorV2(flavor, "5.6.0", latestGTIDStr, false) + insertDMLData := []*event.DMLData{ + { + TableID: uint64(1), + Schema: fmt.Sprintf("db_%d", 1), + Table: strconv.Itoa(1), + ColumnType: []byte{mysql.MYSQL_TYPE_INT24}, + Rows: [][]interface{}{{int32(1)}, {int32(2)}}, + }, + } + var buf bytes.Buffer + _, data, err := generator.GenFileHeader(beforeTime.Add(1 * time.Second).Unix()) + c.Assert(err, IsNil) + buf.Write(data) + + // first transaction, timestamp of BEGIN = beforeTime.Add(2*time.Second) + // timestamp of other events inside this transaction = beforeTime.Add(3 * time.Second) + ts := beforeTime.Add(2 * time.Second).Unix() + header := &replication.EventHeader{ + Timestamp: uint32(ts), + ServerID: 11, + Flags: 0x01, + } + beginEvent, _ := event.GenQueryEvent(header, generator.LatestPos, 1, 1, 0, []byte("0"), []byte("test"), []byte("BEGIN")) + buf.Write(beginEvent.RawData) + + ts = beforeTime.Add(3 * time.Second).Unix() + header.Timestamp = uint32(ts) + mapEvent, _ := event.GenTableMapEvent(header, beginEvent.Header.LogPos, 1, []byte("test"), []byte("t"), []byte{mysql.MYSQL_TYPE_INT24}) + buf.Write(mapEvent.RawData) + rowsEvent, _ := event.GenRowsEvent(header, mapEvent.Header.LogPos, replication.WRITE_ROWS_EVENTv2, 1, 1, [][]interface{}{{int32(1)}, {int32(2)}}, []byte{mysql.MYSQL_TYPE_INT24}, mapEvent) + buf.Write(rowsEvent.RawData) + xidEvent, _ := event.GenXIDEvent(header, rowsEvent.Header.LogPos, 1) + buf.Write(xidEvent.RawData) + + // second transaction, timestamp of all events = beforeTime.Add(3 * time.Second) + generator.LatestPos = xidEvent.Header.LogPos + dmlEvents, data, _ := generator.GenDMLEvents(replication.WRITE_ROWS_EVENTv2, insertDMLData, ts) + buf.Write(data) + + c.Assert(dmlEvents[len(dmlEvents)-1].Header.LogPos, Equals, uint32(buf.Len())) + _ = os.WriteFile(path.Join(relayDir, "mysql-bin.000001"), buf.Bytes(), 0o644) + + { + tcctx := tcontext.NewContext(context.Background(), log.L()) + finder := NewLocalBinlogPosFinder(tcctx, false, flavor, relayDir) + location, posType, err := finder.FindByTimestamp(ts) + c.Assert(err, IsNil) + // start of second transaction + c.Assert(location.Position, Equals, mysql.Position{Name: "mysql-bin.000001", Pos: xidEvent.Header.LogPos}) + c.Assert(location.GTIDSetStr(), Equals, "") + c.Assert(posType, Equals, InRangeBinlogPos) + } +} + +func (t *testPosFinderSuite) TestMySQL56NoGTID(c *C) { + flavor := "mysql" + relayDir := c.MkDir() + beforeTime := time.Now() + latestGTIDStr := "ffffffff-ffff-ffff-ffff-ffffffffffff:1" + + generator, _ := event.NewGeneratorV2(flavor, "5.6.0", latestGTIDStr, false) + + file1Events, data := genBinlogFile(generator, beforeTime, "mysql-bin.000002") + c.Assert(len(file1Events), Equals, 11) + _ = os.WriteFile(path.Join(relayDir, "mysql-bin.000001"), data, 0o644) + file2Events, data := genBinlogFile(generator, beforeTime.Add(5*time.Second), "mysql-bin.000003") + c.Assert(len(file2Events), Equals, 11) + _ = os.WriteFile(path.Join(relayDir, "mysql-bin.000002"), data, 0o644) + file3Events, data := genBinlogFile(generator, beforeTime.Add(10*time.Second), "mysql-bin.000004") + c.Assert(len(file3Events), Equals, 11) + _ = os.WriteFile(path.Join(relayDir, "mysql-bin.000003"), data, 0o644) + + tcctx := tcontext.NewContext(context.Background(), log.L()) + { + var targetEventStart uint32 + var targetEvent *replication.BinlogEvent + for _, ev := range file1Events { + if e, ok := ev.Event.(*replication.QueryEvent); ok && string(e.Query) == "BEGIN" { + targetEvent = ev + break + } + targetEventStart = ev.Header.LogPos + } + finder := NewLocalBinlogPosFinder(tcctx, false, flavor, relayDir) + location, posType, err := finder.FindByTimestamp(int64(targetEvent.Header.Timestamp)) + c.Assert(err, IsNil) + c.Assert(location.Position, Equals, mysql.Position{Name: "mysql-bin.000001", Pos: targetEventStart}) + c.Assert(location.GTIDSetStr(), Equals, "") + c.Assert(posType, Equals, InRangeBinlogPos) + } + { + targetEventStart := file2Events[len(file2Events)-1].Header.LogPos + finder := NewLocalBinlogPosFinder(tcctx, false, flavor, relayDir) + location, posType, err := finder.FindByTimestamp(int64(file3Events[0].Header.Timestamp)) + c.Assert(err, IsNil) + c.Assert(location.Position, Equals, mysql.Position{Name: "mysql-bin.000002", Pos: targetEventStart}) + c.Assert(location.GTIDSetStr(), Equals, "") + c.Assert(posType, Equals, InRangeBinlogPos) + } + { + var targetEventStart uint32 + var targetEvent *replication.BinlogEvent + for _, ev := range file3Events { + if _, ok := ev.Event.(*replication.QueryEvent); ok { + targetEvent = ev + break + } + targetEventStart = ev.Header.LogPos + } + finder := NewLocalBinlogPosFinder(tcctx, false, flavor, relayDir) + location, posType, err := finder.FindByTimestamp(int64(targetEvent.Header.Timestamp)) + c.Assert(err, IsNil) + c.Assert(location.Position, Equals, mysql.Position{Name: "mysql-bin.000003", Pos: targetEventStart}) + c.Assert(location.GTIDSetStr(), Equals, "") + c.Assert(posType, Equals, InRangeBinlogPos) + } +} + +func (t *testPosFinderSuite) TestMySQL57NoGTID(c *C) { + flavor := "mysql" + relayDir := c.MkDir() + beforeTime := time.Now() + latestGTIDStr := "ffffffff-ffff-ffff-ffff-ffffffffffff:1" + + generator, _ := event.NewGeneratorV2(flavor, "5.7.0", latestGTIDStr, false) + + file1Events, data := genBinlogFile(generator, beforeTime, "mysql-bin.000002") + c.Assert(len(file1Events), Equals, 15) + _ = os.WriteFile(path.Join(relayDir, "mysql-bin.000001"), data, 0o644) + file2Events, data := genBinlogFile(generator, beforeTime.Add(5*time.Second), "mysql-bin.000003") + c.Assert(len(file2Events), Equals, 15) + _ = os.WriteFile(path.Join(relayDir, "mysql-bin.000002"), data, 0o644) + file3Events, data := genBinlogFile(generator, beforeTime.Add(10*time.Second), "mysql-bin.000004") + c.Assert(len(file3Events), Equals, 15) + _ = os.WriteFile(path.Join(relayDir, "mysql-bin.000003"), data, 0o644) + + tcctx := tcontext.NewContext(context.Background(), log.L()) + { + var targetEventStart uint32 + var targetEvent *replication.BinlogEvent + cnt := 0 + for _, ev := range file3Events { + if ev.Header.EventType == replication.ANONYMOUS_GTID_EVENT { + targetEvent = ev + // second GTID event + cnt++ + if cnt == 2 { + break + } + } + targetEventStart = ev.Header.LogPos + } + finder := NewLocalBinlogPosFinder(tcctx, false, flavor, relayDir) + location, posType, err := finder.FindByTimestamp(int64(targetEvent.Header.Timestamp)) + c.Assert(err, IsNil) + c.Assert(location.Position, Equals, mysql.Position{Name: "mysql-bin.000003", Pos: targetEventStart}) + c.Assert(location.GTIDSetStr(), Equals, "") + c.Assert(posType, Equals, InRangeBinlogPos) + } +} + +func (t *testPosFinderSuite) TestErrorCase(c *C) { + flavor := "mysql" + relayDir := c.MkDir() + beforeTime := time.Now() + tcctx := tcontext.NewContext(context.Background(), log.L()) + { + finder := NewLocalBinlogPosFinder(tcctx, true, flavor, relayDir+"not-exist") + _, _, err := finder.FindByTimestamp(beforeTime.Add(-time.Minute).Unix()) + c.Assert(err.Error(), Matches, ".*no such file or directory.*") + } + { + finder := NewLocalBinlogPosFinder(tcctx, true, flavor, c.MkDir()) + _, _, err := finder.FindByTimestamp(beforeTime.Add(-time.Minute).Unix()) + c.Assert(err.Error(), Matches, ".*cannot find binlog files.*") + } + { + file, err := os.Create(path.Join(relayDir, "mysql-bin.000001")) + c.Assert(err, IsNil) + file.Close() + finder := NewLocalBinlogPosFinder(tcctx, true, flavor, relayDir) + _, _, err = finder.FindByTimestamp(beforeTime.Add(-time.Minute).Unix()) + c.Assert(err.Error(), Matches, "EOF") + } +} + +func (t *testPosFinderSuite) TestMySQL57GTID(c *C) { + flavor := "mysql" + relayDir := c.MkDir() + beforeTime := time.Now() + latestGTIDStr := "ffffffff-ffff-ffff-ffff-ffffffffffff:1" + + generator, _ := event.NewGeneratorV2(flavor, "5.7.0", latestGTIDStr, true) + + file1Events, data := genBinlogFile(generator, beforeTime, "mysql-bin.000002") + c.Assert(len(file1Events), Equals, 15) + _ = os.WriteFile(path.Join(relayDir, "mysql-bin.000001"), data, 0o644) + file2Events, data := genBinlogFile(generator, beforeTime.Add(5*time.Second), "mysql-bin.000003") + c.Assert(len(file2Events), Equals, 15) + _ = os.WriteFile(path.Join(relayDir, "mysql-bin.000002"), data, 0o644) + file3Events, data := genBinlogFile(generator, beforeTime.Add(10*time.Second), "mysql-bin.000004") + c.Assert(len(file3Events), Equals, 15) + _ = os.WriteFile(path.Join(relayDir, "mysql-bin.000003"), data, 0o644) + + tcctx := tcontext.NewContext(context.Background(), log.L()) + + { + finder := NewLocalBinlogPosFinder(tcctx, true, flavor, relayDir) + location, posType, err := finder.FindByTimestamp(beforeTime.Add(-time.Minute).Unix()) + c.Assert(err, IsNil) + c.Assert(location.Position, Equals, mysql.Position{Name: "mysql-bin.000001", Pos: 4}) + c.Assert(location.GTIDSetStr(), Equals, "ffffffff-ffff-ffff-ffff-ffffffffffff:1") + c.Assert(posType, Equals, BelowLowerBoundBinlogPos) + } + { + gtids := []string{ + "ffffffff-ffff-ffff-ffff-ffffffffffff:1", + "ffffffff-ffff-ffff-ffff-ffffffffffff:1-2", + "ffffffff-ffff-ffff-ffff-ffffffffffff:1-3", + } + var targetEventStart uint32 + var targetEvent *replication.BinlogEvent + cnt := 0 + for _, ev := range file1Events { + if ev.Header.EventType == replication.GTID_EVENT { + targetEvent = ev + + finder := NewLocalBinlogPosFinder(tcctx, true, flavor, relayDir) + location, posType, err := finder.FindByTimestamp(int64(targetEvent.Header.Timestamp)) + c.Assert(err, IsNil) + c.Assert(location.Position, Equals, mysql.Position{Name: "mysql-bin.000001", Pos: targetEventStart}) + c.Assert(location.GTIDSetStr(), Equals, gtids[cnt]) + c.Assert(posType, Equals, InRangeBinlogPos) + + cnt++ + } + targetEventStart = ev.Header.LogPos + } + } + { + targetEventStart := file2Events[len(file2Events)-1].Header.LogPos + finder := NewLocalBinlogPosFinder(tcctx, true, flavor, relayDir) + location, posType, err := finder.FindByTimestamp(int64(file3Events[0].Header.Timestamp)) + c.Assert(err, IsNil) + c.Assert(location.Position, Equals, mysql.Position{Name: "mysql-bin.000002", Pos: targetEventStart}) + c.Assert(location.GTIDSetStr(), Equals, "ffffffff-ffff-ffff-ffff-ffffffffffff:1-7") + c.Assert(posType, Equals, InRangeBinlogPos) + } + { + var targetEventStart uint32 + var targetEvent *replication.BinlogEvent + cnt := 0 + for _, ev := range file3Events { + if ev.Header.EventType == replication.GTID_EVENT { + targetEvent = ev + // third GTID event + cnt++ + if cnt == 3 { + break + } + } + targetEventStart = ev.Header.LogPos + } + finder := NewLocalBinlogPosFinder(tcctx, true, flavor, relayDir) + location, posType, err := finder.FindByTimestamp(int64(targetEvent.Header.Timestamp)) + c.Assert(err, IsNil) + c.Assert(location.Position, Equals, mysql.Position{Name: "mysql-bin.000003", Pos: targetEventStart}) + c.Assert(location.GTIDSetStr(), Equals, "ffffffff-ffff-ffff-ffff-ffffffffffff:1-9") + c.Assert(posType, Equals, InRangeBinlogPos) + } + { + finder := NewLocalBinlogPosFinder(tcctx, true, flavor, relayDir) + location, posType, err := finder.FindByTimestamp(beforeTime.Add(+time.Minute).Unix()) + c.Assert(err, IsNil) + c.Assert(location, IsNil) + c.Assert(posType, Equals, AboveUpperBoundBinlogPos) + } +} + +func (t *testPosFinderSuite) TestMariadbGTID(c *C) { + flavor := "mariadb" + relayDir := c.MkDir() + beforeTime := time.Now() + latestGTIDStr := "1-1-1" + + generator, _ := event.NewGeneratorV2(flavor, "10.0.2", latestGTIDStr, true) + + file1Events, data := genBinlogFile(generator, beforeTime, "mysql-bin.000002") + c.Assert(len(file1Events), Equals, 15) + _ = os.WriteFile(path.Join(relayDir, "mysql-bin.000001"), data, 0o644) + file2Events, data := genBinlogFile(generator, beforeTime.Add(5*time.Second), "mysql-bin.000003") + c.Assert(len(file2Events), Equals, 15) + _ = os.WriteFile(path.Join(relayDir, "mysql-bin.000002"), data, 0o644) + file3Events, data := genBinlogFile(generator, beforeTime.Add(10*time.Second), "mysql-bin.000004") + c.Assert(len(file3Events), Equals, 15) + _ = os.WriteFile(path.Join(relayDir, "mysql-bin.000003"), data, 0o644) + + tcctx := tcontext.NewContext(context.Background(), log.L()) + + { + finder := NewLocalBinlogPosFinder(tcctx, true, flavor, relayDir) + location, posType, err := finder.FindByTimestamp(beforeTime.Add(-time.Minute).Unix()) + c.Assert(err, IsNil) + c.Assert(location.Position, Equals, mysql.Position{Name: "mysql-bin.000001", Pos: 4}) + c.Assert(location.GTIDSetStr(), Equals, "1-1-1") + c.Assert(posType, Equals, BelowLowerBoundBinlogPos) + } + { + var targetEventStart uint32 + var targetEvent *replication.BinlogEvent + cnt := 0 + for _, ev := range file1Events { + if ev.Header.EventType == replication.MARIADB_GTID_EVENT { + targetEvent = ev + // second GTID event + cnt++ + if cnt == 2 { + break + } + } + targetEventStart = ev.Header.LogPos + } + finder := NewLocalBinlogPosFinder(tcctx, true, flavor, relayDir) + location, posType, err := finder.FindByTimestamp(int64(targetEvent.Header.Timestamp)) + c.Assert(err, IsNil) + c.Assert(location.Position, Equals, mysql.Position{Name: "mysql-bin.000001", Pos: targetEventStart}) + c.Assert(location.GTIDSetStr(), Equals, "1-1-2") + c.Assert(posType, Equals, InRangeBinlogPos) + } + { + targetEventStart := file2Events[len(file2Events)-1].Header.LogPos + finder := NewLocalBinlogPosFinder(tcctx, true, flavor, relayDir) + location, posType, err := finder.FindByTimestamp(int64(file3Events[0].Header.Timestamp)) + c.Assert(err, IsNil) + c.Assert(location.Position, Equals, mysql.Position{Name: "mysql-bin.000002", Pos: targetEventStart}) + c.Assert(location.GTIDSetStr(), Equals, "1-1-7") + c.Assert(posType, Equals, InRangeBinlogPos) + } + { + var targetEventStart uint32 + var targetEvent *replication.BinlogEvent + cnt := 0 + for _, ev := range file3Events { + if ev.Header.EventType == replication.MARIADB_GTID_EVENT { + targetEvent = ev + // second GTID event + cnt++ + if cnt == 2 { + break + } + } + targetEventStart = ev.Header.LogPos + } + finder := NewLocalBinlogPosFinder(tcctx, true, flavor, relayDir) + location, posType, err := finder.FindByTimestamp(int64(targetEvent.Header.Timestamp)) + c.Assert(err, IsNil) + c.Assert(location.Position, Equals, mysql.Position{Name: "mysql-bin.000003", Pos: targetEventStart}) + c.Assert(location.GTIDSetStr(), Equals, "1-1-8") + c.Assert(posType, Equals, InRangeBinlogPos) + } + { + finder := NewLocalBinlogPosFinder(tcctx, true, flavor, relayDir) + location, posType, err := finder.FindByTimestamp(beforeTime.Add(+time.Minute).Unix()) + c.Assert(err, IsNil) + c.Assert(location, IsNil) + c.Assert(posType, Equals, AboveUpperBoundBinlogPos) + } +} diff --git a/dm/pkg/binlog/status.go b/dm/pkg/binlog/status.go index 0c00f142c98..8f3cdbb2a7c 100644 --- a/dm/pkg/binlog/status.go +++ b/dm/pkg/binlog/status.go @@ -16,11 +16,13 @@ package binlog import ( "context" "database/sql" + "path" "time" gmysql "github.com/go-mysql-org/go-mysql/mysql" "github.com/pingcap/tiflow/dm/pkg/terror" + "github.com/pingcap/tiflow/dm/pkg/utils" ) // in MySQL, we can set `max_binlog_size` to control the max size of a binlog file. @@ -93,6 +95,22 @@ func (b FileSizes) After(fromFile gmysql.Position) int64 { return total } +func GetLocalBinaryLogs(dir string) (FileSizes, error) { + fileNames, err := ReadSortedBinlogFromDir(dir) + if err != nil { + return nil, err + } + files := make([]binlogSize, 0, len(fileNames)) + for _, fileName := range fileNames { + size, err := utils.GetFileSize(path.Join(dir, fileName)) + if err != nil { + return nil, err + } + files = append(files, binlogSize{name: fileName, size: size}) + } + return files, nil +} + // SourceStatus collects all information of upstream. type SourceStatus struct { Location Location diff --git a/dm/pkg/gtid/gtid.go b/dm/pkg/gtid/gtid.go index d1bd972083b..3f3515ad60f 100644 --- a/dm/pkg/gtid/gtid.go +++ b/dm/pkg/gtid/gtid.go @@ -35,6 +35,7 @@ type Set interface { Origin() mysql.GTIDSet Equal(other Set) bool Contain(other Set) bool + Update(gtidStr string) error // Truncate truncates the current GTID sets until the `end` in-place. // NOTE: the original GTID sets should contain the end GTID sets, otherwise it's invalid. @@ -226,6 +227,10 @@ func (g *MySQLGTIDSet) Contain(other Set) bool { return g.set.Contain(other.Origin()) } +func (g *MySQLGTIDSet) Update(gtidStr string) error { + return g.set.Update(gtidStr) +} + // Truncate implements Set.Truncate. func (g *MySQLGTIDSet) Truncate(end Set) error { if end == nil { @@ -392,6 +397,10 @@ func (m *MariadbGTIDSet) Contain(other Set) bool { return m.set.Contain(other.Origin()) } +func (m *MariadbGTIDSet) Update(gtidStr string) error { + return m.set.Update(gtidStr) +} + // Truncate implements Set.Truncate. func (m *MariadbGTIDSet) Truncate(end Set) error { if end == nil { diff --git a/dm/relay/file.go b/dm/relay/file.go index d88c2021ece..8db12f3b10f 100644 --- a/dm/relay/file.go +++ b/dm/relay/file.go @@ -16,7 +16,6 @@ package relay import ( "os" "path/filepath" - "sort" "go.uber.org/zap" @@ -49,7 +48,7 @@ func CollectAllBinlogFiles(dir string) ([]string, error) { if dir == "" { return nil, terror.ErrEmptyRelayDir.Generate() } - return readSortedBinlogFromDir(dir) + return binlog.ReadSortedBinlogFromDir(dir) } // CollectBinlogFilesCmp collects valid binlog files with a compare condition. @@ -109,7 +108,7 @@ func CollectBinlogFilesCmp(dir, baseFile string, cmp FileCmp) ([]string, error) // getFirstBinlogName gets the first binlog file in relay sub directory. func getFirstBinlogName(baseDir, uuid string) (string, error) { subDir := filepath.Join(baseDir, uuid) - files, err := readSortedBinlogFromDir(subDir) + files, err := binlog.ReadSortedBinlogFromDir(subDir) if err != nil { return "", terror.Annotatef(err, "get binlog file for dir %s", subDir) } @@ -120,57 +119,6 @@ func getFirstBinlogName(baseDir, uuid string) (string, error) { return files[0], nil } -// readSortedBinlogFromDir reads and returns all binlog files (sorted ascending by binlog filename and sequence number). -func readSortedBinlogFromDir(dirpath string) ([]string, error) { - dir, err := os.Open(dirpath) - if err != nil { - return nil, terror.ErrReadDir.Delegate(err, dirpath) - } - defer dir.Close() - - names, err := dir.Readdirnames(-1) - if err != nil { - return nil, terror.ErrReadDir.Delegate(err, dirpath) - } - if len(names) == 0 { - return nil, nil - } - - // sorting bin.100000, ..., bin.1000000, ..., bin.999999 - type tuple struct { - filename string - parsed binlog.Filename - } - tmp := make([]tuple, 0, len(names)-1) - - for _, f := range names { - p, err2 := binlog.ParseFilename(f) - if err2 != nil { - // may contain some file that can't be parsed, like relay meta. ignore them - log.L().Info("collecting binlog file, ignore invalid file", zap.String("file", f)) - continue - } - tmp = append(tmp, tuple{ - filename: f, - parsed: p, - }) - } - - sort.Slice(tmp, func(i, j int) bool { - if tmp[i].parsed.BaseName != tmp[j].parsed.BaseName { - return tmp[i].parsed.BaseName < tmp[j].parsed.BaseName - } - return tmp[i].parsed.LessThan(tmp[j].parsed) - }) - - ret := make([]string, len(tmp)) - for i := range tmp { - ret[i] = tmp[i].filename - } - - return ret, nil -} - // fileSizeUpdated checks whether the file's size has updated // return // 0: not updated diff --git a/dm/relay/file_test.go b/dm/relay/file_test.go index 85e6f6fbe20..5a2a86b343d 100644 --- a/dm/relay/file_test.go +++ b/dm/relay/file_test.go @@ -287,19 +287,3 @@ func (t *testFileSuite) TestFileSizeUpdated(c *C) { c.Assert(err, IsNil) c.Assert(cmp, Equals, 1) } - -func (t *testFileSuite) TestReadSortedBinlogFromDir(c *C) { - dir := c.MkDir() - filenames := []string{ - "bin.000001", "bin.000002", "bin.100000", "bin.100001", "bin.1000000", "bin.1000001", "bin.999999", "relay.meta", - } - expected := []string{ - "bin.000001", "bin.000002", "bin.100000", "bin.100001", "bin.999999", "bin.1000000", "bin.1000001", - } - for _, f := range filenames { - c.Assert(os.WriteFile(filepath.Join(dir, f), nil, 0o600), IsNil) - } - ret, err := readSortedBinlogFromDir(dir) - c.Assert(err, IsNil) - c.Assert(ret, DeepEquals, expected) -} diff --git a/dm/relay/file_util_test.go b/dm/relay/file_util_test.go index fd1dd72c95f..e80b41a7222 100644 --- a/dm/relay/file_util_test.go +++ b/dm/relay/file_util_test.go @@ -195,7 +195,7 @@ func (t *testFileUtilSuite) TestCheckIsDuplicateEvent(c *check.C) { g, err := event.NewGenerator(flavor, serverID, latestPos, latestGTID, previousGTIDSet, latestXID) c.Assert(err, check.IsNil) // file header with FormatDescriptionEvent and PreviousGTIDsEvent - events, data, err := g.GenFileHeader() + events, data, err := g.GenFileHeader(0) c.Assert(err, check.IsNil) allEvents = append(allEvents, events...) allData.Write(data) @@ -206,7 +206,7 @@ func (t *testFileUtilSuite) TestCheckIsDuplicateEvent(c *check.C) { "CREATE TABLE `db`.`tbl2` (c1 INT)", } for _, query := range queries { - events, data, err = g.GenDDLEvents("db", query) + events, data, err = g.GenDDLEvents("db", query, 0) c.Assert(err, check.IsNil) allEvents = append(allEvents, events...) allData.Write(data) @@ -224,7 +224,7 @@ func (t *testFileUtilSuite) TestCheckIsDuplicateEvent(c *check.C) { } // event not in the file, because its start pos > file size - events, _, err = g.GenDDLEvents("", "BEGIN") + events, _, err = g.GenDDLEvents("", "BEGIN", 0) c.Assert(err, check.IsNil) duplicate, err := checkIsDuplicateEvent(filename, events[0]) c.Assert(err, check.IsNil) @@ -346,7 +346,7 @@ func (t *testFileUtilSuite) testGetTxnPosGTIDs(c *check.C, filename, flavor, pre Rows: updateRows, }, } - extraEvents, extraData, err := g.GenDMLEvents(eventType, dmlData) + extraEvents, extraData, err := g.GenDMLEvents(eventType, dmlData, 0) c.Assert(err, check.IsNil) c.Assert(extraEvents, check.HasLen, 5) // [GTID, BEGIN, TableMap, UPDATE, XID] diff --git a/dm/relay/local_reader_test.go b/dm/relay/local_reader_test.go index 8d62a2d6c8a..a5889ccb979 100644 --- a/dm/relay/local_reader_test.go +++ b/dm/relay/local_reader_test.go @@ -1068,7 +1068,7 @@ func (t *testReaderSuite) genBinlogEvents(c *C, latestPos uint32, latestGTID gti // for these tests, generates some DDL events is enough count := 5 + rand.Intn(5) for i := 0; i < count; i++ { - evs, err := event.GenDDLEvents(gmysql.MySQLFlavor, 1, latestPos, latestGTID, fmt.Sprintf("db_%d", i), fmt.Sprintf("CREATE TABLE %d (c1 INT)", i)) + evs, err := event.GenDDLEvents(gmysql.MySQLFlavor, 1, latestPos, latestGTID, fmt.Sprintf("db_%d", i), fmt.Sprintf("CREATE TABLE %d (c1 INT)", i), true, false, 0) c.Assert(err, IsNil) events = append(events, evs.Events...) latestPos = evs.LatestPos @@ -1099,7 +1099,7 @@ func (t *testReaderSuite) genEvents(c *C, eventTypes []replication.EventType, la for i, eventType := range eventTypes { switch eventType { case replication.QUERY_EVENT: - evs, err := event.GenDDLEvents(gmysql.MySQLFlavor, 1, latestPos, latestGTID, fmt.Sprintf("db_%d", i), fmt.Sprintf("CREATE TABLE %d (c1 int)", i)) + evs, err := event.GenDDLEvents(gmysql.MySQLFlavor, 1, latestPos, latestGTID, fmt.Sprintf("db_%d", i), fmt.Sprintf("CREATE TABLE %d (c1 int)", i), true, false, 0) c.Assert(err, IsNil) events = append(events, evs.Events...) latestPos = evs.LatestPos @@ -1119,7 +1119,7 @@ func (t *testReaderSuite) genEvents(c *C, eventTypes []replication.EventType, la Rows: [][]interface{}{{int32(1)}, {int32(2)}}, }, } - evs, err := event.GenDMLEvents(gmysql.MySQLFlavor, 1, latestPos, latestGTID, replication.WRITE_ROWS_EVENTv2, 10, insertDMLData) + evs, err := event.GenDMLEvents(gmysql.MySQLFlavor, 1, latestPos, latestGTID, replication.WRITE_ROWS_EVENTv2, 10, insertDMLData, true, false, 0) c.Assert(err, IsNil) events = append(events, evs.Events...) latestPos = evs.LatestPos diff --git a/dm/relay/relay_test.go b/dm/relay/relay_test.go index 07aa40f1032..638c536ad45 100644 --- a/dm/relay/relay_test.go +++ b/dm/relay/relay_test.go @@ -351,7 +351,7 @@ func genBinlogEventsWithGTIDs(c *C, flavor string, previousGTIDSet, latestGTID1, c.Assert(err, IsNil) // file header with FormatDescriptionEvent and PreviousGTIDsEvent - events, data, err := g.GenFileHeader() + events, data, err := g.GenFileHeader(0) c.Assert(err, IsNil) allEvents = append(allEvents, events...) allData.Write(data) @@ -363,7 +363,7 @@ func genBinlogEventsWithGTIDs(c *C, flavor string, previousGTIDSet, latestGTID1, "CREATE TABLE `db`.`tbl2` (c1 INT)", } for _, query := range queries { - events, data, err = g.GenDDLEvents("db", query) + events, data, err = g.GenDDLEvents("db", query, 0) c.Assert(err, IsNil) allEvents = append(allEvents, events...) allData.Write(data) @@ -390,7 +390,7 @@ func genBinlogEventsWithGTIDs(c *C, flavor string, previousGTIDSet, latestGTID1, Rows: insertRows, }, } - events, data, err = g.GenDMLEvents(eventType, dmlData) + events, data, err = g.GenDMLEvents(eventType, dmlData, 0) c.Assert(err, IsNil) allEvents = append(allEvents, events...) allData.Write(data) @@ -759,7 +759,7 @@ func (t *testRelaySuite) TestPreprocessEvent(c *C) { }) // other event type without LOG_EVENT_ARTIFICIAL_F - ev, err = event.GenCommonGTIDEvent(gmysql.MySQLFlavor, header.ServerID, latestPos, gtidSet) + ev, err = event.GenCommonGTIDEvent(gmysql.MySQLFlavor, header.ServerID, latestPos, gtidSet, false, 0) c.Assert(err, IsNil) cases = append(cases, Case{ event: ev, @@ -835,7 +835,7 @@ func (t *testRelaySuite) TestRecoverMySQL(c *C) { c.Assert(fs.Size(), Equals, int64(len(baseData))) // generate another transaction, DDL - extraEvents, extraData, err := g.GenDDLEvents("db2", "CREATE DATABASE db2") + extraEvents, extraData, err := g.GenDDLEvents("db2", "CREATE DATABASE db2", 0) c.Assert(err, IsNil) c.Assert(extraEvents, HasLen, 2) // [GTID, Query] diff --git a/dm/relay/relay_writer_test.go b/dm/relay/relay_writer_test.go index baf0ba87496..002c703181e 100644 --- a/dm/relay/relay_writer_test.go +++ b/dm/relay/relay_writer_test.go @@ -361,7 +361,7 @@ func (t *testFileWriterSuite) TestWriteMultiEvents(c *check.C) { c.Assert(err, check.IsNil) // file header with FormatDescriptionEvent and PreviousGTIDsEvent - events, data, err := g.GenFileHeader() + events, data, err := g.GenFileHeader(0) c.Assert(err, check.IsNil) allEvents = append(allEvents, events...) allData.Write(data) @@ -369,7 +369,7 @@ func (t *testFileWriterSuite) TestWriteMultiEvents(c *check.C) { // CREATE DATABASE/TABLE queries := []string{"CRATE DATABASE `db`", "CREATE TABLE `db`.`tbl` (c1 INT)"} for _, query := range queries { - events, data, err = g.GenDDLEvents("db", query) + events, data, err = g.GenDDLEvents("db", query, 0) c.Assert(err, check.IsNil) allEvents = append(allEvents, events...) allData.Write(data) @@ -384,7 +384,7 @@ func (t *testFileWriterSuite) TestWriteMultiEvents(c *check.C) { insertRows[0] = []interface{}{int32(1)} events, data, err = g.GenDMLEvents(replication.WRITE_ROWS_EVENTv2, []*event.DMLData{ {TableID: tableID, Schema: "db", Table: "tbl", ColumnType: columnType, Rows: insertRows}, - }) + }, 0) c.Assert(err, check.IsNil) allEvents = append(allEvents, events...) allData.Write(data) diff --git a/dm/syncer/binlog_locations_test.go b/dm/syncer/binlog_locations_test.go index 5140938e802..326c3dd7626 100644 --- a/dm/syncer/binlog_locations_test.go +++ b/dm/syncer/binlog_locations_test.go @@ -98,7 +98,7 @@ func (s *testLocationSuite) generateEvents(binlogEvents mockBinlogEvents, c *C) }, } eventType := replication.WRITE_ROWS_EVENTv2 - evs, _, err := s.eventsGenerator.GenDMLEvents(eventType, dmlData) + evs, _, err := s.eventsGenerator.GenDMLEvents(eventType, dmlData, 0) c.Assert(err, IsNil) events = append(events, evs...) @@ -109,7 +109,7 @@ func (s *testLocationSuite) generateEvents(binlogEvents mockBinlogEvents, c *C) Query: e.args[1].(string), }, } - evs, _, err := s.eventsGenerator.GenDMLEvents(replication.UNKNOWN_EVENT, dmlData) + evs, _, err := s.eventsGenerator.GenDMLEvents(replication.UNKNOWN_EVENT, dmlData, 0) c.Assert(err, IsNil) events = append(events, evs...) @@ -119,7 +119,7 @@ func (s *testLocationSuite) generateEvents(binlogEvents mockBinlogEvents, c *C) c.Assert(err, IsNil) events = append(events, fakeRotate) - events1, _, err := event.GenCommonFileHeader(s.flavor, s.serverID, s.prevGSet) + events1, _, err := event.GenCommonFileHeader(s.flavor, s.serverID, s.prevGSet, true, 0) c.Assert(err, IsNil) events = append(events, events1...) diff --git a/dm/syncer/syncer_test.go b/dm/syncer/syncer_test.go index 6aa68676b9e..0646b8e0340 100644 --- a/dm/syncer/syncer_test.go +++ b/dm/syncer/syncer_test.go @@ -183,7 +183,7 @@ func (s *testSyncerSuite) generateEvents(binlogEvents mockBinlogEvents, c *C) [] events = append(events, evs...) case DDL: - evs, _, err := s.eventsGenerator.GenDDLEvents(e.args[0].(string), e.args[1].(string)) + evs, _, err := s.eventsGenerator.GenDDLEvents(e.args[0].(string), e.args[1].(string), 0) c.Assert(err, IsNil) events = append(events, evs...) @@ -208,7 +208,7 @@ func (s *testSyncerSuite) generateEvents(binlogEvents mockBinlogEvents, c *C) [] default: c.Fatal(fmt.Sprintf("mock event generator don't support event type: %d", e.typ)) } - evs, _, err := s.eventsGenerator.GenDMLEvents(eventType, dmlData) + evs, _, err := s.eventsGenerator.GenDMLEvents(eventType, dmlData, 0) c.Assert(err, IsNil) events = append(events, evs...) }