Skip to content

Commit

Permalink
expression_filter(dm): fix updateOldExprs and updateNewExprs length n…
Browse files Browse the repository at this point in the history
…ot same (#7779)

close #7774
  • Loading branch information
lance6716 authored Dec 7, 2022
1 parent 6dcdaab commit 387b81a
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 74 deletions.
41 changes: 16 additions & 25 deletions dm/syncer/expr_filter_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ type ExprFilterGroup struct {
updateNewExprs map[string][]expression.Expression // tableName -> expr
deleteExprs map[string][]expression.Expression // tableName -> expr

hasInsertFilter map[string]struct{} // set(tableName)
hasUpdateOldFilter map[string]struct{} // set(tableName)
hasUpdateNewFilter map[string]struct{} // set(tableName)
hasDeleteFilter map[string]struct{} // set(tableName)
hasInsertFilter map[string]struct{} // set(tableName)
hasUpdateFilter map[string]struct{} // set(tableName)
hasDeleteFilter map[string]struct{} // set(tableName)

tidbCtx sessionctx.Context
logCtx *tcontext.Context
Expand All @@ -48,17 +47,16 @@ type ExprFilterGroup struct {
// NewExprFilterGroup creates an ExprFilterGroup.
func NewExprFilterGroup(logCtx *tcontext.Context, tidbCtx sessionctx.Context, exprConfig []*config.ExpressionFilter) *ExprFilterGroup {
ret := &ExprFilterGroup{
configs: map[string][]*config.ExpressionFilter{},
insertExprs: map[string][]expression.Expression{},
updateOldExprs: map[string][]expression.Expression{},
updateNewExprs: map[string][]expression.Expression{},
deleteExprs: map[string][]expression.Expression{},
hasInsertFilter: map[string]struct{}{},
hasUpdateOldFilter: map[string]struct{}{},
hasUpdateNewFilter: map[string]struct{}{},
hasDeleteFilter: map[string]struct{}{},
tidbCtx: tidbCtx,
logCtx: logCtx,
configs: map[string][]*config.ExpressionFilter{},
insertExprs: map[string][]expression.Expression{},
updateOldExprs: map[string][]expression.Expression{},
updateNewExprs: map[string][]expression.Expression{},
deleteExprs: map[string][]expression.Expression{},
hasInsertFilter: map[string]struct{}{},
hasUpdateFilter: map[string]struct{}{},
hasDeleteFilter: map[string]struct{}{},
tidbCtx: tidbCtx,
logCtx: logCtx,
}
for _, c := range exprConfig {
tableName := dbutil.TableName(c.Schema, c.Table)
Expand All @@ -67,11 +65,8 @@ func NewExprFilterGroup(logCtx *tcontext.Context, tidbCtx sessionctx.Context, ex
if c.InsertValueExpr != "" {
ret.hasInsertFilter[tableName] = struct{}{}
}
if c.UpdateOldValueExpr != "" {
ret.hasUpdateOldFilter[tableName] = struct{}{}
}
if c.UpdateNewValueExpr != "" {
ret.hasUpdateNewFilter[tableName] = struct{}{}
if c.UpdateOldValueExpr != "" || c.UpdateNewValueExpr != "" {
ret.hasUpdateFilter[tableName] = struct{}{}
}
if c.DeleteValueExpr != "" {
ret.hasDeleteFilter[tableName] = struct{}{}
Expand Down Expand Up @@ -117,7 +112,7 @@ func (g *ExprFilterGroup) GetUpdateExprs(table *filter.Table, ti *model.TableInf
return retOld, retNew, nil
}

if _, ok := g.hasUpdateOldFilter[tableID]; ok {
if _, ok := g.hasUpdateFilter[tableID]; ok {
for _, c := range g.configs[tableID] {
if c.UpdateOldValueExpr != "" {
expr, err := getSimpleExprOfTable(g.tidbCtx, c.UpdateOldValueExpr, ti, g.logCtx.L())
Expand All @@ -129,11 +124,7 @@ func (g *ExprFilterGroup) GetUpdateExprs(table *filter.Table, ti *model.TableInf
} else {
g.updateOldExprs[tableID] = append(g.updateOldExprs[tableID], expression.NewOne())
}
}
}

if _, ok := g.hasUpdateNewFilter[tableID]; ok {
for _, c := range g.configs[tableID] {
if c.UpdateNewValueExpr != "" {
expr, err := getSimpleExprOfTable(g.tidbCtx, c.UpdateNewValueExpr, ti, g.logCtx.L())
if err != nil {
Expand Down
149 changes: 101 additions & 48 deletions dm/syncer/expr_filter_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@ package syncer

import (
"context"
"testing"

. "github.com/pingcap/check"
ddl2 "github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/util/filter"
"github.com/pingcap/tiflow/dm/config"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/schema"
"github.com/pingcap/tiflow/dm/pkg/utils"
"github.com/pingcap/tiflow/dm/syncer/dbconn"
"github.com/stretchr/testify/require"
)

func (s *testFilterSuite) TestSkipDMLByExpression(c *C) {
func TestSkipDMLByExpression(t *testing.T) {
cases := []struct {
exprStr string
tableStr string
Expand Down Expand Up @@ -91,19 +93,18 @@ create table t (
Name: tblName,
}
)
c.Assert(log.InitLogger(&log.Config{Level: "debug"}), IsNil)
require.NoError(t, log.InitLogger(&log.Config{Level: "debug"}))

dbConn := dbconn.NewDBConn(&config.SubTaskConfig{}, s.baseConn)
for _, ca := range cases {
schemaTracker, err := schema.NewTestTracker(ctx, "unit-test", dbConn, log.L())
c.Assert(err, IsNil)
c.Assert(schemaTracker.CreateSchemaIfNotExists(dbName), IsNil)
schemaTracker, err := schema.NewTestTracker(ctx, "unit-test", nil, log.L())
require.NoError(t, err)
require.NoError(t, schemaTracker.CreateSchemaIfNotExists(dbName))
stmt, err := parseSQL(ca.tableStr)
c.Assert(err, IsNil)
c.Assert(schemaTracker.Exec(ctx, dbName, stmt), IsNil)
require.NoError(t, err)
require.NoError(t, schemaTracker.Exec(ctx, dbName, stmt))

ti, err := schemaTracker.GetTableInfo(table)
c.Assert(err, IsNil)
require.NoError(t, err)

exprConfig := []*config.ExpressionFilter{
{
Expand All @@ -115,26 +116,26 @@ create table t (
sessCtx := utils.NewSessionCtx(map[string]string{"time_zone": "UTC"})
g := NewExprFilterGroup(tcontext.Background(), sessCtx, exprConfig)
exprs, err := g.GetInsertExprs(table, ti)
c.Assert(err, IsNil)
c.Assert(exprs, HasLen, 1)
require.NoError(t, err)
require.Len(t, exprs, 1)
expr := exprs[0]

ca.skippedRow = extractValueFromData(ca.skippedRow, ti.Columns, ti)
ca.passedRow = extractValueFromData(ca.passedRow, ti.Columns, ti)

skip, err := SkipDMLByExpression(sessCtx, ca.skippedRow, expr, ti.Columns)
c.Assert(err, IsNil)
c.Assert(skip, Equals, true)
require.NoError(t, err)
require.True(t, skip)

skip, err = SkipDMLByExpression(sessCtx, ca.passedRow, expr, ti.Columns)
c.Assert(err, IsNil)
c.Assert(skip, Equals, false)
require.NoError(t, err)
require.False(t, skip)

schemaTracker.Close()
}
}

func (s *testFilterSuite) TestAllBinaryProtocolTypes(c *C) {
func TestAllBinaryProtocolTypes(t *testing.T) {
cases := []struct {
exprStr string
tableStr string
Expand Down Expand Up @@ -355,20 +356,19 @@ create table t (
Name: tblName,
}
)
c.Assert(log.InitLogger(&log.Config{Level: "debug"}), IsNil)
require.NoError(t, log.InitLogger(&log.Config{Level: "debug"}))

dbConn := dbconn.NewDBConn(&config.SubTaskConfig{}, s.baseConn)
for _, ca := range cases {
c.Log(ca.tableStr)
schemaTracker, err := schema.NewTestTracker(ctx, "unit-test", dbConn, log.L())
c.Assert(err, IsNil)
c.Assert(schemaTracker.CreateSchemaIfNotExists(dbName), IsNil)
t.Log(ca.tableStr)
schemaTracker, err := schema.NewTestTracker(ctx, "unit-test", nil, log.L())
require.NoError(t, err)
require.NoError(t, schemaTracker.CreateSchemaIfNotExists(dbName))
stmt, err := parseSQL(ca.tableStr)
c.Assert(err, IsNil)
c.Assert(schemaTracker.Exec(ctx, dbName, stmt), IsNil)
require.NoError(t, err)
require.NoError(t, schemaTracker.Exec(ctx, dbName, stmt))

ti, err := schemaTracker.GetTableInfo(table)
c.Assert(err, IsNil)
require.NoError(t, err)

exprConfig := []*config.ExpressionFilter{
{
Expand All @@ -380,26 +380,26 @@ create table t (
sessCtx := utils.NewSessionCtx(map[string]string{"time_zone": "UTC"})
g := NewExprFilterGroup(tcontext.Background(), sessCtx, exprConfig)
exprs, err := g.GetInsertExprs(table, ti)
c.Assert(err, IsNil)
c.Assert(exprs, HasLen, 1)
require.NoError(t, err)
require.Len(t, exprs, 1)
expr := exprs[0]

ca.skippedRow = extractValueFromData(ca.skippedRow, ti.Columns, ti)
ca.passedRow = extractValueFromData(ca.passedRow, ti.Columns, ti)

skip, err := SkipDMLByExpression(sessCtx, ca.skippedRow, expr, ti.Columns)
c.Assert(err, IsNil)
c.Assert(skip, Equals, true)
require.NoError(t, err)
require.True(t, skip)

skip, err = SkipDMLByExpression(sessCtx, ca.passedRow, expr, ti.Columns)
c.Assert(err, IsNil)
c.Assert(skip, Equals, false)
require.NoError(t, err)
require.False(t, skip)

schemaTracker.Close()
}
}

func (s *testFilterSuite) TestExpressionContainsNonExistColumn(c *C) {
func TestExpressionContainsNonExistColumn(t *testing.T) {
var (
ctx = context.Background()
dbName = "test"
Expand All @@ -415,16 +415,15 @@ create table t (
exprStr = "d > 1"
)

dbConn := dbconn.NewDBConn(&config.SubTaskConfig{}, s.baseConn)
schemaTracker, err := schema.NewTestTracker(ctx, "unit-test", dbConn, log.L())
c.Assert(err, IsNil)
c.Assert(schemaTracker.CreateSchemaIfNotExists(dbName), IsNil)
schemaTracker, err := schema.NewTestTracker(ctx, "unit-test", nil, log.L())
require.NoError(t, err)
require.NoError(t, schemaTracker.CreateSchemaIfNotExists(dbName))
stmt, err := parseSQL(tableStr)
c.Assert(err, IsNil)
c.Assert(schemaTracker.Exec(ctx, dbName, stmt), IsNil)
require.NoError(t, err)
require.NoError(t, schemaTracker.Exec(ctx, dbName, stmt))

ti, err := schemaTracker.GetTableInfo(table)
c.Assert(err, IsNil)
require.NoError(t, err)

exprConfig := []*config.ExpressionFilter{
{
Expand All @@ -436,16 +435,70 @@ create table t (
sessCtx := utils.NewSessionCtx(map[string]string{"time_zone": "UTC"})
g := NewExprFilterGroup(tcontext.Background(), sessCtx, exprConfig)
exprs, err := g.GetInsertExprs(table, ti)
c.Assert(err, IsNil)
c.Assert(exprs, HasLen, 1)
require.NoError(t, err)
require.Len(t, exprs, 1)
expr := exprs[0]
c.Assert(expr.String(), Equals, "0")
require.Equal(t, "0", expr.String())

// skip nothing
skip, err := SkipDMLByExpression(sessCtx, []interface{}{0}, expr, ti.Columns)
c.Assert(err, IsNil)
c.Assert(skip, Equals, false)
require.NoError(t, err)
require.False(t, skip)
skip, err = SkipDMLByExpression(sessCtx, []interface{}{2}, expr, ti.Columns)
c.Assert(err, IsNil)
c.Assert(skip, Equals, false)
require.NoError(t, err)
require.False(t, skip)
}

func TestGetUpdateExprsSameLength(t *testing.T) {
var (
dbName = "test"
tblName = "t"
table = &filter.Table{
Schema: dbName,
Name: tblName,
}
tableStr = `
create table t (
c varchar(20)
);`
exprStr = "c > 1"
sessCtx = utils.NewSessionCtx(map[string]string{"time_zone": "UTC"})
)

cases := []*config.ExpressionFilter{
{
Schema: dbName,
Table: tblName,
InsertValueExpr: exprStr,
},
{
Schema: dbName,
Table: tblName,
UpdateOldValueExpr: exprStr,
},
{
Schema: dbName,
Table: tblName,
UpdateNewValueExpr: exprStr,
},
{
Schema: dbName,
Table: tblName,
UpdateOldValueExpr: exprStr,
UpdateNewValueExpr: exprStr,
},
}

stmt, err := parseSQL(tableStr)
require.NoError(t, err)
tableInfo, err := ddl2.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
require.NoError(t, err)

for i, c := range cases {
t.Logf("case #%d", i)
g := NewExprFilterGroup(tcontext.Background(), sessCtx, []*config.ExpressionFilter{c})
oldExprs, newExprs, err := g.GetUpdateExprs(table, tableInfo)
require.NoError(t, err)
require.Equal(t, len(oldExprs), len(newExprs))
}
}
5 changes: 5 additions & 0 deletions dm/tests/expression_filter/conf/dm-task2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mysql-instances:
- "update_new_lt_100"
- "update_old_and_new"
- "only_muller"
- "e02"

expression-filter:
even_c:
Expand Down Expand Up @@ -50,6 +51,10 @@ expression-filter:
schema: "expr_filter"
table: "t6"
insert-value-expr: "name != 'Müller'"
e02:
schema: expr_filter
table: t7
update-new-value-expr: "r = 'a'"

black-white-list: # compatible with deprecated config
instance:
Expand Down
3 changes: 3 additions & 0 deletions dm/tests/expression_filter/data/db1.increment2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,8 @@ update t5 set should_skip = 0, c = 3 where c = 1;
insert into t6 (id, name, msg) values (1, 'Müller', 'Müller'), (2, 'X Æ A-12', 'X Æ A-12');
alter table t6 add column name2 varchar(20) character set latin1 default 'Müller';

-- test https://github.com/pingcap/tiflow/issues/7774
UPDATE t7 SET s = s + 1 WHERE a = 1;

-- trigger a flush
alter table t5 add column dummy int;
2 changes: 2 additions & 0 deletions dm/tests/expression_filter/data/db1.prepare2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ create table t2 (id int primary key,

create table t6 (id int, name varchar(20), msg text, primary key(`id`)) character set latin1;
insert into t6 (id, name, msg) values (0, 'Müller', 'Müller');
CREATE TABLE t7 (a BIGINT PRIMARY KEY, r VARCHAR(10), s INT);
INSERT INTO t7 VALUES (1, 'a', 2);
7 changes: 6 additions & 1 deletion dm/tests/expression_filter/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,15 @@ function complex_behaviour() {
run_sql_tidb "select count(10) from expr_filter.t6 where name != 'Müller'"
check_contains "count(10): 0"

run_sql_tidb "select count(11) from expr_filter.t7 where r = 'a' and s = 2"
check_contains "count(11): 1"
run_sql_tidb "select count(12) from expr_filter.t7 where r = 'a' and s = 3"
check_contains "count(12): 0"

insert_num=$(grep -o '"number of filtered insert"=[0-9]\+' $WORK_DIR/worker1/log/dm-worker.log | grep -o '[0-9]\+' | awk '{n += $1}; END{print n}')
[ $insert_num -eq 6 ]
update_num=$(grep -o '"number of filtered update"=[0-9]\+' $WORK_DIR/worker1/log/dm-worker.log | grep -o '[0-9]\+' | awk '{n += $1}; END{print n}')
[ $update_num -eq 3 ]
[ $update_num -eq 4 ]

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task test"
Expand Down

0 comments on commit 387b81a

Please sign in to comment.