From 308ccb726eac111733475de88bd9129576971baa Mon Sep 17 00:00:00 2001 From: Luky116Liuyuecai Date: Sun, 20 Nov 2022 00:26:20 +0800 Subject: [PATCH 1/3] optimize commit --- pkg/datasource/sql/at.go | 4 +- pkg/datasource/sql/driver_test.go | 3 +- .../sql/mock/mock_datasource_manager.go | 17 +++++ pkg/datasource/sql/tx.go | 2 +- .../builder/basic_undo_log_builder_test.go | 21 ++++++ .../builder/mysql_update_undo_log_builder.go | 2 - .../mysql_update_undo_log_builder_test.go | 3 +- .../executor/mysql_undo_update_executor.go | 2 +- pkg/protocol/codec/branch_report_req_codec.go | 63 ++++++++++++++++++ .../codec/branch_report_req_codec_test.go | 43 ++++++++++++ .../branch_statue_report_response_codec.go | 66 +++++++++++++++++++ ...ranch_statue_report_response_codec_test.go | 44 +++++++++++++ .../client/rm_branch_commit_processor.go | 2 +- pkg/rm/rm_remoting.go | 3 +- 14 files changed, 264 insertions(+), 11 deletions(-) create mode 100644 pkg/protocol/codec/branch_report_req_codec.go create mode 100644 pkg/protocol/codec/branch_report_req_codec_test.go create mode 100644 pkg/protocol/codec/branch_statue_report_response_codec.go create mode 100644 pkg/protocol/codec/branch_statue_report_response_codec_test.go diff --git a/pkg/datasource/sql/at.go b/pkg/datasource/sql/at.go index 0a38e065f..006646584 100644 --- a/pkg/datasource/sql/at.go +++ b/pkg/datasource/sql/at.go @@ -116,7 +116,7 @@ func (a *ATSourceManager) BranchRollback(ctx context.Context, branchResource rm. // BranchCommit func (a *ATSourceManager) BranchCommit(ctx context.Context, resource rm.BranchResource) (branch.BranchStatus, error) { a.worker.BranchCommit(ctx, resource) - return branch.BranchStatusPhaseoneDone, nil + return branch.BranchStatusPhasetwoCommitted, nil } // LockQuery @@ -131,7 +131,7 @@ func (a *ATSourceManager) BranchRegister(ctx context.Context, req rm.BranchRegis // BranchReport func (a *ATSourceManager) BranchReport(ctx context.Context, param rm.BranchReportParam) error { - return nil + return a.rmRemoting.BranchReport(param) } // CreateTableMetaCache diff --git a/pkg/datasource/sql/driver_test.go b/pkg/datasource/sql/driver_test.go index 8e6715b49..482b984b8 100644 --- a/pkg/datasource/sql/driver_test.go +++ b/pkg/datasource/sql/driver_test.go @@ -21,10 +21,11 @@ import ( "context" "database/sql" "database/sql/driver" - "github.com/seata/seata-go/pkg/rm" "reflect" "testing" + "github.com/seata/seata-go/pkg/rm" + "github.com/golang/mock/gomock" "github.com/seata/seata-go/pkg/datasource/sql/mock" "github.com/seata/seata-go/pkg/util/reflectx" diff --git a/pkg/datasource/sql/mock/mock_datasource_manager.go b/pkg/datasource/sql/mock/mock_datasource_manager.go index 81fc3c02e..6b1f9209e 100644 --- a/pkg/datasource/sql/mock/mock_datasource_manager.go +++ b/pkg/datasource/sql/mock/mock_datasource_manager.go @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + // Code generated by MockGen. DO NOT EDIT. // Source: datasource_manager.go diff --git a/pkg/datasource/sql/tx.go b/pkg/datasource/sql/tx.go index a77e34b3d..6ef4dbe08 100644 --- a/pkg/datasource/sql/tx.go +++ b/pkg/datasource/sql/tx.go @@ -179,7 +179,7 @@ func (tx *Tx) report(success bool) error { BranchId: int64(tx.tranCtx.BranchID), Status: status, } - dataSourceManager := datasource.GetDataSourceManager(branch.BranchType(tx.tranCtx.TransType)) + dataSourceManager := datasource.GetDataSourceManager(tx.tranCtx.TransType.GetBranchType()) retry := REPORT_RETRY_COUNT for retry > 0 { err := dataSourceManager.BranchReport(context.Background(), request) diff --git a/pkg/datasource/sql/undo/builder/basic_undo_log_builder_test.go b/pkg/datasource/sql/undo/builder/basic_undo_log_builder_test.go index a95a0204e..9dfe1641c 100644 --- a/pkg/datasource/sql/undo/builder/basic_undo_log_builder_test.go +++ b/pkg/datasource/sql/undo/builder/basic_undo_log_builder_test.go @@ -20,6 +20,7 @@ package builder import ( "testing" + "github.com/seata/seata-go/pkg/datasource/sql/types" "github.com/stretchr/testify/assert" ) @@ -45,5 +46,25 @@ func TestBuildWhereConditionByPKs(t *testing.T) { assert.Equal(t, test.expectSQL, sql) }) } +} + +func TestBuildLockKey(t *testing.T) { + metaData := types.TableMeta{ + TableName: "test_name", + Indexs: map[string]types.IndexMeta{ + "PRIMARY_KEY": {IType: types.IndexTypePrimaryKey, Columns: []types.ColumnMeta{{ColumnName: "id"}, {ColumnName: "userId"}}}, + }, + } + records := types.RecordImage{ + TableName: "test_name", + Rows: []types.RowImage{ + {Columns: []types.ColumnImage{{KeyType: types.IndexTypePrimaryKey, Name: "id", Value: 1}, {KeyType: types.IndexTypePrimaryKey, Name: "userId", Value: "one"}}}, + {Columns: []types.ColumnImage{{KeyType: types.IndexTypePrimaryKey, Name: "id", Value: 2}, {KeyType: types.IndexTypePrimaryKey, Name: "userId", Value: "two"}}}, + }, + } + + builder := BasicUndoLogBuilder{} + lockKeys := builder.buildLockKey2(&records, metaData) + assert.Equal(t, "test_name:1_one,2_two", lockKeys) } diff --git a/pkg/datasource/sql/undo/builder/mysql_update_undo_log_builder.go b/pkg/datasource/sql/undo/builder/mysql_update_undo_log_builder.go index 87ca69a96..5d9df4e44 100644 --- a/pkg/datasource/sql/undo/builder/mysql_update_undo_log_builder.go +++ b/pkg/datasource/sql/undo/builder/mysql_update_undo_log_builder.go @@ -134,8 +134,6 @@ func (u *MySQLUpdateUndoLogBuilder) AfterImage(ctx context.Context, execCtx *typ return nil, err } - lockKey := u.buildLockKey(rows, *metaData) - execCtx.TxCtx.LockKeys[lockKey] = struct{}{} image.SQLType = execCtx.ParseContext.SQLType return []*types.RecordImage{image}, nil diff --git a/pkg/datasource/sql/undo/builder/mysql_update_undo_log_builder_test.go b/pkg/datasource/sql/undo/builder/mysql_update_undo_log_builder_test.go index 11bb44035..6a7a36d93 100644 --- a/pkg/datasource/sql/undo/builder/mysql_update_undo_log_builder_test.go +++ b/pkg/datasource/sql/undo/builder/mysql_update_undo_log_builder_test.go @@ -20,10 +20,11 @@ package builder import ( "context" "database/sql/driver" - "github.com/seata/seata-go/pkg/datasource/sql/datasource" "reflect" "testing" + "github.com/seata/seata-go/pkg/datasource/sql/datasource" + "github.com/agiledragon/gomonkey" "github.com/seata/seata-go/pkg/datasource/sql/datasource/mysql" "github.com/seata/seata-go/pkg/datasource/sql/types" diff --git a/pkg/datasource/sql/undo/executor/mysql_undo_update_executor.go b/pkg/datasource/sql/undo/executor/mysql_undo_update_executor.go index b0329b914..8630bcc53 100644 --- a/pkg/datasource/sql/undo/executor/mysql_undo_update_executor.go +++ b/pkg/datasource/sql/undo/executor/mysql_undo_update_executor.go @@ -65,7 +65,7 @@ func (m *MySQLUndoUpdateExecutor) ExecuteOn(ctx context.Context, dbType types.DB undoValues = append(undoValues, col.Value) } - if _, err = stmt.Exec(undoValues); err != nil { + if _, err = stmt.Exec(undoValues...); err != nil { return err } } diff --git a/pkg/protocol/codec/branch_report_req_codec.go b/pkg/protocol/codec/branch_report_req_codec.go new file mode 100644 index 000000000..fdba6cc70 --- /dev/null +++ b/pkg/protocol/codec/branch_report_req_codec.go @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package codec + +import ( + "github.com/seata/seata-go/pkg/protocol/branch" + "github.com/seata/seata-go/pkg/protocol/message" + "github.com/seata/seata-go/pkg/util/bytes" +) + +func init() { + GetCodecManager().RegisterCodec(CodecTypeSeata, &BranchReportRequestCodec{}) +} + +type BranchReportRequestCodec struct { +} + +func (g *BranchReportRequestCodec) Decode(in []byte) interface{} { + data := message.BranchReportRequest{} + buf := bytes.NewByteBuffer(in) + + data.Xid = bytes.ReadString16Length(buf) + data.BranchId = int64(bytes.ReadUInt64(buf)) + data.Status = branch.BranchStatus(bytes.ReadByte(buf)) + data.ResourceId = bytes.ReadString16Length(buf) + data.ApplicationData = []byte(bytes.ReadString32Length(buf)) + data.BranchType = branch.BranchType(bytes.ReadByte(buf)) + + return data +} + +func (g *BranchReportRequestCodec) Encode(in interface{}) []byte { + data, _ := in.(message.BranchReportRequest) + buf := bytes.NewByteBuffer([]byte{}) + + bytes.WriteString16Length(data.Xid, buf) + buf.WriteInt64(data.BranchId) + buf.WriteByte(byte(data.Status)) + bytes.WriteString16Length(data.ResourceId, buf) + bytes.WriteString32Length(string(data.ApplicationData), buf) + buf.WriteByte(byte(data.BranchType)) + + return buf.Bytes() +} + +func (g *BranchReportRequestCodec) GetMessageType() message.MessageType { + return message.MessageTypeBranchStatusReport +} diff --git a/pkg/protocol/codec/branch_report_req_codec_test.go b/pkg/protocol/codec/branch_report_req_codec_test.go new file mode 100644 index 000000000..60a70708e --- /dev/null +++ b/pkg/protocol/codec/branch_report_req_codec_test.go @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package codec + +import ( + "testing" + + model2 "github.com/seata/seata-go/pkg/protocol/branch" + "github.com/seata/seata-go/pkg/protocol/message" + "github.com/stretchr/testify/assert" +) + +func TestBranchReportRequestCodec(t *testing.T) { + msg := message.BranchReportRequest{ + Xid: "123344", + ResourceId: "root:12345678@tcp(127.0.0.1:3306)/seata_client", + Status: model2.BranchStatusPhaseoneDone, + BranchId: 56678, + BranchType: model2.BranchTypeAT, + ApplicationData: []byte("TestExtraData"), + } + + codec := BranchReportRequestCodec{} + bytes := codec.Encode(msg) + msg2 := codec.Decode(bytes) + + assert.Equal(t, msg, msg2) +} diff --git a/pkg/protocol/codec/branch_statue_report_response_codec.go b/pkg/protocol/codec/branch_statue_report_response_codec.go new file mode 100644 index 000000000..632c01038 --- /dev/null +++ b/pkg/protocol/codec/branch_statue_report_response_codec.go @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package codec + +import ( + "math" + + "github.com/seata/seata-go/pkg/protocol/message" + "github.com/seata/seata-go/pkg/util/bytes" + serror "github.com/seata/seata-go/pkg/util/errors" +) + +func init() { + GetCodecManager().RegisterCodec(CodecTypeSeata, &BranchReportResponseCodec{}) +} + +type BranchReportResponseCodec struct{} + +func (g *BranchReportResponseCodec) Decode(in []byte) interface{} { + data := message.BranchReportResponse{} + buf := bytes.NewByteBuffer(in) + + data.ResultCode = message.ResultCode(bytes.ReadByte(buf)) + if data.ResultCode == message.ResultCodeFailed { + data.Msg = bytes.ReadString8Length(buf) + } + data.TransactionErrorCode = serror.TransactionErrorCode(bytes.ReadByte(buf)) + + return data +} + +func (g *BranchReportResponseCodec) Encode(in interface{}) []byte { + data, _ := in.(message.BranchReportResponse) + buf := bytes.NewByteBuffer([]byte{}) + + buf.WriteByte(byte(data.ResultCode)) + if data.ResultCode == message.ResultCodeFailed { + msg := data.Msg + if len(data.Msg) > math.MaxInt8 { + msg = data.Msg[:math.MaxInt8] + } + bytes.WriteString8Length(msg, buf) + } + buf.WriteByte(byte(data.TransactionErrorCode)) + + return buf.Bytes() +} + +func (g *BranchReportResponseCodec) GetMessageType() message.MessageType { + return message.MessageTypeBranchStatusReportResult +} diff --git a/pkg/protocol/codec/branch_statue_report_response_codec_test.go b/pkg/protocol/codec/branch_statue_report_response_codec_test.go new file mode 100644 index 000000000..95a92fc98 --- /dev/null +++ b/pkg/protocol/codec/branch_statue_report_response_codec_test.go @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package codec + +import ( + "testing" + + "github.com/seata/seata-go/pkg/protocol/message" + serror "github.com/seata/seata-go/pkg/util/errors" + "github.com/stretchr/testify/assert" +) + +func TestBranchReportResponseCodec(t *testing.T) { + msg := message.BranchReportResponse{ + AbstractTransactionResponse: message.AbstractTransactionResponse{ + TransactionErrorCode: serror.TransactionErrorCodeBeginFailed, + AbstractResultMessage: message.AbstractResultMessage{ + ResultCode: message.ResultCodeFailed, + Msg: "FAILED", + }, + }, + } + + codec := BranchReportResponseCodec{} + bytes := codec.Encode(msg) + msg2 := codec.Decode(bytes) + + assert.Equal(t, msg, msg2) +} diff --git a/pkg/remoting/processor/client/rm_branch_commit_processor.go b/pkg/remoting/processor/client/rm_branch_commit_processor.go index f392191d4..ffa4d9892 100644 --- a/pkg/remoting/processor/client/rm_branch_commit_processor.go +++ b/pkg/remoting/processor/client/rm_branch_commit_processor.go @@ -87,6 +87,6 @@ func (f *rmBranchCommitProcessor) Process(ctx context.Context, rpcMessage messag log.Errorf("send branch commit response error: {%#v}", err.Error()) return err } - log.Infof("send branch commit response success: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData) + log.Infof("send branch commit success: xid %v, branchID %v, resourceID %v, applicationData %v", xid, branchID, resourceID, applicationData) return nil } diff --git a/pkg/rm/rm_remoting.go b/pkg/rm/rm_remoting.go index 34f907f23..b840e969e 100644 --- a/pkg/rm/rm_remoting.go +++ b/pkg/rm/rm_remoting.go @@ -22,7 +22,6 @@ import ( "github.com/pkg/errors" - "github.com/seata/seata-go/pkg/protocol/branch" "github.com/seata/seata-go/pkg/protocol/message" "github.com/seata/seata-go/pkg/remoting/getty" "github.com/seata/seata-go/pkg/util/log" @@ -70,7 +69,7 @@ func (r *RMRemoting) BranchReport(param BranchReportParam) error { BranchId: param.BranchId, Status: param.Status, ApplicationData: []byte(param.ApplicationData), - BranchType: branch.BranchTypeAT, + BranchType: param.BranchType, } resp, err := getty.GetGettyRemotingClient().SendSyncRequest(request) From e82d0f2576c21ed646ac2a16e1cfb6a75753738c Mon Sep 17 00:00:00 2001 From: Luky116Liuyuecai Date: Sun, 20 Nov 2022 17:39:43 +0800 Subject: [PATCH 2/3] optimize rollback --- pkg/datasource/sql/at.go | 1 - pkg/datasource/sql/types/image.go | 8 +- pkg/datasource/sql/undo/base/undo.go | 82 ++++++++++++----- .../undo/builder/basic_undo_log_builder.go | 10 +- .../builder/basic_undo_log_builder_test.go | 4 +- pkg/datasource/sql/undo/executor/executor.go | 40 +++++++- .../executor/mysql_undo_delete_executor.go | 24 ++--- .../executor/mysql_undo_executor_holder.go | 6 +- .../executor/mysql_undo_insert_executor.go | 28 +++--- .../executor/mysql_undo_update_executor.go | 42 +++++---- pkg/datasource/sql/undo/executor/sql.go | 29 +----- pkg/datasource/sql/undo/executor/utils.go | 91 +++++++++++++++++++ pkg/datasource/sql/undo/mysql/undo.go | 2 +- pkg/datasource/sql/undo/undo.go | 2 +- pkg/datasource/sql/undo/undo_executor.go | 2 +- pkg/rm/tcc/fence/store/db/dao/store_api.go | 2 +- sample/at/basic/main.go | 6 +- 17 files changed, 264 insertions(+), 115 deletions(-) create mode 100644 pkg/datasource/sql/undo/executor/utils.go diff --git a/pkg/datasource/sql/at.go b/pkg/datasource/sql/at.go index 006646584..46cd91d4a 100644 --- a/pkg/datasource/sql/at.go +++ b/pkg/datasource/sql/at.go @@ -73,7 +73,6 @@ func (a *ATSourceManager) GetCachedResources() *sync.Map { // Register a Resource to be managed by Resource Manager func (a *ATSourceManager) RegisterResource(res rm.Resource) error { a.resourceCache.Store(res.GetResourceId(), res) - return a.basic.RegisterResource(res) } diff --git a/pkg/datasource/sql/types/image.go b/pkg/datasource/sql/types/image.go index b275849d1..f413b1f85 100644 --- a/pkg/datasource/sql/types/image.go +++ b/pkg/datasource/sql/types/image.go @@ -93,7 +93,7 @@ type RecordImage struct { // TableName table name TableName string `json:"tableName"` // SQLType sql type - SQLType SQLType `json:"-"` + SQLType SQLType `json:"sqlType"` // Rows data row Rows []RowImage `json:"rows"` // TableMeta table information schema @@ -109,7 +109,7 @@ type RowImage struct { func (r *RowImage) GetColumnMap() map[string]*ColumnImage { m := make(map[string]*ColumnImage, 0) for _, column := range r.Columns { - m[column.Name] = &column + m[column.ColumnName] = &column } return m } @@ -142,8 +142,8 @@ func (r *RowImage) NonPrimaryKeys(cols []ColumnImage) []ColumnImage { type ColumnImage struct { // KeyType index type KeyType IndexType `json:"keyType"` - // Name column name - Name string `json:"name"` + // ColumnName column name + ColumnName string `json:"name"` // Type column type Type int16 `json:"type"` // Value column value diff --git a/pkg/datasource/sql/undo/base/undo.go b/pkg/datasource/sql/undo/base/undo.go index 2b0e929b2..3012527c4 100644 --- a/pkg/datasource/sql/undo/base/undo.go +++ b/pkg/datasource/sql/undo/base/undo.go @@ -43,7 +43,7 @@ var ( var ( checkUndoLogTableExistSql = "SELECT 1 FROM " + constant.UndoLogTableName + " LIMIT 1" insertUndoLogSql = "INSERT INTO " + constant.UndoLogTableName + "(branch_id,xid,context,rollback_info,log_status,log_created,log_modified) VALUES (?, ?, ?, ?, ?, now(6), now(6))" - selectUndoLogSql = "SELECT `branch_id`,`xid`,`context`,`rollback_info`,`log_status` FROM " + constant.UndoLogTableName + " WHERE " + constant.UndoLogBranchXid + " = ? AND " + constant.UndoLogXid + " = ? FOR UPDATE" + selectUndoLogSql = "SELECT `branch_id`,`xid`,`context`,`rollback_info`,`log_status` FROM " + constant.UndoLogTableName + " WHERE " + constant.UndoLogBranchXid + " = ? AND " + constant.UndoLogXid + " = ? FOR UPDATE" // todo 替换成常量吧,不用使用变量来表示字段名 ) const ( @@ -91,15 +91,27 @@ func (m *BaseUndoLogManager) InsertUndoLog(record undo.UndologRecord, conn drive return nil } +func (m *BaseUndoLogManager) InsertUndoLogWithSqlConn(ctx context.Context, record undo.UndologRecord, conn *sql.Conn) error { + stmt, err := conn.PrepareContext(ctx, insertUndoLogSql) + if err != nil { + return err + } + _, err = stmt.Exec([]driver.Value{record.BranchID, record.XID, record.Context, record.RollbackInfo, int64(record.LogStatus)}) + if err != nil { + return err + } + return nil +} + // DeleteUndoLog exec delete single undo log operate -func (m *BaseUndoLogManager) DeleteUndoLog(ctx context.Context, xid string, branchID int64, conn driver.Conn) error { - stmt, err := conn.Prepare(constant.DeleteUndoLogSql) +func (m *BaseUndoLogManager) DeleteUndoLog(ctx context.Context, xid string, branchID int64, conn *sql.Conn) error { + stmt, err := conn.PrepareContext(ctx, constant.DeleteUndoLogSql) if err != nil { log.Errorf("[DeleteUndoLog] prepare sql fail, err: %v", err) return err } - if _, err = stmt.Exec([]driver.Value{branchID, xid}); err != nil { + if _, err = stmt.Exec(branchID, xid); err != nil { log.Errorf("[DeleteUndoLog] exec delete undo log fail, err: %v", err) return err } @@ -210,12 +222,16 @@ func (m *BaseUndoLogManager) RunUndo(ctx context.Context, xid string, branchID i } // Undo undo sql -func (m *BaseUndoLogManager) Undo(ctx context.Context, dbType types.DBType, xid string, branchID int64, db *sql.DB, dbName string) error { - tx, err := db.Begin() +func (m *BaseUndoLogManager) Undo(ctx context.Context, dbType types.DBType, xid string, branchID int64, db *sql.DB, dbName string) (err error) { + conn, err := db.Conn(ctx) if err != nil { return err } + tx, err := conn.BeginTx(ctx, &sql.TxOptions{}) + if err != nil { + return err + } defer func() { if err != nil { if err = tx.Rollback(); err != nil { @@ -225,10 +241,6 @@ func (m *BaseUndoLogManager) Undo(ctx context.Context, dbType types.DBType, xid } }() - conn, err := db.Conn(ctx) - if err != nil { - return err - } stmt, err := conn.PrepareContext(ctx, selectUndoLogSql) if err != nil { log.Errorf("prepare sql fail, err: %v", err) @@ -263,10 +275,12 @@ func (m *BaseUndoLogManager) Undo(ctx context.Context, dbType types.DBType, xid undoLogRecords = append(undoLogRecords, record) } + var exists bool for _, record := range undoLogRecords { + exists = true if !record.CanUndo() { log.Infof("xid %v branch %v, ignore %v undo_log", record.XID, record.BranchID, record.LogStatus) - continue + return nil } // todo use serializer and decode @@ -296,29 +310,53 @@ func (m *BaseUndoLogManager) Undo(ctx context.Context, dbType types.DBType, xid return err } - if err = undoExecutor.ExecuteOn(ctx, dbType, undoLog, conn); err != nil { + if err = undoExecutor.ExecuteOn(ctx, dbType, conn); err != nil { log.Errorf("execute on fail, err: %v", err) return err } } } - //if exist { - // if err = m.DeleteUndoLog(ctx, xid, branchID, conn); err != nil { - // log.Errorf("[Undo] delete undo log fail, err: %v", err) - // return err - // } - //} - // Todo 等 insertLog 合并后加上 insertUndoLogWithGlobalFinished 功能 - /*else { - - }*/ + if exists { + if err = m.DeleteUndoLog(ctx, xid, branchID, conn); err != nil { + log.Errorf("[Undo] delete undo fail, err: %v", err) + return err + } + log.Infof("xid %v branch %v, undo_log deleted with %v", xid, branchID, undo.UndoLogStatueGlobalFinished) + } else { + if err = m.insertUndoLogWithGlobalFinished(ctx, xid, uint64(branchID), conn); err != nil { + log.Errorf("[Undo] insert undo with global finished fail, err: %v", err) + return err + } + log.Errorf("xid %v branch %v, undo_log added with %v", xid, branchID, undo.UndoLogStatueGlobalFinished) + } if err = tx.Commit(); err != nil { log.Errorf("[Undo] execute on fail, err: %v", err) return nil } + return nil +} +func (m *BaseUndoLogManager) insertUndoLogWithGlobalFinished(ctx context.Context, xid string, branchID uint64, conn *sql.Conn) error { + // todo use config to replace + parseContext := make(map[string]string, 0) + parseContext[SerializerKey] = "jackson" + parseContext[CompressorTypeKey] = "NONE" + undoLogContent, err := json.Marshal(parseContext) + + record := undo.UndologRecord{ + BranchID: branchID, + XID: xid, + RollbackInfo: []byte("{}"), + LogStatus: UndoLogStatusGlobalFinished, + Context: undoLogContent, + } + err = m.InsertUndoLogWithSqlConn(ctx, record, conn) + if err != nil { + log.Errorf("insert undo log fail, err: %v", err) + return err + } return nil } diff --git a/pkg/datasource/sql/undo/builder/basic_undo_log_builder.go b/pkg/datasource/sql/undo/builder/basic_undo_log_builder.go index dcb19a88b..b5867d374 100644 --- a/pkg/datasource/sql/undo/builder/basic_undo_log_builder.go +++ b/pkg/datasource/sql/undo/builder/basic_undo_log_builder.go @@ -180,10 +180,10 @@ func (b *BasicUndoLogBuilder) buildRecordImages(rowsi driver.Rows, tableMetaData jdbcType := types.GetJDBCTypeByTypeName(columnMeta.ColumnTypeInfo.DatabaseTypeName()) columns = append(columns, types.ColumnImage{ - KeyType: keyType, - Name: name, - Type: int16(jdbcType), - Value: ss[i], + KeyType: keyType, + ColumnName: name, + Type: int16(jdbcType), + Value: ss[i], }) } rowImages = append(rowImages, types.RowImage{Columns: columns}) @@ -315,7 +315,7 @@ func (s BasicUndoLogBuilder) buildLockKey2(records *types.RecordImage, meta type for _, column := range row.Columns { var hasKeyColumn bool for _, key := range keys { - if column.Name == key { + if column.ColumnName == key { hasKeyColumn = true if pkSplitIndex > 0 { lockKeys.WriteString("_") diff --git a/pkg/datasource/sql/undo/builder/basic_undo_log_builder_test.go b/pkg/datasource/sql/undo/builder/basic_undo_log_builder_test.go index 9dfe1641c..edc99a6cf 100644 --- a/pkg/datasource/sql/undo/builder/basic_undo_log_builder_test.go +++ b/pkg/datasource/sql/undo/builder/basic_undo_log_builder_test.go @@ -59,8 +59,8 @@ func TestBuildLockKey(t *testing.T) { records := types.RecordImage{ TableName: "test_name", Rows: []types.RowImage{ - {Columns: []types.ColumnImage{{KeyType: types.IndexTypePrimaryKey, Name: "id", Value: 1}, {KeyType: types.IndexTypePrimaryKey, Name: "userId", Value: "one"}}}, - {Columns: []types.ColumnImage{{KeyType: types.IndexTypePrimaryKey, Name: "id", Value: 2}, {KeyType: types.IndexTypePrimaryKey, Name: "userId", Value: "two"}}}, + {Columns: []types.ColumnImage{{KeyType: types.IndexTypePrimaryKey, ColumnName: "id", Value: 1}, {KeyType: types.IndexTypePrimaryKey, ColumnName: "userId", Value: "one"}}}, + {Columns: []types.ColumnImage{{KeyType: types.IndexTypePrimaryKey, ColumnName: "id", Value: 2}, {KeyType: types.IndexTypePrimaryKey, ColumnName: "userId", Value: "two"}}}, }, } diff --git a/pkg/datasource/sql/undo/executor/executor.go b/pkg/datasource/sql/undo/executor/executor.go index 79e383226..cde5a3a82 100644 --- a/pkg/datasource/sql/undo/executor/executor.go +++ b/pkg/datasource/sql/undo/executor/executor.go @@ -23,15 +23,22 @@ import ( "github.com/seata/seata-go/pkg/datasource/sql/types" "github.com/seata/seata-go/pkg/datasource/sql/undo" + "github.com/seata/seata-go/pkg/util/log" ) var _ undo.UndoExecutor = (*BaseExecutor)(nil) +const ( + selectSQL = "SELECT * FROM %s WHERE %s FOR UPDATE" +) + type BaseExecutor struct { + sqlUndoLog undo.SQLUndoLog + undoImage *types.RecordImage } // ExecuteOn -func (b *BaseExecutor) ExecuteOn(ctx context.Context, dbType types.DBType, sqlUndoLog undo.SQLUndoLog, conn *sql.Conn) error { +func (b *BaseExecutor) ExecuteOn(ctx context.Context, dbType types.DBType, conn *sql.Conn) error { // check data if valid return nil } @@ -40,3 +47,34 @@ func (b *BaseExecutor) ExecuteOn(ctx context.Context, dbType types.DBType, sqlUn func (b *BaseExecutor) UndoPrepare(undoPST *sql.Stmt, undoValues []types.ColumnImage, pkValueList []types.ColumnImage) { } + +func (b *BaseExecutor) dataValidationAndGoOn(conn *sql.Conn) (bool, error) { + beforeImage := b.sqlUndoLog.BeforeImage + afterImage := b.sqlUndoLog.AfterImage + + equal, err := IsRecordsEquals(beforeImage, afterImage) + if err != nil { + return false, err + } + if equal { + log.Infof("Stop rollback because there is no data change between the before data snapshot and the after data snapshot.") + return false, nil + } + + // todo compare from current db data to old image data + + return true, nil +} + +// todo +//func (b *BaseExecutor) queryCurrentRecords(conn *sql.Conn) *types.RecordImage { +// tableMeta := b.undoImage.TableMeta +// pkNameList := tableMeta.GetPrimaryKeyOnlyName() +// +// b.undoImage.Rows +// +//} +// +//func (b *BaseExecutor) parsePkValues(rows []types.RowImage, pkNameList []string) { +// +//} diff --git a/pkg/datasource/sql/undo/executor/mysql_undo_delete_executor.go b/pkg/datasource/sql/undo/executor/mysql_undo_delete_executor.go index c24d5382d..856e400b8 100644 --- a/pkg/datasource/sql/undo/executor/mysql_undo_delete_executor.go +++ b/pkg/datasource/sql/undo/executor/mysql_undo_delete_executor.go @@ -29,26 +29,26 @@ import ( "github.com/seata/seata-go/pkg/datasource/sql/undo" ) -type MySQLUndoDeleteExecutor struct { +type mySQLUndoDeleteExecutor struct { BaseExecutor *BaseExecutor + sqlUndoLog undo.SQLUndoLog } -// NewMySQLUndoDeleteExecutor init -func NewMySQLUndoDeleteExecutor() *MySQLUndoUpdateExecutor { - return &MySQLUndoUpdateExecutor{} +// newMySQLUndoDeleteExecutor init +func newMySQLUndoDeleteExecutor(sqlUndoLog undo.SQLUndoLog) *mySQLUndoUpdateExecutor { + return &mySQLUndoUpdateExecutor{sqlUndoLog: sqlUndoLog} } -func (m *MySQLUndoDeleteExecutor) ExecuteOn(ctx context.Context, dbType types.DBType, - sqlUndoLog undo.SQLUndoLog, conn driver.Conn) error { +func (m *mySQLUndoDeleteExecutor) ExecuteOn(ctx context.Context, dbType types.DBType, conn driver.Conn) error { - undoSql, _ := m.buildUndoSQL(dbType, sqlUndoLog) + undoSql, _ := m.buildUndoSQL(dbType) stmt, err := conn.Prepare(undoSql) if err != nil { return err } - beforeImage := sqlUndoLog.BeforeImage + beforeImage := m.sqlUndoLog.BeforeImage for _, row := range beforeImage.Rows { undoValues := make([]interface{}, 0) @@ -75,8 +75,8 @@ func (m *MySQLUndoDeleteExecutor) ExecuteOn(ctx context.Context, dbType types.DB return nil } -func (m *MySQLUndoDeleteExecutor) buildUndoSQL(dbType types.DBType, sqlUndoLog undo.SQLUndoLog) (string, error) { - beforeImage := sqlUndoLog.BeforeImage +func (m *mySQLUndoDeleteExecutor) buildUndoSQL(dbType types.DBType) (string, error) { + beforeImage := m.sqlUndoLog.BeforeImage rows := beforeImage.Rows if len(rows) == 0 { return "", errors.New("invalid undo log") @@ -97,7 +97,7 @@ func (m *MySQLUndoDeleteExecutor) buildUndoSQL(dbType types.DBType, sqlUndoLog u ) for key, _ := range fields { - insertColumnSlice = append(insertColumnSlice, AddEscape(fields[key].Name, dbType)) + insertColumnSlice = append(insertColumnSlice, AddEscape(fields[key].ColumnName, dbType)) insertValueSlice = append(insertValueSlice, "?") } @@ -106,5 +106,5 @@ func (m *MySQLUndoDeleteExecutor) buildUndoSQL(dbType types.DBType, sqlUndoLog u // InsertSqlTemplate INSERT INTO a (x, y, z, pk) VALUES (?, ?, ?, ?) insertSqlTemplate := "INSERT INTO %s (%s) VALUES (%s)" - return fmt.Sprintf(insertSqlTemplate, sqlUndoLog.TableName, insertColumns, insertValues), nil + return fmt.Sprintf(insertSqlTemplate, m.sqlUndoLog.TableName, insertColumns, insertValues), nil } diff --git a/pkg/datasource/sql/undo/executor/mysql_undo_executor_holder.go b/pkg/datasource/sql/undo/executor/mysql_undo_executor_holder.go index bb46092bf..1b0117260 100644 --- a/pkg/datasource/sql/undo/executor/mysql_undo_executor_holder.go +++ b/pkg/datasource/sql/undo/executor/mysql_undo_executor_holder.go @@ -30,15 +30,15 @@ func NewMySQLUndoExecutorHolder() undo.UndoExecutorHolder { // GetInsertExecutor get the mysql Insert UndoExecutor by sqlUndoLog func (m *MySQLUndoExecutorHolder) GetInsertExecutor(sqlUndoLog undo.SQLUndoLog) undo.UndoExecutor { - return NewMySQLUndoInsertExecutor() + return newMySQLUndoInsertExecutor(sqlUndoLog) } // GetUpdateExecutor get the mysql Update UndoExecutor by sqlUndoLog func (m *MySQLUndoExecutorHolder) GetUpdateExecutor(sqlUndoLog undo.SQLUndoLog) undo.UndoExecutor { - return NewMySQLUndoUpdateExecutor() + return newMySQLUndoUpdateExecutor(sqlUndoLog) } // GetDeleteExecutor get the mysql Delete UndoExecutor by sqlUndoLog func (m *MySQLUndoExecutorHolder) GetDeleteExecutor(sqlUndoLog undo.SQLUndoLog) undo.UndoExecutor { - return NewMySQLUndoDeleteExecutor() + return newMySQLUndoDeleteExecutor(sqlUndoLog) } diff --git a/pkg/datasource/sql/undo/executor/mysql_undo_insert_executor.go b/pkg/datasource/sql/undo/executor/mysql_undo_insert_executor.go index 3a9661f74..dd2378636 100644 --- a/pkg/datasource/sql/undo/executor/mysql_undo_insert_executor.go +++ b/pkg/datasource/sql/undo/executor/mysql_undo_insert_executor.go @@ -27,32 +27,32 @@ import ( "github.com/seata/seata-go/pkg/datasource/sql/undo" ) -type MySQLUndoInsertExecutor struct { +type mySQLUndoInsertExecutor struct { BaseExecutor *BaseExecutor + sqlUndoLog undo.SQLUndoLog } -// NewMySQLUndoInsertExecutor init -func NewMySQLUndoInsertExecutor() *MySQLUndoInsertExecutor { - return &MySQLUndoInsertExecutor{} +// newMySQLUndoInsertExecutor init +func newMySQLUndoInsertExecutor(sqlUndoLog undo.SQLUndoLog) *mySQLUndoInsertExecutor { + return &mySQLUndoInsertExecutor{sqlUndoLog: sqlUndoLog} } // ExecuteOn execute insert undo logic -func (m *MySQLUndoInsertExecutor) ExecuteOn(ctx context.Context, dbType types.DBType, - sqlUndoLog undo.SQLUndoLog, conn *sql.Conn) error { +func (m *mySQLUndoInsertExecutor) ExecuteOn(ctx context.Context, dbType types.DBType, conn *sql.Conn) error { - if err := m.BaseExecutor.ExecuteOn(ctx, dbType, sqlUndoLog, conn); err != nil { + if err := m.BaseExecutor.ExecuteOn(ctx, dbType, conn); err != nil { return err } // build delete sql - undoSql, _ := m.buildUndoSQL(dbType, sqlUndoLog) + undoSql, _ := m.buildUndoSQL(dbType) stmt, err := conn.PrepareContext(ctx, undoSql) if err != nil { return err } - afterImage := sqlUndoLog.AfterImage + afterImage := m.sqlUndoLog.AfterImage for _, row := range afterImage.Rows { pkValueList := make([]interface{}, 0) @@ -71,14 +71,14 @@ func (m *MySQLUndoInsertExecutor) ExecuteOn(ctx context.Context, dbType types.DB } // buildUndoSQL build insert undo log -func (m *MySQLUndoInsertExecutor) buildUndoSQL(dbType types.DBType, sqlUndoLog undo.SQLUndoLog) (string, error) { - afterImage := sqlUndoLog.AfterImage +func (m *mySQLUndoInsertExecutor) buildUndoSQL(dbType types.DBType) (string, error) { + afterImage := m.sqlUndoLog.AfterImage rows := afterImage.Rows if len(rows) == 0 { return "", errors.New("invalid undo log") } - str, err := m.generateDeleteSql(afterImage, rows, dbType, sqlUndoLog) + str, err := m.generateDeleteSql(afterImage, rows, dbType, m.sqlUndoLog) if err != nil { return "", err } @@ -87,7 +87,7 @@ func (m *MySQLUndoInsertExecutor) buildUndoSQL(dbType types.DBType, sqlUndoLog u } // generateDeleteSql generate delete sql -func (m *MySQLUndoInsertExecutor) generateDeleteSql( +func (m *mySQLUndoInsertExecutor) generateDeleteSql( image *types.RecordImage, rows []types.RowImage, dbType types.DBType, sqlUndoLog undo.SQLUndoLog) (string, error) { @@ -98,7 +98,7 @@ func (m *MySQLUndoInsertExecutor) generateDeleteSql( var pkList []string for key, _ := range colImages { - pkList = append(pkList, colImages[key].Name) + pkList = append(pkList, colImages[key].ColumnName) } whereSql := BuildWhereConditionByPKs(pkList, dbType) diff --git a/pkg/datasource/sql/undo/executor/mysql_undo_update_executor.go b/pkg/datasource/sql/undo/executor/mysql_undo_update_executor.go index 8630bcc53..86b22c74b 100644 --- a/pkg/datasource/sql/undo/executor/mysql_undo_update_executor.go +++ b/pkg/datasource/sql/undo/executor/mysql_undo_update_executor.go @@ -27,27 +27,35 @@ import ( "github.com/seata/seata-go/pkg/datasource/sql/undo" ) -type MySQLUndoUpdateExecutor struct { - BaseExecutor *BaseExecutor +type mySQLUndoUpdateExecutor struct { + baseExecutor *BaseExecutor + sqlUndoLog undo.SQLUndoLog } -// NewMySQLUndoUpdateExecutor init -func NewMySQLUndoUpdateExecutor() *MySQLUndoUpdateExecutor { - return &MySQLUndoUpdateExecutor{} +// newMySQLUndoUpdateExecutor init +func newMySQLUndoUpdateExecutor(sqlUndoLog undo.SQLUndoLog) *mySQLUndoUpdateExecutor { + return &mySQLUndoUpdateExecutor{ + sqlUndoLog: sqlUndoLog, + baseExecutor: &BaseExecutor{sqlUndoLog: sqlUndoLog}, + } } -func (m *MySQLUndoUpdateExecutor) ExecuteOn(ctx context.Context, dbType types.DBType, sqlUndoLog undo.SQLUndoLog, conn *sql.Conn) error { - - //m.BaseExecutor.ExecuteOn(ctx, dbType, sqlUndoLog, conn) - undoSql, _ := m.buildUndoSQL(dbType, sqlUndoLog) +func (m *mySQLUndoUpdateExecutor) ExecuteOn(ctx context.Context, dbType types.DBType, conn *sql.Conn) error { + ok, err := m.baseExecutor.dataValidationAndGoOn(conn) + if err != nil { + return err + } + if !ok { + return nil + } + undoSql, _ := m.buildUndoSQL(dbType) stmt, err := conn.PrepareContext(ctx, undoSql) if err != nil { return err } - beforeImage := sqlUndoLog.BeforeImage - + beforeImage := m.sqlUndoLog.BeforeImage for _, row := range beforeImage.Rows { undoValues := make([]interface{}, 0) pkList, err := GetOrderedPkList(beforeImage, row, dbType) @@ -56,7 +64,7 @@ func (m *MySQLUndoUpdateExecutor) ExecuteOn(ctx context.Context, dbType types.DB } for _, col := range row.Columns { - if col.KeyType != types.PrimaryKey.Number() { + if col.KeyType != types.IndexTypePrimaryKey { undoValues = append(undoValues, col.Value) } } @@ -74,8 +82,8 @@ func (m *MySQLUndoUpdateExecutor) ExecuteOn(ctx context.Context, dbType types.DB } // BuildUndoSQL -func (m *MySQLUndoUpdateExecutor) buildUndoSQL(dbType types.DBType, sqlUndoLog undo.SQLUndoLog) (string, error) { - beforeImage := sqlUndoLog.BeforeImage +func (m *mySQLUndoUpdateExecutor) buildUndoSQL(dbType types.DBType) (string, error) { + beforeImage := m.sqlUndoLog.BeforeImage rows := beforeImage.Rows row := rows[0] @@ -86,7 +94,7 @@ func (m *MySQLUndoUpdateExecutor) buildUndoSQL(dbType types.DBType, sqlUndoLog u nonPkFields := row.NonPrimaryKeys(row.Columns) for key, _ := range nonPkFields { - updateColumnSlice = append(updateColumnSlice, AddEscape(nonPkFields[key].Name, dbType)+" = ? ") + updateColumnSlice = append(updateColumnSlice, AddEscape(nonPkFields[key].ColumnName, dbType)+" = ? ") } updateColumns = strings.Join(updateColumnSlice, ", ") @@ -96,12 +104,12 @@ func (m *MySQLUndoUpdateExecutor) buildUndoSQL(dbType types.DBType, sqlUndoLog u } for key, _ := range pkList { - pkNameList = append(pkNameList, pkList[key].Name) + pkNameList = append(pkNameList, pkList[key].ColumnName) } whereSql := BuildWhereConditionByPKs(pkNameList, dbType) // UpdateSqlTemplate UPDATE a SET x = ?, y = ?, z = ? WHERE pk1 in (?) pk2 in (?) updateSqlTemplate := "UPDATE %s SET %s WHERE %s " - return fmt.Sprintf(updateSqlTemplate, sqlUndoLog.TableName, updateColumns, whereSql), nil + return fmt.Sprintf(updateSqlTemplate, m.sqlUndoLog.TableName, updateColumns, whereSql), nil } diff --git a/pkg/datasource/sql/undo/executor/sql.go b/pkg/datasource/sql/undo/executor/sql.go index f6931b2a0..25ffabe19 100644 --- a/pkg/datasource/sql/undo/executor/sql.go +++ b/pkg/datasource/sql/undo/executor/sql.go @@ -186,31 +186,6 @@ func DataValidationAndGoOn(sqlUndoLog undo.SQLUndoLog, conn *sql.Conn) bool { return true } -// IsRecordsEquals check before record and after record if equal -func IsRecordsEquals(before types.RecordImages, after types.RecordImages) bool { - lenBefore, lenAfter := len(before), len(after) - if lenBefore == 0 && lenAfter == 0 { - return true - } - - if lenBefore > 0 && lenAfter == 0 || lenBefore == 0 && lenAfter > 0 { - return false - } - - for key, _ := range before { - if strings.EqualFold(before[key].TableName, after[key].TableName) && - len(before[key].Rows) == len(after[key].Rows) { - // when image is EmptyTableRecords, getTableMeta will throw an exception - if len(before[key].Rows) == 0 { - return true - } - - } - } - - return true -} - func GetOrderedPkList(image *types.RecordImage, row types.RowImage, dbType types.DBType) ([]types.ColumnImage, error) { pkColumnNameListByOrder := image.TableMeta.GetPrimaryKeyOnlyName() @@ -219,13 +194,13 @@ func GetOrderedPkList(image *types.RecordImage, row types.RowImage, dbType types pkFields := make([]types.ColumnImage, 0) for _, column := range row.PrimaryKeys(row.Columns) { - column.Name = DelEscape(column.Name, dbType) + column.ColumnName = DelEscape(column.ColumnName, dbType) pkColumnNameListNoOrder = append(pkColumnNameListNoOrder, column) } for _, pkName := range pkColumnNameListByOrder { for _, col := range pkColumnNameListNoOrder { - if strings.Index(col.Name, pkName) > -1 { + if strings.Index(col.ColumnName, pkName) > -1 { pkFields = append(pkFields, col) } } diff --git a/pkg/datasource/sql/undo/executor/utils.go b/pkg/datasource/sql/undo/executor/utils.go new file mode 100644 index 000000000..b1e24f1fe --- /dev/null +++ b/pkg/datasource/sql/undo/executor/utils.go @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package executor + +import ( + "fmt" + "strings" + + "github.com/seata/seata-go/pkg/datasource/sql/types" +) + +// IsRecordsEquals check before record and after record if equal +func IsRecordsEquals(beforeImage *types.RecordImage, afterImage *types.RecordImage) (bool, error) { + if beforeImage == nil && afterImage == nil { + return true, nil + } + if beforeImage == nil || afterImage == nil { + return false, nil + } + + if !strings.EqualFold(beforeImage.TableName, afterImage.TableName) || len(beforeImage.Rows) != len(afterImage.Rows) { + return false, nil + } + if len(beforeImage.Rows) == 0 { + return true, nil + } + + return compareRows(beforeImage.TableMeta, beforeImage.Rows, afterImage.Rows) +} + +func compareRows(tableMeta types.TableMeta, oldRows []types.RowImage, newRows []types.RowImage) (bool, error) { + oldRowMap := rowListToMap(oldRows, tableMeta.GetPrimaryKeyOnlyName()) + newRowMap := rowListToMap(newRows, tableMeta.GetPrimaryKeyOnlyName()) + + for key, oldRow := range oldRowMap { + newRow := newRowMap[key] + if newRow == nil { + return false, fmt.Errorf("compare row failed, rowKey %s, reason [newField is null]", key) + } + for fieldName, oldValue := range oldRow { + newValue := newRow[fieldName] + if newValue == nil { + return false, fmt.Errorf("compare row failed, rowKey %s, fieldName %s, reason [newField is null]", key, fieldName) + } + if newValue != oldValue { + return false, nil + } + } + } + return true, nil +} + +func rowListToMap(rows []types.RowImage, primaryKeyList []string) map[string]map[string]interface{} { + rowMap := make(map[string]map[string]interface{}, 0) + for _, row := range rows { + fieldMap := make(map[string]interface{}, 0) + var rowKey string + var firstUnderline bool + + for _, column := range row.Columns { + for i, key := range primaryKeyList { + if column.ColumnName == key { + if firstUnderline && i > 0 { + rowKey += "_##$$_" + } + // todo make value more accurate + rowKey = fmt.Sprintf("%v%v", rowKey, column.Value) + firstUnderline = true + } + } + fieldMap[strings.ToUpper(column.ColumnName)] = column.Value + } + rowMap[rowKey] = fieldMap + } + return rowMap +} diff --git a/pkg/datasource/sql/undo/mysql/undo.go b/pkg/datasource/sql/undo/mysql/undo.go index 67b8942a6..1a464549d 100644 --- a/pkg/datasource/sql/undo/mysql/undo.go +++ b/pkg/datasource/sql/undo/mysql/undo.go @@ -42,7 +42,7 @@ func (m *undoLogManager) Init() { } // DeleteUndoLog -func (m *undoLogManager) DeleteUndoLog(ctx context.Context, xid string, branchID int64, conn driver.Conn) error { +func (m *undoLogManager) DeleteUndoLog(ctx context.Context, xid string, branchID int64, conn *sql.Conn) error { return m.Base.DeleteUndoLog(ctx, xid, branchID, conn) } diff --git a/pkg/datasource/sql/undo/undo.go b/pkg/datasource/sql/undo/undo.go index e7cc08b86..420228105 100644 --- a/pkg/datasource/sql/undo/undo.go +++ b/pkg/datasource/sql/undo/undo.go @@ -66,7 +66,7 @@ func GetUndologBuilder(sqlType types.ExecutorType) UndoLogBuilder { type UndoLogManager interface { Init() // DeleteUndoLog - DeleteUndoLog(ctx context.Context, xid string, branchID int64, conn driver.Conn) error + DeleteUndoLog(ctx context.Context, xid string, branchID int64, conn *sql.Conn) error // BatchDeleteUndoLog BatchDeleteUndoLog(xid []string, branchID []int64, conn *sql.Conn) error //FlushUndoLog diff --git a/pkg/datasource/sql/undo/undo_executor.go b/pkg/datasource/sql/undo/undo_executor.go index 1728e6b0b..fea533efa 100644 --- a/pkg/datasource/sql/undo/undo_executor.go +++ b/pkg/datasource/sql/undo/undo_executor.go @@ -25,5 +25,5 @@ import ( ) type UndoExecutor interface { - ExecuteOn(ctx context.Context, dbType types.DBType, sqlUndoLog SQLUndoLog, conn *sql.Conn) error + ExecuteOn(ctx context.Context, dbType types.DBType, conn *sql.Conn) error } diff --git a/pkg/rm/tcc/fence/store/db/dao/store_api.go b/pkg/rm/tcc/fence/store/db/dao/store_api.go index 8dd2a37f9..e292c6719 100644 --- a/pkg/rm/tcc/fence/store/db/dao/store_api.go +++ b/pkg/rm/tcc/fence/store/db/dao/store_api.go @@ -63,7 +63,7 @@ type TCCFenceStore interface { // return the error msg DeleteTCCFenceDOByMdfDate(tx *sql.Tx, datetime time.Time) error - // SetLogTableName LogTable Name + // SetLogTableName LogTable ColumnName // param logTableName logTableName SetLogTableName(logTable string) } diff --git a/sample/at/basic/main.go b/sample/at/basic/main.go index 2c60f0d8d..ca2ce4280 100644 --- a/sample/at/basic/main.go +++ b/sample/at/basic/main.go @@ -60,13 +60,13 @@ func updateData(ctx context.Context) error { ret, err := db.ExecContext(ctx, sql, fmt.Sprintf("NewDescs-%d", time.Now().UnixMilli()), 1) if err != nil { fmt.Printf("update failed, err:%v\n", err) - return nil + return err } rows, err := ret.RowsAffected() if err != nil { fmt.Printf("update failed, err:%v\n", err) - return nil + return err } fmt.Printf("update success: %d.\n", rows) - return nil + return fmt.Errorf("test error") } From ec06d5df953765ac9428059d78ec04837451f773 Mon Sep 17 00:00:00 2001 From: Luky116Liuyuecai Date: Sun, 20 Nov 2022 18:56:31 +0800 Subject: [PATCH 3/3] fix err --- pkg/datasource/sql/undo/base/undo.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/datasource/sql/undo/base/undo.go b/pkg/datasource/sql/undo/base/undo.go index 3012527c4..da8fd9f98 100644 --- a/pkg/datasource/sql/undo/base/undo.go +++ b/pkg/datasource/sql/undo/base/undo.go @@ -344,6 +344,9 @@ func (m *BaseUndoLogManager) insertUndoLogWithGlobalFinished(ctx context.Context parseContext[SerializerKey] = "jackson" parseContext[CompressorTypeKey] = "NONE" undoLogContent, err := json.Marshal(parseContext) + if err != nil { + return err + } record := undo.UndologRecord{ BranchID: branchID,