Skip to content

Commit

Permalink
dm: MySQL 8.0 Compressed Binlogs (#8373)
Browse files Browse the repository at this point in the history
close #6381
  • Loading branch information
dveeden committed Jun 21, 2023
1 parent d52a23d commit 8d202f4
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 9 deletions.
5 changes: 4 additions & 1 deletion dm/syncer/binlogstream/binlog_locations.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import (

func isDataEvent(e *replication.BinlogEvent) bool {
switch e.Event.(type) {
case *replication.TableMapEvent, *replication.RowsEvent, *replication.QueryEvent:
case *replication.TableMapEvent,
*replication.RowsEvent,
*replication.QueryEvent,
*replication.TransactionPayloadEvent:
return true
}
return false
Expand Down
29 changes: 29 additions & 0 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2345,6 +2345,35 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
if err2 != nil {
return err2
}
case *replication.TransactionPayloadEvent:
for _, tpev := range ev.Events {
switch tpevt := tpev.Event.(type) {
case *replication.RowsEvent:
eventIndex++
s.metricsProxies.Metrics.BinlogEventRowHistogram.Observe(float64(len(tpevt.Rows)))
ec.header.EventType = tpev.Header.EventType
sourceTable, err2 = s.handleRowsEvent(tpevt, ec)
if sourceTable != nil && err2 == nil && s.cfg.EnableGTID {
if _, ok := affectedSourceTables[sourceTable.Schema]; !ok {
affectedSourceTables[sourceTable.Schema] = make(map[string]struct{})
}
affectedSourceTables[sourceTable.Schema][sourceTable.Name] = struct{}{}
}
case *replication.QueryEvent:
originSQL = strings.TrimSpace(string(tpevt.Query))
err2 = s.ddlWorker.HandleQueryEvent(tpevt, ec, originSQL)
case *replication.XIDEvent:
eventType = "XID"
needContinue, err2 = funcCommit()
default:
s.tctx.L().Warn("unhandled event from transaction payload", zap.String("type", fmt.Sprintf("%T", tpevt)))
}
}
if needContinue {
continue
}
default:
s.tctx.L().Warn("unhandled event", zap.String("type", fmt.Sprintf("%T", ev)))
}
if err2 != nil {
if err := s.handleEventError(err2, startLocation, endLocation, e.Header.EventType == replication.QUERY_EVENT, originSQL); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ require (
github.com/gin-gonic/gin v1.8.1
github.com/glebarez/go-sqlite v1.17.3
github.com/glebarez/sqlite v1.4.6
github.com/go-mysql-org/go-mysql v1.6.1-0.20221223014230-81966e15b9c5
github.com/go-mysql-org/go-mysql v1.7.1-0.20230619063055-fd67d94318fd
github.com/go-ozzo/ozzo-validation/v4 v4.3.0
github.com/go-sql-driver/mysql v1.7.1
github.com/goccy/go-json v0.9.11
Expand Down Expand Up @@ -199,7 +199,7 @@ require (
github.com/jonboulle/clockwork v0.3.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.5 // indirect
github.com/klauspost/compress v1.16.6 // indirect
github.com/klauspost/cpuid v1.3.1 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
Expand Down
10 changes: 4 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,8 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA=
github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AEU963A2AYjv4d1V5eVL1CQbEJq6aCNHDDjibzu8=
github.com/go-mysql-org/go-mysql v1.6.1-0.20221223014230-81966e15b9c5 h1:RYZm5JbJelQnV/7wojD4T04kXd5O46gfOageTLzjJoc=
github.com/go-mysql-org/go-mysql v1.6.1-0.20221223014230-81966e15b9c5/go.mod h1:9cRWLtuXNKhamUPMkrDVzBhaomGvqLRLtBiyjvjc4pk=
github.com/go-mysql-org/go-mysql v1.7.1-0.20230619063055-fd67d94318fd h1:lqWdv8GEYqF1deivEmnSx81GfcAUZ/FoxilGxm/kwWs=
github.com/go-mysql-org/go-mysql v1.7.1-0.20230619063055-fd67d94318fd/go.mod h1:kOk/pFv3q5EPspyQfDRGLmEA6wfMvIeV4DmThwzkNzs=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-openapi/jsonpointer v0.17.0/go.mod h1:cOnomiV+CVVwFLk0A/MExoFMjwdsUdVpsRhURCKh+3M=
Expand Down Expand Up @@ -697,8 +697,8 @@ github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYs
github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.15.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI=
github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.16.6 h1:91SKEy4K37vkp255cJ8QesJhjyRO0hn9i9G0GoUwLsk=
github.com/klauspost/compress v1.16.6/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid v1.3.1 h1:5JNjFYYQrZeKRJ0734q51WCEEn2huer72Dc7K+R/b6s=
github.com/klauspost/cpuid v1.3.1/go.mod h1:bYW4mA6ZgKPob1/Dlai2LviZJO7KGI3uoWLd42rAQw4=
Expand Down Expand Up @@ -917,7 +917,6 @@ github.com/pingcap/tidb-tools v7.0.0+incompatible h1:CHjAva2ON13HZAB0HRNI69fC/1A
github.com/pingcap/tidb-tools v7.0.0+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e/go.mod h1:e1MGCA9Sg3T8jid8PKAEq5eYVuMMCq4n8gJ+Kqp4Plg=
github.com/pingcap/tidb/parser v0.0.0-20220511160835-98c31070d958/go.mod h1:ElJiub4lRy6UZDb+0JHDkGEdr6aOli+ykhyej7VCLoI=
github.com/pingcap/tidb/parser v0.0.0-20221126021158-6b02a5d8ba7d/go.mod h1:ElJiub4lRy6UZDb+0JHDkGEdr6aOli+ykhyej7VCLoI=
github.com/pingcap/tidb/parser v0.0.0-20230505070356-cb609bb39c80 h1:HBvVVneDewwoGfkTtuacVCDqMN9D6ciEbCg2L0J2ZkI=
github.com/pingcap/tidb/parser v0.0.0-20230505070356-cb609bb39c80/go.mod h1:cjYHlmExmReRJXGvncysWwYJ1X1OGp9QfENrBFynd8A=
github.com/pingcap/tipb v0.0.0-20220215045658-d12dec7a7609/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
Expand Down Expand Up @@ -1012,7 +1011,6 @@ github.com/shoenig/go-m1cpu v0.1.5 h1:LF57Z/Fpb/WdGLjt2HZilNnmZOxg/q2bSKTQhgbrLr
github.com/shoenig/go-m1cpu v0.1.5/go.mod h1:Wwvst4LR89UxjeFtLRMrpgRiyY4xPsejnVZym39dbAQ=
github.com/shoenig/test v0.6.3 h1:GVXWJFk9PiOjN0KoJ7VrJGH6uLPnqxR7/fe3HUPfE0c=
github.com/shoenig/test v0.6.3/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shopspring/decimal v1.3.0 h1:KK3gWIXskZ2O1U/JNTisNcvH+jveJxZYrjbTsrbbnh8=
github.com/shopspring/decimal v1.3.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 h1:bUGsEnyNbVPw06Bs80sCeARAlK8lhwqGyi6UT8ymuGk=
Expand Down

0 comments on commit 8d202f4

Please sign in to comment.