Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature add undo log manager delete #240

Merged
merged 8 commits into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ require (
github.com/agiledragon/gomonkey v2.0.2+incompatible
github.com/apache/dubbo-getty v1.4.8
github.com/dubbogo/gost v1.12.5
github.com/golang/mock v1.6.0
github.com/go-sql-driver/mysql v1.6.0
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/pingcap/tidb v1.1.0-beta.0.20211124132551-4a1b2e9fe5b5
Expand Down
2 changes: 1 addition & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1848,4 +1848,4 @@ sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67/go.m
vimagination.zapto.org/byteio v0.0.0-20200222190125-d27cba0f0b10 h1:pxt6fVJP67Hxo1qk8JalUghLlk3abYByl+3e0JYfUlE=
vimagination.zapto.org/byteio v0.0.0-20200222190125-d27cba0f0b10/go.mod h1:fl9OF22g6MTKgvHA1hqMXe/L7+ULWofVTwbC9loGu7A=
vimagination.zapto.org/memio v0.0.0-20200222190306-588ebc67b97d h1:Mp6WiHHuiwHaknxTdxJ8pvC9/B4pOgW1PamKGexG7Fs=
vimagination.zapto.org/memio v0.0.0-20200222190306-588ebc67b97d/go.mod h1:zHGDKp2tyvF4IAfLti4pKYqCJucXYmmKMb3UMrCHK/4=
vimagination.zapto.org/memio v0.0.0-20200222190306-588ebc67b97d/go.mod h1:zHGDKp2tyvF4IAfLti4pKYqCJucXYmmKMb3UMrCHK/4=
41 changes: 41 additions & 0 deletions pkg/common/util/slice_to_str.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package util

import (
"errors"
"strconv"
"strings"
)

// Int64Slice2Str
func Int64Slice2Str(values interface{}, sep string) (string, error) {
v, ok := values.([]int64)
if !ok {
return "", errors.New("param type is fault")
}

var valuesText []string

for i := range v {
text := strconv.FormatInt(v[i], 10)
valuesText = append(valuesText, text)
}

return strings.Join(valuesText, sep), nil
}
44 changes: 44 additions & 0 deletions pkg/constant/client_table_columns_name.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package constant

AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
const (
// UndoLogId The constant undo_log column name xid, this field is not use in mysql
UndoLogId string = "id"

// UndoLogXid The constant undo_log column name xid
UndoLogXid = "xid"

// UndoLogBranchXid The constant undo_log column name branch_id
UndoLogBranchXid = "branch_id"

// UndoLogContext The constant undo_log column name context
UndoLogContext = "context"

// UndoLogRollBackInfo The constant undo_log column name rollback_info
UndoLogRollBackInfo = "rollback_info"

// UndoLogLogStatus The constant undo_log column name log_status
UndoLogLogStatus = "log_status"

// UndoLogLogCreated The constant undo_log column name log_created
UndoLogLogCreated = "log_created"

// UndoLogLogModified The constant undo_log column name log_modified
UndoLogLogModified = "log_modified"
)
26 changes: 26 additions & 0 deletions pkg/constant/undo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package constant

const (
DeleteFrom = "DELETE FROM "
DefaultTransactionUndoLogTable = " undo_log "
// UndoLogTableName Todo get from config
UndoLogTableName = DefaultTransactionUndoLogTable
DeleteUndoLogSql = DeleteFrom + UndoLogTableName + " WHERE " + UndoLogBranchXid + " = ? AND " + UndoLogXid + " = ?"
)
5 changes: 3 additions & 2 deletions pkg/datasource/sql/at.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (
"sync"
"time"

"github.com/seata/seata-go/pkg/datasource/sql/undo"

"github.com/seata/seata-go/pkg/datasource/sql/datasource"
"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/rm"
Expand Down Expand Up @@ -242,7 +243,7 @@ func (w *asyncATWorker) dealWithGroupedContexts(resID string, phaseCtxs []phaseT

for i := range phaseCtxs {
phaseCtx := phaseCtxs[i]
if err := undoMgr.DeleteUndoLogs([]string{phaseCtx.Xid}, []int64{phaseCtx.BranchID}, conn); err != nil {
if err := undoMgr.BatchDeleteUndoLog([]string{phaseCtx.Xid}, []int64{phaseCtx.BranchID}, conn); err != nil {
w.commitQueue <- phaseCtx
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/datasource/sql/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (
"context"
gosql "database/sql"

"github.com/seata/seata-go/pkg/datasource/sql/undo"

"github.com/seata/seata-go/pkg/datasource/sql/datasource"
"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/protocol/branch"
)

Expand Down
3 changes: 2 additions & 1 deletion pkg/datasource/sql/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
"context"
"database/sql/driver"

"github.com/seata/seata-go/pkg/datasource/sql/undo"

"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/datasource/sql/datasource"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"

"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
)

const REPORT_RETRY_COUNT = 5
Expand Down
97 changes: 94 additions & 3 deletions pkg/datasource/sql/undo/base/undo.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,24 @@
package base

import (
"context"
"database/sql"
"database/sql/driver"
"strings"

"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/common/util"
"github.com/seata/seata-go/pkg/constant"
"github.com/seata/seata-go/pkg/datasource/sql/undo"

"github.com/seata/seata-go/pkg/datasource/sql/types"
)

var _ undo.UndoLogManager = (*BaseUndoLogManager)(nil)

var ErrorDeleteUndoLogParamsFault = errors.New("xid or branch_id can't nil")

// BaseUndoLogManager
type BaseUndoLogManager struct{}

Expand All @@ -39,8 +48,52 @@ func (m *BaseUndoLogManager) InsertUndoLog(l []undo.BranchUndoLog, tx driver.Tx)
return nil
}

// DeleteUndoLog
func (m *BaseUndoLogManager) DeleteUndoLogs(xid []string, branchID []int64, conn *sql.Conn) error {
// DeleteUndoLog exec delete single undo log operate
func (m *BaseUndoLogManager) DeleteUndoLog(ctx context.Context, xid string, branchID int64, conn *sql.Conn) error {
stmt, err := conn.PrepareContext(ctx, constant.DeleteUndoLogSql)
if err != nil {
log.Errorf("[DeleteUndoLog] prepare sql fail, err: %v", err)
return err
}

if _, err = stmt.ExecContext(ctx, branchID, xid); err != nil {
log.Errorf("[DeleteUndoLog] exec delete undo log fail, err: %v", err)
return err
}

return nil
}

// BatchDeleteUndoLog exec delete undo log operate
func (m *BaseUndoLogManager) BatchDeleteUndoLog(xid []string, branchID []int64, conn *sql.Conn) error {
// build delete undo log sql
batchDeleteSql, err := m.getBatchDeleteUndoLogSql(xid, branchID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getBatchDeleteUndoLogSql 是不是也可以把SQL写死?和DeleteUndoLogSql类似,保持统一

if err != nil {
log.Errorf("get undo sql log fail, err: %v", err)
return err
}

ctx := context.Background()

// prepare deal sql
stmt, err := conn.PrepareContext(ctx, batchDeleteSql)
if err != nil {
log.Errorf("prepare sql fail, err: %v", err)
return err
}

branchIDStr, err := util.Int64Slice2Str(branchID, ",")
if err != nil {
log.Errorf("slice to string transfer fail, err: %v", err)
return err
}

// exec sql stmt
if _, err = stmt.ExecContext(ctx, branchIDStr, strings.Join(xid, ",")); err != nil {
log.Errorf("exec delete undo log fail, err: %v", err)
return err
}

return nil
}

Expand All @@ -58,3 +111,41 @@ func (m *BaseUndoLogManager) RunUndo(xid string, branchID int64, conn *sql.Conn)
func (m *BaseUndoLogManager) DBType() types.DBType {
panic("implement me")
}

// getBatchDeleteUndoLogSql build batch delete undo log
func (m *BaseUndoLogManager) getBatchDeleteUndoLogSql(xid []string, branchID []int64) (string, error) {
if len(xid) == 0 || len(branchID) == 0 {
return "", ErrorDeleteUndoLogParamsFault
}

var undoLogDeleteSql strings.Builder
undoLogDeleteSql.WriteString(constant.DeleteFrom)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里直接把SQL字符串写死成常量,感觉效率会高一点?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯,因为这是一个批量删除的方法,所以这里会根据参数数量动态拼接 "?"占位符,就可能没法整个SQL定义为一个常量

undoLogDeleteSql.WriteString(constant.UndoLogTableName)
undoLogDeleteSql.WriteString(" WHERE ")
undoLogDeleteSql.WriteString(constant.UndoLogBranchXid)
undoLogDeleteSql.WriteString(" IN ")
m.appendInParam(len(branchID), &undoLogDeleteSql)
undoLogDeleteSql.WriteString(" AND ")
undoLogDeleteSql.WriteString(constant.UndoLogXid)
undoLogDeleteSql.WriteString(" IN ")
m.appendInParam(len(xid), &undoLogDeleteSql)

return undoLogDeleteSql.String(), nil
}

// appendInParam build in param
func (m *BaseUndoLogManager) appendInParam(size int, str *strings.Builder) {
if size <= 0 {
return
}

str.WriteString(" (")
for i := 0; i < size; i++ {
str.WriteString("?")
if i < size-1 {
str.WriteString(",")
}
}

str.WriteString(") ")
}
13 changes: 10 additions & 3 deletions pkg/datasource/sql/undo/mysql/undo.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package mysql

import (
"context"
"database/sql"
"database/sql/driver"

"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/datasource/sql/undo"

"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/datasource/sql/undo/base"
)

Expand All @@ -42,8 +44,13 @@ func (m *undoLogManager) InsertUndoLog(l []undo.BranchUndoLog, tx driver.Tx) err
}

// DeleteUndoLog
func (m *undoLogManager) DeleteUndoLogs(xid []string, branchID []int64, conn *sql.Conn) error {
return m.Base.DeleteUndoLogs(xid, branchID, conn)
func (m *undoLogManager) DeleteUndoLog(ctx context.Context, xid string, branchID int64, conn *sql.Conn) error {
return m.Base.DeleteUndoLog(ctx, xid, branchID, conn)
}

// BatchDeleteUndoLog
func (m *undoLogManager) BatchDeleteUndoLog(xid []string, branchID []int64, conn *sql.Conn) error {
return m.Base.BatchDeleteUndoLog(xid, branchID, conn)
}

// FlushUndoLog
Expand Down
5 changes: 4 additions & 1 deletion pkg/datasource/sql/undo/undo.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package undo

import (
"context"
"database/sql"
"database/sql/driver"
"errors"
Expand Down Expand Up @@ -51,7 +52,9 @@ type UndoLogManager interface {
// InsertUndoLog
InsertUndoLog(l []BranchUndoLog, tx driver.Tx) error
// DeleteUndoLog
DeleteUndoLogs(xid []string, branchID []int64, conn *sql.Conn) error
DeleteUndoLog(ctx context.Context, xid string, branchID int64, conn *sql.Conn) error
// BatchDeleteUndoLog
BatchDeleteUndoLog(xid []string, branchID []int64, conn *sql.Conn) error
// FlushUndoLog
FlushUndoLog(txCtx *types.TransactionContext, tx driver.Tx) error
// RunUndo
Expand Down
Loading