Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter: using startts to filter txn #589

Merged
merged 1 commit into from
May 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ func UnmarshalDDL(raw *model.RawKVEntry) (*timodel.Job, error) {
}
// FinishedTS is only set when the job is synced,
// but we can use the entry's ts here
job.StartTS = raw.StartTs
job.BinlogInfo.FinishedTS = raw.CRTs
return job, nil
}
Expand Down
14 changes: 8 additions & 6 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,12 @@ type Column struct {

// DDLEvent represents a DDL event
type DDLEvent struct {
Ts uint64
Schema string
Table string
Query string
Type model.ActionType
StartTs uint64
CommitTs uint64
Schema string
Table string
Query string
Type model.ActionType
}

// FromJob fills the values of DDLEvent from DDL job
Expand All @@ -77,7 +78,8 @@ func (e *DDLEvent) FromJob(job *model.Job) {
if job.BinlogInfo.TableInfo != nil {
tableName = job.BinlogInfo.TableInfo.Name.O
}
e.Ts = job.BinlogInfo.FinishedTS
e.StartTs = job.StartTS
e.CommitTs = job.BinlogInfo.FinishedTS
e.Query = job.Query
e.Schema = job.SchemaName
e.Table = tableName
Expand Down
8 changes: 6 additions & 2 deletions cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ func rowEventToMqMessage(e *model.RowChangedEvent) (*mqMessageKey, *mqMessageRow

func mqMessageToRowEvent(key *mqMessageKey, value *mqMessageRow) *model.RowChangedEvent {
e := new(model.RowChangedEvent)
// TODO: we lost the startTs from kafka message
// startTs-based txn filter is out of work
e.CommitTs = key.Ts
e.Table = &model.TableName{
Schema: key.Schema,
Expand All @@ -153,7 +155,7 @@ func mqMessageToRowEvent(key *mqMessageKey, value *mqMessageRow) *model.RowChang

func ddlEventtoMqMessage(e *model.DDLEvent) (*mqMessageKey, *mqMessageDDL) {
key := &mqMessageKey{
Ts: e.Ts,
Ts: e.CommitTs,
Schema: e.Schema,
Table: e.Table,
Type: model.MqMessageTypeDDL,
Expand All @@ -167,7 +169,9 @@ func ddlEventtoMqMessage(e *model.DDLEvent) (*mqMessageKey, *mqMessageDDL) {

func mqMessageToDDLEvent(key *mqMessageKey, value *mqMessageDDL) *model.DDLEvent {
e := new(model.DDLEvent)
e.Ts = key.Ts
// TODO: we lost the startTs from kafka message
// startTs-based txn filter is out of work
e.CommitTs = key.Ts
e.Table = key.Table
e.Schema = key.Schema
e.Type = value.Type
Expand Down
40 changes: 20 additions & 20 deletions cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,29 +53,29 @@ var _ = check.Suite(&batchSuite{
Columns: map[string]*model.Column{"col1": {Type: 1, Value: "bb"}},
}}, {}},
ddlCases: [][]*model.DDLEvent{{{
Ts: 1,
Schema: "a",
Table: "b",
Query: "create table a",
Type: 1,
CommitTs: 1,
Schema: "a",
Table: "b",
Query: "create table a",
Type: 1,
}}, {{
Ts: 1,
Schema: "a",
Table: "b",
Query: "create table a",
Type: 1,
CommitTs: 1,
Schema: "a",
Table: "b",
Query: "create table a",
Type: 1,
}, {
Ts: 2,
Schema: "a",
Table: "b",
Query: "create table b",
Type: 2,
CommitTs: 2,
Schema: "a",
Table: "b",
Query: "create table b",
Type: 2,
}, {
Ts: 3,
Schema: "a",
Table: "b",
Query: "create table c",
Type: 3,
CommitTs: 3,
Schema: "a",
Table: "b",
Query: "create table c",
Type: 3,
}}, {}},
resolvedTsCases: [][]uint64{{1}, {1, 2, 3}, {}},
})
Expand Down
7 changes: 4 additions & 3 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func newMqSink(ctx context.Context, mqProducer mqProducer.Producer, filter *filt

func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
for _, row := range rows {
if k.filter.ShouldIgnoreDMLEvent(row.CommitTs, row.Table.Schema, row.Table.Table) {
if k.filter.ShouldIgnoreDMLEvent(row.StartTs, row.Table.Schema, row.Table.Table) {
log.Info("Row changed event ignored", zap.Uint64("ts", row.CommitTs))
continue
}
Expand Down Expand Up @@ -166,11 +166,12 @@ func (k *mqSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
}

func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
if k.filter.ShouldIgnoreDDLEvent(ddl.Ts, ddl.Schema, ddl.Table) {
if k.filter.ShouldIgnoreDDLEvent(ddl.StartTs, ddl.Schema, ddl.Table) {
log.Info(
"DDL event ignored",
zap.String("query", ddl.Query),
zap.Uint64("ts", ddl.Ts),
zap.Uint64("startTs", ddl.StartTs),
zap.Uint64("commitTs", ddl.CommitTs),
)
return nil
}
Expand Down
7 changes: 4 additions & 3 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (s *mysqlSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.Row
s.unresolvedRowsMu.Lock()
defer s.unresolvedRowsMu.Unlock()
for _, row := range rows {
if s.filter.ShouldIgnoreDMLEvent(row.CommitTs, row.Table.Schema, row.Table.Table) {
if s.filter.ShouldIgnoreDMLEvent(row.StartTs, row.Table.Schema, row.Table.Table) {
log.Info("Row changed event ignored", zap.Uint64("ts", row.CommitTs))
continue
}
Expand Down Expand Up @@ -122,11 +122,12 @@ func (s *mysqlSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
}

func (s *mysqlSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
if s.filter.ShouldIgnoreDDLEvent(ddl.Ts, ddl.Schema, ddl.Table) {
if s.filter.ShouldIgnoreDDLEvent(ddl.StartTs, ddl.Schema, ddl.Table) {
log.Info(
"DDL event ignored",
zap.String("query", ddl.Query),
zap.Uint64("ts", ddl.Ts),
zap.Uint64("startTs", ddl.StartTs),
zap.Uint64("commitTs", ddl.CommitTs),
)
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/changefeed.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
case-sensitive = true

[filter]
# 忽略哪些 CommitTS 的事务
# Transactions with the following CommitTS will be ignored
ignore-txn-commit-ts = [1, 2]
# 忽略哪些 StartTs 的事务
# Transactions with the following StartTs will be ignored
ignore-txn-start-ts = [1, 2]

# 同步哪些库
# The following databases(schema) will be replicated
Expand Down
14 changes: 7 additions & 7 deletions cmd/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (s *decodeFileSuite) TestCanDecodeTOML(c *check.C) {
case-sensitive = false

[filter]
ignore-txn-commit-ts = [1, 2]
ignore-txn-start-ts = [1, 2]
ddl-white-list = [1, 2]
ignore-dbs = ["test", "sys"]
do-dbs = ["test1", "sys1"]
Expand Down Expand Up @@ -77,8 +77,8 @@ sync-ddl = true

c.Assert(cfg.CaseSensitive, check.IsFalse)
c.Assert(cfg.Filter, check.DeepEquals, &config.FilterConfig{
IgnoreTxnCommitTs: []uint64{1, 2},
DDLWhitelist: []model.ActionType{1, 2},
IgnoreTxnStartTs: []uint64{1, 2},
DDLWhitelist: []model.ActionType{1, 2},
Rules: &filter.Rules{
IgnoreDBs: []string{"test", "sys"},
DoDBs: []string{"test1", "sys1"},
Expand Down Expand Up @@ -120,9 +120,9 @@ func (s *decodeFileSuite) TestAndWriteExampleTOML(c *check.C) {
case-sensitive = true

[filter]
# 忽略哪些 CommitTS 的事务
# Transactions with the following CommitTS will be ignored
ignore-txn-commit-ts = [1, 2]
# 忽略哪些 StartTs 的事务
# Transactions with the following StartTs will be ignored
ignore-txn-start-ts = [1, 2]

# 同步哪些库
# The following databases(schema) will be replicated
Expand Down Expand Up @@ -184,7 +184,7 @@ sync-ddl = true

c.Assert(cfg.CaseSensitive, check.IsTrue)
c.Assert(cfg.Filter, check.DeepEquals, &config.FilterConfig{
IgnoreTxnCommitTs: []uint64{1, 2},
IgnoreTxnStartTs: []uint64{1, 2},
Rules: &filter.Rules{
IgnoreDBs: []string{"test", "sys"},
DoDBs: []string{"test1", "sys1"},
Expand Down
16 changes: 8 additions & 8 deletions kafka_consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,16 +405,16 @@ ClaimMessages:
func (c *Consumer) appendDDL(ddl *model.DDLEvent) {
c.ddlListMu.Lock()
defer c.ddlListMu.Unlock()
if ddl.Ts <= c.maxDDLReceivedTs {
if ddl.CommitTs <= c.maxDDLReceivedTs {
return
}
globalResolvedTs := atomic.LoadUint64(&c.globalResolvedTs)
if ddl.Ts <= globalResolvedTs {
log.Error("unexpected ddl job", zap.Uint64("ddlts", ddl.Ts), zap.Uint64("globalResolvedTs", globalResolvedTs))
if ddl.CommitTs <= globalResolvedTs {
log.Error("unexpected ddl job", zap.Uint64("ddlts", ddl.CommitTs), zap.Uint64("globalResolvedTs", globalResolvedTs))
return
}
c.ddlList = append(c.ddlList, ddl)
c.maxDDLReceivedTs = ddl.Ts
c.maxDDLReceivedTs = ddl.CommitTs
}

func (c *Consumer) getFrontDDL() *model.DDLEvent {
Expand Down Expand Up @@ -477,13 +477,13 @@ func (c *Consumer) Run(ctx context.Context) error {
return errors.Trace(err)
}
todoDDL := c.getFrontDDL()
if todoDDL != nil && globalResolvedTs >= todoDDL.Ts {
if todoDDL != nil && globalResolvedTs >= todoDDL.CommitTs {
//flush DMLs
err := c.forEachSink(func(sink *struct {
sink.Sink
resolvedTs uint64
}) error {
return sink.FlushRowChangedEvents(ctx, todoDDL.Ts)
return sink.FlushRowChangedEvents(ctx, todoDDL.CommitTs)
})
if err != nil {
return errors.Trace(err)
Expand All @@ -498,8 +498,8 @@ func (c *Consumer) Run(ctx context.Context) error {
continue
}

if todoDDL != nil && todoDDL.Ts < globalResolvedTs {
globalResolvedTs = todoDDL.Ts
if todoDDL != nil && todoDDL.CommitTs < globalResolvedTs {
globalResolvedTs = todoDDL.CommitTs
}
if lastGlobalResolvedTs == globalResolvedTs {
continue
Expand Down
4 changes: 2 additions & 2 deletions pkg/config/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ import (
// FilterConfig represents filter config for a changefeed
type FilterConfig struct {
*filter.Rules
IgnoreTxnCommitTs []uint64 `toml:"ignore-txn-commit-ts" json:"ignore-txn-commit-ts"`
DDLWhitelist []model.ActionType `toml:"ddl-white-list" json:"ddl-white-list"`
IgnoreTxnStartTs []uint64 `toml:"ignore-txn-start-ts" json:"ignore-txn-start-ts"`
DDLWhitelist []model.ActionType `toml:"ddl-white-list" json:"ddl-white-list"`
}
2 changes: 0 additions & 2 deletions pkg/cyclic/mark.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,12 @@ func CreateMarkTable(sourceSchema, sourceTable string) []*model.DDLEvent {
schema, table := MarkTableName(sourceSchema, sourceTable)
events := []*model.DDLEvent{
{
Ts: 0,
Schema: schema,
Table: table,
Query: fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s;", schema),
Type: timodel.ActionCreateSchema,
},
{
Ts: 0,
Schema: schema,
Table: table,
Query: fmt.Sprintf(
Expand Down
20 changes: 10 additions & 10 deletions pkg/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ const OptCyclicConfig string = "_cyclic_relax_sql_mode"

// Filter is a event filter implementation
type Filter struct {
filter *filter.Filter
ignoreTxnCommitTs []uint64
ddlWhitelist []model.ActionType
filter *filter.Filter
ignoreTxnStartTs []uint64
ddlWhitelist []model.ActionType
}

// NewFilter creates a filter
Expand All @@ -39,15 +39,15 @@ func NewFilter(cfg *config.ReplicaConfig) (*Filter, error) {
return nil, err
}
return &Filter{
filter: filter,
ignoreTxnCommitTs: cfg.Filter.IgnoreTxnCommitTs,
ddlWhitelist: cfg.Filter.DDLWhitelist,
filter: filter,
ignoreTxnStartTs: cfg.Filter.IgnoreTxnStartTs,
ddlWhitelist: cfg.Filter.DDLWhitelist,
}, nil
}

// ShouldIgnoreTxn returns true is the given txn should be ignored
func (f *Filter) shouldIgnoreCommitTs(ts uint64) bool {
for _, ignoreTs := range f.ignoreTxnCommitTs {
func (f *Filter) shouldIgnoreStartTs(ts uint64) bool {
for _, ignoreTs := range f.ignoreTxnStartTs {
if ignoreTs == ts {
return true
}
Expand All @@ -69,13 +69,13 @@ func (f *Filter) ShouldIgnoreTable(db, tbl string) bool {
// ShouldIgnoreDMLEvent removes DMLs that's not wanted by this change feed.
// CDC only supports filtering by database/table now.
func (f *Filter) ShouldIgnoreDMLEvent(ts uint64, schema, table string) bool {
return f.shouldIgnoreCommitTs(ts) || f.ShouldIgnoreTable(schema, table)
return f.shouldIgnoreStartTs(ts) || f.ShouldIgnoreTable(schema, table)
}

// ShouldIgnoreDDLEvent removes DDLs that's not wanted by this change feed.
// CDC only supports filtering by database/table now.
func (f *Filter) ShouldIgnoreDDLEvent(ts uint64, schema, table string) bool {
return f.shouldIgnoreCommitTs(ts) || f.ShouldIgnoreTable(schema, table)
return f.shouldIgnoreStartTs(ts) || f.ShouldIgnoreTable(schema, table)
}

// ShouldDiscardDDL returns true if this DDL should be discarded
Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *filterSuite) TestShouldUseCustomRules(c *check.C) {
func (s *filterSuite) TestShouldIgnoreTxn(c *check.C) {
filter, err := NewFilter(&config.ReplicaConfig{
Filter: &config.FilterConfig{
IgnoreTxnCommitTs: []uint64{1, 3},
IgnoreTxnStartTs: []uint64{1, 3},
Rules: &filter.Rules{
DoDBs: []string{"sns", "ecom"},
IgnoreTables: []*filter.Table{
Expand Down