Skip to content

Commit

Permalink
reactor: select for update executor (apache#478)
Browse files Browse the repository at this point in the history
* reactor: select for update executor
  • Loading branch information
Charlie17Li authored and georgehao committed May 7, 2023
1 parent 07dcc15 commit fcc067d
Show file tree
Hide file tree
Showing 19 changed files with 588 additions and 38 deletions.
4 changes: 3 additions & 1 deletion pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"sync"

"github.com/seata/seata-go/pkg/datasource"

at "github.com/seata/seata-go/pkg/datasource/sql"
"github.com/seata/seata-go/pkg/datasource/sql/exec/config"
"github.com/seata/seata-go/pkg/integration"
"github.com/seata/seata-go/pkg/remoting/getty"
"github.com/seata/seata-go/pkg/remoting/processor/client"
Expand Down Expand Up @@ -75,9 +75,11 @@ func initRmClient(cfg *Config) {
log.Init()
initRemoting(cfg)
rm.InitRm(rm.RmConfig{
Config: cfg.ClientConfig.RmConfig,
ApplicationID: cfg.ApplicationID,
TxServiceGroup: cfg.TxServiceGroup,
})
config.Init(cfg.ClientConfig.RmConfig.LockConfig)
client.RegisterProcessor()
integration.Init()
tcc.InitTCC()
Expand Down
1 change: 1 addition & 0 deletions pkg/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/knadh/koanf/parsers/toml"
"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/rawbytes"

"github.com/seata/seata-go/pkg/datasource/sql"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/remoting/getty"
Expand Down
10 changes: 5 additions & 5 deletions pkg/client/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ func TestLoadPath(t *testing.T) {
assert.Equal(t, false, cfg.ClientConfig.RmConfig.SagaRetryPersistModeUpdate)
assert.Equal(t, -2147482648, cfg.ClientConfig.RmConfig.TccActionInterceptorOrder)
assert.Equal(t, "druid", cfg.ClientConfig.RmConfig.SqlParserType)
assert.Equal(t, 10, cfg.ClientConfig.RmConfig.LockConfig.RetryInterval)
assert.Equal(t, time.Second*30, cfg.ClientConfig.RmConfig.LockConfig.RetryTimes)
assert.Equal(t, 30*time.Second, cfg.ClientConfig.RmConfig.LockConfig.RetryInterval)
assert.Equal(t, 10, cfg.ClientConfig.RmConfig.LockConfig.RetryTimes)
assert.Equal(t, true, cfg.ClientConfig.RmConfig.LockConfig.RetryPolicyBranchRollbackOnConflict)

assert.NotNil(t, cfg.ClientConfig.UndoConfig)
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestLoadPath(t *testing.T) {
}

func TestLoadJson(t *testing.T) {
confJson := `{"enabled":false,"application-id":"application_test","tx-service-group":"default_tx_group","access-key":"test","secret-key":"test","enable-auto-data-source-proxy":false,"data-source-proxy-mode":"AT","client":{"rm":{"async-commit-buffer-limit":10000,"report-retry-count":5,"table-meta-check-enable":false,"report-success-enable":false,"saga-branch-register-enable":false,"saga-json-parser":"fastjson","saga-retry-persist-mode-update":false,"saga-compensate-persist-mode-update":false,"tcc-action-interceptor-order":-2147482648,"sql-parser-type":"druid","lock":{"retry-interval":10,"retry-times":"30s","retry-policy-branch-rollback-on-conflict":true}},"tm":{"commit-retry-count":5,"rollback-retry-count":5,"default-global-transaction-timeout":"60s","degrade-check":false,"degrade-check-period":2000,"degrade-check-allow-times":"10s","interceptor-order":-2147482648},"undo":{"data-validation":false,"log-serialization":"jackson222","only-care-update-columns":false,"log-table":"undo_log333","compress":{"enable":false,"type":"zip111","threshold":"128k"}}},"tcc":{"fence":{"log-table-name":"tcc_fence_log_test2","clean-period":80000000000}},"getty":{"reconnect-interval":1,"connection-num":10,"session":{"compress-encoding":true,"tcp-no-delay":false,"tcp-keep-alive":false,"keep-alive-period":"120s","tcp-r-buf-size":261120,"tcp-w-buf-size":32768,"tcp-read-timeout":"2s","tcp-write-timeout":"8s","wait-timeout":"2s","max-msg-len":261120,"session-name":"client_test","cron-period":"2s"}},"transport":{"shutdown":{"wait":"3s"},"type":"TCP","server":"NIO","heartbeat":true,"serialization":"seata","compressor":"none"," enable-tm-client-batch-send-request":false,"enable-rm-client-batch-send-request":true,"rpc-rm-request-timeout":"30s","rpc-tm-request-timeout":"30s"},"service":{"enable-degrade":true,"disable-global-transaction":true,"vgroup-mapping":{"default_tx_group":"default_test"},"grouplist":{"default":"127.0.0.1:8092"}}}`
confJson := `{"enabled":false,"application-id":"application_test","tx-service-group":"default_tx_group","access-key":"test","secret-key":"test","enable-auto-data-source-proxy":false,"data-source-proxy-mode":"AT","client":{"rm":{"async-commit-buffer-limit":10000,"report-retry-count":5,"table-meta-check-enable":false,"report-success-enable":false,"saga-branch-register-enable":false,"saga-json-parser":"fastjson","saga-retry-persist-mode-update":false,"saga-compensate-persist-mode-update":false,"tcc-action-interceptor-order":-2147482648,"sql-parser-type":"druid","lock":{"retry-interval":"30s","retry-times":10,"retry-policy-branch-rollback-on-conflict":true}},"tm":{"commit-retry-count":5,"rollback-retry-count":5,"default-global-transaction-timeout":"60s","degrade-check":false,"degrade-check-period":2000,"degrade-check-allow-times":"10s","interceptor-order":-2147482648},"undo":{"data-validation":false,"log-serialization":"jackson222","only-care-update-columns":false,"log-table":"undo_log333","compress":{"enable":false,"type":"zip111","threshold":"128k"}}},"tcc":{"fence":{"log-table-name":"tcc_fence_log_test2","clean-period":80000000000}},"getty":{"reconnect-interval":1,"connection-num":10,"session":{"compress-encoding":true,"tcp-no-delay":false,"tcp-keep-alive":false,"keep-alive-period":"120s","tcp-r-buf-size":261120,"tcp-w-buf-size":32768,"tcp-read-timeout":"2s","tcp-write-timeout":"8s","wait-timeout":"2s","max-msg-len":261120,"session-name":"client_test","cron-period":"2s"}},"transport":{"shutdown":{"wait":"3s"},"type":"TCP","server":"NIO","heartbeat":true,"serialization":"seata","compressor":"none"," enable-tm-client-batch-send-request":false,"enable-rm-client-batch-send-request":true,"rpc-rm-request-timeout":"30s","rpc-tm-request-timeout":"30s"},"service":{"enable-degrade":true,"disable-global-transaction":true,"vgroup-mapping":{"default_tx_group":"default_test"},"grouplist":{"default":"127.0.0.1:8092"}}}`
cfg := LoadJson([]byte(confJson))
assert.NotNil(t, cfg)
assert.Equal(t, false, cfg.Enabled)
Expand All @@ -138,8 +138,8 @@ func TestLoadJson(t *testing.T) {
assert.Equal(t, false, cfg.ClientConfig.RmConfig.SagaRetryPersistModeUpdate)
assert.Equal(t, -2147482648, cfg.ClientConfig.RmConfig.TccActionInterceptorOrder)
assert.Equal(t, "druid", cfg.ClientConfig.RmConfig.SqlParserType)
assert.Equal(t, 10, cfg.ClientConfig.RmConfig.LockConfig.RetryInterval)
assert.Equal(t, time.Second*30, cfg.ClientConfig.RmConfig.LockConfig.RetryTimes)
assert.Equal(t, 30*time.Second, cfg.ClientConfig.RmConfig.LockConfig.RetryInterval)
assert.Equal(t, 10, cfg.ClientConfig.RmConfig.LockConfig.RetryTimes)
assert.Equal(t, true, cfg.ClientConfig.RmConfig.LockConfig.RetryPolicyBranchRollbackOnConflict)

assert.NotNil(t, cfg.ClientConfig.UndoConfig)
Expand Down
8 changes: 4 additions & 4 deletions pkg/datasource/sql/exec/at/at_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ package at
import (
"context"

"github.com/seata/seata-go/pkg/datasource/sql/util"
"github.com/seata/seata-go/pkg/tm"

"github.com/seata/seata-go/pkg/datasource/sql/exec"
"github.com/seata/seata-go/pkg/datasource/sql/parser"
"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/datasource/sql/util"
"github.com/seata/seata-go/pkg/tm"
)

func Init() {
Expand Down Expand Up @@ -63,7 +62,8 @@ func (e *ATExecutor) ExecWithNamedValue(ctx context.Context, execCtx *types.Exec
exec = NewUpdateExecutor(parser, execCtx, e.hooks)
case types.SQLTypeDelete:
exec = NewDeleteExecutor(parser, execCtx, e.hooks)
//case types.SQLTypeSelectForUpdate:
case types.SQLTypeSelectForUpdate:
exec = NewSelectForUpdateExecutor(parser, execCtx, e.hooks)
//case types.SQLTypeMultiDelete:
//case types.SQLTypeMultiUpdate:
default:
Expand Down
6 changes: 3 additions & 3 deletions pkg/datasource/sql/exec/at/base_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ func (b *baseExecutor) afterHooks(ctx context.Context, execCtx *types.ExecContex
// todo to use ColumnInfo get slice
func (*baseExecutor) GetScanSlice(columnNames []string, tableMeta *types.TableMeta) []interface{} {
scanSlice := make([]interface{}, 0, len(columnNames))
for _, columnNmae := range columnNames {
for _, columnName := range columnNames {
var (
// 从metData获取该列的元信息
columnMeta = tableMeta.Columns[columnNmae]
// get from metaData from this column
columnMeta = tableMeta.Columns[columnName]
)
switch strings.ToUpper(columnMeta.DatabaseTypeString) {
case "VARCHAR", "NVARCHAR", "VARCHAR2", "CHAR", "TEXT", "JSON", "TINYTEXT":
Expand Down
22 changes: 22 additions & 0 deletions pkg/datasource/sql/exec/at/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package at

import "github.com/seata/seata-go/pkg/rm"

var LockConfig rm.LockConfig
6 changes: 4 additions & 2 deletions pkg/datasource/sql/exec/at/delete_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ func (d *deleteExecutor) beforeImage(ctx context.Context) (*types.RecordImage, e
return nil, fmt.Errorf("invalid conn")
}

tableName, _ := d.parserCtx.GteTableName()
tableName, _ := d.parserCtx.GetTableName()
metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, d.execContext.DBName, tableName)

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -155,10 +156,11 @@ func (d *deleteExecutor) buildBeforeImageSQL(query string, args []driver.NamedVa

// afterImage build after image
func (d *deleteExecutor) afterImage(ctx context.Context) (*types.RecordImage, error) {
tableName, _ := d.parserCtx.GteTableName()
tableName, _ := d.parserCtx.GetTableName()
metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, d.execContext.DBName, tableName)
if err != nil {
return nil, err
}

return types.NewEmptyRecordImage(metaData, types.SQLTypeDelete), nil
}
14 changes: 9 additions & 5 deletions pkg/datasource/sql/exec/at/insert_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strings"

"github.com/arana-db/parser/ast"

"github.com/seata/seata-go/pkg/datasource/sql/datasource"
"github.com/seata/seata-go/pkg/datasource/sql/exec"
"github.com/seata/seata-go/pkg/datasource/sql/types"
Expand Down Expand Up @@ -82,7 +83,7 @@ func (i *insertExecutor) ExecContext(ctx context.Context, f exec.CallbackWithNam

// beforeImage build before image
func (i *insertExecutor) beforeImage(ctx context.Context) (*types.RecordImage, error) {
tableName, _ := i.parserCtx.GteTableName()
tableName, _ := i.parserCtx.GetTableName()
metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, i.execContext.DBName, tableName)
if err != nil {
return nil, err
Expand All @@ -96,7 +97,7 @@ func (i *insertExecutor) afterImage(ctx context.Context) (*types.RecordImage, er
return nil, nil
}

tableName, _ := i.parserCtx.GteTableName()
tableName, _ := i.parserCtx.GetTableName()
metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, i.execContext.DBName, tableName)
if err != nil {
return nil, err
Expand Down Expand Up @@ -141,7 +142,7 @@ func (i *insertExecutor) afterImage(ctx context.Context) (*types.RecordImage, er
// buildAfterImageSQL build select sql from insert sql
func (i *insertExecutor) buildAfterImageSQL(ctx context.Context) (string, []driver.NamedValue, error) {
// get all pk value
tableName, _ := i.parserCtx.GteTableName()
tableName, _ := i.parserCtx.GetTableName()

meta, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, i.execContext.DBName, tableName)
if err != nil {
Expand Down Expand Up @@ -414,7 +415,8 @@ func (i *insertExecutor) getPkValuesByColumn(ctx context.Context, execCtx *types
if !i.isAstStmtValid() {
return nil, nil
}
tableName, _ := i.parserCtx.GteTableName()

tableName, _ := i.parserCtx.GetTableName()
meta, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, i.execContext.DBName, tableName)
if err != nil {
return nil, err
Expand Down Expand Up @@ -452,11 +454,13 @@ func (i *insertExecutor) getPkValuesByAuto(ctx context.Context, execCtx *types.E
if !i.isAstStmtValid() {
return nil, nil
}
tableName, _ := i.parserCtx.GteTableName()

tableName, _ := i.parserCtx.GetTableName()
metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, i.execContext.DBName, tableName)
if err != nil {
return nil, err
}

pkValuesMap := make(map[string][]interface{})
pkMetaMap := metaData.GetPrimaryKeyMap()
if len(pkMetaMap) == 0 {
Expand Down
Loading

0 comments on commit fcc067d

Please sign in to comment.