diff --git a/pkg/runtime/transaction/trx_log.go b/pkg/runtime/transaction/trx_log.go index 89b1f297..02fcaa4c 100644 --- a/pkg/runtime/transaction/trx_log.go +++ b/pkg/runtime/transaction/trx_log.go @@ -26,6 +26,7 @@ import ( import ( "github.com/arana-db/arana/pkg/proto" + "github.com/arana-db/arana/pkg/runtime" ) var ( @@ -59,6 +60,9 @@ CREATE TABLE IF NOT EXISTS __arana_trx_log ) ENGINE = InnoDB CHARSET = utf8 ` + insSql = "REPLACE INTO __arana_trx_log(trx_id, tenant, server_id, status, participant, start_time, update_time) VALUES (?,?,?,?,?,sysdate(),sysdate())" + delSql = "DELETE FROM __arana_trx_log WHERE trx_id = ?" + selectSql = "SELECT trx_id, tenant, server_id, status, participant, start_time, update_time FROM __arana_trx_log WHERE 1=1 %s ORDER BY update_time LIMIT ? OFFSET ?" ) // TxLogManager Transaction log management @@ -79,12 +83,6 @@ func (gm *TxLogManager) init() error { // AddOrUpdateTxLog Add or update transaction log func (gm *TxLogManager) AddOrUpdateTxLog(l TrxLog) error { - insSql := ` -REPLACE INTO - __arana_trx_log(trx_id, tenant, server_id, status, participant, start_time, update_time) -VALUES - (?,?,?,?,?,sysdate(),sysdate()) -` participants, err := json.Marshal(l.Participants) if err != nil { return err @@ -92,8 +90,8 @@ VALUES trxIdVal, _ := proto.NewValue(l.TrxID) tenantVal, _ := proto.NewValue(l.Tenant) serverIdVal, _ := proto.NewValue(l.ServerID) - stateVal, _ := proto.NewValue(l.State) - participantsVal, _ := proto.NewValue(participants) + stateVal, _ := proto.NewValue(int32(l.State)) + participantsVal, _ := proto.NewValue(string(participants)) args := []proto.Value{ trxIdVal, tenantVal, @@ -107,7 +105,6 @@ VALUES // DeleteTxLog Delete transaction log func (gm *TxLogManager) DeleteTxLog(l TrxLog) error { - delSql := "DELETE FROM __arana_trx_log WHERE trx_id = ?" trxIdVal, _ := proto.NewValue(l.TrxID) args := []proto.Value{ trxIdVal, @@ -118,18 +115,16 @@ func (gm *TxLogManager) DeleteTxLog(l TrxLog) error { // ScanTxLog Scanning transaction func (gm *TxLogManager) ScanTxLog(pageNo, pageSize uint64, conditions []Condition) (uint32, []TrxLog, error) { - selectSql := ` -SELECT - trx_id, tenant, server_id, status, participant, start_time, update_time -FROM - __arana_trx_log -WHERE - 1=1 %s LIMIT ? OFFSET ? ORDER BY update_time -` - var ( whereBuilder []string args []proto.Value + logs []TrxLog + num uint32 + dest []proto.Value + log TrxLog + participants []TrxParticipant + serverId int64 + state int64 ) for i := range conditions { @@ -138,7 +133,6 @@ WHERE return 0, nil, fmt.Errorf("ScanTxLog filter attribute=%s not allowed", condition.FiledName) } whereBuilder = append(whereBuilder, fmt.Sprintf("%s %s ?", condition.FiledName, condition.Operation)) - val, _ := proto.NewValue(condition.Value) args = append(args, val) } @@ -147,17 +141,38 @@ WHERE offset := proto.NewValueUint64((pageNo - 1) * pageSize) args = append(args, limit, offset) - - selectSql = fmt.Sprintf(selectSql, strings.Join(whereBuilder, " ")) - - _, _, err := gm.sysDB.Call(context.Background(), selectSql, args...) + conditionSelectSql := fmt.Sprintf(selectSql, strings.Join(whereBuilder, " ")) + rows, _, err := gm.sysDB.Call(context.Background(), conditionSelectSql, args...) if err != nil { return 0, nil, err } - - // TODO convert sql.Rows to []TrxLog - - return 0, nil, nil + dataset, _ := rows.Dataset() + for { + row, err := dataset.Next() + if err != nil { + return 0, nil, err + } + if row == nil { + break + } + if err := row.Scan(dest[:]); err != nil { + return 0, nil, err + } + log.TrxID = dest[0].String() + log.Tenant = dest[1].String() + serverId, _ = dest[2].Int64() + log.ServerID = int32(serverId) + state, _ = dest[3].Int64() + log.State = runtime.TxState(int32(state)) + + if err := json.Unmarshal([]byte(dest[4].String()), &participants); err != nil { + return 0, nil, err + } + log.Participants = participants + logs = append(logs, log) + num++ + } + return num, logs, nil } // runCleanTxLogTask execute the transaction log cleanup action, and clean up the __arana_tx_log secondary diff --git a/pkg/runtime/transaction/trx_log_test.go b/pkg/runtime/transaction/trx_log_test.go new file mode 100644 index 00000000..9bfdf686 --- /dev/null +++ b/pkg/runtime/transaction/trx_log_test.go @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package transaction + +import ( + "context" + "encoding/json" + "testing" +) + +import ( + "github.com/golang/mock/gomock" + + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/arana-db/arana/pkg/proto" + "github.com/arana-db/arana/pkg/runtime" + "github.com/arana-db/arana/testdata" +) + +func TestDeleteTxLog(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockDB := testdata.NewMockDB(ctrl) + txLogManager := &TxLogManager{ + sysDB: mockDB, + } + testTrxLog := TrxLog{ + TrxID: "test_delete_id", + ServerID: 1, + State: runtime.TrxActive, + Participants: []TrxParticipant{{NodeID: "1", RemoteAddr: "127.0.0.1", Schema: "schema"}}, + Tenant: "test_tenant", + } + trxIdVal, _ := proto.NewValue("test_delete_id") + mockDB.EXPECT().Call( + context.Background(), + "DELETE FROM __arana_trx_log WHERE trx_id = ?", + gomock.Eq([]proto.Value{trxIdVal}), + ).Return(nil, uint16(0), nil).Times(1) + err := txLogManager.DeleteTxLog(testTrxLog) + assert.NoError(t, err) +} + +func TestAddOrUpdateTxLog(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockDB := testdata.NewMockDB(ctrl) + txLogManager := &TxLogManager{ + sysDB: mockDB, + } + testTrxLog := TrxLog{ + TrxID: "test_add_or_update_id", + ServerID: 1, + State: runtime.TrxActive, + Participants: []TrxParticipant{{NodeID: "1", RemoteAddr: "127.0.0.1", Schema: "schema"}}, + Tenant: "test_tenant", + } + participants, err := json.Marshal(testTrxLog.Participants) + assert.NoError(t, err) + trxIdVal, _ := proto.NewValue(testTrxLog.TrxID) + tenantVal, _ := proto.NewValue(testTrxLog.Tenant) + serverIdVal, _ := proto.NewValue(testTrxLog.ServerID) + stateVal, _ := proto.NewValue(int32(testTrxLog.State)) + participantsVal, _ := proto.NewValue(string(participants)) + + args := []proto.Value{ + trxIdVal, + tenantVal, + serverIdVal, + stateVal, + participantsVal, + } + mockDB.EXPECT().Call( + context.Background(), + "REPLACE INTO __arana_trx_log(trx_id, tenant, server_id, status, participant, start_time, update_time) VALUES (?,?,?,?,?,sysdate(),sysdate())", + args, + ).Return(nil, uint16(0), nil).Times(1) + err = txLogManager.AddOrUpdateTxLog(testTrxLog) + assert.NoError(t, err) +}