Skip to content

Commit

Permalink
feat: Auto clean transaction log (#671)
Browse files Browse the repository at this point in the history
  • Loading branch information
PangXing authored Jun 18, 2023
1 parent 9c7357c commit b9b7e96
Show file tree
Hide file tree
Showing 14 changed files with 98 additions and 11 deletions.
2 changes: 2 additions & 0 deletions conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ data:
password: "123456"
# TODO It is recommended that the initialization of the subsequent system library be handled by arana internally
database: __arana_sys
weight: r10w10
parameters:
users:
- username: root
password: "123456"
Expand Down
2 changes: 2 additions & 0 deletions integration_test/config/db/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ data:
password: "123456"
# TODO It is recommended that the initialization of the subsequent system library be handled by arana internally
database: __arana_sys
weight: r10w10
parameters:
clusters:
- name: employees
type: mysql
Expand Down
2 changes: 2 additions & 0 deletions integration_test/config/db_tbl/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ data:
password: "123456"
# TODO It is recommended that the initialization of the subsequent system library be handled by arana internally
database: __arana_sys
weight: r10w10
parameters:
clusters:
- name: employees
type: mysql
Expand Down
9 changes: 9 additions & 0 deletions integration_test/config/db_tbl_rw/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ data:
password: "123456"
- username: arana
password: "123456"
sys_db:
host: arana-mysql
port: 3306
username: root
password: "123456"
# TODO It is recommended that the initialization of the subsequent system library be handled by arana internally
database: __arana_sys
weight: r10w10
parameters:

clusters:
- name: employees
Expand Down
2 changes: 2 additions & 0 deletions integration_test/config/tbl/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ data:
password: "123456"
# TODO It is recommended that the initialization of the subsequent system library be handled by arana internally
database: __arana_sys
weight: r10w10
parameters:
clusters:
- name: employees
type: mysql
Expand Down
1 change: 1 addition & 0 deletions pkg/config/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ const (

const (
ConfigItemSpec = "spec"
ConfigItemSysDB = "sys_db"
ConfigItemUsers = "users"
ConfigItemClusters = "clusters"
ConfigItemShardingRule = "sharding_rule"
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/config_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ func (c *configReader) compositeConfiguration(loadFilter map[PathKey]string) *Te
}
}

if _, ok := loadFilter[c.pathInfo.DefaultConfigSysDBPath]; ok {
if val := c.holders[c.pathInfo.DefaultConfigSysDBPath].Load(); val != nil {
conf.SysDB = val.(*Tenant).SysDB
}
}

if _, ok := loadFilter[c.pathInfo.DefaultConfigDataNodesPath]; ok {
if val := c.holders[c.pathInfo.DefaultConfigDataNodesPath].Load(); val != nil {
conf.Nodes = val.(*Tenant).Nodes
Expand Down
8 changes: 8 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func Test_configReader_LoadAll(t *testing.T) {
}

data, _ := yaml.Marshal(nodes)
sysdb, _ := yaml.Marshal(config.NewEmptyTenant().SysDB)
users, _ := yaml.Marshal(config.NewEmptyTenant().Users)
shadow, _ := yaml.Marshal(config.NewEmptyTenant().ShadowRule)
sharding, _ := yaml.Marshal(config.NewEmptyTenant().ShardingRule)
Expand All @@ -311,6 +312,9 @@ func Test_configReader_LoadAll(t *testing.T) {
Do(func(interface{}) {
atomic.AddInt32(&callCnt, 1)
})
mockStoreOperator.EXPECT().Get(config.NewPathInfo("arana").DefaultConfigSysDBPath).
AnyTimes().
Return(sysdb, nil)
mockStoreOperator.EXPECT().Get(config.NewPathInfo("arana").DefaultConfigDataUsersPath).
AnyTimes().
Return(users, nil)
Expand Down Expand Up @@ -357,6 +361,7 @@ func Test_configReader_LoadAll(t *testing.T) {
mockStoreOperator.EXPECT().Watch(gomock.Any()).AnyTimes().Return(shareCh, nil)

nodes, _ := yaml.Marshal(config.NewEmptyTenant().Nodes)
sysdb, _ := yaml.Marshal(config.NewEmptyTenant().SysDB)
users, _ := yaml.Marshal(config.NewEmptyTenant().Users)
shadow, _ := yaml.Marshal(config.NewEmptyTenant().ShadowRule)
sharding, _ := yaml.Marshal(config.NewEmptyTenant().ShardingRule)
Expand All @@ -366,6 +371,9 @@ func Test_configReader_LoadAll(t *testing.T) {
mockStoreOperator.EXPECT().Get(config.NewPathInfo("arana").DefaultConfigDataNodesPath).
AnyTimes().
Return(nodes, nil)
mockStoreOperator.EXPECT().Get(config.NewPathInfo("arana").DefaultConfigSysDBPath).
AnyTimes().
Return(sysdb, nil)
mockStoreOperator.EXPECT().Get(config.NewPathInfo("arana").DefaultConfigDataUsersPath).
AnyTimes().
Return(users, nil)
Expand Down
1 change: 1 addition & 0 deletions pkg/config/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ func NewEmptyTenant() *Tenant {
Spec: Spec{
Metadata: map[string]interface{}{},
},
SysDB: nil,
Users: make([]*User, 0, 1),
DataSourceClusters: make([]*DataSourceCluster, 0, 1),
ShardingRule: new(ShardingRule),
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
type PathInfo struct {
DefaultConfigSpecPath PathKey
DefaultTenantBaseConfigPath PathKey
DefaultConfigSysDBPath PathKey
DefaultConfigDataNodesPath PathKey
DefaultConfigDataUsersPath PathKey
DefaultConfigDataSourceClustersPath PathKey
Expand All @@ -45,6 +46,7 @@ func NewPathInfo(tenant string) *PathInfo {

p.DefaultTenantBaseConfigPath = PathKey(filepath.Join(string(DefaultRootPath), fmt.Sprintf("tenants/%s", tenant)))
p.DefaultConfigSpecPath = PathKey(filepath.Join(string(p.DefaultTenantBaseConfigPath), "spec"))
p.DefaultConfigSysDBPath = PathKey(filepath.Join(string(p.DefaultTenantBaseConfigPath), "sys_db"))
p.DefaultConfigDataNodesPath = PathKey(filepath.Join(string(p.DefaultTenantBaseConfigPath), "nodes"))
p.DefaultConfigDataUsersPath = PathKey(filepath.Join(string(p.DefaultTenantBaseConfigPath), "users"))
p.DefaultConfigDataSourceClustersPath = PathKey(filepath.Join(string(p.DefaultTenantBaseConfigPath), "dataSourceClusters"))
Expand All @@ -63,6 +65,9 @@ func NewPathInfo(tenant string) *PathInfo {
p.DefaultConfigSpecPath: func(cfg *Tenant) interface{} {
return &cfg.Spec
},
p.DefaultConfigSysDBPath: func(cfg *Tenant) interface{} {
return &cfg.SysDB
},
p.DefaultConfigDataUsersPath: func(cfg *Tenant) interface{} {
return &cfg.Users
},
Expand All @@ -82,6 +87,7 @@ func NewPathInfo(tenant string) *PathInfo {

p.ConfigKeyMapping = map[PathKey]string{
p.DefaultConfigSpecPath: ConfigItemSpec,
p.DefaultConfigSysDBPath: ConfigItemSysDB,
p.DefaultConfigDataUsersPath: ConfigItemUsers,
p.DefaultConfigDataSourceClustersPath: ConfigItemClusters,
p.DefaultConfigDataShardingRulePath: ConfigItemShardingRule,
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/tenant/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func LoadSysDB(tenant string) (proto.DB, error) {
defer lock.RUnlock()

val, ok := _tenantSysDB[tenant]
if ok {
if !ok {
return nil, fmt.Errorf("cannot load sysdb: tenant=%s", tenant)
}

Expand Down
50 changes: 41 additions & 9 deletions pkg/runtime/transaction/trx_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"
"time"
)

import (
Expand All @@ -40,6 +42,8 @@ var (
"start_time": {},
"update_time": {},
}
_initTxLogOnce sync.Once
_txLogCleanTimer *time.Timer
)

const (
Expand Down Expand Up @@ -71,14 +75,18 @@ type TxLogManager struct {
}

// init executes create __arana_tx_log table action
func (gm *TxLogManager) init() error {
ctx := context.Background()
res, _, err := gm.sysDB.Call(ctx, _initTxLog)
if err != nil {
return err
}
_, _ = res.RowsAffected()
return nil
func (gm *TxLogManager) Init(delay time.Duration) error {
var err error
_initTxLogOnce.Do(func() {
ctx := context.Background()
res, _, err := gm.sysDB.Call(ctx, _initTxLog)
if err != nil {
return
}
_, _ = res.RowsAffected()
_txLogCleanTimer = time.AfterFunc(delay, gm.runCleanTxLogTask)
})
return err
}

// AddOrUpdateTxLog Add or update transaction log
Expand Down Expand Up @@ -179,5 +187,29 @@ func (gm *TxLogManager) ScanTxLog(pageNo, pageSize uint64, conditions []Conditio
// partition table according to the day level or hour level.
// the execution of this task requires distributed task preemption based on the metadata DB
func (gm *TxLogManager) runCleanTxLogTask() {

var (
pageNo uint64
pageSize uint64 = 50
conditions = []Condition{
{
FiledName: "status",
Operation: Equal,
Value: runtime.TrxFinish,
},
}
)
var txLogs []TrxLog
for {
total, logs, err := gm.ScanTxLog(pageNo, pageSize, conditions)
if err != nil {
break
}
txLogs = append(txLogs, logs...)
if len(txLogs) >= int(total) {
break
}
}
for _, l := range txLogs {
gm.DeleteTxLog(l)
}
}
9 changes: 8 additions & 1 deletion pkg/runtime/transaction/trx_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package transaction
import (
"errors"
"sync"
"time"
)

import (
Expand All @@ -28,6 +29,7 @@ import (

var (
ErrorTrxManagerNotInitialize = errors.New("TrxManager not initialize")
DefaultCleanLogDelay = 1 * time.Hour
)

var (
Expand All @@ -45,11 +47,16 @@ func CreateTrxManager(tenant string) error {
}

sysDB, err := aranatenant.LoadSysDB(tenant)
if err == nil {
if err != nil {
return err
}

trxLog := &TxLogManager{sysDB: sysDB}
err = trxLog.Init(DefaultCleanLogDelay)
if err != nil {
return err
}

trxBottomMaker := &TxFaultDecisionExecutor{tm: trxLog}

trxMgrs[tenant] = &TrxManager{
Expand Down
9 changes: 9 additions & 0 deletions testdata/fake_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ data:
password: "123456"
- username: arana
password: "123456"
sys_db:
host: arana-mysql
port: 3306
username: root
password: "123456"
# TODO It is recommended that the initialization of the subsequent system library be handled by arana internally
database: __arana_sys
weight: r10w10
parameters:
clusters:
- name: employees
type: mysql
Expand Down

0 comments on commit b9b7e96

Please sign in to comment.