Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter (ticdc): simplify filter interface and apply event filter rule in ddlPuller (#9549) #9569

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 32 additions & 5 deletions cdc/owner/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,20 +246,47 @@ func (s *schemaWrap4Owner) BuildDDLEvents(
return s.filterDDLEvents(ddlEvents)
}

// TODO: delete this function after integration test passed.
func (s *schemaWrap4Owner) filterDDLEvents(ddlEvents []*model.DDLEvent) ([]*model.DDLEvent, error) {
res := make([]*model.DDLEvent, 0, len(ddlEvents))
for _, event := range ddlEvents {
ignored, err := s.filter.ShouldIgnoreDDLEvent(event)
if err != nil {
return nil, errors.Trace(err)
var (
ignored bool
err error
)
if event.Type == timodel.ActionRenameTable {
ignored, err = s.filter.ShouldDiscardDDL(
event.StartTs,
event.Type,
event.PreTableInfo.TableName.Schema,
event.PreTableInfo.TableName.Table,
event.Query)
if err != nil {
return nil, errors.Trace(err)
}
} else {
ignored, err = s.filter.ShouldDiscardDDL(
event.StartTs,
event.Type,
event.TableInfo.TableName.Schema,
event.TableInfo.TableName.Table,
event.Query)
if err != nil {
return nil, errors.Trace(err)
}
}
if ignored {
s.metricIgnoreDDLEventCounter.Inc()
log.Info(
"DDL event ignored",
log.Error(
"ignored DDL event should not be sent to owner"+
"please report a bug to TiCDC if you see this log"+
"but it is no harm to your replication",
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID),
zap.String("query", event.Query),
zap.String("type", event.Type.String()),
zap.String("schema", event.TableInfo.TableName.Schema),
zap.String("table", event.TableInfo.TableName.Table),
zap.Uint64("startTs", event.StartTs),
zap.Uint64("commitTs", event.CommitTs),
)
Expand Down
81 changes: 57 additions & 24 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ func (p *ddlJobPullerImpl) Run(ctx context.Context, _ ...chan<- error) error {
if job != nil {
skip, err := p.handleJob(job)
if err != nil {
return errors.Trace(err)
return cerror.WrapError(cerror.ErrHandleDDLFailed,
err, job.String(), job.Query, job.StartTS, job.StartTS)
}
log.Info("handle ddl job",
zap.String("namespace", p.changefeedID.Namespace),
Expand Down Expand Up @@ -260,15 +261,23 @@ func (p *ddlJobPullerImpl) handleRenameTables(job *timodel.Job) (skip bool, err
if !ok {
shouldDiscardOldTable = true
} else {
shouldDiscardOldTable = p.filter.ShouldDiscardDDL(job.Type, oldSchemaNames[i].O, oldTable.Name.O)
shouldDiscardOldTable, err = p.filter.ShouldDiscardDDL(job.StartTS,
job.Type, oldSchemaNames[i].O, oldTable.Name.O, job.Query)
if err != nil {
return true, errors.Trace(err)
}
}

newSchemaName, ok := snap.SchemaByID(newSchemaIDs[i])
if !ok {
// the new table name does not hit the filter rule, so we should discard the table.
shouldDiscardNewTable = true
} else {
shouldDiscardNewTable = p.filter.ShouldDiscardDDL(job.Type, newSchemaName.Name.O, newTableNames[i].O)
shouldDiscardNewTable, err = p.filter.ShouldDiscardDDL(job.StartTS,
job.Type, newSchemaName.Name.O, newTableNames[i].O, job.Query)
if err != nil {
return true, errors.Trace(err)
}
}

if shouldDiscardOldTable && shouldDiscardNewTable {
Expand Down Expand Up @@ -348,32 +357,20 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
zap.String("schema", job.SchemaName),
zap.String("table", job.TableName),
zap.String("query", job.Query),
zap.String("job", job.String()))
zap.Stringer("job", job))
}
if err != nil {
log.Info("handle ddl job failed",
log.Warn("handle ddl job failed",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.String("query", job.Query),
zap.String("schema", job.SchemaName),
zap.String("table", job.BinlogInfo.TableInfo.Name.O),
zap.String("job", job.String()),
zap.String("table", job.TableName),
zap.String("query", job.Query),
zap.Stringer("job", job),
zap.Error(err))
}
}()

snap := p.schemaStorage.GetLastSnapshot()
// Do this first to fill the schema name to its origin schema name.
if err := snap.FillSchemaName(job); err != nil {
log.Info("failed to fill schema name for ddl job", zap.Error(err))
// If we can't find a job's schema, check if it's been filtered.
if p.filter.ShouldIgnoreTable(job.SchemaName, job.TableName) ||
p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName) {
return true, nil
}
return true, errors.Trace(err)
}

if job.BinlogInfo.FinishedTS <= p.getResolvedTs() ||
job.BinlogInfo.SchemaVersion <= p.schemaVersion {
log.Info("ddl job finishedTs less than puller resolvedTs,"+
Expand All @@ -389,6 +386,20 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
return true, nil
}

snap := p.schemaStorage.GetLastSnapshot()
if err := snap.FillSchemaName(job); err != nil {
log.Info("failed to fill schema name for ddl job", zap.Error(err))
discard, fErr := p.filter.
ShouldDiscardDDL(job.StartTS, job.Type, job.SchemaName, job.TableName, job.Query)
if fErr != nil {
return false, errors.Trace(fErr)
}
if discard {
return true, nil
}
return true, errors.Trace(err)
}

switch job.Type {
case timodel.ActionRenameTables:
skip, err = p.handleRenameTables(job)
Expand All @@ -406,28 +417,50 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
oldTable, ok := snap.PhysicalTableByID(job.TableID)
if !ok {
// 1. If we can not find the old table, and the new table name is in filter rule, return error.
if !p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O) {
discard, err := p.filter.
ShouldDiscardDDL(job.StartTS, job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O, job.Query)
if err != nil {
return true, errors.Trace(err)
}
if !discard {
return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query)
}
skip = true
} else {
log.Info("rename table ddl job",
zap.String("oldTableName", oldTable.TableName.Table),
zap.String("oldSchemaName", oldTable.TableName.Schema))
// since we can find the old table, we must can find the old schema.
// 2. If we can find the preTableInfo, we filter it by the old table name.
skipByOldTableName := p.filter.ShouldDiscardDDL(job.Type, oldTable.TableName.Schema, oldTable.TableName.Table)
skipByNewTableName := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O)
skipByOldTableName, err := p.filter.ShouldDiscardDDL(job.StartTS,
job.Type, oldTable.TableName.Schema, oldTable.TableName.Table, job.Query)
if err != nil {
return true, errors.Trace(err)
}
skipByNewTableName, err := p.filter.ShouldDiscardDDL(job.StartTS,
job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O, job.Query)
if err != nil {
return true, errors.Trace(err)
}
// 3. If its old table name is not in filter rule, and its new table name in filter rule, return error.
if skipByOldTableName && !skipByNewTableName {
return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query)
}
if skipByOldTableName && skipByNewTableName {
skip = true
return true, nil
}
}
default:
// nil means it is a schema ddl job, it's no need to fill the table name.
if job.BinlogInfo.TableInfo != nil {
job.TableName = job.BinlogInfo.TableInfo.Name.O
}
skip = p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName)
skip, err = p.filter.
ShouldDiscardDDL(job.StartTS, job.Type, job.SchemaName, job.TableName, job.Query)
if err != nil {
return false, errors.Trace(err)
}
}

if skip {
Expand Down
27 changes: 27 additions & 0 deletions cdc/puller/ddl_puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,18 @@ func TestHandleJob(t *testing.T) {
cfg.Filter.Rules = []string{
"test1.t1",
"test1.t2",
"test1.testStartTs",
}
// test start ts filter
cfg.Filter.IgnoreTxnStartTs = []uint64{1}
// test event filter
cfg.Filter.EventFilters = []*config.EventFilterRule{
{
Matcher: []string{"test1.*"},
IgnoreSQL: []string{"alter table test1.t1 add column c1 int"},
},
}

f, err := filter.NewFilter(cfg, "")
require.NoError(t, err)
ddlJobPullerImpl.filter = f
Expand Down Expand Up @@ -423,6 +434,22 @@ func TestHandleJob(t *testing.T) {
require.NoError(t, err)
require.False(t, skip)

job = helper.DDL2Job("alter table test1.t1 add column c1 int")
skip, err = ddlJobPullerImpl.handleJob(job)
require.NoError(t, err)
require.True(t, skip)

job = helper.DDL2Job("create table test1.testStartTs(id int)")
skip, err = ddlJobPullerImpl.handleJob(job)
require.NoError(t, err)
require.False(t, skip)

job = helper.DDL2Job("alter table test1.testStartTs add column c1 int")
job.StartTS = 1
skip, err = ddlJobPullerImpl.handleJob(job)
require.NoError(t, err)
require.True(t, skip)

job = helper.DDL2Job("create table test1.t2(id int)")
skip, err = ddlJobPullerImpl.handleJob(job)
require.NoError(t, err)
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,11 @@ error = '''
get tikv grpc context failed
'''

["CDC:ErrHandleDDLFailed"]
error = '''
handle ddl failed, job: %s, query: %s, startTs: %d. If you want to skip this DDL and continue with replication, you can manually execute this DDL downstream. Afterwards, add `ignore-txn-start-ts=[%d]` to the changefeed in the filter configuration.
'''

["CDC:ErrIllegalSorterParameter"]
error = '''
illegal parameter for sorter: %s
Expand Down
8 changes: 8 additions & 0 deletions pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,4 +872,12 @@ var (
"invalid replica config, %s",
errors.RFCCodeText("CDC:ErrInvalidReplicaConfig"),
)

ErrHandleDDLFailed = errors.Normalize(
"handle ddl failed, job: %s, query: %s, startTs: %d. "+
"If you want to skip this DDL and continue with replication, "+
"you can manually execute this DDL downstream. Afterwards, "+
"add `ignore-txn-start-ts=[%d]` to the changefeed in the filter configuration.",
errors.RFCCodeText("CDC:ErrHandleDDLFailed"),
)
)
61 changes: 20 additions & 41 deletions pkg/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,10 @@ var allowDDLList = []timodel.ActionType{
type Filter interface {
// ShouldIgnoreDMLEvent returns true and nil if the DML event should be ignored.
ShouldIgnoreDMLEvent(dml *model.RowChangedEvent, rawRow model.RowChangedDatums, tableInfo *model.TableInfo) (bool, error)
// ShouldIgnoreDDLEvent returns true and nil if the DDL event should be ignored.
// If a ddl is ignored, it will applied to cdc's schema storage,
// but not sent to downstream.
ShouldIgnoreDDLEvent(ddl *model.DDLEvent) (bool, error)
// ShouldDiscardDDL returns true if this DDL should be discarded.
// If a ddl is discarded, it will neither be applied to cdc's schema storage
// nor sent to downstream.
ShouldDiscardDDL(ddlType timodel.ActionType, schema, table string) bool
ShouldDiscardDDL(startTs uint64, ddlType timodel.ActionType, schema, table, query string) (bool, error)
// ShouldIgnoreTable returns true if the table should be ignored.
ShouldIgnoreTable(schema, table string) bool
// ShouldIgnoreSchema returns true if the schema should be ignored.
Expand Down Expand Up @@ -148,46 +144,16 @@ func (f *filter) ShouldIgnoreDMLEvent(
return f.dmlExprFilter.shouldSkipDML(dml, rawRow, ti)
}

// ShouldIgnoreDDLEvent checks if a DDL Event should be ignore by conditions below:
// 0. By startTs.
// 1. By schema name.
// 2. By table name.
// 3. By type.
// 4. By query.
func (f *filter) ShouldIgnoreDDLEvent(ddl *model.DDLEvent) (bool, error) {
if f.shouldIgnoreStartTs(ddl.StartTs) {
return true, nil
}

var shouldIgnoreTableOrSchema bool
switch ddl.Type {
case timodel.ActionCreateSchema, timodel.ActionDropSchema,
timodel.ActionModifySchemaCharsetAndCollate:
shouldIgnoreTableOrSchema = !f.tableFilter.MatchSchema(ddl.TableInfo.TableName.Schema)
case timodel.ActionRenameTable:
shouldIgnoreTableOrSchema = f.ShouldIgnoreTable(ddl.PreTableInfo.TableName.Schema, ddl.PreTableInfo.TableName.Table)
default:
shouldIgnoreTableOrSchema = f.ShouldIgnoreTable(ddl.TableInfo.TableName.Schema, ddl.TableInfo.TableName.Table)
}
if shouldIgnoreTableOrSchema {
return true, nil
}
return f.sqlEventFilter.shouldSkipDDL(ddl)
}

// ShouldDiscardDDL returns true if this DDL should be discarded.
// If a ddl is discarded, it will not be applied to cdc's schema storage
// and sent to downstream.
func (f *filter) ShouldDiscardDDL(ddlType timodel.ActionType, schema, table string) (discard bool) {
discard = true

for _, actionType := range allowDDLList {
if ddlType == actionType {
discard = false
break
}
func (f *filter) ShouldDiscardDDL(startTs uint64, ddlType timodel.ActionType, schema, table, query string) (discard bool, err error) {
discard = !isAllowedDDL(ddlType)
if discard {
return
}

discard = f.shouldIgnoreStartTs(startTs)
if discard {
return
}
Expand All @@ -199,7 +165,11 @@ func (f *filter) ShouldDiscardDDL(ddlType timodel.ActionType, schema, table stri
default:
discard = f.ShouldIgnoreTable(schema, table)
}
return
if discard {
return
}

return f.sqlEventFilter.shouldSkipDDL(ddlType, schema, table, query)
}

// ShouldIgnoreTable returns true if the specified table should be ignored by this changefeed.
Expand Down Expand Up @@ -228,3 +198,12 @@ func (f *filter) shouldIgnoreStartTs(ts uint64) bool {
}
return false
}

func isAllowedDDL(actionType timodel.ActionType) bool {
for _, action := range allowDDLList {
if actionType == action {
return true
}
}
return false
}
Loading
Loading