Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize meta data #352

Merged
merged 11 commits into from
Nov 19, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions pkg/datasource/sql/async_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ import (
"flag"
"time"

"github.com/seata/seata-go/pkg/rm"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/seata/seata-go/pkg/datasource/sql/datasource"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/util/fanout"
"github.com/seata/seata-go/pkg/util/log"
)
Expand Down Expand Up @@ -102,7 +103,7 @@ func NewAsyncWorker(prom prometheus.Registerer, conf AsyncWorkerConfig, sourceMa
}

// BranchCommit commit branch transaction
func (aw *AsyncWorker) BranchCommit(ctx context.Context, req message.BranchCommitRequest) (branch.BranchStatus, error) {
func (aw *AsyncWorker) BranchCommit(ctx context.Context, req rm.BranchResource) (branch.BranchStatus, error) {
phaseCtx := phaseTwoContext{
Xid: req.Xid,
BranchID: req.BranchId,
Expand Down Expand Up @@ -170,7 +171,7 @@ func (aw *AsyncWorker) doBranchCommit(phaseCtxs *[]phaseTwoContext) {
}

func (aw *AsyncWorker) dealWithGroupedContexts(resID string, phaseCtxs []phaseTwoContext) {
val, ok := aw.resourceMgr.GetManagedResources()[resID]
val, ok := aw.resourceMgr.GetCachedResources().Load(resID)
if !ok {
for i := range phaseCtxs {
aw.rePutBackToQueue.Add(1)
Expand All @@ -180,7 +181,7 @@ func (aw *AsyncWorker) dealWithGroupedContexts(resID string, phaseCtxs []phaseTw
}

res := val.(*DBResource)
conn, err := res.target.Conn(context.Background())
conn, err := res.db.Conn(context.Background())
if err != nil {
for i := range phaseCtxs {
aw.commitQueue <- phaseCtxs[i]
Expand Down
73 changes: 32 additions & 41 deletions pkg/datasource/sql/at.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/rm"
)

Expand All @@ -52,7 +51,7 @@ func init() {
_ = fs.Parse([]string{})

atSourceManager.worker = NewAsyncWorker(prometheus.DefaultRegisterer, asyncWorkerConf, atSourceManager)
datasource.RegisterResourceManager(branch.BranchTypeAT, atSourceManager)
rm.GetRmCacheInstance().RegisterResourceManager(atSourceManager)
}

type ATSourceManager struct {
Expand All @@ -62,51 +61,43 @@ type ATSourceManager struct {
rmRemoting *rm.RMRemoting
}

// Register a Resource to be managed by Resource Manager
func (mgr *ATSourceManager) RegisterResource(res rm.Resource) error {
mgr.resourceCache.Store(res.GetResourceId(), res)

return mgr.basic.RegisterResource(res)
}

// Unregister a Resource from the Resource Manager
func (mgr *ATSourceManager) UnregisterResource(res rm.Resource) error {
return mgr.basic.UnregisterResource(res)
func (a *ATSourceManager) GetBranchType() branch.BranchType {
return branch.BranchTypeAT
}

// Get all resources managed by this manager
func (mgr *ATSourceManager) GetManagedResources() map[string]rm.Resource {
ret := make(map[string]rm.Resource)
func (a *ATSourceManager) GetCachedResources() *sync.Map {
return &a.resourceCache
}

mgr.resourceCache.Range(func(key, value interface{}) bool {
ret[key.(string)] = value.(rm.Resource)
return true
})
// Register a Resource to be managed by Resource Manager
func (a *ATSourceManager) RegisterResource(res rm.Resource) error {
a.resourceCache.Store(res.GetResourceId(), res)

return ret
return a.basic.RegisterResource(res)
}

// BranchRollback Rollback the corresponding transactions according to the request
func (mgr *ATSourceManager) BranchRollback(ctx context.Context, req message.BranchRollbackRequest) (branch.BranchStatus, error) {
val, ok := mgr.resourceCache.Load(req.ResourceId)
// Unregister a Resource from the Resource Manager
func (a *ATSourceManager) UnregisterResource(res rm.Resource) error {
return a.basic.UnregisterResource(res)
}

if !ok {
return branch.BranchStatusPhaseoneFailed, fmt.Errorf("resource %s not found", req.ResourceId)
// Rollback a branch transaction
func (a *ATSourceManager) BranchRollback(ctx context.Context, branchResource rm.BranchResource) (branch.BranchStatus, error) {
var dbResource *DBResource
if resource, ok := a.resourceCache.Load(branchResource.ResourceId); !ok {
err := fmt.Errorf("DB resource is not exist, resourceId: %s", branchResource.ResourceId)
return 0, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里不应该用0吧

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

改为了
return branch.BranchStatusUnknown, err

} else {
dbResource, _ = resource.(*DBResource)
}

res := val.(*DBResource)

undoMgr, err := undo.GetUndoLogManager(res.dbType)
undoMgr, err := undo.GetUndoLogManager(dbResource.dbType)
if err != nil {
return branch.BranchStatusUnknown, err
}

/*conn, err := res.target.Conn(ctx)
if err != nil {
return branch.BranchStatusUnknown, err
}*/

if err := undoMgr.RunUndo(ctx, req.Xid, req.BranchId, res.conn); err != nil {
if err := undoMgr.RunUndo(ctx, branchResource.Xid, branchResource.BranchId, dbResource.db); err != nil {
transErr, ok := err.(*types.TransactionError)
if !ok {
return branch.BranchStatusPhaseoneFailed, err
Expand All @@ -123,28 +114,28 @@ func (mgr *ATSourceManager) BranchRollback(ctx context.Context, req message.Bran
}

// BranchCommit
func (mgr *ATSourceManager) BranchCommit(ctx context.Context, req message.BranchCommitRequest) (branch.BranchStatus, error) {
mgr.worker.BranchCommit(ctx, req)
func (a *ATSourceManager) BranchCommit(ctx context.Context, resource rm.BranchResource) (branch.BranchStatus, error) {
a.worker.BranchCommit(ctx, resource)
return branch.BranchStatusPhaseoneDone, nil
}

// LockQuery
func (mgr *ATSourceManager) LockQuery(ctx context.Context, req message.GlobalLockQueryRequest) (bool, error) {
func (a *ATSourceManager) LockQuery(ctx context.Context, param rm.LockQueryParam) (bool, error) {
return false, nil
}

// BranchRegister
func (mgr *ATSourceManager) BranchRegister(ctx context.Context, req rm.BranchRegisterParam) (int64, error) {
return mgr.rmRemoting.BranchRegister(req)
func (a *ATSourceManager) BranchRegister(ctx context.Context, req rm.BranchRegisterParam) (int64, error) {
return a.rmRemoting.BranchRegister(req)
}

// BranchReport
func (mgr *ATSourceManager) BranchReport(ctx context.Context, req message.BranchReportRequest) error {
func (a *ATSourceManager) BranchReport(ctx context.Context, param rm.BranchReportParam) error {
return nil
}

// CreateTableMetaCache
func (mgr *ATSourceManager) CreateTableMetaCache(ctx context.Context, resID string, dbType types.DBType,
func (a *ATSourceManager) CreateTableMetaCache(ctx context.Context, resID string, dbType types.DBType,
db *sql.DB) (datasource.TableMetaCache, error) {
return mgr.basic.CreateTableMetaCache(ctx, resID, dbType, db)
return a.basic.CreateTableMetaCache(ctx, resID, dbType, db)
}
3 changes: 2 additions & 1 deletion pkg/datasource/sql/conn_at.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func (c *ATConn) PrepareContext(ctx context.Context, query string) (driver.Stmt,
c.txCtx = types.NewTxCtx()
}()
}

return c.Conn.PrepareContext(ctx, query)
}

Expand Down Expand Up @@ -125,6 +124,7 @@ func (c *ATConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx,
c.txCtx = types.NewTxCtx()
c.txCtx.DBType = c.res.dbType
c.txCtx.TxOpt = opts
c.txCtx.ResourceID = c.res.resourceID

if tm.IsGlobalTx(ctx) {
c.txCtx.XID = tm.GetXID(ctx)
Expand All @@ -145,6 +145,7 @@ func (c *ATConn) createOnceTxContext(ctx context.Context) bool {
if onceTx {
c.txCtx = types.NewTxCtx()
c.txCtx.DBType = c.res.dbType
c.txCtx.ResourceID = c.res.resourceID
c.txCtx.XID = tm.GetXID(ctx)
c.txCtx.TransType = types.ATMode
c.txCtx.GlobalLockRequire = true
Expand Down
4 changes: 2 additions & 2 deletions pkg/datasource/sql/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func initMockAtConnector(t *testing.T, ctrl *gomock.Controller, db *sql.DB, f in
if v.Kind() == reflect.Ptr {
v = v.Elem()
}
reflectx.SetUnexportedField(v.FieldByName("target"), f(t, ctrl))
reflectx.SetUnexportedField(v.FieldByName("db"), f(t, ctrl))

return fieldVal.(driver.Connector)
}
Expand Down Expand Up @@ -101,7 +101,7 @@ func initMockXaConnector(t *testing.T, ctrl *gomock.Controller, db *sql.DB, f in
if v.Kind() == reflect.Ptr {
v = v.Elem()
}
reflectx.SetUnexportedField(v.FieldByName("target"), f(t, ctrl))
reflectx.SetUnexportedField(v.FieldByName("db"), f(t, ctrl))

return fieldVal.(driver.Connector)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/datasource/sql/datasource/base/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package base

import (
"context"
"database/sql/driver"
"database/sql"
"errors"
"sync"
"time"
Expand All @@ -30,7 +30,7 @@ import (
type (
// trigger
trigger interface {
LoadOne(ctx context.Context, dbName string, table string, conn driver.Conn) (*types.TableMeta, error)
LoadOne(ctx context.Context, dbName string, table string, conn *sql.Conn) (*types.TableMeta, error)

LoadAll() ([]types.TableMeta, error)
}
Expand Down Expand Up @@ -134,7 +134,7 @@ func (c *BaseTableMetaCache) scanExpire(ctx context.Context) {
}

// GetTableMeta
func (c *BaseTableMetaCache) GetTableMeta(ctx context.Context, dbName, tableName string, conn driver.Conn) (types.TableMeta, error) {
func (c *BaseTableMetaCache) GetTableMeta(ctx context.Context, dbName, tableName string, conn *sql.Conn) (types.TableMeta, error) {
c.lock.Lock()
defer c.lock.Unlock()

Expand Down
55 changes: 17 additions & 38 deletions pkg/datasource/sql/datasource/datasource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package datasource
import (
"context"
"database/sql"
"database/sql/driver"
"errors"
"sync"

Expand All @@ -31,56 +30,34 @@ import (
)

var (
atOnce sync.Once
atMgr DataSourceManager
xaMgr DataSourceManager
solts = map[types.DBType]func() TableMetaCache{}
atOnce sync.Once
tableMetaCacheMap = map[types.DBType]TableMetaCache{}
)

// RegisterTableCache
func RegisterTableCache(dbType types.DBType, builder func() TableMetaCache) {
solts[dbType] = builder
func RegisterTableCache(dbType types.DBType, tableMetaCache TableMetaCache) {
tableMetaCacheMap[dbType] = tableMetaCache
}

func RegisterResourceManager(b branch.BranchType, d DataSourceManager) {
if b == branch.BranchTypeAT {
atMgr = d
}

if b == branch.BranchTypeXA {
xaMgr = d
}
func GetTableCache(dbType types.DBType) TableMetaCache {
return tableMetaCacheMap[dbType]
}

func GetDataSourceManager(b branch.BranchType) DataSourceManager {
if b == branch.BranchTypeAT {
return atMgr
func GetDataSourceManager(branchType branch.BranchType) DataSourceManager {
resourceManager := rm.GetRmCacheInstance().GetResourceManager(branchType)
if resourceManager == nil {
return nil
}
if b == branch.BranchTypeXA {
return xaMgr
if d, ok := resourceManager.(DataSourceManager); ok {
return d
}
return nil
}

// todo implements ResourceManagerOutbound interface
// DataSourceManager
type DataSourceManager interface {
// Register a Resource to be managed by Resource Manager
RegisterResource(resource rm.Resource) error
// Unregister a Resource from the Resource Manager
UnregisterResource(resource rm.Resource) error
// GetManagedResources Get all resources managed by this manager
GetManagedResources() map[string]rm.Resource
// BranchRollback
BranchRollback(ctx context.Context, req message.BranchRollbackRequest) (branch.BranchStatus, error)
// BranchCommit
BranchCommit(ctx context.Context, req message.BranchCommitRequest) (branch.BranchStatus, error)
// LockQuery
LockQuery(ctx context.Context, req message.GlobalLockQueryRequest) (bool, error)
// BranchRegister
BranchRegister(ctx context.Context, req rm.BranchRegisterParam) (int64, error)
// BranchReport
BranchReport(ctx context.Context, req message.BranchReportRequest) error
rm.ResourceManager
// CreateTableMetaCache
CreateTableMetaCache(ctx context.Context, resID string, dbType types.DBType, db *sql.DB) (TableMetaCache, error)
}
Expand All @@ -95,6 +72,7 @@ type BasicSourceManager struct {
// lock
lock sync.RWMutex
// tableMetaCache
// todo do not put meta cache here
tableMetaCache map[string]*entry
}

Expand Down Expand Up @@ -178,14 +156,15 @@ type TableMetaCache interface {
// Init
Init(ctx context.Context, conn *sql.DB) error
// GetTableMeta
GetTableMeta(ctx context.Context, dbName, table string, conn driver.Conn) (*types.TableMeta, error)
GetTableMeta(ctx context.Context, dbName, table string) (*types.TableMeta, error)
// Destroy
Destroy() error
}

// buildResource
// todo not here
func buildResource(ctx context.Context, dbType types.DBType, db *sql.DB) (*entry, error) {
cache := solts[dbType]()
cache := tableMetaCacheMap[dbType]

if err := cache.Init(ctx, db); err != nil {
return nil, err
Expand Down
30 changes: 0 additions & 30 deletions pkg/datasource/sql/datasource/mysql/default.go

This file was deleted.

Loading