Skip to content

Commit

Permalink
【XA】build transaction log ability (#666)
Browse files Browse the repository at this point in the history
* complete crud of tx_log & add ut

* reformat import
Optimize code to use "var" in for

* format import of trx_log_test.go

* format import of trx_log_test.go by imports-formatter

* reformat import of trx_log_test.go by imports-formatter
  • Loading branch information
RevSoy authored Apr 9, 2023
1 parent 41901cc commit a8c1c6a
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 27 deletions.
69 changes: 42 additions & 27 deletions pkg/runtime/transaction/trx_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

import (
"github.com/arana-db/arana/pkg/proto"
"github.com/arana-db/arana/pkg/runtime"
)

var (
Expand Down Expand Up @@ -59,6 +60,9 @@ CREATE TABLE IF NOT EXISTS __arana_trx_log
) ENGINE = InnoDB
CHARSET = utf8
`
insSql = "REPLACE INTO __arana_trx_log(trx_id, tenant, server_id, status, participant, start_time, update_time) VALUES (?,?,?,?,?,sysdate(),sysdate())"
delSql = "DELETE FROM __arana_trx_log WHERE trx_id = ?"
selectSql = "SELECT trx_id, tenant, server_id, status, participant, start_time, update_time FROM __arana_trx_log WHERE 1=1 %s ORDER BY update_time LIMIT ? OFFSET ?"
)

// TxLogManager Transaction log management
Expand All @@ -79,21 +83,15 @@ func (gm *TxLogManager) init() error {

// AddOrUpdateTxLog Add or update transaction log
func (gm *TxLogManager) AddOrUpdateTxLog(l TrxLog) error {
insSql := `
REPLACE INTO
__arana_trx_log(trx_id, tenant, server_id, status, participant, start_time, update_time)
VALUES
(?,?,?,?,?,sysdate(),sysdate())
`
participants, err := json.Marshal(l.Participants)
if err != nil {
return err
}
trxIdVal, _ := proto.NewValue(l.TrxID)
tenantVal, _ := proto.NewValue(l.Tenant)
serverIdVal, _ := proto.NewValue(l.ServerID)
stateVal, _ := proto.NewValue(l.State)
participantsVal, _ := proto.NewValue(participants)
stateVal, _ := proto.NewValue(int32(l.State))
participantsVal, _ := proto.NewValue(string(participants))
args := []proto.Value{
trxIdVal,
tenantVal,
Expand All @@ -107,7 +105,6 @@ VALUES

// DeleteTxLog Delete transaction log
func (gm *TxLogManager) DeleteTxLog(l TrxLog) error {
delSql := "DELETE FROM __arana_trx_log WHERE trx_id = ?"
trxIdVal, _ := proto.NewValue(l.TrxID)
args := []proto.Value{
trxIdVal,
Expand All @@ -118,18 +115,16 @@ func (gm *TxLogManager) DeleteTxLog(l TrxLog) error {

// ScanTxLog Scanning transaction
func (gm *TxLogManager) ScanTxLog(pageNo, pageSize uint64, conditions []Condition) (uint32, []TrxLog, error) {
selectSql := `
SELECT
trx_id, tenant, server_id, status, participant, start_time, update_time
FROM
__arana_trx_log
WHERE
1=1 %s LIMIT ? OFFSET ? ORDER BY update_time
`

var (
whereBuilder []string
args []proto.Value
logs []TrxLog
num uint32
dest []proto.Value
log TrxLog
participants []TrxParticipant
serverId int64
state int64
)

for i := range conditions {
Expand All @@ -138,7 +133,6 @@ WHERE
return 0, nil, fmt.Errorf("ScanTxLog filter attribute=%s not allowed", condition.FiledName)
}
whereBuilder = append(whereBuilder, fmt.Sprintf("%s %s ?", condition.FiledName, condition.Operation))

val, _ := proto.NewValue(condition.Value)
args = append(args, val)
}
Expand All @@ -147,17 +141,38 @@ WHERE
offset := proto.NewValueUint64((pageNo - 1) * pageSize)

args = append(args, limit, offset)

selectSql = fmt.Sprintf(selectSql, strings.Join(whereBuilder, " "))

_, _, err := gm.sysDB.Call(context.Background(), selectSql, args...)
conditionSelectSql := fmt.Sprintf(selectSql, strings.Join(whereBuilder, " "))
rows, _, err := gm.sysDB.Call(context.Background(), conditionSelectSql, args...)
if err != nil {
return 0, nil, err
}

// TODO convert sql.Rows to []TrxLog

return 0, nil, nil
dataset, _ := rows.Dataset()
for {
row, err := dataset.Next()
if err != nil {
return 0, nil, err
}
if row == nil {
break
}
if err := row.Scan(dest[:]); err != nil {
return 0, nil, err
}
log.TrxID = dest[0].String()
log.Tenant = dest[1].String()
serverId, _ = dest[2].Int64()
log.ServerID = int32(serverId)
state, _ = dest[3].Int64()
log.State = runtime.TxState(int32(state))

if err := json.Unmarshal([]byte(dest[4].String()), &participants); err != nil {
return 0, nil, err
}
log.Participants = participants
logs = append(logs, log)
num++
}
return num, logs, nil
}

// runCleanTxLogTask execute the transaction log cleanup action, and clean up the __arana_tx_log secondary
Expand Down
98 changes: 98 additions & 0 deletions pkg/runtime/transaction/trx_log_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package transaction

import (
"context"
"encoding/json"
"testing"
)

import (
"github.com/golang/mock/gomock"

"github.com/stretchr/testify/assert"
)

import (
"github.com/arana-db/arana/pkg/proto"
"github.com/arana-db/arana/pkg/runtime"
"github.com/arana-db/arana/testdata"
)

func TestDeleteTxLog(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockDB := testdata.NewMockDB(ctrl)
txLogManager := &TxLogManager{
sysDB: mockDB,
}
testTrxLog := TrxLog{
TrxID: "test_delete_id",
ServerID: 1,
State: runtime.TrxActive,
Participants: []TrxParticipant{{NodeID: "1", RemoteAddr: "127.0.0.1", Schema: "schema"}},
Tenant: "test_tenant",
}
trxIdVal, _ := proto.NewValue("test_delete_id")
mockDB.EXPECT().Call(
context.Background(),
"DELETE FROM __arana_trx_log WHERE trx_id = ?",
gomock.Eq([]proto.Value{trxIdVal}),
).Return(nil, uint16(0), nil).Times(1)
err := txLogManager.DeleteTxLog(testTrxLog)
assert.NoError(t, err)
}

func TestAddOrUpdateTxLog(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockDB := testdata.NewMockDB(ctrl)
txLogManager := &TxLogManager{
sysDB: mockDB,
}
testTrxLog := TrxLog{
TrxID: "test_add_or_update_id",
ServerID: 1,
State: runtime.TrxActive,
Participants: []TrxParticipant{{NodeID: "1", RemoteAddr: "127.0.0.1", Schema: "schema"}},
Tenant: "test_tenant",
}
participants, err := json.Marshal(testTrxLog.Participants)
assert.NoError(t, err)
trxIdVal, _ := proto.NewValue(testTrxLog.TrxID)
tenantVal, _ := proto.NewValue(testTrxLog.Tenant)
serverIdVal, _ := proto.NewValue(testTrxLog.ServerID)
stateVal, _ := proto.NewValue(int32(testTrxLog.State))
participantsVal, _ := proto.NewValue(string(participants))

args := []proto.Value{
trxIdVal,
tenantVal,
serverIdVal,
stateVal,
participantsVal,
}
mockDB.EXPECT().Call(
context.Background(),
"REPLACE INTO __arana_trx_log(trx_id, tenant, server_id, status, participant, start_time, update_time) VALUES (?,?,?,?,?,sysdate(),sysdate())",
args,
).Return(nil, uint16(0), nil).Times(1)
err = txLogManager.AddOrUpdateTxLog(testTrxLog)
assert.NoError(t, err)
}

0 comments on commit a8c1c6a

Please sign in to comment.