diff --git a/go.mod b/go.mod index 543679c89..306306683 100644 --- a/go.mod +++ b/go.mod @@ -66,7 +66,7 @@ require ( github.com/go-playground/locales v0.14.0 // indirect github.com/go-playground/universal-translator v0.18.0 // indirect github.com/go-resty/resty/v2 v2.7.0 // indirect - github.com/goccy/go-json v0.9.7 + github.com/goccy/go-json v0.9.7 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.2 // indirect diff --git a/pkg/datasource/sql/conn_at.go b/pkg/datasource/sql/conn_at.go index e444d9774..e8b0b4582 100644 --- a/pkg/datasource/sql/conn_at.go +++ b/pkg/datasource/sql/conn_at.go @@ -22,6 +22,8 @@ import ( gosql "database/sql" "database/sql/driver" + "github.com/seata/seata-go/pkg/util/log" + "github.com/seata/seata-go/pkg/datasource/sql/exec" "github.com/seata/seata-go/pkg/datasource/sql/types" "github.com/seata/seata-go/pkg/tm" @@ -165,10 +167,14 @@ func (c *ATConn) createNewTxOnExecIfNeed(ctx context.Context, f func() (types.Ex return nil, err } } - defer func() { - if tx != nil { - tx.Rollback() + recoverErr := recover() + if err != nil || recoverErr != nil { + log.Errorf("conn at rollback error:%v or recoverErr:%v", err, recoverErr) + if tx != nil { + rollbackErr := tx.Rollback() + log.Errorf("conn at rollback error:%v", rollbackErr) + } } }() diff --git a/pkg/datasource/sql/undo/base/undo.go b/pkg/datasource/sql/undo/base/undo.go index aee41c008..7b9b6b7c9 100644 --- a/pkg/datasource/sql/undo/base/undo.go +++ b/pkg/datasource/sql/undo/base/undo.go @@ -23,10 +23,11 @@ import ( "database/sql/driver" "encoding/json" "fmt" - "github.com/seata/seata-go/pkg/util/convert" "strconv" "strings" + "github.com/seata/seata-go/pkg/util/convert" + "github.com/arana-db/parser/mysql" "github.com/pkg/errors" diff --git a/pkg/rm/tcc/tcc_service.go b/pkg/rm/tcc/tcc_service.go index bdc7918b8..222c497f1 100644 --- a/pkg/rm/tcc/tcc_service.go +++ b/pkg/rm/tcc/tcc_service.go @@ -244,7 +244,7 @@ func obtainStructValueType(o interface{}) (bool, reflect.Value, reflect.Type) { func (t *TCCServiceProxy) GetTransactionInfo() tm.TransactionInfo { // todo replace with config return tm.TransactionInfo{ - TimeOut: 10000, + TimeOut: time.Second * 10, Name: t.GetActionName(), // Propagation, Propagation // LockRetryInternal, int64 diff --git a/pkg/rm/tcc/tcc_service_test.go b/pkg/rm/tcc/tcc_service_test.go index 495c65803..9c0383ff3 100644 --- a/pkg/rm/tcc/tcc_service_test.go +++ b/pkg/rm/tcc/tcc_service_test.go @@ -309,7 +309,7 @@ func TestTCCGetTransactionInfo(t1 *testing.T) { TwoPhaseAction: twoPhaseAction1, }, }, - tm.TransactionInfo{Name: "TwoPhaseDemoService", TimeOut: 10000, Propagation: 0, LockRetryInternal: 0, LockRetryTimes: 0}, + tm.TransactionInfo{Name: "TwoPhaseDemoService", TimeOut: time.Second * 10, Propagation: 0, LockRetryInternal: 0, LockRetryTimes: 0}, } t1.Run(tests.name, func(t1 *testing.T) { diff --git a/pkg/tm/transaction_executor.go b/pkg/tm/transaction_executor.go index da383f1fe..c5fe5a69e 100644 --- a/pkg/tm/transaction_executor.go +++ b/pkg/tm/transaction_executor.go @@ -28,8 +28,10 @@ import ( "github.com/seata/seata-go/pkg/util/log" ) +const DefaultTimeOut = time.Second * 30 + type TransactionInfo struct { - TimeOut int32 + TimeOut time.Duration Name string Propagation Propagation LockRetryInternal int64 @@ -48,12 +50,16 @@ func WithGlobalTx(ctx context.Context, ti *TransactionInfo, business CallbackWit return errors.New("global transaction name is required.") } - if ctx, re = begin(ctx, ti.Name); re != nil { + if ctx, re = begin(ctx, ti); re != nil { return } defer func() { // business maybe to throw panic, so need to recover it here. - re = commitOrRollback(ctx, recover() == nil && re == nil) + err := recover() + if err != nil { + log.Errorf("business callback panic:%v", err) + } + re = commitOrRollback(ctx, err == nil && re == nil) log.Infof("global transaction result %v", re) }() @@ -63,12 +69,18 @@ func WithGlobalTx(ctx context.Context, ti *TransactionInfo, business CallbackWit } // begin a global transaction, it will obtain a xid from tc in tcp call. -func begin(ctx context.Context, name string) (rc context.Context, re error) { +func begin(ctx context.Context, ti *TransactionInfo) (rc context.Context, re error) { + if ti == nil { + return nil, errors.New("transaction info is nil") + } + if ti.TimeOut == 0 { + ti.TimeOut = DefaultTimeOut + } if !IsSeataContext(ctx) { ctx = InitSeataContext(ctx) } - SetTxName(ctx, name) + SetTxName(ctx, ti.Name) if GetTransactionRole(ctx) == nil { SetTransactionRole(ctx, LAUNCHER) } @@ -93,8 +105,7 @@ func begin(ctx context.Context, name string) (rc context.Context, re error) { SetTxStatus(ctx, message.GlobalStatusUnKnown) } - // todo timeout should read from config - err := GetGlobalTransactionManager().Begin(ctx, tx, time.Second*30, name) + err := GetGlobalTransactionManager().Begin(ctx, tx, ti.TimeOut, ti.Name) if err != nil { re = fmt.Errorf("transactionTemplate: begin transaction failed, error %v", err) } diff --git a/pkg/tm/transaction_executor_test.go b/pkg/tm/transaction_executor_test.go index 86e3e03d3..28907dfa7 100644 --- a/pkg/tm/transaction_executor_test.go +++ b/pkg/tm/transaction_executor_test.go @@ -85,7 +85,9 @@ func TestTransactionExecutorBegin(t *testing.T) { assert.Equal(t, v.wantErrString, err.Error) } }(v) - begin(v.ctx, v.name) + begin(v.ctx, &TransactionInfo{ + Name: v.name, + }) }() // rest up stub diff --git a/sample/at/basic/main.go b/sample/at/basic/main.go index 210eff2b4..d517b9240 100644 --- a/sample/at/basic/main.go +++ b/sample/at/basic/main.go @@ -40,7 +40,8 @@ func main() { initService() selectData() tm.WithGlobalTx(context.Background(), &tm.TransactionInfo{ - Name: "ATSampleLocalGlobalTx", + Name: "ATSampleLocalGlobalTx", + TimeOut: time.Second * 30, }, updateData) <-make(chan struct{}) } diff --git a/sample/dockercompose/mysql/order.sql b/sample/dockercompose/mysql/order.sql index 726f45baa..d00e75e20 100644 --- a/sample/dockercompose/mysql/order.sql +++ b/sample/dockercompose/mysql/order.sql @@ -1,3 +1,18 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + CREATE database if NOT EXISTS `seata_client` default character set utf8mb4 collate utf8mb4_unicode_ci; USE `seata_client`;