Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize rollback #356

Merged
merged 4 commits into from
Nov 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/datasource/sql/at.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ func (a *ATSourceManager) GetCachedResources() *sync.Map {
// Register a Resource to be managed by Resource Manager
func (a *ATSourceManager) RegisterResource(res rm.Resource) error {
a.resourceCache.Store(res.GetResourceId(), res)

return a.basic.RegisterResource(res)
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/datasource/sql/types/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type RecordImage struct {
// TableName table name
TableName string `json:"tableName"`
// SQLType sql type
SQLType SQLType `json:"-"`
SQLType SQLType `json:"sqlType"`
// Rows data row
Rows []RowImage `json:"rows"`
// TableMeta table information schema
Expand All @@ -109,7 +109,7 @@ type RowImage struct {
func (r *RowImage) GetColumnMap() map[string]*ColumnImage {
m := make(map[string]*ColumnImage, 0)
for _, column := range r.Columns {
m[column.Name] = &column
m[column.ColumnName] = &column
}
return m
}
Expand Down Expand Up @@ -142,8 +142,8 @@ func (r *RowImage) NonPrimaryKeys(cols []ColumnImage) []ColumnImage {
type ColumnImage struct {
// KeyType index type
KeyType IndexType `json:"keyType"`
// Name column name
Name string `json:"name"`
// ColumnName column name
ColumnName string `json:"name"`
// Type column type
Type int16 `json:"type"`
// Value column value
Expand Down
85 changes: 63 additions & 22 deletions pkg/datasource/sql/undo/base/undo.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var (
var (
checkUndoLogTableExistSql = "SELECT 1 FROM " + constant.UndoLogTableName + " LIMIT 1"
insertUndoLogSql = "INSERT INTO " + constant.UndoLogTableName + "(branch_id,xid,context,rollback_info,log_status,log_created,log_modified) VALUES (?, ?, ?, ?, ?, now(6), now(6))"
selectUndoLogSql = "SELECT `branch_id`,`xid`,`context`,`rollback_info`,`log_status` FROM " + constant.UndoLogTableName + " WHERE " + constant.UndoLogBranchXid + " = ? AND " + constant.UndoLogXid + " = ? FOR UPDATE"
selectUndoLogSql = "SELECT `branch_id`,`xid`,`context`,`rollback_info`,`log_status` FROM " + constant.UndoLogTableName + " WHERE " + constant.UndoLogBranchXid + " = ? AND " + constant.UndoLogXid + " = ? FOR UPDATE" // todo 替换成常量吧,不用使用变量来表示字段名
)

const (
Expand Down Expand Up @@ -91,15 +91,27 @@ func (m *BaseUndoLogManager) InsertUndoLog(record undo.UndologRecord, conn drive
return nil
}

func (m *BaseUndoLogManager) InsertUndoLogWithSqlConn(ctx context.Context, record undo.UndologRecord, conn *sql.Conn) error {
stmt, err := conn.PrepareContext(ctx, insertUndoLogSql)
if err != nil {
return err
}
_, err = stmt.Exec([]driver.Value{record.BranchID, record.XID, record.Context, record.RollbackInfo, int64(record.LogStatus)})
if err != nil {
return err
}
return nil
}

// DeleteUndoLog exec delete single undo log operate
func (m *BaseUndoLogManager) DeleteUndoLog(ctx context.Context, xid string, branchID int64, conn driver.Conn) error {
stmt, err := conn.Prepare(constant.DeleteUndoLogSql)
func (m *BaseUndoLogManager) DeleteUndoLog(ctx context.Context, xid string, branchID int64, conn *sql.Conn) error {
stmt, err := conn.PrepareContext(ctx, constant.DeleteUndoLogSql)
if err != nil {
log.Errorf("[DeleteUndoLog] prepare sql fail, err: %v", err)
return err
}

if _, err = stmt.Exec([]driver.Value{branchID, xid}); err != nil {
if _, err = stmt.Exec(branchID, xid); err != nil {
log.Errorf("[DeleteUndoLog] exec delete undo log fail, err: %v", err)
return err
}
Expand Down Expand Up @@ -210,12 +222,16 @@ func (m *BaseUndoLogManager) RunUndo(ctx context.Context, xid string, branchID i
}

// Undo undo sql
func (m *BaseUndoLogManager) Undo(ctx context.Context, dbType types.DBType, xid string, branchID int64, db *sql.DB, dbName string) error {
tx, err := db.Begin()
func (m *BaseUndoLogManager) Undo(ctx context.Context, dbType types.DBType, xid string, branchID int64, db *sql.DB, dbName string) (err error) {
conn, err := db.Conn(ctx)
if err != nil {
return err
}

tx, err := conn.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
return err
}
defer func() {
if err != nil {
if err = tx.Rollback(); err != nil {
Expand All @@ -225,10 +241,6 @@ func (m *BaseUndoLogManager) Undo(ctx context.Context, dbType types.DBType, xid
}
}()

conn, err := db.Conn(ctx)
if err != nil {
return err
}
stmt, err := conn.PrepareContext(ctx, selectUndoLogSql)
if err != nil {
log.Errorf("prepare sql fail, err: %v", err)
Expand Down Expand Up @@ -263,10 +275,12 @@ func (m *BaseUndoLogManager) Undo(ctx context.Context, dbType types.DBType, xid
undoLogRecords = append(undoLogRecords, record)
}

var exists bool
for _, record := range undoLogRecords {
exists = true
if !record.CanUndo() {
log.Infof("xid %v branch %v, ignore %v undo_log", record.XID, record.BranchID, record.LogStatus)
continue
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one of the multiple undologs does not mark undo, the match will be terminated. However, in the process executed by the branch transaction, undo should be isolated from each other

}

// todo use serializer and decode
Expand Down Expand Up @@ -296,29 +310,56 @@ func (m *BaseUndoLogManager) Undo(ctx context.Context, dbType types.DBType, xid
return err
}

if err = undoExecutor.ExecuteOn(ctx, dbType, undoLog, conn); err != nil {
if err = undoExecutor.ExecuteOn(ctx, dbType, conn); err != nil {
log.Errorf("execute on fail, err: %v", err)
return err
}
}
}

//if exist {
// if err = m.DeleteUndoLog(ctx, xid, branchID, conn); err != nil {
// log.Errorf("[Undo] delete undo log fail, err: %v", err)
// return err
// }
//}
// Todo 等 insertLog 合并后加上 insertUndoLogWithGlobalFinished 功能
/*else {

}*/
if exists {
if err = m.DeleteUndoLog(ctx, xid, branchID, conn); err != nil {
log.Errorf("[Undo] delete undo fail, err: %v", err)
return err
}
log.Infof("xid %v branch %v, undo_log deleted with %v", xid, branchID, undo.UndoLogStatueGlobalFinished)
} else {
if err = m.insertUndoLogWithGlobalFinished(ctx, xid, uint64(branchID), conn); err != nil {
log.Errorf("[Undo] insert undo with global finished fail, err: %v", err)
return err
}
log.Errorf("xid %v branch %v, undo_log added with %v", xid, branchID, undo.UndoLogStatueGlobalFinished)
}

if err = tx.Commit(); err != nil {
log.Errorf("[Undo] execute on fail, err: %v", err)
return nil
}
return nil
}

func (m *BaseUndoLogManager) insertUndoLogWithGlobalFinished(ctx context.Context, xid string, branchID uint64, conn *sql.Conn) error {
// todo use config to replace
parseContext := make(map[string]string, 0)
parseContext[SerializerKey] = "jackson"
parseContext[CompressorTypeKey] = "NONE"
undoLogContent, err := json.Marshal(parseContext)
if err != nil {
return err
}

record := undo.UndologRecord{
BranchID: branchID,
XID: xid,
RollbackInfo: []byte("{}"),
LogStatus: UndoLogStatusGlobalFinished,
Context: undoLogContent,
}
err = m.InsertUndoLogWithSqlConn(ctx, record, conn)
if err != nil {
log.Errorf("insert undo log fail, err: %v", err)
return err
}
return nil
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/datasource/sql/undo/builder/basic_undo_log_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,10 @@ func (b *BasicUndoLogBuilder) buildRecordImages(rowsi driver.Rows, tableMetaData
jdbcType := types.GetJDBCTypeByTypeName(columnMeta.ColumnTypeInfo.DatabaseTypeName())

columns = append(columns, types.ColumnImage{
KeyType: keyType,
Name: name,
Type: int16(jdbcType),
Value: ss[i],
KeyType: keyType,
ColumnName: name,
Type: int16(jdbcType),
Value: ss[i],
})
}
rowImages = append(rowImages, types.RowImage{Columns: columns})
Expand Down Expand Up @@ -315,7 +315,7 @@ func (b *BasicUndoLogBuilder) buildLockKey2(records *types.RecordImage, meta typ
for _, column := range row.Columns {
var hasKeyColumn bool
for _, key := range keys {
if column.Name == key {
if column.ColumnName == key {
hasKeyColumn = true
if pkSplitIndex > 0 {
lockKeys.WriteString("_")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ func TestBuildLockKey(t *testing.T) {
records := types.RecordImage{
TableName: "test_name",
Rows: []types.RowImage{
{Columns: []types.ColumnImage{{KeyType: types.IndexTypePrimaryKey, Name: "id", Value: 1}, {KeyType: types.IndexTypePrimaryKey, Name: "userId", Value: "one"}}},
{Columns: []types.ColumnImage{{KeyType: types.IndexTypePrimaryKey, Name: "id", Value: 2}, {KeyType: types.IndexTypePrimaryKey, Name: "userId", Value: "two"}}},
{Columns: []types.ColumnImage{{KeyType: types.IndexTypePrimaryKey, ColumnName: "id", Value: 1}, {KeyType: types.IndexTypePrimaryKey, ColumnName: "userId", Value: "one"}}},
{Columns: []types.ColumnImage{{KeyType: types.IndexTypePrimaryKey, ColumnName: "id", Value: 2}, {KeyType: types.IndexTypePrimaryKey, ColumnName: "userId", Value: "two"}}},
},
}

Expand Down
40 changes: 39 additions & 1 deletion pkg/datasource/sql/undo/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,22 @@ import (

"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/util/log"
)

var _ undo.UndoExecutor = (*BaseExecutor)(nil)

const (
selectSQL = "SELECT * FROM %s WHERE %s FOR UPDATE"
)

type BaseExecutor struct {
sqlUndoLog undo.SQLUndoLog
undoImage *types.RecordImage
}

// ExecuteOn
func (b *BaseExecutor) ExecuteOn(ctx context.Context, dbType types.DBType, sqlUndoLog undo.SQLUndoLog, conn *sql.Conn) error {
func (b *BaseExecutor) ExecuteOn(ctx context.Context, dbType types.DBType, conn *sql.Conn) error {
// check data if valid
return nil
}
Expand All @@ -40,3 +47,34 @@ func (b *BaseExecutor) ExecuteOn(ctx context.Context, dbType types.DBType, sqlUn
func (b *BaseExecutor) UndoPrepare(undoPST *sql.Stmt, undoValues []types.ColumnImage, pkValueList []types.ColumnImage) {

}

func (b *BaseExecutor) dataValidationAndGoOn(conn *sql.Conn) (bool, error) {
beforeImage := b.sqlUndoLog.BeforeImage
afterImage := b.sqlUndoLog.AfterImage

equal, err := IsRecordsEquals(beforeImage, afterImage)
if err != nil {
return false, err
}
if equal {
log.Infof("Stop rollback because there is no data change between the before data snapshot and the after data snapshot.")
return false, nil
}

// todo compare from current db data to old image data

return true, nil
}

// todo
//func (b *BaseExecutor) queryCurrentRecords(conn *sql.Conn) *types.RecordImage {
// tableMeta := b.undoImage.TableMeta
// pkNameList := tableMeta.GetPrimaryKeyOnlyName()
//
// b.undoImage.Rows
//
//}
//
//func (b *BaseExecutor) parsePkValues(rows []types.RowImage, pkNameList []string) {
//
//}
24 changes: 12 additions & 12 deletions pkg/datasource/sql/undo/executor/mysql_undo_delete_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,26 @@ import (
"github.com/seata/seata-go/pkg/datasource/sql/undo"
)

type MySQLUndoDeleteExecutor struct {
type mySQLUndoDeleteExecutor struct {
BaseExecutor *BaseExecutor
sqlUndoLog undo.SQLUndoLog
}

// NewMySQLUndoDeleteExecutor init
func NewMySQLUndoDeleteExecutor() *MySQLUndoUpdateExecutor {
return &MySQLUndoUpdateExecutor{}
// newMySQLUndoDeleteExecutor init
func newMySQLUndoDeleteExecutor(sqlUndoLog undo.SQLUndoLog) *mySQLUndoUpdateExecutor {
return &mySQLUndoUpdateExecutor{sqlUndoLog: sqlUndoLog}
}

func (m *MySQLUndoDeleteExecutor) ExecuteOn(ctx context.Context, dbType types.DBType,
sqlUndoLog undo.SQLUndoLog, conn driver.Conn) error {
func (m *mySQLUndoDeleteExecutor) ExecuteOn(ctx context.Context, dbType types.DBType, conn driver.Conn) error {

undoSql, _ := m.buildUndoSQL(dbType, sqlUndoLog)
undoSql, _ := m.buildUndoSQL(dbType)

stmt, err := conn.Prepare(undoSql)
if err != nil {
return err
}

beforeImage := sqlUndoLog.BeforeImage
beforeImage := m.sqlUndoLog.BeforeImage

for _, row := range beforeImage.Rows {
undoValues := make([]interface{}, 0)
Expand All @@ -75,8 +75,8 @@ func (m *MySQLUndoDeleteExecutor) ExecuteOn(ctx context.Context, dbType types.DB
return nil
}

func (m *MySQLUndoDeleteExecutor) buildUndoSQL(dbType types.DBType, sqlUndoLog undo.SQLUndoLog) (string, error) {
beforeImage := sqlUndoLog.BeforeImage
func (m *mySQLUndoDeleteExecutor) buildUndoSQL(dbType types.DBType) (string, error) {
beforeImage := m.sqlUndoLog.BeforeImage
rows := beforeImage.Rows
if len(rows) == 0 {
return "", errors.New("invalid undo log")
Expand All @@ -97,7 +97,7 @@ func (m *MySQLUndoDeleteExecutor) buildUndoSQL(dbType types.DBType, sqlUndoLog u
)

for key, _ := range fields {
insertColumnSlice = append(insertColumnSlice, AddEscape(fields[key].Name, dbType))
insertColumnSlice = append(insertColumnSlice, AddEscape(fields[key].ColumnName, dbType))
insertValueSlice = append(insertValueSlice, "?")
}

Expand All @@ -106,5 +106,5 @@ func (m *MySQLUndoDeleteExecutor) buildUndoSQL(dbType types.DBType, sqlUndoLog u

// InsertSqlTemplate INSERT INTO a (x, y, z, pk) VALUES (?, ?, ?, ?)
insertSqlTemplate := "INSERT INTO %s (%s) VALUES (%s)"
return fmt.Sprintf(insertSqlTemplate, sqlUndoLog.TableName, insertColumns, insertValues), nil
return fmt.Sprintf(insertSqlTemplate, m.sqlUndoLog.TableName, insertColumns, insertValues), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ func NewMySQLUndoExecutorHolder() undo.UndoExecutorHolder {

// GetInsertExecutor get the mysql Insert UndoExecutor by sqlUndoLog
func (m *MySQLUndoExecutorHolder) GetInsertExecutor(sqlUndoLog undo.SQLUndoLog) undo.UndoExecutor {
return NewMySQLUndoInsertExecutor()
return newMySQLUndoInsertExecutor(sqlUndoLog)
}

// GetUpdateExecutor get the mysql Update UndoExecutor by sqlUndoLog
func (m *MySQLUndoExecutorHolder) GetUpdateExecutor(sqlUndoLog undo.SQLUndoLog) undo.UndoExecutor {
return NewMySQLUndoUpdateExecutor()
return newMySQLUndoUpdateExecutor(sqlUndoLog)
}

// GetDeleteExecutor get the mysql Delete UndoExecutor by sqlUndoLog
func (m *MySQLUndoExecutorHolder) GetDeleteExecutor(sqlUndoLog undo.SQLUndoLog) undo.UndoExecutor {
return NewMySQLUndoDeleteExecutor()
return newMySQLUndoDeleteExecutor(sqlUndoLog)
}
Loading