Skip to content

Commit

Permalink
This is an automated cherry-pick of #9549
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
asddongmen authored and ti-chi-bot committed Aug 14, 2023
1 parent db5fb4d commit bec3840
Show file tree
Hide file tree
Showing 9 changed files with 289 additions and 112 deletions.
37 changes: 32 additions & 5 deletions cdc/owner/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,20 +246,47 @@ func (s *schemaWrap4Owner) BuildDDLEvents(
return s.filterDDLEvents(ddlEvents)
}

// TODO: delete this function after integration test passed.
func (s *schemaWrap4Owner) filterDDLEvents(ddlEvents []*model.DDLEvent) ([]*model.DDLEvent, error) {
res := make([]*model.DDLEvent, 0, len(ddlEvents))
for _, event := range ddlEvents {
ignored, err := s.filter.ShouldIgnoreDDLEvent(event)
if err != nil {
return nil, errors.Trace(err)
var (
ignored bool
err error
)
if event.Type == timodel.ActionRenameTable {
ignored, err = s.filter.ShouldDiscardDDL(
event.StartTs,
event.Type,
event.PreTableInfo.TableName.Schema,
event.PreTableInfo.TableName.Table,
event.Query)
if err != nil {
return nil, errors.Trace(err)
}
} else {
ignored, err = s.filter.ShouldDiscardDDL(
event.StartTs,
event.Type,
event.TableInfo.TableName.Schema,
event.TableInfo.TableName.Table,
event.Query)
if err != nil {
return nil, errors.Trace(err)
}
}
if ignored {
s.metricIgnoreDDLEventCounter.Inc()
log.Info(
"DDL event ignored",
log.Error(
"ignored DDL event should not be sent to owner"+
"please report a bug to TiCDC if you see this log"+
"but it is no harm to your replication",
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID),
zap.String("query", event.Query),
zap.String("type", event.Type.String()),
zap.String("schema", event.TableInfo.TableName.Schema),
zap.String("table", event.TableInfo.TableName.Table),
zap.Uint64("startTs", event.StartTs),
zap.Uint64("commitTs", event.CommitTs),
)
Expand Down
147 changes: 145 additions & 2 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,66 @@ type ddlJobPullerImpl struct {

// Run starts the DDLJobPuller.
func (p *ddlJobPullerImpl) Run(ctx context.Context, _ ...chan<- error) error {
<<<<<<< HEAD

Check failure on line 86 in cdc/puller/ddl_puller.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected <<, expected }

Check failure on line 86 in cdc/puller/ddl_puller.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected <<, expected }
=======
if p.multiplexing {
return p.runMultiplexing(ctx)
}
return p.run(ctx)
}

func (p *ddlJobPullerImpl) handleRawKVEntry(ctx context.Context, ddlRawKV *model.RawKVEntry) error {
if ddlRawKV == nil {
return nil
}

if ddlRawKV.OpType == model.OpTypeResolved {
// Only nil in unit test case.
if p.schemaStorage != nil {
p.schemaStorage.AdvanceResolvedTs(ddlRawKV.CRTs)
}
if ddlRawKV.CRTs > p.getResolvedTs() {
p.setResolvedTs(ddlRawKV.CRTs)
}
}

job, err := p.unmarshalDDL(ddlRawKV)
if err != nil {
return errors.Trace(err)
}

if job != nil {
skip, err := p.handleJob(job)
if err != nil {
return cerror.WrapError(cerror.ErrHandleDDLFailed,
err, job.String(), job.Query, job.StartTS, job.StartTS)
}
log.Info("handle ddl job",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.String("query", job.Query),
zap.Stringer("job", job), zap.Bool("skip", skip))
if skip {
return nil
}
}

jobEntry := &model.DDLJobEntry{
Job: job,
OpType: ddlRawKV.OpType,
CRTs: ddlRawKV.CRTs,
Err: err,
}
select {
case <-ctx.Done():
return ctx.Err()
case p.outputCh <- jobEntry:
}
return nil
}

func (p *ddlJobPullerImpl) run(ctx context.Context) error {
>>>>>>> aab72b1b92 (filter (ticdc): simplify filter interface and apply event filter rule in ddlPuller (#9549))

Check failure on line 145 in cdc/puller/ddl_puller.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected >>, expected }

Check failure on line 145 in cdc/puller/ddl_puller.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid character U+0023 '#'

Check failure on line 145 in cdc/puller/ddl_puller.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected >>, expected }

Check failure on line 145 in cdc/puller/ddl_puller.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid character U+0023 '#'
eg, ctx := errgroup.WithContext(ctx)

eg.Go(func() error {

Check failure on line 148 in cdc/puller/ddl_puller.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

method has no receiver

Check failure on line 148 in cdc/puller/ddl_puller.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected {, expected (

Check failure on line 148 in cdc/puller/ddl_puller.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

method has no receiver

Check failure on line 148 in cdc/puller/ddl_puller.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected {, expected (
Expand Down Expand Up @@ -253,14 +313,43 @@ func (p *ddlJobPullerImpl) handleRenameTables(job *timodel.Job) (skip bool, err
for i, tableInfo := range multiTableInfos {
schema, ok := snap.SchemaByID(newSchemaIDs[i])
if !ok {
<<<<<<< HEAD
return true, cerror.ErrSnapshotSchemaNotFound.GenWithStackByArgs(newSchemaIDs[i])
=======
shouldDiscardOldTable = true
} else {
shouldDiscardOldTable, err = p.filter.ShouldDiscardDDL(job.StartTS,
job.Type, oldSchemaNames[i].O, oldTable.Name.O, job.Query)
if err != nil {
return true, errors.Trace(err)
}
>>>>>>> aab72b1b92 (filter (ticdc): simplify filter interface and apply event filter rule in ddlPuller (#9549))
}
table, ok := snap.PhysicalTableByID(tableInfo.ID)
if !ok {
<<<<<<< HEAD
// if a table is not found and its new name is in filter rule, return error.
if !p.filter.ShouldDiscardDDL(job.Type, schema.Name.O, newTableNames[i].O) {
return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(tableInfo.ID, job.Query)
}
=======
// the new table name does not hit the filter rule, so we should discard the table.
shouldDiscardNewTable = true
} else {
shouldDiscardNewTable, err = p.filter.ShouldDiscardDDL(job.StartTS,
job.Type, newSchemaName.Name.O, newTableNames[i].O, job.Query)
if err != nil {
return true, errors.Trace(err)
}
}

if shouldDiscardOldTable && shouldDiscardNewTable {
// skip a rename table ddl only when its old table name and new table name are both filtered.
log.Info("RenameTables is filtered",
zap.Int64("tableID", tableInfo.ID),
zap.String("schema", oldSchemaNames[i].O),
zap.String("query", job.Query))
>>>>>>> aab72b1b92 (filter (ticdc): simplify filter interface and apply event filter rule in ddlPuller (#9549))
continue
}
// we skip a rename table ddl only when its old table name and new table name are both filtered.
Expand Down Expand Up @@ -331,6 +420,7 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
if p.schemaStorage == nil {
return false, nil
}
<<<<<<< HEAD
snap := p.schemaStorage.GetLastSnapshot()
// Do this first to fill the schema name to its origin schema name.
if err := snap.FillSchemaName(job); err != nil {
Expand All @@ -341,6 +431,20 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
}
return true, errors.Trace(err)
}
=======

defer func() {
if skip && err == nil {
log.Info("ddl job schema or table does not match, discard it",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.String("schema", job.SchemaName),
zap.String("table", job.TableName),
zap.String("query", job.Query),
zap.Stringer("job", job))
}
}()
>>>>>>> aab72b1b92 (filter (ticdc): simplify filter interface and apply event filter rule in ddlPuller (#9549))

if job.BinlogInfo.FinishedTS <= p.getResolvedTs() ||
job.BinlogInfo.SchemaVersion <= p.schemaVersion {
Expand All @@ -357,6 +461,19 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
return true, nil
}

snap := p.schemaStorage.GetLastSnapshot()
if err := snap.FillSchemaName(job); err != nil {
log.Info("failed to fill schema name for ddl job", zap.Error(err))
discard, fErr := p.filter.
ShouldDiscardDDL(job.StartTS, job.Type, job.SchemaName, job.TableName, job.Query)
if fErr != nil {
return false, errors.Trace(fErr)
}
if discard {
return true, nil
}
}

switch job.Type {
case timodel.ActionRenameTables:
skip, err = p.handleRenameTables(job)
Expand All @@ -367,24 +484,50 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
oldTable, ok := snap.PhysicalTableByID(job.TableID)
if !ok {
// 1. If we can not find the old table, and the new table name is in filter rule, return error.
if !p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O) {
discard, err := p.filter.
ShouldDiscardDDL(job.StartTS, job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O, job.Query)
if err != nil {
return true, errors.Trace(err)
}
if !discard {
return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query)
}
skip = true
} else {
// 2. If we can find the preTableInfo, we filter it by the old table name.
<<<<<<< HEAD
skip = p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, oldTable.Name.O)
=======
skipByOldTableName, err := p.filter.ShouldDiscardDDL(job.StartTS,
job.Type, oldTable.TableName.Schema, oldTable.TableName.Table, job.Query)
if err != nil {
return true, errors.Trace(err)
}
skipByNewTableName, err := p.filter.ShouldDiscardDDL(job.StartTS,
job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O, job.Query)
if err != nil {
return true, errors.Trace(err)
}
>>>>>>> aab72b1b92 (filter (ticdc): simplify filter interface and apply event filter rule in ddlPuller (#9549))
// 3. If its old table name is not in filter rule, and its new table name in filter rule, return error.
if skip && !p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O) {
return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query)
}
if skipByOldTableName && skipByNewTableName {
skip = true
return true, nil
}
}
default:
// nil means it is a schema ddl job, it's no need to fill the table name.
if job.BinlogInfo.TableInfo != nil {
job.TableName = job.BinlogInfo.TableInfo.Name.O
}
skip = p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName)
skip, err = p.filter.
ShouldDiscardDDL(job.StartTS, job.Type, job.SchemaName, job.TableName, job.Query)
if err != nil {
return false, errors.Trace(err)
}
}

if skip {
Expand Down
27 changes: 27 additions & 0 deletions cdc/puller/ddl_puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,18 @@ func TestHandleJob(t *testing.T) {
cfg.Filter.Rules = []string{
"test1.t1",
"test1.t2",
"test1.testStartTs",
}
// test start ts filter
cfg.Filter.IgnoreTxnStartTs = []uint64{1}
// test event filter
cfg.Filter.EventFilters = []*config.EventFilterRule{
{
Matcher: []string{"test1.*"},
IgnoreSQL: []string{"alter table test1.t1 add column c1 int"},
},
}

f, err := filter.NewFilter(cfg, "")
require.NoError(t, err)
ddlJobPullerImpl.filter = f
Expand Down Expand Up @@ -380,6 +391,22 @@ func TestHandleJob(t *testing.T) {
require.NoError(t, err)
require.False(t, skip)

job = helper.DDL2Job("alter table test1.t1 add column c1 int")
skip, err = ddlJobPullerImpl.handleJob(job)
require.NoError(t, err)
require.True(t, skip)

job = helper.DDL2Job("create table test1.testStartTs(id int)")
skip, err = ddlJobPullerImpl.handleJob(job)
require.NoError(t, err)
require.False(t, skip)

job = helper.DDL2Job("alter table test1.testStartTs add column c1 int")
job.StartTS = 1
skip, err = ddlJobPullerImpl.handleJob(job)
require.NoError(t, err)
require.True(t, skip)

job = helper.DDL2Job("create table test1.t2(id int)")
skip, err = ddlJobPullerImpl.handleJob(job)
require.NoError(t, err)
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,11 @@ error = '''
get tikv grpc context failed
'''

["CDC:ErrHandleDDLFailed"]
error = '''
handle ddl failed, job: %s, query: %s, startTs: %d. If you want to skip this DDL and continue with replication, you can manually execute this DDL downstream. Afterwards, add `ignore-txn-start-ts=[%d]` to the changefeed in the filter configuration.
'''

["CDC:ErrIllegalSorterParameter"]
error = '''
illegal parameter for sorter: %s
Expand Down
8 changes: 8 additions & 0 deletions pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,4 +872,12 @@ var (
"invalid replica config, %s",
errors.RFCCodeText("CDC:ErrInvalidReplicaConfig"),
)

ErrHandleDDLFailed = errors.Normalize(
"handle ddl failed, job: %s, query: %s, startTs: %d. "+
"If you want to skip this DDL and continue with replication, "+
"you can manually execute this DDL downstream. Afterwards, "+
"add `ignore-txn-start-ts=[%d]` to the changefeed in the filter configuration.",
errors.RFCCodeText("CDC:ErrHandleDDLFailed"),
)
)
Loading

0 comments on commit bec3840

Please sign in to comment.