Skip to content

Commit

Permalink
cherrypick: fix ut failed about race (#17108)
Browse files Browse the repository at this point in the history
1,解决data race
session取tenantInfo
tennantInfo内部都加了mutex.
全局变量globalAicm,globalRtMgr用原子变量包装。

2,ut 时序问题

打开client连接加了重试3次。会出现server 没启动成功,client就开始连接的情况。
将关闭client连接移动到 server 关闭之后。避免server还在发数据,client连接就断开的错误。

Approved by: @qingxinhome, @sukki37
  • Loading branch information
daviszhen authored Jun 24, 2024
1 parent db96a35 commit fa926aa
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 26 deletions.
4 changes: 2 additions & 2 deletions pkg/frontend/authenticate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4298,7 +4298,7 @@ func postDropSuspendAccount(
}
var nodes []string
currTenant := ses.GetTenantInfo().GetTenant()
currUser := ses.GetTenantInfo().User
currUser := ses.GetTenantInfo().GetUser()
labels := clusterservice.NewSelector().SelectByLabel(
map[string]string{"account": accountName}, clusterservice.Contain)
sysTenant := isSysTenant(currTenant)
Expand Down Expand Up @@ -9124,7 +9124,7 @@ func doRevokePrivilegeImplicitly(ctx context.Context, ses *Session, stmt tree.St
}

func doSetGlobalSystemVariable(ctx context.Context, ses *Session, varName string, varValue interface{}) (err error) {
accountId := uint64(ses.GetTenantInfo().TenantID)
accountId := uint64(ses.GetTenantInfo().GetTenantID())
accountName := ses.GetTenantName()
varName = strings.ToLower(varName)
bh := ses.GetBackgroundExec(ctx)
Expand Down
4 changes: 2 additions & 2 deletions pkg/frontend/authenticate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6104,7 +6104,7 @@ func TestDoSetSecondaryRoleAll(t *testing.T) {
bh.sql2result["commit;"] = nil
bh.sql2result["rollback;"] = nil

sql := getSqlForgetUserRolesExpectPublicRole(publicRoleID, ses.GetTenantInfo().UserID)
sql := getSqlForgetUserRolesExpectPublicRole(publicRoleID, ses.GetTenantInfo().GetUserID())
mrs := newMrsForPasswordOfUser([][]interface{}{
{"6", "role5"},
})
Expand Down Expand Up @@ -6145,7 +6145,7 @@ func TestDoSetSecondaryRoleAll(t *testing.T) {
bh.sql2result["commit;"] = nil
bh.sql2result["rollback;"] = nil

sql := getSqlForgetUserRolesExpectPublicRole(publicRoleID, ses.GetTenantInfo().UserID)
sql := getSqlForgetUserRolesExpectPublicRole(publicRoleID, ses.GetTenantInfo().GetUserID())
mrs := newMrsForPasswordOfUser([][]interface{}{})
bh.sql2result[sql] = mrs

Expand Down
8 changes: 3 additions & 5 deletions pkg/frontend/back_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ import (
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/pb/plan"

"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
"github.com/matrixorigin/matrixone/pkg/sql/compile"

"github.com/matrixorigin/matrixone/pkg/sql/parsers"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/dialect/mysql"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
Expand Down Expand Up @@ -212,7 +214,7 @@ func doComQueryInBack(backSes *backSession, execCtx *ExecCtx,
getGlobalPu().QueryClient,
getGlobalPu().HAKeeperClient,
getGlobalPu().UdfService,
getGlobalAic())
getGlobalAicm())
proc.Id = backSes.getNextProcessId()
proc.Lim.Size = getGlobalPu().SV.ProcessLimitationSize
proc.Lim.BatchRows = getGlobalPu().SV.ProcessLimitationBatchRows
Expand Down Expand Up @@ -704,10 +706,6 @@ func (backSes *backSession) getNextProcessId() string {
func (backSes *backSession) cleanCache() {
}

func (backSes *backSession) GetUpstream() FeSession {
return backSes.upstream
}

func (backSes *backSession) getCNLabels() map[string]string {
return backSes.label
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func setGlobalAicm(aicm *defines.AutoIncrCacheManager) {
globalAicm.Store(aicm)
}

func getGlobalAic() *defines.AutoIncrCacheManager {
func getGlobalAicm() *defines.AutoIncrCacheManager {
if globalAicm.Load() != nil {
return globalAicm.Load().(*defines.AutoIncrCacheManager)
}
Expand Down
16 changes: 4 additions & 12 deletions pkg/frontend/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync/atomic"
"time"

"github.com/matrixorigin/matrixone/pkg/logutil"

"github.com/google/uuid"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -35,7 +37,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/pb/query"
"github.com/matrixorigin/matrixone/pkg/pb/status"
Expand Down Expand Up @@ -237,7 +238,7 @@ type Session struct {
}

func (ses *Session) InitSystemVariables(ctx context.Context) (err error) {
if ses.gSysVars, err = GSysVarsMgr.Get(ses.GetTenantInfo().TenantID, ses, ctx); err != nil {
if ses.gSysVars, err = GSysVarsMgr.Get(ses.GetTenantInfo().GetTenantID(), ses, ctx); err != nil {
return
}
ses.sesSysVars = ses.gSysVars.Clone()
Expand Down Expand Up @@ -532,7 +533,7 @@ func NewSession(connCtx context.Context, proto MysqlRrWr, mp *mpool.MPool) *Sess
getGlobalPu().QueryClient,
getGlobalPu().HAKeeperClient,
getGlobalPu().UdfService,
getGlobalAic())
getGlobalAicm())

ses.proc.Lim.Size = getGlobalPu().SV.ProcessLimitationSize
ses.proc.Lim.BatchRows = getGlobalPu().SV.ProcessLimitationBatchRows
Expand Down Expand Up @@ -983,15 +984,6 @@ func (ses *Session) GetTxnInfo() string {
return meta.DebugString()
}

func (ses *Session) GetDatabaseName() string {
return ses.GetResponser().GetStr(DBNAME)
}

func (ses *Session) SetDatabaseName(db string) {
ses.GetResponser().SetStr(DBNAME, db)
ses.GetTxnCompileCtx().SetDatabase(db)
}

func (ses *Session) DatabaseNameIsEmpty() bool {
return len(ses.GetDatabaseName()) == 0
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/frontend/show_account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/sql/parsers"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/dialect"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func Test_getSqlForAccountInfo(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/frontend/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,8 @@ func (th *TxnHandler) createTxnOpUnsafe(execCtx *ExecCtx) error {
connectionID = execCtx.resper.GetU32(CONNID)
}
if execCtx.ses.GetTenantInfo() != nil {
accountID = execCtx.ses.GetTenantInfo().TenantID
userName = execCtx.ses.GetTenantInfo().User
accountID = execCtx.ses.GetTenantInfo().GetTenantID()
userName = execCtx.ses.GetTenantInfo().GetUser()
}
sessionInfo := execCtx.ses.GetDebugString()
opts = append(opts,
Expand Down

0 comments on commit fa926aa

Please sign in to comment.