Skip to content

Commit

Permalink
optimize meta data (#352)
Browse files Browse the repository at this point in the history
optimize meta data
  • Loading branch information
luky116 committed Nov 19, 2022
1 parent ef1759e commit 7481327
Show file tree
Hide file tree
Showing 31 changed files with 372 additions and 500 deletions.
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
Expand Down Expand Up @@ -146,7 +145,6 @@ require (
golang.org/x/sys v0.0.0-20220915200043-7b5979e65e41 // indirect
google.golang.org/genproto v0.0.0-20220630174209-ad1d48641aa7 // indirect
gopkg.in/ini.v1 v1.62.0 // indirect
gotest.tools v2.2.0+incompatible
moul.io/http2curl v1.0.0 // indirect
vimagination.zapto.org/memio v0.0.0-20200222190306-588ebc67b97d // indirect
)
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1425,7 +1425,6 @@ gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
9 changes: 5 additions & 4 deletions pkg/datasource/sql/async_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ import (
"flag"
"time"

"github.com/seata/seata-go/pkg/rm"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/seata/seata-go/pkg/datasource/sql/datasource"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/util/fanout"
"github.com/seata/seata-go/pkg/util/log"
)
Expand Down Expand Up @@ -102,7 +103,7 @@ func NewAsyncWorker(prom prometheus.Registerer, conf AsyncWorkerConfig, sourceMa
}

// BranchCommit commit branch transaction
func (aw *AsyncWorker) BranchCommit(ctx context.Context, req message.BranchCommitRequest) (branch.BranchStatus, error) {
func (aw *AsyncWorker) BranchCommit(ctx context.Context, req rm.BranchResource) (branch.BranchStatus, error) {
phaseCtx := phaseTwoContext{
Xid: req.Xid,
BranchID: req.BranchId,
Expand Down Expand Up @@ -170,7 +171,7 @@ func (aw *AsyncWorker) doBranchCommit(phaseCtxs *[]phaseTwoContext) {
}

func (aw *AsyncWorker) dealWithGroupedContexts(resID string, phaseCtxs []phaseTwoContext) {
val, ok := aw.resourceMgr.GetManagedResources()[resID]
val, ok := aw.resourceMgr.GetCachedResources().Load(resID)
if !ok {
for i := range phaseCtxs {
aw.rePutBackToQueue.Add(1)
Expand All @@ -180,7 +181,7 @@ func (aw *AsyncWorker) dealWithGroupedContexts(resID string, phaseCtxs []phaseTw
}

res := val.(*DBResource)
conn, err := res.target.Conn(context.Background())
conn, err := res.db.Conn(context.Background())
if err != nil {
for i := range phaseCtxs {
aw.commitQueue <- phaseCtxs[i]
Expand Down
73 changes: 32 additions & 41 deletions pkg/datasource/sql/at.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/rm"
)

Expand All @@ -52,7 +51,7 @@ func init() {
_ = fs.Parse([]string{})

atSourceManager.worker = NewAsyncWorker(prometheus.DefaultRegisterer, asyncWorkerConf, atSourceManager)
datasource.RegisterResourceManager(branch.BranchTypeAT, atSourceManager)
rm.GetRmCacheInstance().RegisterResourceManager(atSourceManager)
}

type ATSourceManager struct {
Expand All @@ -62,51 +61,43 @@ type ATSourceManager struct {
rmRemoting *rm.RMRemoting
}

// Register a Resource to be managed by Resource Manager
func (mgr *ATSourceManager) RegisterResource(res rm.Resource) error {
mgr.resourceCache.Store(res.GetResourceId(), res)

return mgr.basic.RegisterResource(res)
}

// Unregister a Resource from the Resource Manager
func (mgr *ATSourceManager) UnregisterResource(res rm.Resource) error {
return mgr.basic.UnregisterResource(res)
func (a *ATSourceManager) GetBranchType() branch.BranchType {
return branch.BranchTypeAT
}

// Get all resources managed by this manager
func (mgr *ATSourceManager) GetManagedResources() map[string]rm.Resource {
ret := make(map[string]rm.Resource)

mgr.resourceCache.Range(func(key, value interface{}) bool {
ret[key.(string)] = value.(rm.Resource)
return true
})

return ret
func (a *ATSourceManager) GetCachedResources() *sync.Map {
return &a.resourceCache
}

// BranchRollback Rollback the corresponding transactions according to the request
func (mgr *ATSourceManager) BranchRollback(ctx context.Context, req message.BranchRollbackRequest) (branch.BranchStatus, error) {
val, ok := mgr.resourceCache.Load(req.ResourceId)
// Register a Resource to be managed by Resource Manager
func (a *ATSourceManager) RegisterResource(res rm.Resource) error {
a.resourceCache.Store(res.GetResourceId(), res)

if !ok {
return branch.BranchStatusPhaseoneFailed, fmt.Errorf("resource %s not found", req.ResourceId)
}
return a.basic.RegisterResource(res)
}

res := val.(*DBResource)
// Unregister a Resource from the Resource Manager
func (a *ATSourceManager) UnregisterResource(res rm.Resource) error {
return a.basic.UnregisterResource(res)
}

undoMgr, err := undo.GetUndoLogManager(res.dbType)
if err != nil {
// Rollback a branch transaction
func (a *ATSourceManager) BranchRollback(ctx context.Context, branchResource rm.BranchResource) (branch.BranchStatus, error) {
var dbResource *DBResource
if resource, ok := a.resourceCache.Load(branchResource.ResourceId); !ok {
err := fmt.Errorf("DB resource is not exist, resourceId: %s", branchResource.ResourceId)
return branch.BranchStatusUnknown, err
} else {
dbResource, _ = resource.(*DBResource)
}

/*conn, err := res.target.Conn(ctx)
undoMgr, err := undo.GetUndoLogManager(dbResource.dbType)
if err != nil {
return branch.BranchStatusUnknown, err
}*/
}

if err := undoMgr.RunUndo(ctx, req.Xid, req.BranchId, res.conn); err != nil {
if err := undoMgr.RunUndo(ctx, branchResource.Xid, branchResource.BranchId, dbResource.db, dbResource.dbName); err != nil {
transErr, ok := err.(*types.TransactionError)
if !ok {
return branch.BranchStatusPhaseoneFailed, err
Expand All @@ -123,28 +114,28 @@ func (mgr *ATSourceManager) BranchRollback(ctx context.Context, req message.Bran
}

// BranchCommit
func (mgr *ATSourceManager) BranchCommit(ctx context.Context, req message.BranchCommitRequest) (branch.BranchStatus, error) {
mgr.worker.BranchCommit(ctx, req)
func (a *ATSourceManager) BranchCommit(ctx context.Context, resource rm.BranchResource) (branch.BranchStatus, error) {
a.worker.BranchCommit(ctx, resource)
return branch.BranchStatusPhaseoneDone, nil
}

// LockQuery
func (mgr *ATSourceManager) LockQuery(ctx context.Context, req message.GlobalLockQueryRequest) (bool, error) {
func (a *ATSourceManager) LockQuery(ctx context.Context, param rm.LockQueryParam) (bool, error) {
return false, nil
}

// BranchRegister
func (mgr *ATSourceManager) BranchRegister(ctx context.Context, req rm.BranchRegisterParam) (int64, error) {
return mgr.rmRemoting.BranchRegister(req)
func (a *ATSourceManager) BranchRegister(ctx context.Context, req rm.BranchRegisterParam) (int64, error) {
return a.rmRemoting.BranchRegister(req)
}

// BranchReport
func (mgr *ATSourceManager) BranchReport(ctx context.Context, req message.BranchReportRequest) error {
func (a *ATSourceManager) BranchReport(ctx context.Context, param rm.BranchReportParam) error {
return nil
}

// CreateTableMetaCache
func (mgr *ATSourceManager) CreateTableMetaCache(ctx context.Context, resID string, dbType types.DBType,
func (a *ATSourceManager) CreateTableMetaCache(ctx context.Context, resID string, dbType types.DBType,
db *sql.DB) (datasource.TableMetaCache, error) {
return mgr.basic.CreateTableMetaCache(ctx, resID, dbType, db)
return a.basic.CreateTableMetaCache(ctx, resID, dbType, db)
}
3 changes: 2 additions & 1 deletion pkg/datasource/sql/conn_at.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func (c *ATConn) PrepareContext(ctx context.Context, query string) (driver.Stmt,
c.txCtx = types.NewTxCtx()
}()
}

return c.Conn.PrepareContext(ctx, query)
}

Expand Down Expand Up @@ -127,6 +126,7 @@ func (c *ATConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx,
c.txCtx = types.NewTxCtx()
c.txCtx.DBType = c.res.dbType
c.txCtx.TxOpt = opts
c.txCtx.ResourceID = c.res.resourceID

if tm.IsGlobalTx(ctx) {
c.txCtx.XID = tm.GetXID(ctx)
Expand All @@ -147,6 +147,7 @@ func (c *ATConn) createOnceTxContext(ctx context.Context) bool {
if onceTx {
c.txCtx = types.NewTxCtx()
c.txCtx.DBType = c.res.dbType
c.txCtx.ResourceID = c.res.resourceID
c.txCtx.XID = tm.GetXID(ctx)
c.txCtx.TransType = types.ATMode
c.txCtx.GlobalLockRequire = true
Expand Down
6 changes: 3 additions & 3 deletions pkg/datasource/sql/datasource/base/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package base

import (
"context"
"database/sql/driver"
"database/sql"
"errors"
"sync"
"time"
Expand All @@ -30,7 +30,7 @@ import (
type (
// trigger
trigger interface {
LoadOne(ctx context.Context, dbName string, table string, conn driver.Conn) (*types.TableMeta, error)
LoadOne(ctx context.Context, dbName string, table string, conn *sql.Conn) (*types.TableMeta, error)

LoadAll() ([]types.TableMeta, error)
}
Expand Down Expand Up @@ -134,7 +134,7 @@ func (c *BaseTableMetaCache) scanExpire(ctx context.Context) {
}

// GetTableMeta
func (c *BaseTableMetaCache) GetTableMeta(ctx context.Context, dbName, tableName string, conn driver.Conn) (types.TableMeta, error) {
func (c *BaseTableMetaCache) GetTableMeta(ctx context.Context, dbName, tableName string, conn *sql.Conn) (types.TableMeta, error) {
c.lock.Lock()
defer c.lock.Unlock()

Expand Down
55 changes: 17 additions & 38 deletions pkg/datasource/sql/datasource/datasource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package datasource
import (
"context"
"database/sql"
"database/sql/driver"
"errors"
"sync"

Expand All @@ -31,56 +30,34 @@ import (
)

var (
atOnce sync.Once
atMgr DataSourceManager
xaMgr DataSourceManager
solts = map[types.DBType]func() TableMetaCache{}
atOnce sync.Once
tableMetaCacheMap = map[types.DBType]TableMetaCache{}
)

// RegisterTableCache
func RegisterTableCache(dbType types.DBType, builder func() TableMetaCache) {
solts[dbType] = builder
func RegisterTableCache(dbType types.DBType, tableMetaCache TableMetaCache) {
tableMetaCacheMap[dbType] = tableMetaCache
}

func RegisterResourceManager(b branch.BranchType, d DataSourceManager) {
if b == branch.BranchTypeAT {
atMgr = d
}

if b == branch.BranchTypeXA {
xaMgr = d
}
func GetTableCache(dbType types.DBType) TableMetaCache {
return tableMetaCacheMap[dbType]
}

func GetDataSourceManager(b branch.BranchType) DataSourceManager {
if b == branch.BranchTypeAT {
return atMgr
func GetDataSourceManager(branchType branch.BranchType) DataSourceManager {
resourceManager := rm.GetRmCacheInstance().GetResourceManager(branchType)
if resourceManager == nil {
return nil
}
if b == branch.BranchTypeXA {
return xaMgr
if d, ok := resourceManager.(DataSourceManager); ok {
return d
}
return nil
}

// todo implements ResourceManagerOutbound interface
// DataSourceManager
type DataSourceManager interface {
// Register a Resource to be managed by Resource Manager
RegisterResource(resource rm.Resource) error
// Unregister a Resource from the Resource Manager
UnregisterResource(resource rm.Resource) error
// GetManagedResources Get all resources managed by this manager
GetManagedResources() map[string]rm.Resource
// BranchRollback
BranchRollback(ctx context.Context, req message.BranchRollbackRequest) (branch.BranchStatus, error)
// BranchCommit
BranchCommit(ctx context.Context, req message.BranchCommitRequest) (branch.BranchStatus, error)
// LockQuery
LockQuery(ctx context.Context, req message.GlobalLockQueryRequest) (bool, error)
// BranchRegister
BranchRegister(ctx context.Context, req rm.BranchRegisterParam) (int64, error)
// BranchReport
BranchReport(ctx context.Context, req message.BranchReportRequest) error
rm.ResourceManager
// CreateTableMetaCache
CreateTableMetaCache(ctx context.Context, resID string, dbType types.DBType, db *sql.DB) (TableMetaCache, error)
}
Expand All @@ -95,6 +72,7 @@ type BasicSourceManager struct {
// lock
lock sync.RWMutex
// tableMetaCache
// todo do not put meta cache here
tableMetaCache map[string]*entry
}

Expand Down Expand Up @@ -178,14 +156,15 @@ type TableMetaCache interface {
// Init
Init(ctx context.Context, conn *sql.DB) error
// GetTableMeta
GetTableMeta(ctx context.Context, dbName, table string, conn driver.Conn) (*types.TableMeta, error)
GetTableMeta(ctx context.Context, dbName, table string) (*types.TableMeta, error)
// Destroy
Destroy() error
}

// buildResource
// todo not here
func buildResource(ctx context.Context, dbType types.DBType, db *sql.DB) (*entry, error) {
cache := solts[dbType]()
cache := tableMetaCacheMap[dbType]

if err := cache.Init(ctx, db); err != nil {
return nil, err
Expand Down
30 changes: 0 additions & 30 deletions pkg/datasource/sql/datasource/mysql/default.go

This file was deleted.

Loading

0 comments on commit 7481327

Please sign in to comment.