From 25c980669409cdb1d69c3f4d4f772609834015d6 Mon Sep 17 00:00:00 2001 From: LiuBo Date: Fri, 6 Dec 2024 16:47:52 +0800 Subject: [PATCH] [improvement] stats: add some logs and UT --- pkg/cnservice/server_query.go | 3 +- pkg/frontend/show_account.go | 5 +- pkg/frontend/test/engine_mock.go | 17 + pkg/vm/engine/disttae/engine.go | 13 +- pkg/vm/engine/disttae/logtail_consumer.go | 10 +- pkg/vm/engine/disttae/mo_table_stats.go | 449 +++++++++--------- pkg/vm/engine/disttae/stats.go | 76 ++- pkg/vm/engine/disttae/stats_test.go | 218 +++++++++ pkg/vm/engine/disttae/types.go | 2 + pkg/vm/engine/entire_engine.go | 10 + pkg/vm/engine/entire_engine_test.go | 10 + pkg/vm/engine/memoryengine/binded.go | 10 + pkg/vm/engine/memoryengine/engine.go | 10 + pkg/vm/engine/test/change_handle_test.go | 57 ++- pkg/vm/engine/test/disttae_engine_test.go | 18 +- pkg/vm/engine/test/mo_table_stats_test.go | 52 +- pkg/vm/engine/test/reader_test.go | 35 +- pkg/vm/engine/test/testutil/disttae_engine.go | 12 +- pkg/vm/engine/test/testutil/logtailserver.go | 7 +- pkg/vm/engine/test/testutil/tae_engine.go | 14 +- pkg/vm/engine/test/testutil/util.go | 24 +- pkg/vm/engine/test/workspace_test.go | 60 ++- pkg/vm/engine/types.go | 8 + 23 files changed, 773 insertions(+), 347 deletions(-) diff --git a/pkg/cnservice/server_query.go b/pkg/cnservice/server_query.go index 3e10bbafcf6b1..fc2f2fedde08d 100644 --- a/pkg/cnservice/server_query.go +++ b/pkg/cnservice/server_query.go @@ -158,7 +158,8 @@ func (s *service) handleFaultInjection(ctx context.Context, req *query.Request, } func (s *service) handleMoTableStats(ctx context.Context, req *query.Request, resp *query.Response, _ *morpc.Buffer) error { - ret := disttae.HandleMoTableStatsCtl(req.CtlMoTableStatsRequest.Cmd) + e := s.storeEngine.(*disttae.Engine) + ret := e.HandleMoTableStatsCtl(req.CtlMoTableStatsRequest.Cmd) resp.CtlMoTableStatsResponse = query.CtlMoTableStatsResponse{ Resp: ret, } diff --git a/pkg/frontend/show_account.go b/pkg/frontend/show_account.go index c058308bb1164..89434fcbc9818 100644 --- a/pkg/frontend/show_account.go +++ b/pkg/frontend/show_account.go @@ -326,6 +326,7 @@ func updateStorageUsageCache(usages *cmd_util.StorageUsageResp_V3) { func tryGetSizeFromMTS( ctx context.Context, + serviceID string, accIds [][]int64, ) (sizes map[int64]uint64, ok bool) { @@ -340,7 +341,7 @@ func tryGetSizeFromMTS( } } - vals, accs, err, ok = disttae.QueryTableStatsByAccounts( + vals, accs, err, ok = getPu(serviceID).StorageEngine.QueryTableStatsByAccounts( ctx, []int{disttae.TableStatsTableSize}, accs, @@ -385,7 +386,7 @@ func getAccountsStorageUsage( return } - sizes, ok := tryGetSizeFromMTS(ctx, accIds) + sizes, ok := tryGetSizeFromMTS(ctx, ses.service, accIds) if ok { for k, v := range sizes { if len(ret[k]) == 0 { diff --git a/pkg/frontend/test/engine_mock.go b/pkg/frontend/test/engine_mock.go index e1fa8ff6f8302..f65c9d15dc8a5 100644 --- a/pkg/frontend/test/engine_mock.go +++ b/pkg/frontend/test/engine_mock.go @@ -2072,6 +2072,23 @@ func (mr *MockEngineMockRecorder) PrefetchTableMeta(ctx, key interface{}) *gomoc return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PrefetchTableMeta", reflect.TypeOf((*MockEngine)(nil).PrefetchTableMeta), ctx, key) } +// QueryTableStatsByAccounts mocks base method. +func (m *MockEngine) QueryTableStatsByAccounts(ctx context.Context, wantedStatsIdxes []int, accs []uint64, forceUpdate, resetUpdateTime bool) ([][]any, []uint64, error, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "QueryTableStatsByAccounts", ctx, wantedStatsIdxes, accs, forceUpdate, resetUpdateTime) + ret0, _ := ret[0].([][]any) + ret1, _ := ret[1].([]uint64) + ret2, _ := ret[2].(error) + ret3, _ := ret[3].(bool) + return ret0, ret1, ret2, ret3 +} + +// QueryTableStatsByAccounts indicates an expected call of QueryTableStatsByAccounts. +func (mr *MockEngineMockRecorder) QueryTableStatsByAccounts(ctx, wantedStatsIdxes, accs, forceUpdate, resetUpdateTime interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryTableStatsByAccounts", reflect.TypeOf((*MockEngine)(nil).QueryTableStatsByAccounts), ctx, wantedStatsIdxes, accs, forceUpdate, resetUpdateTime) +} + // Stats mocks base method. func (m *MockEngine) Stats(ctx context.Context, key statsinfo.StatsInfoKey, sync bool) *statsinfo.StatsInfo { m.ctrl.T.Helper() diff --git a/pkg/vm/engine/disttae/engine.go b/pkg/vm/engine/disttae/engine.go index ab44fc4913e75..53c63add94cfd 100644 --- a/pkg/vm/engine/disttae/engine.go +++ b/pkg/vm/engine/disttae/engine.go @@ -137,11 +137,22 @@ func New( e.pClient.LogtailRPCClientFactory = DefaultNewRpcStreamToTnLogTailService e.pClient.ctx = ctx - initMoTableStatsConfig(ctx, e) + err = initMoTableStatsConfig(ctx, e) + if err != nil { + panic(err) + } return e } +func (e *Engine) Close() error { + if e.gcPool != nil { + e.gcPool.Release() + } + e.dynamicCtx.Close() + return nil +} + func (e *Engine) fillDefaults() { if e.config.insertEntryMaxCount <= 0 { e.config.insertEntryMaxCount = InsertEntryThreshold diff --git a/pkg/vm/engine/disttae/logtail_consumer.go b/pkg/vm/engine/disttae/logtail_consumer.go index 3be32195f3629..3626a522d3b67 100644 --- a/pkg/vm/engine/disttae/logtail_consumer.go +++ b/pkg/vm/engine/disttae/logtail_consumer.go @@ -621,7 +621,12 @@ func (c *PushClient) receiveLogtails(ctx context.Context, e *Engine) { } // Wait for resuming logtail receiver. - <-c.resumeC + select { + case <-ctx.Done(): + return + + case <-c.resumeC: + } logutil.Infof("%s logtail receiver resumed", logTag) default: @@ -1835,6 +1840,9 @@ func (c *PushClient) createRoutineToConsumeLogTails( errHappen := false for { select { + case <-ctx.Done(): + return + case cmd := <-receiver.signalChan: if errHappen { continue diff --git a/pkg/vm/engine/disttae/mo_table_stats.go b/pkg/vm/engine/disttae/mo_table_stats.go index bba8b4d1f4bd7..9e5472755684d 100644 --- a/pkg/vm/engine/disttae/mo_table_stats.go +++ b/pkg/vm/engine/disttae/mo_table_stats.go @@ -256,11 +256,10 @@ func initMoTableStatsConfig( ctx context.Context, eng *Engine, ) (err error) { - - dynamicCtx.once.Do(func() { + eng.dynamicCtx.once.Do(func() { defer func() { - dynamicCtx.defaultConf = dynamicCtx.conf + eng.dynamicCtx.defaultConf = eng.dynamicCtx.conf if err != nil { logutil.Error(logHeader, zap.String("source", "init mo table stats config"), @@ -268,49 +267,49 @@ func initMoTableStatsConfig( } }() - dynamicCtx.de = eng + eng.dynamicCtx.de = eng - if dynamicCtx.alphaTaskPool, err = ants.NewPool( + if eng.dynamicCtx.alphaTaskPool, err = ants.NewPool( runtime.NumCPU(), ants.WithNonblocking(false)); err != nil { return } - dynamicCtx.conf = eng.config.statsConf + eng.dynamicCtx.conf = eng.config.statsConf - function.MoTableRowsSizeUseOldImpl.Store(dynamicCtx.conf.DisableStatsTask) + function.MoTableRowsSizeUseOldImpl.Store(eng.dynamicCtx.conf.DisableStatsTask) - dynamicCtx.executorPool = sync.Pool{ + eng.dynamicCtx.executorPool = sync.Pool{ New: func() interface{} { return eng.config.ieFactory() }, } - dynamicCtx.sqlOpts = ie.NewOptsBuilder().Database(catalog.MO_CATALOG).Internal(true).Finish() + eng.dynamicCtx.sqlOpts = ie.NewOptsBuilder().Database(catalog.MO_CATALOG).Internal(true).Finish() - if dynamicCtx.conf.GetTableListLimit <= 0 { - dynamicCtx.conf.GetTableListLimit = defaultGetTableListLimit + if eng.dynamicCtx.conf.GetTableListLimit <= 0 { + eng.dynamicCtx.conf.GetTableListLimit = defaultGetTableListLimit } - if dynamicCtx.conf.UpdateDuration <= 0 { - dynamicCtx.conf.UpdateDuration = defaultAlphaCycleDur + if eng.dynamicCtx.conf.UpdateDuration <= 0 { + eng.dynamicCtx.conf.UpdateDuration = defaultAlphaCycleDur } - if dynamicCtx.conf.CorrectionDuration <= 0 { - dynamicCtx.conf.CorrectionDuration = defaultGamaCycleDur + if eng.dynamicCtx.conf.CorrectionDuration <= 0 { + eng.dynamicCtx.conf.CorrectionDuration = defaultGamaCycleDur } - dynamicCtx.objIdsPool = sync.Pool{ + eng.dynamicCtx.objIdsPool = sync.Pool{ New: func() interface{} { objIds := make([]types.Objectid, 0) return &objIds }, } - dynamicCtx.tblQueue = make(chan tablePair, options.DefaultBlockMaxRows*2) + eng.dynamicCtx.tblQueue = make(chan tablePair, options.DefaultBlockMaxRows*2) - dynamicCtx.cleanDeletesQueue = make(chan struct{}) - dynamicCtx.updateForgottenQueue = make(chan struct{}) + eng.dynamicCtx.cleanDeletesQueue = make(chan struct{}) + eng.dynamicCtx.updateForgottenQueue = make(chan struct{}) // registerMoTableSizeRows { @@ -318,7 +317,7 @@ func initMoTableStatsConfig( context.Context, []uint64, []uint64, []uint64, engine.Engine, bool, bool) ([]uint64, error) { - return MTSTableSize + return eng.dynamicCtx.MTSTableSize } function.GetMoTableSizeFunc.Store(&ff1) @@ -326,18 +325,18 @@ func initMoTableStatsConfig( context.Context, []uint64, []uint64, []uint64, engine.Engine, bool, bool) ([]uint64, error) { - return MTSTableRows + return eng.dynamicCtx.MTSTableRows } function.GetMoTableRowsFunc.Store(&ff2) } - dynamicCtx.tableStock.tbls = make([]tablePair, 0, 1) + eng.dynamicCtx.tableStock.tbls = make([]tablePair, 0, 1) { - dynamicCtx.beta.executor = betaTask - dynamicCtx.gama.executor = gamaTask + eng.dynamicCtx.beta.executor = eng.dynamicCtx.betaTask + eng.dynamicCtx.gama.executor = eng.dynamicCtx.gamaTask - if dynamicCtx.beta.taskPool, err = ants.NewPool( + if eng.dynamicCtx.beta.taskPool, err = ants.NewPool( runtime.NumCPU(), ants.WithNonblocking(false), ants.WithPanicHandler(func(e interface{}) { @@ -348,7 +347,7 @@ func initMoTableStatsConfig( return } - if dynamicCtx.gama.taskPool, err = ants.NewPool( + if eng.dynamicCtx.gama.taskPool, err = ants.NewPool( runtime.NumCPU(), ants.WithNonblocking(false), ants.WithPanicHandler(func(e interface{}) { @@ -374,41 +373,46 @@ func initMoTableStatsConfig( go func() { defer func() { - dynamicCtx.Lock() + eng.dynamicCtx.Lock() task.running = false - dynamicCtx.Unlock() + eng.dynamicCtx.Unlock() }() // there should not have a deadline - taskCtx := turn2SysCtx(context.Background()) + taskCtx := turn2SysCtx(ctx) task.executor(taskCtx, eng.service, eng) }() } // beta task expect to be running on every cn. // gama task expect to be running only on one cn. - dynamicCtx.launchTask = func(name string) { - dynamicCtx.Lock() - defer dynamicCtx.Unlock() + eng.dynamicCtx.launchTask = func(name string) { + eng.dynamicCtx.Lock() + defer eng.dynamicCtx.Unlock() switch name { case gamaTaskName: - dynamicCtx.isMainRunner = true - launch("gama task", &dynamicCtx.gama) + eng.dynamicCtx.isMainRunner = true + launch("gama task", &eng.dynamicCtx.gama) case betaTaskName: - launch("beta task", &dynamicCtx.beta) + launch("beta task", &eng.dynamicCtx.beta) } } go func() { - ctx = turn2SysCtx(context.Background()) + taskCtx := turn2SysCtx(ctx) for { if eng.config.moServerStateChecker == nil || !eng.config.moServerStateChecker() { - time.Sleep(time.Second * 5) + select { + case <-taskCtx.Done(): + return + + case <-time.After(time.Second * 5): + } continue } - if initCronTask(ctx) { + if eng.dynamicCtx.initCronTask(taskCtx) { break } } @@ -418,7 +422,7 @@ func initMoTableStatsConfig( return err } -func initCronTask( +func (d *dynamicCtx) initCronTask( ctx context.Context, ) bool { // insert mo table stats task meta into sys_cron_task table. @@ -436,11 +440,11 @@ func initCronTask( zap.Error(err)) } - executeSQL(ctx, sql, "init cron task") + d.executeSQL(ctx, sql, "init cron task") } checkTask := func() bool { - sqlRet := executeSQL(ctx, + sqlRet := d.executeSQL(ctx, fmt.Sprintf(`select count(*) from %s.%s where task_metadata_id = '%s';`, catalog.MOTaskDB, "sys_cron_task", "mo_table_stats"), "check cron task") @@ -473,13 +477,11 @@ type taskState struct { launchTimes int } -var dynamicCtx struct { +type dynamicCtx struct { sync.RWMutex once sync.Once - moServerStateChecker func() bool - defaultConf MoTableStatsConfig conf MoTableStatsConfig @@ -488,7 +490,6 @@ var dynamicCtx struct { updateForgottenQueue chan struct{} de *Engine - service string objIdsPool sync.Pool tableStock struct { @@ -508,44 +509,56 @@ var dynamicCtx struct { sqlOpts ie.SessionOverrideOptions } -func LogDynamicCtx() string { - dynamicCtx.Lock() - defer dynamicCtx.Unlock() +func (d *dynamicCtx) LogDynamicCtx() string { + d.Lock() + defer d.Unlock() var buf bytes.Buffer buf.WriteString(fmt.Sprintf("gama: [running: %v; launched-time: %v]\n", - dynamicCtx.gama.running, - dynamicCtx.gama.launchTimes)) + d.gama.running, + d.gama.launchTimes)) buf.WriteString(fmt.Sprintf("beta: [running: %v; launched-time: %v]\n", - dynamicCtx.beta.running, - dynamicCtx.beta.launchTimes)) + d.beta.running, + d.beta.launchTimes)) buf.WriteString(fmt.Sprintf( "default-conf:[alpha-dur: %v; gama-dur: %v; limit: %v; force-update: %v; use-old-impl: %v; disable-task: %v]\n", - dynamicCtx.defaultConf.UpdateDuration, - dynamicCtx.defaultConf.CorrectionDuration, - dynamicCtx.defaultConf.GetTableListLimit, - dynamicCtx.defaultConf.ForceUpdate, - dynamicCtx.defaultConf.StatsUsingOldImpl, - dynamicCtx.defaultConf.DisableStatsTask)) + d.defaultConf.UpdateDuration, + d.defaultConf.CorrectionDuration, + d.defaultConf.GetTableListLimit, + d.defaultConf.ForceUpdate, + d.defaultConf.StatsUsingOldImpl, + d.defaultConf.DisableStatsTask)) buf.WriteString(fmt.Sprintf( "cur-conf:[alpha-dur: %v; gama-dur: %v; limit: %v; force-update: %v; use-old-impl: %v; disable-task: %v]\n", - dynamicCtx.conf.UpdateDuration, - dynamicCtx.conf.CorrectionDuration, - dynamicCtx.conf.GetTableListLimit, - dynamicCtx.conf.ForceUpdate, - dynamicCtx.conf.StatsUsingOldImpl, - dynamicCtx.conf.DisableStatsTask)) + d.conf.UpdateDuration, + d.conf.CorrectionDuration, + d.conf.GetTableListLimit, + d.conf.ForceUpdate, + d.conf.StatsUsingOldImpl, + d.conf.DisableStatsTask)) return buf.String() } +func (d *dynamicCtx) Close() { + if d.alphaTaskPool != nil { + d.alphaTaskPool.Release() + } + if d.beta.taskPool != nil { + d.beta.taskPool.Release() + } + if d.gama.taskPool != nil { + d.gama.taskPool.Release() + } +} + ////////////////// MoTableStats Interface ////////////////// -func HandleMoTableStatsCtl(cmd string) string { +func (d *dynamicCtx) HandleMoTableStatsCtl(cmd string) string { cmds := strings.Split(cmd, ":") if len(cmds) != 2 { @@ -565,36 +578,36 @@ func HandleMoTableStatsCtl(cmd string) string { switch typ { case "use_old_impl": - return setUseOldImpl(val == "true") + return d.setUseOldImpl(val == "true") case "force_update": - return setForceUpdate(val == "true") + return d.setForceUpdate(val == "true") case "move_on": - return setMoveOnTask(val == "true") + return d.setMoveOnTask(val == "true") case "restore_default_setting": - return restoreDefaultSetting(val == "true") + return d.restoreDefaultSetting(val == "true") case "echo_current_setting": - return echoCurrentSetting(val == "true") + return d.echoCurrentSetting(val == "true") case "recomputing": - return recomputing(val) + return d.recomputing(val) default: return "failed, cmd invalid" } } -func recomputing(para string) string { +func (d *dynamicCtx) recomputing(para string) string { { - dynamicCtx.Lock() - if !dynamicCtx.isMainRunner { - dynamicCtx.Unlock() + d.Lock() + if !d.isMainRunner { + d.Unlock() return "not main runner" } - dynamicCtx.Unlock() + d.Unlock() } var ( @@ -616,7 +629,7 @@ func recomputing(para string) string { } } - _, retAcc, err, ok = QueryTableStatsByAccounts( + _, retAcc, err, ok = d.QueryTableStatsByAccounts( context.Background(), nil, accIds, false, true) if !ok { @@ -637,11 +650,11 @@ func recomputing(para string) string { return buf.String() } -func checkMoveOnTask() bool { - dynamicCtx.Lock() - defer dynamicCtx.Unlock() +func (d *dynamicCtx) checkMoveOnTask() bool { + d.Lock() + defer d.Unlock() - disable := dynamicCtx.conf.DisableStatsTask + disable := d.conf.DisableStatsTask logutil.Info(logHeader, zap.String("source", "check move on"), @@ -650,44 +663,44 @@ func checkMoveOnTask() bool { return disable } -func echoCurrentSetting(ok bool) string { +func (d *dynamicCtx) echoCurrentSetting(ok bool) string { if !ok { return "noop" } - dynamicCtx.Lock() - defer dynamicCtx.Unlock() + d.Lock() + defer d.Unlock() return fmt.Sprintf("move_on(%v), use_old_impl(%v), force_update(%v)", - !dynamicCtx.conf.DisableStatsTask, - dynamicCtx.conf.StatsUsingOldImpl, - dynamicCtx.conf.ForceUpdate) + !d.conf.DisableStatsTask, + d.conf.StatsUsingOldImpl, + d.conf.ForceUpdate) } -func restoreDefaultSetting(ok bool) string { +func (d *dynamicCtx) restoreDefaultSetting(ok bool) string { if !ok { return "noop" } - dynamicCtx.Lock() - defer dynamicCtx.Unlock() + d.Lock() + defer d.Unlock() - dynamicCtx.conf = dynamicCtx.defaultConf - function.MoTableRowsSizeUseOldImpl.Store(dynamicCtx.conf.StatsUsingOldImpl) - function.MoTableRowsSizeForceUpdate.Store(dynamicCtx.conf.ForceUpdate) + d.conf = d.defaultConf + function.MoTableRowsSizeUseOldImpl.Store(d.conf.StatsUsingOldImpl) + function.MoTableRowsSizeForceUpdate.Store(d.conf.ForceUpdate) return fmt.Sprintf("move_on(%v), use_old_impl(%v), force_update(%v)", - !dynamicCtx.conf.DisableStatsTask, - dynamicCtx.conf.StatsUsingOldImpl, - dynamicCtx.conf.ForceUpdate) + !d.conf.DisableStatsTask, + d.conf.StatsUsingOldImpl, + d.conf.ForceUpdate) } -func setMoveOnTask(newVal bool) string { - dynamicCtx.Lock() - defer dynamicCtx.Unlock() +func (d *dynamicCtx) setMoveOnTask(newVal bool) string { + d.Lock() + defer d.Unlock() - oldState := !dynamicCtx.conf.DisableStatsTask - dynamicCtx.conf.DisableStatsTask = !newVal + oldState := !d.conf.DisableStatsTask + d.conf.DisableStatsTask = !newVal ret := fmt.Sprintf("move on: %v to %v", oldState, newVal) logutil.Info(logHeader, @@ -697,13 +710,13 @@ func setMoveOnTask(newVal bool) string { return ret } -func setUseOldImpl(newVal bool) string { - dynamicCtx.Lock() - defer dynamicCtx.Unlock() +func (d *dynamicCtx) setUseOldImpl(newVal bool) string { + d.Lock() + defer d.Unlock() - oldState := dynamicCtx.conf.StatsUsingOldImpl + oldState := d.conf.StatsUsingOldImpl function.MoTableRowsSizeUseOldImpl.Store(newVal) - dynamicCtx.conf.StatsUsingOldImpl = newVal + d.conf.StatsUsingOldImpl = newVal ret := fmt.Sprintf("use old impl: %v to %v", oldState, newVal) logutil.Info(logHeader, @@ -713,13 +726,13 @@ func setUseOldImpl(newVal bool) string { return ret } -func setForceUpdate(newVal bool) string { - dynamicCtx.Lock() - defer dynamicCtx.Unlock() +func (d *dynamicCtx) setForceUpdate(newVal bool) string { + d.Lock() + defer d.Unlock() - oldState := dynamicCtx.conf.ForceUpdate + oldState := d.conf.ForceUpdate function.MoTableRowsSizeForceUpdate.Store(newVal) - dynamicCtx.conf.ForceUpdate = newVal + d.conf.ForceUpdate = newVal ret := fmt.Sprintf("force update: %v to %v", oldState, newVal) logutil.Info(logHeader, @@ -729,9 +742,9 @@ func setForceUpdate(newVal bool) string { return ret } -func executeSQL(ctx context.Context, sql string, hint string) ie.InternalExecResult { - exec := dynamicCtx.executorPool.Get() - defer dynamicCtx.executorPool.Put(exec) +func (d *dynamicCtx) executeSQL(ctx context.Context, sql string, hint string) ie.InternalExecResult { + exec := d.executorPool.Get() + defer d.executorPool.Put(exec) var ( newCtx = ctx @@ -743,7 +756,7 @@ func executeSQL(ctx context.Context, sql string, hint string) ie.InternalExecRes defer cancel() } - ret := exec.(ie.InternalExecutor).Query(newCtx, sql, dynamicCtx.sqlOpts) + ret := exec.(ie.InternalExecutor).Query(newCtx, sql, d.sqlOpts) if ret.Error() != nil { logutil.Info(logHeader, zap.String("source", hint), @@ -766,7 +779,7 @@ func intsJoin(items []uint64, delimiter string) string { return builder.String() } -func forceUpdateQuery( +func (d *dynamicCtx) forceUpdateQuery( ctx context.Context, wantedStatsIdxes []int, accs, dbs, tbls []uint64, @@ -794,7 +807,7 @@ func forceUpdateQuery( intsJoin(dbs, ","), intsJoin(tbls, ",")) - sqlRet := executeSQL(ctx, sql, "force update query") + sqlRet := d.executeSQL(ctx, sql, "force update query") if sqlRet.Error() != nil { return nil, sqlRet.Error() } @@ -826,7 +839,7 @@ func forceUpdateQuery( } } - if err = getChangedTableList( + if err = d.getChangedTableList( ctx, eng.service, eng, accs, dbs, tbls, oldTS, &pairs, &to); err != nil { return } @@ -854,12 +867,12 @@ func forceUpdateQuery( } } - if err = alphaTask(ctx, eng.service, eng, pairs, + if err = d.alphaTask(ctx, eng.service, eng, pairs, fmt.Sprintf("forceUpdateQuery(reset_update=%v)", resetUpdateTime)); err != nil { return nil, err } - if statsVals, err = normalQuery( + if statsVals, err = d.normalQuery( ctx, wantedStatsIdxes, accs, dbs, tbls); err != nil { return nil, err @@ -868,7 +881,7 @@ func forceUpdateQuery( return } -func normalQuery( +func (d *dynamicCtx) normalQuery( ctx context.Context, wantedStatsIdxes []int, accs, dbs, tbls []uint64, @@ -885,7 +898,7 @@ func normalQuery( intsJoin(dbs, ","), intsJoin(tbls, ",")) - sqlRet := executeSQL(ctx, sql, "normal query") + sqlRet := d.executeSQL(ctx, sql, "normal query") if sqlRet.Error() != nil { return nil, sqlRet.Error() } @@ -944,7 +957,7 @@ func normalQuery( return statsVals, nil } -func QueryTableStatsByAccounts( +func (d *dynamicCtx) QueryTableStatsByAccounts( ctx context.Context, wantedStatsIdxes []int, accs []uint64, @@ -963,7 +976,7 @@ func QueryTableStatsByAccounts( sql := fmt.Sprintf(accumulateIdsByAccSQL, catalog.MO_CATALOG, catalog.MO_TABLES, intsJoin(accs, ",")) - sqlRet := executeSQL(newCtx, sql, "query table stats by accounts") + sqlRet := d.executeSQL(newCtx, sql, "query table stats by accounts") if err = sqlRet.Error(); err != nil { return } @@ -993,7 +1006,7 @@ func QueryTableStatsByAccounts( tbls = append(tbls, val.(uint64)) } - statsVals, err, ok = QueryTableStats( + statsVals, err, ok = d.QueryTableStats( newCtx, wantedStatsIdxes, accs2, dbs, tbls, forceUpdate, resetUpdateTime, nil) logutil.Info(logHeader, @@ -1009,7 +1022,7 @@ func QueryTableStatsByAccounts( return statsVals, accs2, err, ok } -func QueryTableStats( +func (d *dynamicCtx) QueryTableStats( ctx context.Context, wantedStatsIdxes []int, accs, dbs, tbls []uint64, @@ -1018,17 +1031,17 @@ func QueryTableStats( eng engine.Engine, ) (statsVals [][]any, err error, ok bool) { - dynamicCtx.Lock() - useOld := dynamicCtx.conf.StatsUsingOldImpl - dynamicCtx.Unlock() + d.Lock() + useOld := d.conf.StatsUsingOldImpl + d.Unlock() if useOld { return } if eng == nil { - dynamicCtx.Lock() - eng = dynamicCtx.de - dynamicCtx.Unlock() + d.Lock() + eng = d.de + d.Unlock() } var now = time.Now() @@ -1061,7 +1074,7 @@ func QueryTableStats( de = eng.(*Engine) } - statsVals, err = forceUpdateQuery( + statsVals, err = d.forceUpdateQuery( newCtx, wantedStatsIdxes, accs, dbs, tbls, resetUpdateTime, @@ -1069,12 +1082,12 @@ func QueryTableStats( return statsVals, err, err == nil } - statsVals, err = normalQuery(newCtx, wantedStatsIdxes, accs, dbs, tbls) + statsVals, err = d.normalQuery(newCtx, wantedStatsIdxes, accs, dbs, tbls) return statsVals, err, err == nil } -func MTSTableSize( +func (d *dynamicCtx) MTSTableSize( ctx context.Context, accs, dbs, tbls []uint64, eng engine.Engine, @@ -1082,7 +1095,7 @@ func MTSTableSize( resetUpdateTime bool, ) (sizes []uint64, err error) { - statsVals, err, _ := QueryTableStats( + statsVals, err, _ := d.QueryTableStats( ctx, []int{TableStatsTableSize}, accs, dbs, tbls, forceUpdate, resetUpdateTime, eng) @@ -1101,7 +1114,7 @@ func MTSTableSize( return } -func MTSTableRows( +func (d *dynamicCtx) MTSTableRows( ctx context.Context, accs, dbs, tbls []uint64, eng engine.Engine, @@ -1109,7 +1122,7 @@ func MTSTableRows( resetUpdateTime bool, ) (sizes []uint64, err error) { - statsVals, err, _ := QueryTableStats( + statsVals, err, _ := d.QueryTableStats( ctx, []int{TableStatsTableRows}, accs, dbs, tbls, forceUpdate, resetUpdateTime, eng) @@ -1255,25 +1268,25 @@ func GetMOTableStatsExecutor( sqlExecutor func() ie.InternalExecutor, ) func(ctx context.Context, task task.Task) error { return func(ctx context.Context, task task.Task) error { - return tableStatsExecutor(ctx, service, eng) + return eng.(*Engine).dynamicCtx.tableStatsExecutor(ctx, service, eng) } } func turn2SysCtx(ctx context.Context) context.Context { newCtx := ctx if val := ctx.Value(defines.TenantIDKey{}); val == nil || val.(uint32) != catalog.System_Account { - newCtx = context.WithValue(context.Background(), defines.TenantIDKey{}, catalog.System_Account) + newCtx = context.WithValue(ctx, defines.TenantIDKey{}, catalog.System_Account) } return newCtx } -func LaunchMTSTasksForUT() { - dynamicCtx.launchTask(gamaTaskName) - dynamicCtx.launchTask(betaTaskName) +func (d *dynamicCtx) LaunchMTSTasksForUT() { + d.launchTask(gamaTaskName) + d.launchTask(betaTaskName) } -func tableStatsExecutor( +func (d *dynamicCtx) tableStatsExecutor( ctx context.Context, service string, eng engine.Engine, @@ -1293,19 +1306,19 @@ func tableStatsExecutor( return newCtx.Err() case <-executeTicker.C: - if checkMoveOnTask() { + if d.checkMoveOnTask() { continue } - if err = prepare(newCtx, service, eng); err != nil { + if err = d.prepare(newCtx, service, eng); err != nil { return err } - dynamicCtx.Lock() - tbls := dynamicCtx.tableStock.tbls[:] - dynamicCtx.Unlock() + d.Lock() + tbls := d.tableStock.tbls[:] + d.Unlock() - if err = alphaTask( + if err = d.alphaTask( newCtx, service, eng, tbls, "main routine", @@ -1316,29 +1329,29 @@ func tableStatsExecutor( return err } - dynamicCtx.Lock() - executeTicker.Reset(dynamicCtx.conf.UpdateDuration) - dynamicCtx.tableStock.tbls = dynamicCtx.tableStock.tbls[:0] - dynamicCtx.Unlock() + d.Lock() + executeTicker.Reset(d.conf.UpdateDuration) + d.tableStock.tbls = d.tableStock.tbls[:0] + d.Unlock() } } } -func prepare( +func (d *dynamicCtx) prepare( ctx context.Context, service string, eng engine.Engine, ) (err error) { // gama task running only on a specified cn - dynamicCtx.launchTask(gamaTaskName) + d.launchTask(gamaTaskName) - dynamicCtx.Lock() - defer dynamicCtx.Unlock() + d.Lock() + defer d.Unlock() offsetTS := types.TS{} - for len(dynamicCtx.tableStock.tbls) == 0 { - accs, dbs, tbls, ts, err := getCandidates(ctx, service, eng, dynamicCtx.conf.GetTableListLimit, offsetTS) + for len(d.tableStock.tbls) == 0 { + accs, dbs, tbls, ts, err := d.getCandidates(ctx, service, eng, d.conf.GetTableListLimit, offsetTS) if err != nil { return err } @@ -1347,10 +1360,10 @@ func prepare( break } - err = getChangedTableList( + err = d.getChangedTableList( ctx, service, eng, accs, dbs, tbls, ts, - &dynamicCtx.tableStock.tbls, - &dynamicCtx.tableStock.newest) + &d.tableStock.tbls, + &d.tableStock.newest) if err != nil { return err @@ -1359,13 +1372,13 @@ func prepare( // in case of all candidates have been deleted. offsetTS = types.TimestampToTS(*ts[len(ts)-1]) - if len(dynamicCtx.tableStock.tbls) == 0 && offsetTS.IsEmpty() { + if len(d.tableStock.tbls) == 0 && offsetTS.IsEmpty() { // there exists a large number of deleted table which are new inserts logutil.Info(logHeader, zap.String("source", "prepare"), zap.String("info", "found new inserts deletes, force clean")) - NotifyCleanDeletes() + d.NotifyCleanDeletes() break } } @@ -1373,7 +1386,7 @@ func prepare( return err } -func alphaTask( +func (d *dynamicCtx) alphaTask( ctx context.Context, service string, eng engine.Engine, @@ -1386,7 +1399,7 @@ func alphaTask( } // maybe the task exited, need to launch a new one - dynamicCtx.launchTask(betaTaskName) + d.launchTask(betaTaskName) var ( errWaitToReceive = len(tbls) @@ -1444,7 +1457,7 @@ func alphaTask( case <-ticker.C: if len(tbls) == 0 { // all submitted - dynamicCtx.tblQueue <- tablePair{} + d.tblQueue <- tablePair{} ticker.Reset(time.Second) if enterWait { @@ -1476,7 +1489,7 @@ func alphaTask( submitted++ wg.Add(1) - err = dynamicCtx.alphaTaskPool.Submit(func() { + err = d.alphaTaskPool.Submit(func() { defer wg.Done() var err2 error @@ -1495,7 +1508,7 @@ func alphaTask( tbls[i].pState = pState tbls[i].errChan = errQueue - dynamicCtx.tblQueue <- tbls[i] + d.tblQueue <- tbls[i] }) if err != nil { @@ -1511,7 +1524,7 @@ func alphaTask( wg.Wait() // let beta know that a batch done - dynamicCtx.tblQueue <- tablePair{} + d.tblQueue <- tablePair{} dur := time.Since(start) // the longer the update takes, the longer we would pause, @@ -1524,7 +1537,7 @@ func alphaTask( } } -func betaTask( +func (d *dynamicCtx) betaTask( ctx context.Context, service string, eng engine.Engine, @@ -1550,13 +1563,13 @@ func betaTask( err = ctx.Err() return - case tbl := <-dynamicCtx.tblQueue: + case tbl := <-d.tblQueue: if !tbl.valid { // an alpha batch transmit done - bulkUpdateTableOnlyTS(ctx, service, onlyTSBat) + d.bulkUpdateTableOnlyTS(ctx, service, onlyTSBat) bulkWait.Wait() - _ = bulkUpdateTableStatsList(ctx, service, &slBat) + _ = d.bulkUpdateTableStatsList(ctx, service, &slBat) slBat.Clear() onlyTSBat = onlyTSBat[:0] @@ -1573,10 +1586,10 @@ func betaTask( } bulkWait.Add(1) - if err = dynamicCtx.beta.taskPool.Submit(func() { + if err = d.beta.taskPool.Submit(func() { defer bulkWait.Done() - sl, err2 := statsCalculateOp(ctx, service, de.fs, tbl.snapshot, tbl.pState) + sl, err2 := d.statsCalculateOp(ctx, service, de.fs, tbl.snapshot, tbl.pState) if err2 != nil { tbl.Done(err2) } else { @@ -1591,15 +1604,15 @@ func betaTask( } } -func NotifyCleanDeletes() { - dynamicCtx.cleanDeletesQueue <- struct{}{} +func (d *dynamicCtx) NotifyCleanDeletes() { + d.cleanDeletesQueue <- struct{}{} } -func NotifyUpdateForgotten() { - dynamicCtx.updateForgottenQueue <- struct{}{} +func (d *dynamicCtx) NotifyUpdateForgotten() { + d.updateForgottenQueue <- struct{}{} } -func gamaInsertNewTables( +func (d *dynamicCtx) gamaInsertNewTables( ctx context.Context, service string, eng engine.Engine, @@ -1616,7 +1629,7 @@ func gamaInsertNewTables( catalog.MO_CATALOG, catalog.MO_TABLE_STATS, ) - sqlRet = executeSQL(ctx, sql, "insert new table-0: get new tables") + sqlRet = d.executeSQL(ctx, sql, "insert new table-0: get new tables") if err = sqlRet.Error(); err != nil { return } @@ -1687,7 +1700,7 @@ func gamaInsertNewTables( catalog.MO_CATALOG, catalog.MO_TABLE_STATS, strings.Join(values, ",")) - sqlRet = executeSQL(ctx, sql, "insert new table-1: insert new tables") + sqlRet = d.executeSQL(ctx, sql, "insert new table-1: insert new tables") err = sqlRet.Error() } @@ -1727,7 +1740,7 @@ func decodeIdsFromMoTableStatsSqlRet( return } -func gamaUpdateForgotten( +func (d *dynamicCtx) gamaUpdateForgotten( ctx context.Context, service string, de *Engine, @@ -1756,7 +1769,7 @@ func gamaUpdateForgotten( }() sql := fmt.Sprintf(getNullStatsSQL, catalog.MO_CATALOG, catalog.MO_TABLE_STATS, limit) - sqlRet := executeSQL(ctx, sql, "gama task: get null stats list") + sqlRet := d.executeSQL(ctx, sql, "gama task: get null stats list") if err = sqlRet.Error(); err != nil { return } @@ -1777,7 +1790,7 @@ func gamaUpdateForgotten( tbls = append(tbls, tbl) } - if err = alphaTask( + if err = d.alphaTask( ctx, service, de, tbls, "gama opA"); err != nil { return } @@ -1786,7 +1799,7 @@ func gamaUpdateForgotten( v2.GamaTaskDurationHistogram.Observe(time.Since(now).Seconds()) } -func gamaCleanDeletes( +func (d *dynamicCtx) gamaCleanDeletes( ctx context.Context, de *Engine, ) { @@ -1823,7 +1836,7 @@ func gamaCleanDeletes( colName2[step], colName2[step], catalog.MO_CATALOG, catalog.MO_TABLE_STATS, colName2[step], options.DefaultBlockMaxRows) - sqlRet := executeSQL(ctx, sql, fmt.Sprintf("gama task-%d-0", step)) + sqlRet := d.executeSQL(ctx, sql, fmt.Sprintf("gama task-%d-0", step)) if sqlRet.Error() != nil { return } @@ -1836,7 +1849,7 @@ func gamaCleanDeletes( sql = fmt.Sprintf(getCheckAliveSQL, colName1[step], catalog.MO_CATALOG, tblName[step], colName1[step], intsJoin(ids, ",")) - sqlRet = executeSQL(ctx, sql, fmt.Sprintf("gama task-%d-1", step)) + sqlRet = d.executeSQL(ctx, sql, fmt.Sprintf("gama task-%d-1", step)) if err = sqlRet.Error(); err != nil { return } @@ -1862,7 +1875,7 @@ func gamaCleanDeletes( if len(ids) != 0 { sql = fmt.Sprintf(getDeleteFromStatsSQL, catalog.MO_CATALOG, catalog.MO_TABLE_STATS, colName2[step], intsJoin(ids, ",")) - sqlRet = executeSQL(ctx, sql, fmt.Sprintf("gama task-%d-2", step)) + sqlRet = d.executeSQL(ctx, sql, fmt.Sprintf("gama task-%d-2", step)) if err = sqlRet.Error(); err != nil { return } @@ -1880,7 +1893,7 @@ func gamaCleanDeletes( deleteByStep(2) // clean tables } -func gamaTask( +func (d *dynamicCtx) gamaTask( ctx context.Context, service string, eng engine.Engine, @@ -1890,10 +1903,10 @@ func gamaTask( de = eng.(*Engine) ) - dynamicCtx.Lock() - gamaDur := dynamicCtx.conf.CorrectionDuration - gamaLimit := max(dynamicCtx.conf.GetTableListLimit, 8192) - dynamicCtx.Unlock() + d.Lock() + gamaDur := d.conf.CorrectionDuration + gamaLimit := max(d.conf.GetTableListLimit, 8192) + d.Unlock() randDuration := func(n int) time.Duration { rnd := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -1914,33 +1927,33 @@ func gamaTask( return case <-tickerA.C: - dynamicCtx.gama.taskPool.Submit(func() { - gamaUpdateForgotten(ctx, service, de, gamaLimit) + d.gama.taskPool.Submit(func() { + d.gamaUpdateForgotten(ctx, service, de, gamaLimit) }) tickerA.Reset(randDuration(baseFactory)) - case <-dynamicCtx.updateForgottenQueue: - dynamicCtx.gama.taskPool.Submit(func() { - gamaUpdateForgotten(ctx, service, de, gamaLimit) + case <-d.updateForgottenQueue: + d.gama.taskPool.Submit(func() { + d.gamaUpdateForgotten(ctx, service, de, gamaLimit) }) tickerA.Reset(randDuration(baseFactory)) case <-tickerB.C: - dynamicCtx.gama.taskPool.Submit(func() { - gamaCleanDeletes(ctx, de) + d.gama.taskPool.Submit(func() { + d.gamaCleanDeletes(ctx, de) }) tickerB.Reset(randDuration(baseFactory)) - case <-dynamicCtx.cleanDeletesQueue: + case <-d.cleanDeletesQueue: // emergence, do clean now - dynamicCtx.gama.taskPool.Submit(func() { - gamaCleanDeletes(ctx, de) + d.gama.taskPool.Submit(func() { + d.gamaCleanDeletes(ctx, de) }) tickerB.Reset(randDuration(baseFactory)) case <-tickerC.C: - dynamicCtx.gama.taskPool.Submit(func() { - gamaInsertNewTables(ctx, service, de) + d.gama.taskPool.Submit(func() { + d.gamaInsertNewTables(ctx, service, de) }) // try insert table at [1, 5] min tickerC.Reset(randDuration(5)) @@ -1948,7 +1961,7 @@ func gamaTask( } } -func statsCalculateOp( +func (d *dynamicCtx) statsCalculateOp( ctx context.Context, service string, fs fileservice.FileService, @@ -1959,12 +1972,12 @@ func statsCalculateOp( bcs := betaCycleStash{ born: time.Now(), snapshot: snapshot, - dataObjIds: dynamicCtx.objIdsPool.Get().(*[]types.Objectid), + dataObjIds: d.objIdsPool.Get().(*[]types.Objectid), } defer func() { *bcs.dataObjIds = (*bcs.dataObjIds)[:0] - dynamicCtx.objIdsPool.Put(bcs.dataObjIds) + d.objIdsPool.Put(bcs.dataObjIds) }() if err = collectVisibleData(&bcs, pState); err != nil { @@ -1982,7 +1995,7 @@ func statsCalculateOp( return sl, nil } -func getCandidates( +func (d *dynamicCtx) getCandidates( ctx context.Context, service string, eng engine.Engine, @@ -2004,7 +2017,7 @@ func getCandidates( } sql = fmt.Sprintf(getNextReadyListSQL, catalog.MO_CATALOG, catalog.MO_TABLE_STATS, where, limit) - sqlRet = executeSQL(ctx, sql, "get next ready to update list") + sqlRet = d.executeSQL(ctx, sql, "get next ready to update list") if err = sqlRet.Error(); err != nil { return } @@ -2044,7 +2057,7 @@ func getCandidates( return } -func getChangedTableList( +func (d *dynamicCtx) getChangedTableList( ctx context.Context, service string, eng engine.Engine, @@ -2362,7 +2375,7 @@ func applyTombstones( return nil } -func bulkUpdateTableStatsList( +func (d *dynamicCtx) bulkUpdateTableStatsList( ctx context.Context, service string, bat *sync.Map, @@ -2416,11 +2429,11 @@ func bulkUpdateTableStatsList( catalog.MO_CATALOG, catalog.MO_TABLE_STATS, strings.Join(vals, ",")) - ret := executeSQL(ctx, sql, "bulk update table stats") + ret := d.executeSQL(ctx, sql, "bulk update table stats") return ret.Error() } -func bulkUpdateTableOnlyTS( +func (d *dynamicCtx) bulkUpdateTableOnlyTS( ctx context.Context, service string, tbls []tablePair, @@ -2453,7 +2466,7 @@ func bulkUpdateTableOnlyTS( catalog.MO_CATALOG, catalog.MO_TABLE_STATS, strings.Join(vals, ",")) - ret := executeSQL(ctx, sql, "bulk update only ts") + ret := d.executeSQL(ctx, sql, "bulk update only ts") for i := range tbls { tbls[i].Done(ret.Error()) diff --git a/pkg/vm/engine/disttae/stats.go b/pkg/vm/engine/disttae/stats.go index 6f3d84de5356a..bba9ec494928c 100644 --- a/pkg/vm/engine/disttae/stats.go +++ b/pkg/vm/engine/disttae/stats.go @@ -42,6 +42,10 @@ var ( // MinUpdateInterval is the minimal interval to update stats info as it // is necessary to update stats every time. MinUpdateInterval = time.Second * 15 + + initCheckInterval = time.Millisecond * 10 + maxCheckInterval = time.Second * 5 + checkTimeout = time.Minute ) // waitKeeper is used to mark the table has finished waited, @@ -137,6 +141,20 @@ func WithUpdateWorkerFactor(f int) GlobalStatsOption { } } +// WithStatsUpdater set the update function to update stats info. +func WithStatsUpdater(f func(pb.StatsInfoKey, *pb.StatsInfo) bool) GlobalStatsOption { + return func(s *GlobalStats) { + s.statsUpdater = f + } +} + +// WithApproxObjectNumUpdater set the update function to update approx object num. +func WithApproxObjectNumUpdater(f func() int64) GlobalStatsOption { + return func(s *GlobalStats) { + s.approxObjectNumUpdater = f + } +} + // updateRecord records the update status of a key. type updateRecord struct { // inProgress indicates if the stats of a table is being updated. @@ -193,6 +211,12 @@ type GlobalStats struct { KeyRouter client.KeyRouter[pb.StatsInfoKey] concurrentExecutor ConcurrentExecutor + + // statsUpdate is the function which updates the stats info. + // If it is nil, set it to doUpdate. + statsUpdater func(pb.StatsInfoKey, *pb.StatsInfo) bool + // for test only currently. + approxObjectNumUpdater func() int64 } func NewGlobalStats( @@ -214,6 +238,9 @@ func NewGlobalStats( for _, opt := range opts { opt(s) } + if s.statsUpdater == nil { + s.statsUpdater = s.doUpdate + } s.concurrentExecutor = newConcurrentExecutor(runtime.GOMAXPROCS(0) * s.updateWorkerFactor * 4) s.concurrentExecutor.Run(ctx) go s.consumeWorker(ctx) @@ -462,11 +489,11 @@ func (gs *GlobalStats) waitLogtailUpdated(tid uint64) { // 1. context done // 2. interval checking, whose init interval is 10ms and max interval is 5s // 3. logtail update notify, to check if it is the required table. - initCheckInterval := time.Millisecond * 10 - maxCheckInterval := time.Second * 5 checkInterval := initCheckInterval timer := time.NewTimer(checkInterval) defer timer.Stop() + timeout := time.NewTimer(checkTimeout) + defer timeout.Stop() var done bool for { @@ -480,6 +507,10 @@ func (gs *GlobalStats) waitLogtailUpdated(tid uint64) { case <-gs.ctx.Done(): return + case <-timeout.C: + logutil.Warnf("wait logtail updated timeout, table ID: %d", tid) + timeout.Reset(checkTimeout) + case <-timer.C: if checkUpdated() { return @@ -581,35 +612,38 @@ func (gs *GlobalStats) updateTableStats(key pb.StatsInfoKey) { var updated bool stats := plan2.NewStatsInfo() - defer func() { - gs.mu.Lock() - defer gs.mu.Unlock() - - if updated { - gs.mu.statsInfoMap[key] = stats - gs.broadcastStats(key) - } else if _, ok := gs.mu.statsInfoMap[key]; !ok { - gs.mu.statsInfoMap[key] = nil - } + if gs.statsUpdater != nil { + updated = gs.statsUpdater(key, stats) + } - // Notify all the waiters to read the new stats info. - gs.mu.cond.Broadcast() + gs.mu.Lock() + defer gs.mu.Unlock() + if updated { + gs.mu.statsInfoMap[key] = stats + gs.broadcastStats(key) + } else if _, ok := gs.mu.statsInfoMap[key]; !ok { + gs.mu.statsInfoMap[key] = nil + } - gs.doneUpdate(key, updated) - }() + // Notify all the waiters to read the new stats info. + gs.mu.cond.Broadcast() + + gs.doneUpdate(key, updated) +} +func (gs *GlobalStats) doUpdate(key pb.StatsInfoKey, stats *pb.StatsInfo) bool { table := gs.engine.GetLatestCatalogCache().GetTableById(key.AccId, key.DatabaseID, key.TableID) // table or its definition is nil, means that the table is created but not committed yet. if table == nil || table.TableDef == nil { logutil.Errorf("cannot get table by ID %v", key) - return + return false } partitionState := gs.engine.GetOrCreateLatestPart(key.DatabaseID, key.TableID).Snapshot() approxObjectNum := int64(partitionState.ApproxDataObjectsNum()) - if approxObjectNum == 0 { + if gs.approxObjectNumUpdater == nil && approxObjectNum == 0 { // There are no objects flushed yet. - return + return false } // the time used to init stats info is not need to be too precise. @@ -624,10 +658,10 @@ func (gs *GlobalStats) updateTableStats(key pb.StatsInfoKey) { ) if err := UpdateStats(gs.ctx, req, gs.concurrentExecutor); err != nil { logutil.Errorf("failed to init stats info for table %v, err: %v", key, err) - return + return false } v2.StatsUpdateBlockCounter.Add(float64(stats.BlockNumber)) - updated = true + return true } func getMinMaxValueByFloat64(typ types.Type, buf []byte) float64 { diff --git a/pkg/vm/engine/disttae/stats_test.go b/pkg/vm/engine/disttae/stats_test.go index d4a8224792172..48d89fad6b2cc 100644 --- a/pkg/vm/engine/disttae/stats_test.go +++ b/pkg/vm/engine/disttae/stats_test.go @@ -16,20 +16,234 @@ package disttae import ( "context" + "math/rand" "sync" "sync/atomic" "testing" "time" "github.com/lni/goutils/leaktest" + "github.com/matrixorigin/matrixone/pkg/catalog" + "github.com/matrixorigin/matrixone/pkg/clusterservice" + "github.com/matrixorigin/matrixone/pkg/common/mpool" + "github.com/matrixorigin/matrixone/pkg/common/runtime" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/lockservice" "github.com/matrixorigin/matrixone/pkg/pb/statsinfo" + plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" + "github.com/matrixorigin/matrixone/pkg/vm/engine/disttae/cache" "github.com/stretchr/testify/assert" ) +func TestGetStats(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + gs := NewGlobalStats(ctx, nil, nil, + WithUpdateWorkerFactor(4), + WithStatsUpdater(func(key statsinfo.StatsInfoKey, info *statsinfo.StatsInfo) bool { + info.BlockNumber = 20 + return true + }), + ) + + tids := []uint64{2000, 2001, 2002} + go func() { + time.Sleep(time.Millisecond * 20) + for _, tid := range tids { + gs.notifyLogtailUpdate(tid) + } + }() + var wg sync.WaitGroup + wg.Add(100) + for i := 0; i < 100; i++ { + go func(j int) { + defer wg.Done() + rd := rand.New(rand.NewSource(time.Now().UnixNano())) + time.Sleep(time.Millisecond * time.Duration(10+rd.Intn(20))) + k := statsinfo.StatsInfoKey{ + DatabaseID: 1000, + TableID: tids[j%3], + } + info := gs.Get(ctx, k, true) + assert.NotNil(t, info) + assert.Equal(t, int64(20), info.BlockNumber) + }(i) + } + wg.Wait() +} + +func runTest( + t *testing.T, + test func(ctx context.Context, e *Engine), + opts ...GlobalStatsOption, +) { + defer leaktest.AfterTest(t)() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sid := "s1" + rt := runtime.DefaultRuntime() + runtime.SetupServiceBasedRuntime(sid, rt) + cluster := clusterservice.NewMOCluster( + sid, + nil, + time.Hour, + clusterservice.WithDisableRefresh(), + ) + defer cluster.Close() + rt.SetGlobalVariables(runtime.ClusterService, cluster) + lk := lockservice.NewLockService(lockservice.Config{ + ServiceID: sid, + }) + defer lk.Close() + rt.SetGlobalVariables(runtime.LockService, lk) + mp, err := mpool.NewMPool(sid, 1024*1024, 0) + catalog.SetupDefines(sid) + assert.NoError(t, err) + e := New( + ctx, + sid, + mp, + nil, + nil, + nil, + nil, + 4, + ) + for _, opt := range opts { + opt(e.globalStats) + } + defer e.Close() + test(ctx, e) +} + +func insertTable( + t *testing.T, + e *Engine, + did, tid uint64, + dname, tname string, +) (uint64, uint64) { + tbl := catalog.Table{ + AccountId: 0, + UserId: 0, + RoleId: 0, + DatabaseId: did, + DatabaseName: dname, + TableId: tid, + TableName: tname, + } + packer := types.NewPacker() + bat, err := catalog.GenCreateTableTuple(tbl, e.mp, packer) + assert.NoError(t, err) + _, err = fillRandomRowidAndZeroTs(bat, e.mp) + assert.NoError(t, err) + e.catalog.InsertTable(bat) + tableItem := e.catalog.GetTableByName(0, did, tname) + assert.NotNil(t, tableItem) + defs, err := catalog.GenColumnsFromDefs( + 0, + tname, + dname, + tid, + did, + catalog.GetDefines(e.service).MoDatabaseTableDefs, + ) + assert.NoError(t, err) + cache.InitTableItemWithColumns(tableItem, defs) + return tableItem.DatabaseId, tableItem.Id +} + +func TestUpdateStats(t *testing.T) { + t.Run("no table", func(t *testing.T) { + runTest(t, func(ctx context.Context, e *Engine) { + k := statsinfo.StatsInfoKey{ + DatabaseID: 1000, + TableID: 1001, + } + stats := plan2.NewStatsInfo() + updated := e.globalStats.doUpdate(k, stats) + assert.False(t, updated) + }) + }) + + t.Run("no obj", func(t *testing.T) { + runTest(t, func(ctx context.Context, e *Engine) { + did := uint64(1000) + dname := "test-db" + tid := uint64(1001) + tname := "test-table" + did1, tid1 := insertTable(t, e, did, tid, dname, tname) + assert.Equal(t, did, did1) + assert.Equal(t, tid, tid1) + k := statsinfo.StatsInfoKey{ + DatabaseID: did, + TableID: tid, + } + stats := plan2.NewStatsInfo() + updated := e.globalStats.doUpdate(k, stats) + assert.False(t, updated) + }) + }) + + t.Run("objs", func(t *testing.T) { + runTest(t, func(ctx context.Context, e *Engine) { + did := uint64(1000) + dname := "test-db" + tid := uint64(1001) + tname := "test-table" + did1, tid1 := insertTable(t, e, did, tid, dname, tname) + assert.Equal(t, did, did1) + assert.Equal(t, tid, tid1) + k := statsinfo.StatsInfoKey{ + DatabaseID: did, + TableID: tid, + } + stats := plan2.NewStatsInfo() + updated := e.globalStats.doUpdate(k, stats) + assert.True(t, updated) + }, WithApproxObjectNumUpdater(func() int64 { + return 10 + })) + }) +} + +func TestWaitLogtailUpdate(t *testing.T) { + origInitCheckInterval := initCheckInterval + origMaxCheckInterval := maxCheckInterval + origCheckTimeout := checkTimeout + defer func() { + initCheckInterval = origInitCheckInterval + maxCheckInterval = origMaxCheckInterval + checkTimeout = origCheckTimeout + leaktest.AfterTest(t)() + }() + initCheckInterval = time.Millisecond * 2 + maxCheckInterval = time.Millisecond * 10 + checkTimeout = time.Millisecond * 200 + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + gs := NewGlobalStats(ctx, nil, nil) + assert.NotNil(t, gs) + tid := uint64(2) + gs.waitLogtailUpdated(tid) + + tid = 200 + go func() { + time.Sleep(time.Millisecond * 100) + gs.notifyLogtailUpdate(tid) + }() + gs.waitLogtailUpdated(tid) +} + func TestGlobalStats_ShouldUpdate(t *testing.T) { defer leaktest.AfterTest(t)() t.Run("basic", func(t *testing.T) { + origMinUpdateInterval := MinUpdateInterval + defer func() { + MinUpdateInterval = origMinUpdateInterval + }() MinUpdateInterval = time.Millisecond * 10 ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -47,6 +261,10 @@ func TestGlobalStats_ShouldUpdate(t *testing.T) { }) t.Run("parallel", func(t *testing.T) { + origMinUpdateInterval := MinUpdateInterval + defer func() { + MinUpdateInterval = origMinUpdateInterval + }() MinUpdateInterval = time.Second * 10 ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/pkg/vm/engine/disttae/types.go b/pkg/vm/engine/disttae/types.go index 7877b29c503fc..34860960aa0c1 100644 --- a/pkg/vm/engine/disttae/types.go +++ b/pkg/vm/engine/disttae/types.go @@ -255,6 +255,8 @@ type Engine struct { moDatabaseCreatedTime *vector.Vector moTablesCreatedTime *vector.Vector moColumnsCreatedTime *vector.Vector + + dynamicCtx } func (e *Engine) SetService(svr string) { diff --git a/pkg/vm/engine/entire_engine.go b/pkg/vm/engine/entire_engine.go index c39352041457d..aef3aeb78bfdd 100644 --- a/pkg/vm/engine/entire_engine.go +++ b/pkg/vm/engine/entire_engine.go @@ -121,3 +121,13 @@ func (e *EntireEngine) GetMessageCenter() any { func (e *EntireEngine) GetService() string { return e.Engine.GetService() } + +func (e *EntireEngine) QueryTableStatsByAccounts( + ctx context.Context, + wantedStatsIdxes []int, + accs []uint64, + forceUpdate bool, + resetUpdateTime bool, +) (statsVals [][]any, retAcc []uint64, err error, ok bool) { + return e.Engine.QueryTableStatsByAccounts(ctx, wantedStatsIdxes, accs, forceUpdate, resetUpdateTime) +} diff --git a/pkg/vm/engine/entire_engine_test.go b/pkg/vm/engine/entire_engine_test.go index e6216f5904436..a782892410cec 100644 --- a/pkg/vm/engine/entire_engine_test.go +++ b/pkg/vm/engine/entire_engine_test.go @@ -358,6 +358,16 @@ func (e *testEngine) LatestLogtailAppliedTime() timestamp.Timestamp { return timestamp.Timestamp{} } +func (e *testEngine) QueryTableStatsByAccounts( + ctx context.Context, + wantedStatsIdxes []int, + accs []uint64, + forceUpdate bool, + resetUpdateTime bool, +) (statsVals [][]any, retAcc []uint64, err error, ok bool) { + return nil, nil, nil, false +} + func newtestOperator() *testOperator { return &testOperator{} } diff --git a/pkg/vm/engine/memoryengine/binded.go b/pkg/vm/engine/memoryengine/binded.go index f8a91f8fb32d8..3218e84c46e2d 100644 --- a/pkg/vm/engine/memoryengine/binded.go +++ b/pkg/vm/engine/memoryengine/binded.go @@ -123,3 +123,13 @@ func (b *BindedEngine) GetMessageCenter() any { func (b *BindedEngine) GetService() string { return b.engine.GetService() } + +func (b *BindedEngine) QueryTableStatsByAccounts( + ctx context.Context, + wantedStatsIdxes []int, + accs []uint64, + forceUpdate bool, + resetUpdateTime bool, +) (statsVals [][]any, retAcc []uint64, err error, ok bool) { + return nil, nil, nil, false +} diff --git a/pkg/vm/engine/memoryengine/engine.go b/pkg/vm/engine/memoryengine/engine.go index b1205ecdbf221..1a8056e748b3d 100644 --- a/pkg/vm/engine/memoryengine/engine.go +++ b/pkg/vm/engine/memoryengine/engine.go @@ -266,3 +266,13 @@ func getTNServices(cluster clusterservice.MOCluster) []metadata.TNService { func (e *Engine) GetMessageCenter() any { return nil } + +func (e *Engine) QueryTableStatsByAccounts( + ctx context.Context, + wantedStatsIdxes []int, + accs []uint64, + forceUpdate bool, + resetUpdateTime bool, +) (statsVals [][]any, retAcc []uint64, err error, ok bool) { + return nil, nil, nil, false +} diff --git a/pkg/vm/engine/test/change_handle_test.go b/pkg/vm/engine/test/change_handle_test.go index 42511c56d7fa1..318875d4fcefa 100644 --- a/pkg/vm/engine/test/change_handle_test.go +++ b/pkg/vm/engine/test/change_handle_test.go @@ -53,9 +53,9 @@ func TestChangesHandle1(t *testing.T) { databaseName = "db1" ) - ctx := context.WithValue(context.Background(), defines.TenantIDKey{}, accountId) - ctx, cancel := context.WithTimeout(ctx, time.Minute*5) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() + ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) disttaeEngine, taeHandler, rpcAgent, _ := testutil.CreateEngines(ctx, testutil.TestOptions{}, t) defer func() { @@ -68,6 +68,8 @@ func TestChangesHandle1(t *testing.T) { schema.Name = tableName bat := catalog2.MockBatch(schema, 10) + ctx, cancel = context.WithTimeout(ctx, time.Minute*5) + defer cancel() _, _, err := disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) txn, rel := testutil2.GetRelation(t, accountId, taeHandler.GetDB(), databaseName, tableName) @@ -141,9 +143,9 @@ func TestChangesHandle2(t *testing.T) { databaseName = "db1" ) - ctx := context.WithValue(context.Background(), defines.TenantIDKey{}, accountId) - ctx, cancel := context.WithTimeout(ctx, time.Minute*5) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() + ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) disttaeEngine, taeHandler, rpcAgent, _ := testutil.CreateEngines(ctx, testutil.TestOptions{}, t) defer func() { @@ -157,6 +159,8 @@ func TestChangesHandle2(t *testing.T) { bat := catalog2.MockBatch(schema, 10) mp := common.DebugAllocator + ctx, cancel = context.WithTimeout(ctx, time.Minute*5) + defer cancel() _, _, err := disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) txn, rel := testutil2.GetRelation(t, accountId, taeHandler.GetDB(), databaseName, tableName) @@ -254,9 +258,9 @@ func TestChangesHandle3(t *testing.T) { databaseName = "db1" ) - ctx := context.WithValue(context.Background(), defines.TenantIDKey{}, accountId) - ctx, cancel := context.WithTimeout(ctx, time.Minute*5) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() + ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) disttaeEngine, taeHandler, rpcAgent, _ := testutil.CreateEngines(ctx, testutil.TestOptions{}, t) defer func() { @@ -270,6 +274,8 @@ func TestChangesHandle3(t *testing.T) { bat := catalog2.MockBatch(schema, 163840) mp := common.DebugAllocator + ctx, cancel = context.WithTimeout(ctx, time.Minute*5) + defer cancel() _, _, err := disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) txn, rel := testutil2.GetRelation(t, accountId, taeHandler.GetDB(), databaseName, tableName) @@ -360,9 +366,8 @@ func TestChangesHandleForCNWrite(t *testing.T) { disttaeEngine *testutil.TestDisttaeEngine ) - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) schema := catalog2.MockSchemaAll(4, primaryKeyIdx) @@ -376,6 +381,8 @@ func TestChangesHandleForCNWrite(t *testing.T) { }() startTS := taeEngine.GetDB().TxnMgr.Now() + ctx, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() _, _, err = disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) @@ -465,9 +472,8 @@ func TestChangesHandle4(t *testing.T) { disttaeEngine *testutil.TestDisttaeEngine ) - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) schema := catalog2.MockSchemaAll(4, primaryKeyIdx) @@ -481,6 +487,8 @@ func TestChangesHandle4(t *testing.T) { }() startTS := taeEngine.GetDB().TxnMgr.Now() + ctx, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() _, _, err = disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) @@ -597,9 +605,9 @@ func TestChangesHandle5(t *testing.T) { databaseName = "db1" ) - ctx := context.WithValue(context.Background(), defines.TenantIDKey{}, accountId) - ctx, cancel := context.WithTimeout(ctx, time.Minute*5) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() + ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) opts := config.WithLongScanAndCKPOpts(nil) disttaeEngine, taeHandler, rpcAgent, _ := testutil.CreateEngines(ctx, testutil.TestOptions{TaeEngineOptions: opts}, t) @@ -613,6 +621,8 @@ func TestChangesHandle5(t *testing.T) { schema.Name = tableName bat := catalog2.MockBatch(schema, 10) + ctx, cancel = context.WithTimeout(ctx, time.Minute*5) + defer cancel() _, _, err := disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) txn, rel := testutil2.GetRelation(t, accountId, taeHandler.GetDB(), databaseName, tableName) @@ -695,9 +705,9 @@ func TestChangesHandle6(t *testing.T) { databaseName = "db1" ) - ctx := context.WithValue(context.Background(), defines.TenantIDKey{}, accountId) - ctx, cancel := context.WithTimeout(ctx, time.Minute*5) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() + ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) opts := config.WithLongScanAndCKPOpts(nil) disttaeEngine, taeHandler, rpcAgent, _ := testutil.CreateEngines(ctx, testutil.TestOptions{TaeEngineOptions: opts}, t) @@ -711,6 +721,8 @@ func TestChangesHandle6(t *testing.T) { schema.Name = tableName bat := catalog2.MockBatch(schema, 10) + ctx, cancel = context.WithTimeout(ctx, time.Minute*5) + defer cancel() _, _, err := disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) txn, rel := testutil2.GetRelation(t, accountId, taeHandler.GetDB(), databaseName, tableName) @@ -795,9 +807,9 @@ func TestChangesHandleStaleFiles1(t *testing.T) { databaseName = "db1" ) - ctx := context.WithValue(context.Background(), defines.TenantIDKey{}, accountId) - ctx, cancel := context.WithTimeout(ctx, time.Minute*5) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() + ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) opts := config.WithLongScanAndCKPOpts(nil) disttaeEngine, taeHandler, rpcAgent, _ := testutil.CreateEngines(ctx, testutil.TestOptions{TaeEngineOptions: opts}, t) @@ -811,6 +823,8 @@ func TestChangesHandleStaleFiles1(t *testing.T) { schema.Name = tableName bat := catalog2.MockBatch(schema, 10) + ctx, cancel = context.WithTimeout(ctx, time.Minute*5) + defer cancel() _, _, err := disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) txn, rel := testutil2.GetRelation(t, accountId, taeHandler.GetDB(), databaseName, tableName) @@ -871,9 +885,8 @@ func TestChangesHandleStaleFiles2(t *testing.T) { disttaeEngine *testutil.TestDisttaeEngine ) - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) schema := catalog2.MockSchemaAll(4, primaryKeyIdx) @@ -887,6 +900,8 @@ func TestChangesHandleStaleFiles2(t *testing.T) { }() startTS := taeEngine.GetDB().TxnMgr.Now() + ctx, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() _, _, err = disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) @@ -962,9 +977,9 @@ func TestChangesHandleStaleFiles5(t *testing.T) { databaseName = "db1" ) - ctx := context.WithValue(context.Background(), defines.TenantIDKey{}, accountId) - ctx, cancel := context.WithTimeout(ctx, time.Minute*5) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() + ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) disttaeEngine, taeHandler, rpcAgent, _ := testutil.CreateEngines(ctx, testutil.TestOptions{}, t) defer func() { @@ -978,6 +993,8 @@ func TestChangesHandleStaleFiles5(t *testing.T) { bat := catalog2.MockBatch(schema, 163840) mp := common.DebugAllocator + ctx, cancel = context.WithTimeout(ctx, time.Minute*5) + defer cancel() _, _, err := disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) txn, rel := testutil2.GetRelation(t, accountId, taeHandler.GetDB(), databaseName, tableName) diff --git a/pkg/vm/engine/test/disttae_engine_test.go b/pkg/vm/engine/test/disttae_engine_test.go index 82a244ac10d85..725a47399bca4 100644 --- a/pkg/vm/engine/test/disttae_engine_test.go +++ b/pkg/vm/engine/test/disttae_engine_test.go @@ -62,9 +62,9 @@ func Test_InsertRows(t *testing.T) { databaseName = "db1" ) - ctx := context.WithValue(context.Background(), defines.TenantIDKey{}, accountId) - ctx, cancel := context.WithTimeout(ctx, time.Minute*5) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() + ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) disttaeEngine, taeHandler, rpcAgent, _ := testutil.CreateEngines(ctx, testutil.TestOptions{}, t) defer func() { @@ -73,6 +73,8 @@ func Test_InsertRows(t *testing.T) { rpcAgent.Close() }() + ctx, cancel = context.WithTimeout(ctx, time.Minute*5) + defer cancel() txn, err := disttaeEngine.NewTxnOperator(ctx, disttaeEngine.Now()) require.Nil(t, err) @@ -876,9 +878,9 @@ func TestShowDatabasesInRestoreTxn(t *testing.T) { func TestObjectStats1(t *testing.T) { catalog.SetupDefines("") - ctx := context.WithValue(context.Background(), defines.TenantIDKey{}, catalog.System_Account) - ctx, cancel := context.WithTimeout(ctx, time.Minute*5) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() + ctx = context.WithValue(ctx, defines.TenantIDKey{}, catalog.System_Account) disttaeEngine, taeHandler, rpcAgent, _ := testutil.CreateEngines(ctx, testutil.TestOptions{}, t) defer func() { @@ -891,6 +893,8 @@ func TestObjectStats1(t *testing.T) { testutil2.CreateRelationAndAppend(t, catalog.System_Account, taeHandler.GetDB(), "db", schema, bat, true) + ctx, cancel = context.WithTimeout(ctx, time.Minute*5) + defer cancel() txn, rel := testutil2.GetRelation(t, catalog.System_Account, taeHandler.GetDB(), "db", schema.Name) id := rel.GetMeta().(*catalog2.TableEntry).AsCommonID() appendableObjectID := testutil2.GetOneObject(rel).GetID() @@ -963,9 +967,9 @@ func TestObjectStats1(t *testing.T) { func TestObjectStats2(t *testing.T) { catalog.SetupDefines("") - ctx := context.WithValue(context.Background(), defines.TenantIDKey{}, catalog.System_Account) - ctx, cancel := context.WithTimeout(ctx, time.Minute*5) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() + ctx = context.WithValue(ctx, defines.TenantIDKey{}, catalog.System_Account) disttaeEngine, taeHandler, rpcAgent, _ := testutil.CreateEngines(ctx, testutil.TestOptions{}, t) defer func() { @@ -978,6 +982,8 @@ func TestObjectStats2(t *testing.T) { testutil2.CreateRelationAndAppend(t, catalog.System_Account, taeHandler.GetDB(), "db", schema, bat, true) + ctx, cancel = context.WithTimeout(ctx, time.Minute*5) + defer cancel() txn, rel := testutil2.GetRelation(t, catalog.System_Account, taeHandler.GetDB(), "db", schema.Name) id := rel.GetMeta().(*catalog2.TableEntry).AsCommonID() appendableObjectID := testutil2.GetOneObject(rel).GetID() diff --git a/pkg/vm/engine/test/mo_table_stats_test.go b/pkg/vm/engine/test/mo_table_stats_test.go index 217ef80651ab7..352bc33714dc1 100644 --- a/pkg/vm/engine/test/mo_table_stats_test.go +++ b/pkg/vm/engine/test/mo_table_stats_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/lni/goutils/leaktest" catalog2 "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/runtime" "github.com/matrixorigin/matrixone/pkg/container/types" @@ -37,6 +38,7 @@ import ( ) func TestMoTableStatsMoCtl(t *testing.T) { + defer leaktest.AfterTest(t)() var ( opts testutil.TestOptions tableName = "test1" @@ -59,13 +61,14 @@ func TestMoTableStatsMoCtl(t *testing.T) { exec := v.(executor.SQLExecutor) + e := p.D.Engine { - ret := disttae.HandleMoTableStatsCtl("restore_default_setting:true") + ret := e.HandleMoTableStatsCtl("restore_default_setting:true") require.Equal(t, "move_on(true), use_old_impl(false), force_update(false)", ret) } { - ret := disttae.HandleMoTableStatsCtl("move_on: false") + ret := e.HandleMoTableStatsCtl("move_on: false") require.Equal(t, "move on: true to false", ret) } @@ -92,7 +95,7 @@ func TestMoTableStatsMoCtl(t *testing.T) { } { - ret := disttae.HandleMoTableStatsCtl("force_update:true") + ret := e.HandleMoTableStatsCtl("force_update:true") require.Equal(t, "force update: false to true", ret) } @@ -109,7 +112,7 @@ func TestMoTableStatsMoCtl(t *testing.T) { } { - ret := disttae.HandleMoTableStatsCtl("use_old_impl:true") + ret := e.HandleMoTableStatsCtl("use_old_impl:true") require.Equal(t, "use old impl: false to true", ret) } @@ -125,18 +128,20 @@ func TestMoTableStatsMoCtl(t *testing.T) { } func TestMoTableStatsMoCtl2(t *testing.T) { + defer leaktest.AfterTest(t)() var opts testutil.TestOptions opts.TaeEngineOptions = config.WithLongScanAndCKPOpts(nil) p := testutil.InitEnginePack(opts, t) defer p.Close() - disttae.LaunchMTSTasksForUT() + e := p.D.Engine + e.LaunchMTSTasksForUT() - ret := disttae.HandleMoTableStatsCtl("recomputing:0") + ret := e.HandleMoTableStatsCtl("recomputing:0") fmt.Println(ret) - disttae.NotifyCleanDeletes() - disttae.NotifyUpdateForgotten() + e.NotifyCleanDeletes() + e.NotifyUpdateForgotten() schema := catalog.MockSchemaAll(3, 2) schema.Name = "test1" @@ -145,69 +150,70 @@ func TestMoTableStatsMoCtl2(t *testing.T) { _, rel := p.CreateDBAndTable(txnop, "db1", schema) require.NoError(t, txnop.Commit(p.Ctx)) - ret = disttae.HandleMoTableStatsCtl("restore_default_setting:true") + ret = e.HandleMoTableStatsCtl("restore_default_setting:true") require.Equal(t, "move_on(true), use_old_impl(false), force_update(false)", ret) dbId := rel.GetDBID(p.Ctx) tblId := rel.GetTableID(p.Ctx) - _, err, _ := disttae.QueryTableStats(context.Background(), + _, err, _ := e.QueryTableStats(context.Background(), []int{disttae.TableStatsTableRows}, []uint64{0}, []uint64{dbId}, []uint64{tblId}, false, false, nil) require.NotNil(t, err) - ret = disttae.HandleMoTableStatsCtl("use_old_impl:true") + ret = e.HandleMoTableStatsCtl("use_old_impl:true") require.Equal(t, "use old impl: false to true", ret) - _, err, _ = disttae.QueryTableStats(context.Background(), + _, err, _ = e.QueryTableStats(context.Background(), []int{disttae.TableStatsTableRows}, []uint64{0}, []uint64{dbId}, []uint64{tblId}, false, false, nil) require.NoError(t, err) - ret = disttae.HandleMoTableStatsCtl("use_old_impl:false") + ret = e.HandleMoTableStatsCtl("use_old_impl:false") require.Equal(t, "use old impl: true to false", ret) - ret = disttae.HandleMoTableStatsCtl("force_update:true") + ret = e.HandleMoTableStatsCtl("force_update:true") require.Equal(t, "force update: false to true", ret) - _, err, _ = disttae.QueryTableStats(context.Background(), + _, err, _ = e.QueryTableStats(context.Background(), []int{disttae.TableStatsTableRows}, []uint64{0}, []uint64{dbId}, []uint64{tblId}, true, false, nil) require.NotNil(t, err) - ret = disttae.HandleMoTableStatsCtl("move_on: false") + ret = e.HandleMoTableStatsCtl("move_on: false") require.Equal(t, "move on: true to false", ret) - _, err, _ = disttae.QueryTableStats(context.Background(), + _, err, _ = e.QueryTableStats(context.Background(), []int{disttae.TableStatsTableRows}, []uint64{0}, []uint64{dbId}, []uint64{tblId}, false, true, nil) require.NotNil(t, err) - ret = disttae.HandleMoTableStatsCtl("echo_current_setting:false") + ret = e.HandleMoTableStatsCtl("echo_current_setting:false") require.Equal(t, "noop", ret) - ret = disttae.HandleMoTableStatsCtl("echo_current_setting:true") + ret = e.HandleMoTableStatsCtl("echo_current_setting:true") require.Equal(t, "move_on(false), use_old_impl(false), force_update(true)", ret) { - ret = disttae.HandleMoTableStatsCtl("no_such_cmd:true") + ret = e.HandleMoTableStatsCtl("no_such_cmd:true") require.Equal(t, "failed, cmd invalid", ret) - ret = disttae.HandleMoTableStatsCtl("force_update:yes") + ret = e.HandleMoTableStatsCtl("force_update:yes") require.Equal(t, "failed, cmd invalid", ret) - ret = disttae.HandleMoTableStatsCtl("force_update:true:false") + ret = e.HandleMoTableStatsCtl("force_update:true:false") require.Equal(t, "invalid command", ret) } - fmt.Println(disttae.LogDynamicCtx()) + fmt.Println(e.LogDynamicCtx()) } func TestHandleGetChangedList(t *testing.T) { + defer leaktest.AfterTest(t)() var opts testutil.TestOptions opts.TaeEngineOptions = config.WithLongScanAndCKPOpts(nil) diff --git a/pkg/vm/engine/test/reader_test.go b/pkg/vm/engine/test/reader_test.go index 5392aad66a95d..f5917502a6b6d 100644 --- a/pkg/vm/engine/test/reader_test.go +++ b/pkg/vm/engine/test/reader_test.go @@ -72,9 +72,8 @@ func Test_ReaderCanReadRangesBlocksWithoutDeletes(t *testing.T) { disttaeEngine *testutil.TestDisttaeEngine ) - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) schema := catalog2.MockSchemaAll(4, primaryKeyIdx) @@ -87,6 +86,8 @@ func Test_ReaderCanReadRangesBlocksWithoutDeletes(t *testing.T) { rpcAgent.Close() }() + ctx, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() _, _, err = disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) @@ -180,9 +181,8 @@ func TestReaderCanReadUncommittedInMemInsertAndDeletes(t *testing.T) { disttaeEngine *testutil.TestDisttaeEngine ) - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) schema := catalog2.MockSchemaAll(4, primaryKeyIdx) @@ -195,6 +195,8 @@ func TestReaderCanReadUncommittedInMemInsertAndDeletes(t *testing.T) { rpcAgent.Close() }() + ctx, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() _, _, err = disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) @@ -262,9 +264,8 @@ func Test_ReaderCanReadCommittedInMemInsertAndDeletes(t *testing.T) { disttaeEngine *testutil.TestDisttaeEngine ) - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) // mock a schema with 4 columns and the 4th column as primary key @@ -289,6 +290,8 @@ func Test_ReaderCanReadCommittedInMemInsertAndDeletes(t *testing.T) { rpcAgent.Close() }() + ctx, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() _, _, err = disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) @@ -412,9 +415,8 @@ func Test_ShardingHandler(t *testing.T) { disttaeEngine *testutil.TestDisttaeEngine ) - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) // mock a schema with 4 columns and the 4th column as primary key @@ -439,6 +441,8 @@ func Test_ShardingHandler(t *testing.T) { rpcAgent.Close() }() + ctx, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() _, _, err = disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) @@ -603,9 +607,8 @@ func Test_ShardingRemoteReader(t *testing.T) { disttaeEngine *testutil.TestDisttaeEngine ) - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) // mock a schema with 4 columns and the 4th column as primary key @@ -629,6 +632,8 @@ func Test_ShardingRemoteReader(t *testing.T) { rpcAgent.Close() }() + ctx, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() _, _, err = disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) @@ -904,9 +909,8 @@ func Test_ShardingTableDelegate(t *testing.T) { disttaeEngine *testutil.TestDisttaeEngine ) - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) // mock a schema with 4 columns and the 4th column as primary key @@ -930,6 +934,8 @@ func Test_ShardingTableDelegate(t *testing.T) { rpcAgent.Close() }() + ctx, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() _, _, err = disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) @@ -1072,9 +1078,8 @@ func Test_ShardingLocalReader(t *testing.T) { disttaeEngine *testutil.TestDisttaeEngine ) - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) fault.Enable() @@ -1117,6 +1122,8 @@ func Test_ShardingLocalReader(t *testing.T) { rpcAgent.Close() }() + ctx, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() _, _, err = disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) diff --git a/pkg/vm/engine/test/testutil/disttae_engine.go b/pkg/vm/engine/test/testutil/disttae_engine.go index 3bef7e2c35993..59888627da003 100644 --- a/pkg/vm/engine/test/testutil/disttae_engine.go +++ b/pkg/vm/engine/test/testutil/disttae_engine.go @@ -65,6 +65,7 @@ type TestDisttaeEngine struct { ctx context.Context cancel context.CancelFunc txnClient client.TxnClient + queryClient qclient.QueryClient txnOperator client.TxnOperator timestampWaiter client.TimestampWaiter mp *mpool.MPool @@ -143,7 +144,7 @@ func NewTestDisttaeEngine( engineOpts = append(engineOpts, disttae.WithMoServerStateChecker(func() bool { return false })) catalog.SetupDefines("") - de.Engine = disttae.New(ctx, + de.Engine = disttae.New(de.ctx, "", de.mp, fs, @@ -167,7 +168,7 @@ func NewTestDisttaeEngine( } }() - op, err := de.txnClient.New(ctx, types.TS{}.ToTimestamp()) + op, err := de.txnClient.New(de.ctx, types.TS{}.ToTimestamp()) if err != nil { return nil, err } @@ -175,11 +176,12 @@ func NewTestDisttaeEngine( close(wait) de.txnOperator = op - if err = de.Engine.New(ctx, op); err != nil { + if err = de.Engine.New(de.ctx, op); err != nil { return nil, err } qc, _ := qclient.NewQueryClient("", morpc.Config{}) + de.queryClient = qc sqlExecutor := compile.NewSQLExecutor( "127.0.0.1:2000", de.Engine, @@ -207,7 +209,7 @@ func NewTestDisttaeEngine( setServerLevelParams(de) // InitLoTailPushModel presupposes that the internal sql executor has been initialized. - err = de.Engine.InitLogTailPushModel(ctx, de.timestampWaiter) + err = de.Engine.InitLogTailPushModel(de.ctx, de.timestampWaiter) //err = de.prevSubscribeSysTables(ctx, rpcAgent) return de, err } @@ -440,6 +442,8 @@ func (de *TestDisttaeEngine) Close(ctx context.Context) { close(de.logtailReceiver) de.cancel() de.wg.Wait() + de.Engine.Close() + de.queryClient.Close() if err := os.RemoveAll(de.rootDir); err != nil { logutil.Errorf("remove root dir failed (%s): %v", de.rootDir, err) diff --git a/pkg/vm/engine/test/testutil/logtailserver.go b/pkg/vm/engine/test/testutil/logtailserver.go index 2c8df36bab105..c01cb4c14d83b 100644 --- a/pkg/vm/engine/test/testutil/logtailserver.go +++ b/pkg/vm/engine/test/testutil/logtailserver.go @@ -36,7 +36,12 @@ func (cs *testClientSession) SessionCtx() context.Context { func (cs *testClientSession) Close() error { return nil } func (cs *testClientSession) Write(ctx context.Context, response morpc.Message) error { - cs.tailReceiveQueue <- response + select { + case <-ctx.Done(): + return ctx.Err() + + case cs.tailReceiveQueue <- response: + } return nil } diff --git a/pkg/vm/engine/test/testutil/tae_engine.go b/pkg/vm/engine/test/testutil/tae_engine.go index 4ed94e7be4ca1..fbfe7ba7211f2 100644 --- a/pkg/vm/engine/test/testutil/tae_engine.go +++ b/pkg/vm/engine/test/testutil/tae_engine.go @@ -61,8 +61,18 @@ func (ts *TestTxnStorage) Shard() metadata.TNShard { func (ts *TestTxnStorage) Start() error { return nil } func (ts *TestTxnStorage) Close(destroy bool) error { - err := ts.GetDB().Close() - return err + var firstErr error + if err := ts.GetDB().Close(); err != nil { + firstErr = err + } + if err := ts.logtailServer.Close(); err != nil { + if firstErr == nil { + firstErr = err + } + } + ts.txnHandler.GCManager.Stop() + blockio.Stop("") + return firstErr } func (ts *TestTxnStorage) Read(ctx context.Context, request *txn.TxnRequest, response *txn.TxnResponse) error { return nil diff --git a/pkg/vm/engine/test/testutil/util.go b/pkg/vm/engine/test/testutil/util.go index cda5e0984f5de..f2426512c37bc 100644 --- a/pkg/vm/engine/test/testutil/util.go +++ b/pkg/vm/engine/test/testutil/util.go @@ -96,6 +96,11 @@ func CreateEngines( panic("cannot find account id in ctx") } + _, ok := ctx.Deadline() + if ok { + panic("context should not have deadline") + } + var err error rpcAgent = NewMockLogtailAgent() @@ -254,26 +259,27 @@ type EnginePack struct { } func InitEnginePack(opts TestOptions, t *testing.T) *EnginePack { - ctx := context.WithValue(context.Background(), defines.TenantIDKey{}, uint32(0)) - timeout := opts.Timeout - if timeout == 0 { - timeout = 5 * time.Minute - } - ctx, cancel := context.WithTimeoutCause(ctx, timeout, moerr.CauseInitEnginePack) + ctx, cancel := context.WithCancel(context.Background()) + ctx = context.WithValue(ctx, defines.TenantIDKey{}, uint32(0)) pack := &EnginePack{ - Ctx: ctx, t: t, cancelF: cancel, } - pack.D, pack.T, pack.R, pack.Mp = CreateEngines(pack.Ctx, opts, t, opts.DisttaeOptions...) + pack.D, pack.T, pack.R, pack.Mp = CreateEngines(ctx, opts, t, opts.DisttaeOptions...) + timeout := opts.Timeout + if timeout == 0 { + timeout = 5 * time.Minute + } + ctx, _ = context.WithTimeoutCause(ctx, timeout, moerr.CauseInitEnginePack) + pack.Ctx = ctx return pack } func (p *EnginePack) Close() { - p.cancelF() p.D.Close(p.Ctx) p.T.Close(true) p.R.Close() + p.cancelF() } func (p *EnginePack) StartCNTxn(opts ...client.TxnOption) client.TxnOperator { diff --git a/pkg/vm/engine/test/workspace_test.go b/pkg/vm/engine/test/workspace_test.go index e0036f8c31801..190e1eb8c7f06 100644 --- a/pkg/vm/engine/test/workspace_test.go +++ b/pkg/vm/engine/test/workspace_test.go @@ -66,9 +66,8 @@ func Test_BasicInsertDelete(t *testing.T) { disttaeEngine *testutil.TestDisttaeEngine ) - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) schema := catalog2.MockSchemaAll(4, primaryKeyIdx) @@ -85,6 +84,8 @@ func Test_BasicInsertDelete(t *testing.T) { rpcAgent.Close() }() + ctx, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() _, _, err = disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) @@ -153,9 +154,8 @@ func Test_BasicS3InsertDelete(t *testing.T) { disttaeEngine *testutil.TestDisttaeEngine ) - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) schema := catalog2.MockSchemaAll(4, primaryKeyIdx) @@ -174,6 +174,8 @@ func Test_BasicS3InsertDelete(t *testing.T) { rpcAgent.Close() }() + ctx, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() _, _, err = disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) @@ -288,9 +290,8 @@ func Test_MultiTxnInsertDelete(t *testing.T) { disttaeEngine *testutil.TestDisttaeEngine ) - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) schema := catalog2.MockSchemaAll(4, primaryKeyIdx) @@ -307,6 +308,8 @@ func Test_MultiTxnInsertDelete(t *testing.T) { rpcAgent.Close() }() + ctx, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() _, _, err = disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) @@ -443,9 +446,8 @@ func Test_MultiTxnS3InsertDelete(t *testing.T) { disttaeEngine *testutil.TestDisttaeEngine ) - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) schema := catalog2.MockSchemaAll(4, primaryKeyIdx) @@ -464,6 +466,8 @@ func Test_MultiTxnS3InsertDelete(t *testing.T) { rpcAgent.Close() }() + ctx, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() _, _, err = disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) @@ -597,9 +601,8 @@ func Test_MultiTxnS3Tombstones(t *testing.T) { disttaeEngine *testutil.TestDisttaeEngine ) - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) schema := catalog2.MockSchemaEnhanced(2, primaryKeyIdx, 2) @@ -618,6 +621,8 @@ func Test_MultiTxnS3Tombstones(t *testing.T) { rpcAgent.Close() }() + ctx, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() _, _, err = disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) @@ -770,9 +775,8 @@ func Test_BasicRollbackStatement(t *testing.T) { disttaeEngine *testutil.TestDisttaeEngine ) - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) schema := catalog2.MockSchemaAll(4, primaryKeyIdx) @@ -789,6 +793,8 @@ func Test_BasicRollbackStatement(t *testing.T) { rpcAgent.Close() }() + ctx, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() _, _, err = disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) @@ -879,9 +885,8 @@ func Test_BasicRollbackStatementS3(t *testing.T) { disttaeEngine *testutil.TestDisttaeEngine ) - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) schema := catalog2.MockSchemaAll(4, primaryKeyIdx) @@ -900,6 +905,8 @@ func Test_BasicRollbackStatementS3(t *testing.T) { rpcAgent.Close() }() + ctx, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() _, _, err = disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) @@ -1079,9 +1086,8 @@ func Test_MultiTxnRollbackStatement(t *testing.T) { disttaeEngine *testutil.TestDisttaeEngine ) - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) schema := catalog2.MockSchemaAll(4, primaryKeyIdx) @@ -1098,6 +1104,8 @@ func Test_MultiTxnRollbackStatement(t *testing.T) { rpcAgent.Close() }() + ctx, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() _, _, err = disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) @@ -1249,9 +1257,8 @@ func Test_MultiTxnRollbackStatementS3(t *testing.T) { disttaeEngine *testutil.TestDisttaeEngine ) - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) schema := catalog2.MockSchemaAll(4, primaryKeyIdx) @@ -1270,6 +1277,8 @@ func Test_MultiTxnRollbackStatementS3(t *testing.T) { rpcAgent.Close() }() + ctx, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() _, _, err = disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) @@ -1411,9 +1420,8 @@ func Test_DeleteUncommittedBlock(t *testing.T) { disttaeEngine *testutil.TestDisttaeEngine ) - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) _ = colexec.NewServer(nil) @@ -1441,6 +1449,8 @@ func Test_DeleteUncommittedBlock(t *testing.T) { rpcAgent.Close() }() + ctx, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() _, _, err = disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) } @@ -1526,9 +1536,8 @@ func Test_BigDeleteWriteS3(t *testing.T) { disttaeEngine *testutil.TestDisttaeEngine ) - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ctx = context.WithValue(ctx, defines.TenantIDKey{}, accountId) // mock a schema with 4 columns and the 4th column as primary key @@ -1552,6 +1561,8 @@ func Test_BigDeleteWriteS3(t *testing.T) { rpcAgent.Close() }() + ctx, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() _, _, err = disttaeEngine.CreateDatabaseAndTable(ctx, databaseName, tableName, schema) require.NoError(t, err) } @@ -1601,9 +1612,8 @@ func Test_CNTransferTombstoneObjects(t *testing.T) { databaseName = "db1" ) - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ctx = context.WithValue(ctx, defines.TenantIDKey{}, uint32(0)) opts.TaeEngineOptions = config.WithLongScanAndCKPOpts(nil) @@ -1614,6 +1624,8 @@ func Test_CNTransferTombstoneObjects(t *testing.T) { schema := catalog2.MockSchemaEnhanced(1, 0, 2) schema.Name = tableName + ctx, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() cnTxnOp := p.StartCNTxn() _, rel := p.CreateDBAndTable(cnTxnOp, databaseName, schema) require.NotNil(t, rel) diff --git a/pkg/vm/engine/types.go b/pkg/vm/engine/types.go index f4fac1ece9908..39aaccf6f2dd8 100644 --- a/pkg/vm/engine/types.go +++ b/pkg/vm/engine/types.go @@ -997,6 +997,14 @@ type Engine interface { GetService() string LatestLogtailAppliedTime() timestamp.Timestamp + + QueryTableStatsByAccounts( + ctx context.Context, + wantedStatsIdxes []int, + accs []uint64, + forceUpdate bool, + resetUpdateTime bool, + ) (statsVals [][]any, retAcc []uint64, err error, ok bool) } type VectorPool interface {