Skip to content

Commit

Permalink
Feat multi undo log builder (apache#301)
Browse files Browse the repository at this point in the history
* feat: add mysql update undo log builder

* add log

* add comment for JDBCType

* add name

* optimize basic builder

* fix type

* fix type

* fix switch

* add update after iamge builder

* format code

* format

* add multi undo log builder

* fix conflict

* fix conflict

* fix conflict

* fix conflict

* fix conflict

* fix conflict

* fix conflict
  • Loading branch information
luky116 authored Oct 15, 2022
1 parent 7d1dfdf commit 4109db9
Show file tree
Hide file tree
Showing 14 changed files with 417 additions and 211 deletions.
66 changes: 37 additions & 29 deletions pkg/datasource/sql/exec/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,25 @@ import (
"database/sql/driver"

"github.com/seata/seata-go/pkg/datasource/sql/parser"

"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/datasource/sql/undo/builder"
"github.com/seata/seata-go/pkg/tm"
"github.com/seata/seata-go/pkg/util/log"
)

func init() {
undo.RegistrUndoLogBuilder(types.UpdateExecutor, builder.GetMySQLUpdateUndoLogBuilder)
undo.RegistrUndoLogBuilder(types.MultiExecutor, builder.GetMySQLMultiUndoLogBuilder)
}

// executorSolts
var executorSolts = make(map[types.DBType]map[parser.ExecutorType]func() SQLExecutor)
var executorSolts = make(map[types.DBType]map[types.ExecutorType]func() SQLExecutor)

func RegisterExecutor(dt types.DBType, et parser.ExecutorType, builder func() SQLExecutor) {
func RegisterExecutor(dt types.DBType, et types.ExecutorType, builder func() SQLExecutor) {
if _, ok := executorSolts[dt]; !ok {
executorSolts[dt] = make(map[parser.ExecutorType]func() SQLExecutor)
executorSolts[dt] = make(map[types.ExecutorType]func() SQLExecutor)
}

val := executorSolts[dt]
Expand Down Expand Up @@ -89,7 +96,7 @@ func BuildExecutor(dbType types.DBType, txType types.TransactionType, query stri
supplier, ok := factories[parseCtx.ExecutorType]
if !ok {
log.Debugf("%s not found executor for %s, return default Executor",
dbType.String(), parseCtx.ExecutorType.String())
dbType.String(), parseCtx.ExecutorType)
e := &BaseExecutor{}
e.interceptors(hooks)
return e, nil
Expand Down Expand Up @@ -117,18 +124,18 @@ func (e *BaseExecutor) ExecWithNamedValue(ctx context.Context, execCtx *types.Ex
}

var (
beforeImage *types.RecordImage
afterImage *types.RecordImage
result types.ExecResult
err error
beforeImages []*types.RecordImage
afterImages []*types.RecordImage
result types.ExecResult
err error
)

beforeImage, err = e.beforeImage(ctx, execCtx)
beforeImages, err = e.beforeImage(ctx, execCtx)
if err != nil {
return nil, err
}
if beforeImage != nil {
execCtx.TxCtx.RoundImages.AppendBeofreImage(beforeImage)
if beforeImages != nil {
execCtx.TxCtx.RoundImages.AppendBeofreImages(beforeImages)
}

defer func() {
Expand All @@ -147,12 +154,12 @@ func (e *BaseExecutor) ExecWithNamedValue(ctx context.Context, execCtx *types.Ex
return nil, err
}

afterImage, err = e.afterImage(ctx, execCtx, beforeImage)
afterImages, err = e.afterImage(ctx, execCtx, beforeImages)
if err != nil {
return nil, err
}
if afterImage != nil {
execCtx.TxCtx.RoundImages.AppendAfterImage(afterImage)
if afterImages != nil {
execCtx.TxCtx.RoundImages.AppendAfterImages(afterImages)
}

return result, err
Expand All @@ -165,18 +172,18 @@ func (e *BaseExecutor) ExecWithValue(ctx context.Context, execCtx *types.ExecCon
}

var (
beforeImage *types.RecordImage
afterImage *types.RecordImage
result types.ExecResult
err error
beforeImages []*types.RecordImage
afterImages []*types.RecordImage
result types.ExecResult
err error
)

beforeImage, err = e.beforeImage(ctx, execCtx)
beforeImages, err = e.beforeImage(ctx, execCtx)
if err != nil {
return nil, err
}
if beforeImage != nil {
execCtx.TxCtx.RoundImages.AppendBeofreImage(beforeImage)
if beforeImages != nil {
execCtx.TxCtx.RoundImages.AppendBeofreImages(beforeImages)
}

defer func() {
Expand All @@ -194,18 +201,18 @@ func (e *BaseExecutor) ExecWithValue(ctx context.Context, execCtx *types.ExecCon
return nil, err
}

afterImage, err = e.afterImage(ctx, execCtx, beforeImage)
afterImages, err = e.afterImage(ctx, execCtx, beforeImages)
if err != nil {
return nil, err
}
if afterImage != nil {
execCtx.TxCtx.RoundImages.AppendAfterImage(afterImage)
if afterImages != nil {
execCtx.TxCtx.RoundImages.AppendAfterImages(afterImages)
}

return result, err
}

func (h *BaseExecutor) beforeImage(ctx context.Context, execCtx *types.ExecContext) (*types.RecordImage, error) {
func (h *BaseExecutor) beforeImage(ctx context.Context, execCtx *types.ExecContext) ([]*types.RecordImage, error) {
if !tm.IsTransactionOpened(ctx) {
return nil, nil
}
Expand All @@ -218,14 +225,15 @@ func (h *BaseExecutor) beforeImage(ctx context.Context, execCtx *types.ExecConte
return nil, nil
}

builder := undo.GetUndologBuilder(pc.SQLType)
builder := undo.GetUndologBuilder(pc.ExecutorType)
if builder == nil {
return nil, nil
}
return builder.BeforeImage(ctx, execCtx)
}

func (h *BaseExecutor) afterImage(ctx context.Context, execCtx *types.ExecContext, beforeImage *types.RecordImage) (*types.RecordImage, error) {
// After
func (h *BaseExecutor) afterImage(ctx context.Context, execCtx *types.ExecContext, beforeImages []*types.RecordImage) ([]*types.RecordImage, error) {
if !tm.IsTransactionOpened(ctx) {
return nil, nil
}
Expand All @@ -236,9 +244,9 @@ func (h *BaseExecutor) afterImage(ctx context.Context, execCtx *types.ExecContex
if !pc.HasValidStmt() {
return nil, nil
}
builder := undo.GetUndologBuilder(pc.SQLType)
builder := undo.GetUndologBuilder(pc.ExecutorType)
if builder == nil {
return nil, nil
}
return builder.AfterImage(ctx, execCtx, beforeImage)
return builder.AfterImage(ctx, execCtx, beforeImages)
}
46 changes: 0 additions & 46 deletions pkg/datasource/sql/parser/executortype_string.go

This file was deleted.

68 changes: 34 additions & 34 deletions pkg/datasource/sql/parser/parse_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,47 +29,47 @@ func TestDoParser(t *testing.T) {
type tt struct {
sql string
sqlType types.SQLType
types ExecutorType
types types.ExecutorType
}

for _, t2 := range [...]tt{
// replace
{sql: "REPLACE INTO foo VALUES (1 || 2)", types: ReplaceIntoExecutor, sqlType: types.SQLTypeInsert},
{sql: "REPLACE INTO foo VALUES (1 | 2)", types: ReplaceIntoExecutor, sqlType: types.SQLTypeInsert},
{sql: "REPLACE INTO foo VALUES (false || true)", types: ReplaceIntoExecutor, sqlType: types.SQLTypeInsert},
{sql: "REPLACE INTO foo VALUES (bar(5678))", types: ReplaceIntoExecutor, sqlType: types.SQLTypeInsert},
{sql: "REPLACE INTO foo VALUES ()", types: ReplaceIntoExecutor, sqlType: types.SQLTypeInsert},
{sql: "REPLACE INTO foo (a,b) VALUES (42,314)", types: ReplaceIntoExecutor, sqlType: types.SQLTypeInsert},
{sql: "REPLACE INTO foo () VALUES ()", types: ReplaceIntoExecutor, sqlType: types.SQLTypeInsert},
{sql: "REPLACE INTO foo VALUE ()", types: ReplaceIntoExecutor, sqlType: types.SQLTypeInsert},
{sql: "REPLACE INTO ta TABLE tb", types: ReplaceIntoExecutor, sqlType: types.SQLTypeInsert},
{sql: "REPLACE INTO t.a TABLE t.b", types: ReplaceIntoExecutor, sqlType: types.SQLTypeInsert},
{sql: "REPLACE INTO foo VALUES (1 || 2)", types: types.ReplaceIntoExecutor, sqlType: types.SQLTypeInsert},
{sql: "REPLACE INTO foo VALUES (1 | 2)", types: types.ReplaceIntoExecutor, sqlType: types.SQLTypeInsert},
{sql: "REPLACE INTO foo VALUES (false || true)", types: types.ReplaceIntoExecutor, sqlType: types.SQLTypeInsert},
{sql: "REPLACE INTO foo VALUES (bar(5678))", types: types.ReplaceIntoExecutor, sqlType: types.SQLTypeInsert},
{sql: "REPLACE INTO foo VALUES ()", types: types.ReplaceIntoExecutor, sqlType: types.SQLTypeInsert},
{sql: "REPLACE INTO foo (a,b) VALUES (42,314)", types: types.ReplaceIntoExecutor, sqlType: types.SQLTypeInsert},
{sql: "REPLACE INTO foo () VALUES ()", types: types.ReplaceIntoExecutor, sqlType: types.SQLTypeInsert},
{sql: "REPLACE INTO foo VALUE ()", types: types.ReplaceIntoExecutor, sqlType: types.SQLTypeInsert},
{sql: "REPLACE INTO ta TABLE tb", types: types.ReplaceIntoExecutor, sqlType: types.SQLTypeInsert},
{sql: "REPLACE INTO t.a TABLE t.b", types: types.ReplaceIntoExecutor, sqlType: types.SQLTypeInsert},
// insert
{sql: "INSERT INTO foo VALUES (1234)", types: InsertExecutor, sqlType: types.SQLTypeInsert},
{sql: "INSERT INTO foo VALUES (1234, 5678)", types: InsertExecutor, sqlType: types.SQLTypeInsert},
{sql: "INSERT INTO t1 (SELECT * FROM t2)", types: InsertExecutor, sqlType: types.SQLTypeInsert},
{sql: "INSERT INTO foo VALUES (1 || 2)", types: InsertExecutor, sqlType: types.SQLTypeInsert},
{sql: "INSERT INTO foo VALUES (1 | 2)", types: InsertExecutor, sqlType: types.SQLTypeInsert},
{sql: "INSERT INTO foo VALUES (false || true)", types: InsertExecutor, sqlType: types.SQLTypeInsert},
{sql: "INSERT INTO foo VALUES (bar(5678))", types: InsertExecutor, sqlType: types.SQLTypeInsert},
{sql: "INSERT INTO foo (a) VALUES (42)", types: InsertExecutor, sqlType: types.SQLTypeInsert},
{sql: "INSERT INTO foo VALUES (1234)", types: types.InsertExecutor, sqlType: types.SQLTypeInsert},
{sql: "INSERT INTO foo VALUES (1234, 5678)", types: types.InsertExecutor, sqlType: types.SQLTypeInsert},
{sql: "INSERT INTO t1 (SELECT * FROM t2)", types: types.InsertExecutor, sqlType: types.SQLTypeInsert},
{sql: "INSERT INTO foo VALUES (1 || 2)", types: types.InsertExecutor, sqlType: types.SQLTypeInsert},
{sql: "INSERT INTO foo VALUES (1 | 2)", types: types.InsertExecutor, sqlType: types.SQLTypeInsert},
{sql: "INSERT INTO foo VALUES (false || true)", types: types.InsertExecutor, sqlType: types.SQLTypeInsert},
{sql: "INSERT INTO foo VALUES (bar(5678))", types: types.InsertExecutor, sqlType: types.SQLTypeInsert},
{sql: "INSERT INTO foo (a) VALUES (42)", types: types.InsertExecutor, sqlType: types.SQLTypeInsert},
// update
{sql: "UPDATE LOW_PRIORITY IGNORE t SET id = id + 1 ORDER BY id DESC;", types: UpdateExecutor, sqlType: types.SQLTypeUpdate},
{sql: "UPDATE t SET id = id + 1 ORDER BY id DESC;", types: UpdateExecutor, sqlType: types.SQLTypeUpdate},
{sql: "UPDATE t SET id = id + 1 ORDER BY id DESC limit 3 ;", types: UpdateExecutor, sqlType: types.SQLTypeUpdate},
{sql: "UPDATE t SET id = id + 1, name = 'jojo';", types: UpdateExecutor, sqlType: types.SQLTypeUpdate},
{sql: "UPDATE items,month SET items.price=month.price WHERE items.id=month.id;", types: UpdateExecutor, sqlType: types.SQLTypeUpdate},
{sql: "UPDATE user T0 LEFT OUTER JOIN user_profile T1 ON T1.id = T0.profile_id SET T0.profile_id = 1 WHERE T0.profile_id IN (1);", types: UpdateExecutor, sqlType: types.SQLTypeUpdate},
{sql: "UPDATE t1, t2 set t1.profile_id = 1, t2.profile_id = 1 where ta.a=t.ba", types: UpdateExecutor, sqlType: types.SQLTypeUpdate},
{sql: "UPDATE LOW_PRIORITY IGNORE t SET id = id + 1 ORDER BY id DESC;", types: types.UpdateExecutor, sqlType: types.SQLTypeUpdate},
{sql: "UPDATE t SET id = id + 1 ORDER BY id DESC;", types: types.UpdateExecutor, sqlType: types.SQLTypeUpdate},
{sql: "UPDATE t SET id = id + 1 ORDER BY id DESC limit 3 ;", types: types.UpdateExecutor, sqlType: types.SQLTypeUpdate},
{sql: "UPDATE t SET id = id + 1, name = 'jojo';", types: types.UpdateExecutor, sqlType: types.SQLTypeUpdate},
{sql: "UPDATE items,month SET items.price=month.price WHERE items.id=month.id;", types: types.UpdateExecutor, sqlType: types.SQLTypeUpdate},
{sql: "UPDATE user T0 LEFT OUTER JOIN user_profile T1 ON T1.id = T0.profile_id SET T0.profile_id = 1 WHERE T0.profile_id IN (1);", types: types.UpdateExecutor, sqlType: types.SQLTypeUpdate},
{sql: "UPDATE t1, t2 set t1.profile_id = 1, t2.profile_id = 1 where ta.a=t.ba", types: types.UpdateExecutor, sqlType: types.SQLTypeUpdate},
// delete
{sql: "DELETE from t1 where a=1 limit 1", types: DeleteExecutor, sqlType: types.SQLTypeDelete},
{sql: "DELETE FROM t1 WHERE t1.a > 0 ORDER BY t1.a LIMIT 1", types: DeleteExecutor, sqlType: types.SQLTypeDelete},
{sql: "DELETE FROM x.y z WHERE z.a > 0", types: DeleteExecutor, sqlType: types.SQLTypeDelete},
{sql: "DELETE FROM t1 AS w WHERE a > 0", types: DeleteExecutor, sqlType: types.SQLTypeDelete},
{sql: "DELETE from t1 partition (p0,p1)", types: DeleteExecutor, sqlType: types.SQLTypeDelete},
{sql: "delete low_priority t1, t2 from t1, t2", types: DeleteExecutor, sqlType: types.SQLTypeDelete},
{sql: "delete quick t1, t2 from t1, t2", types: DeleteExecutor, sqlType: types.SQLTypeDelete},
{sql: "delete ignore t1, t2 from t1, t2", types: DeleteExecutor, sqlType: types.SQLTypeDelete},
{sql: "DELETE from t1 where a=1 limit 1", types: types.DeleteExecutor, sqlType: types.SQLTypeDelete},
{sql: "DELETE FROM t1 WHERE t1.a > 0 ORDER BY t1.a LIMIT 1", types: types.DeleteExecutor, sqlType: types.SQLTypeDelete},
{sql: "DELETE FROM x.y z WHERE z.a > 0", types: types.DeleteExecutor, sqlType: types.SQLTypeDelete},
{sql: "DELETE FROM t1 AS w WHERE a > 0", types: types.DeleteExecutor, sqlType: types.SQLTypeDelete},
{sql: "DELETE from t1 partition (p0,p1)", types: types.DeleteExecutor, sqlType: types.SQLTypeDelete},
{sql: "delete low_priority t1, t2 from t1, t2", types: types.DeleteExecutor, sqlType: types.SQLTypeDelete},
{sql: "delete quick t1, t2 from t1, t2", types: types.DeleteExecutor, sqlType: types.SQLTypeDelete},
{sql: "delete ignore t1, t2 from t1, t2", types: types.DeleteExecutor, sqlType: types.SQLTypeDelete},
} {
parser, err := DoParser(t2.sql)
assert.NoError(t, err)
Expand Down
67 changes: 27 additions & 40 deletions pkg/datasource/sql/parser/parser_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,68 +23,55 @@ import (
"github.com/seata/seata-go/pkg/datasource/sql/types"
)

// ExecutorType
//go:generate stringer -type=ExecutorType
type ExecutorType int32

const (
_ ExecutorType = iota
UnSupportExecutor
InsertExecutor
UpdateExecutor
DeleteExecutor
ReplaceIntoExecutor
InsertOnDuplicateExecutor
)

type ParseContext struct {
// SQLType
SQLType types.SQLType
// ExecutorType
ExecutorType ExecutorType
// InsertStmt
InsertStmt *ast.InsertStmt
// UpdateStmt
UpdateStmt *ast.UpdateStmt
// DeleteStmt
DeleteStmt *ast.DeleteStmt
}

func (p *ParseContext) HasValidStmt() bool {
return p.InsertStmt != nil || p.UpdateStmt != nil || p.DeleteStmt != nil
}

func DoParser(query string) (*ParseContext, error) {
func DoParser(query string) (*types.ParseContext, error) {
p := aparser.New()
stmtNode, err := p.ParseOneStmt(query, "", "")
stmtNodes, _, err := p.Parse(query, "", "")
if err != nil {
return nil, err
}

parserCtx := new(ParseContext)
if len(stmtNodes) == 1 {
return parseParseContext(stmtNodes[0]), err
}

parserCtx := types.ParseContext{
SQLType: types.SQLTypeMulti,
ExecutorType: types.MultiExecutor,
MultiStmt: make([]*types.ParseContext, 0, len(stmtNodes)),
}

for _, node := range stmtNodes {
parserCtx.MultiStmt = append(parserCtx.MultiStmt, parseParseContext(node))
}

return &parserCtx, nil
}

func parseParseContext(stmtNode ast.StmtNode) *types.ParseContext {
parserCtx := new(types.ParseContext)

switch stmt := stmtNode.(type) {
case *ast.InsertStmt:
parserCtx.SQLType = types.SQLTypeInsert
parserCtx.InsertStmt = stmt
parserCtx.ExecutorType = InsertExecutor
parserCtx.ExecutorType = types.InsertExecutor

if stmt.IsReplace {
parserCtx.ExecutorType = ReplaceIntoExecutor
parserCtx.ExecutorType = types.ReplaceIntoExecutor
}

if len(stmt.OnDuplicate) != 0 {
parserCtx.ExecutorType = InsertOnDuplicateExecutor
parserCtx.ExecutorType = types.InsertOnDuplicateExecutor
}
case *ast.UpdateStmt:
parserCtx.SQLType = types.SQLTypeUpdate
parserCtx.UpdateStmt = stmt
parserCtx.ExecutorType = UpdateExecutor
parserCtx.ExecutorType = types.UpdateExecutor
case *ast.DeleteStmt:
parserCtx.SQLType = types.SQLTypeDelete
parserCtx.DeleteStmt = stmt
parserCtx.ExecutorType = DeleteExecutor
parserCtx.ExecutorType = types.DeleteExecutor
}

return parserCtx, nil
return parserCtx
}
Loading

0 comments on commit 4109db9

Please sign in to comment.