Skip to content

Commit

Permalink
*: workload repository init code (pingcap#1098)
Browse files Browse the repository at this point in the history
* domain: add repository worker

Signed-off-by: xhe <xw897002528@gmail.com>

* sessionctx, domain: add function gate

Signed-off-by: xhe <xw897002528@gmail.com>

* repository: refine context usage

Signed-off-by: xhe <xw897002528@gmail.com>

* repository: add recover and session getter

Signed-off-by: xhe <xw897002528@gmail.com>

* sessionctx: hide variable

Signed-off-by: xhe <xw897002528@gmail.com>

* fix bazel

Signed-off-by: xhe <xw897002528@gmail.com>

* fix check

Signed-off-by: xhe <xw897002528@gmail.com>

* repository: refine owner management

Signed-off-by: xhe <xw897002528@gmail.com>

* repository: free memref

Signed-off-by: xhe <xw897002528@gmail.com>

* fix check

Signed-off-by: xhe <xw897002528@gmail.com>

---------

Signed-off-by: xhe <xw897002528@gmail.com>
  • Loading branch information
xhebox authored and wddevries committed Nov 8, 2024
1 parent bfd7da3 commit fe892e3
Show file tree
Hide file tree
Showing 10 changed files with 311 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ go_library(
"//pkg/domain/globalconfigsync",
"//pkg/domain/infosync",
"//pkg/domain/metrics",
"//pkg/domain/repository",
"//pkg/errno",
"//pkg/infoschema",
"//pkg/infoschema/metrics",
Expand Down
2 changes: 2 additions & 0 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
"github.com/pingcap/tidb/pkg/domain/globalconfigsync"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/domain/repository"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/infoschema"
infoschema_metrics "github.com/pingcap/tidb/pkg/infoschema/metrics"
Expand Down Expand Up @@ -194,6 +195,7 @@ type Domain struct {
}

logBackupAdvancer *daemon.OwnerDaemon
repositoryWorker *repository.Worker
historicalStatsWorker *HistoricalStatsWorker
ttlJobManager atomic.Pointer[ttlworker.JobManager]
runawayManager *runaway.Manager
Expand Down
35 changes: 35 additions & 0 deletions pkg/domain/domain_sysvars.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"strconv"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb/domain/repository"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
Expand All @@ -39,6 +41,9 @@ func (do *Domain) initDomainSysVars() {
variable.SetExternalTimestamp = do.setExternalTimestamp
variable.GetExternalTimestamp = do.getExternalTimestamp

variable.SetRepositoryDest = do.setRepositoryDest
variable.ValidateRepositoryDest = repository.ValidateDest

setGlobalResourceControlFunc := do.setGlobalResourceControl
variable.SetGlobalResourceControl.Store(&setGlobalResourceControlFunc)
variable.SetLowResolutionTSOUpdateInterval = do.setLowResolutionTSOUpdateInterval
Expand Down Expand Up @@ -149,3 +154,33 @@ func (do *Domain) changeSchemaCacheSize(ctx context.Context, size uint64) error
do.infoCache.Data.SetCacheCapacity(size)
return nil
}

func (do *Domain) setRepositoryDest(ctx context.Context, dst string) error {
switch {
case dst == "table":
return do.startRepositoryWorker(ctx)
default:
return do.stopRepositoryWorker()
}
}

func (do *Domain) startRepositoryWorker(ctx context.Context) error {
if do.repositoryWorker == nil {
if do.etcdClient == nil {
log.Warn("etcd client not provided, no repositoryWorker.")
return nil
}
do.repositoryWorker = repository.NewWorker(do.etcdClient, do.GetGlobalVar, do.newOwnerManager, do.SysSessionPool(), do.exit)
}
do.repositoryWorker.Start(context.Background())
return nil
}

func (do *Domain) stopRepositoryWorker() error {
if do.repositoryWorker == nil {
return nil
}
do.repositoryWorker.Stop()
do.repositoryWorker = nil
return nil
}
22 changes: 22 additions & 0 deletions pkg/domain/repository/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "repository",
srcs = [
"const.go",
"dest.go",
"table.go",
"worker.go",
],
importpath = "github.com/pingcap/tidb/pkg/domain/repository",
visibility = ["//visibility:public"],
deps = [
"//pkg/owner",
"//pkg/sessionctx",
"//pkg/util",
"//pkg/util/logutil",
"@com_github_ngaut_pools//:pools",
"@io_etcd_go_etcd_client_v3//:client",
"@org_uber_go_zap//:zap",
],
)
20 changes: 20 additions & 0 deletions pkg/domain/repository/const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package repository

const (
ownerKey = "/tidb/repository/owner"
promptKey = "repository"
)
23 changes: 23 additions & 0 deletions pkg/domain/repository/dest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package repository

import "strings"

// ValidateDest will validate destination url, and normalize it.
func ValidateDest(orig string) (string, error) {
// validate S3 URL, etc...
return strings.ToLower(orig), nil
}
26 changes: 26 additions & 0 deletions pkg/domain/repository/table.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package repository

import (
"github.com/pingcap/tidb/sessionctx"
)

func createTable(_ sessionctx.Context) {
}

func checkTableExists(_ sessionctx.Context) bool {
return true
}
165 changes: 165 additions & 0 deletions pkg/domain/repository/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package repository

import (
"context"
"time"

"github.com/ngaut/pools"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

type sessionPool interface {
Get() (pools.Resource, error)
Put(resource pools.Resource)
}

// Worker is the main struct for repository.
type Worker struct {
etcdClient *clientv3.Client
exit chan struct{}
sesspool sessionPool
cancel context.CancelFunc
getGlobalVar func(string) (string, error)
newOwner func(string, string) owner.Manager
owner owner.Manager
wg *util.WaitGroupEnhancedWrapper
}

// NewWorker creates a new repository worker.
func NewWorker(etcdClient *clientv3.Client,
getGlobalVar func(string) (string, error),
newOwner func(string, string) owner.Manager,
sesspool sessionPool, exit chan struct{}) *Worker {
w := &Worker{
etcdClient: etcdClient,
exit: exit,
getGlobalVar: getGlobalVar,
sesspool: sesspool,
newOwner: newOwner,
wg: util.NewWaitGroupEnhancedWrapper("repository", exit, false),
}
return w
}

func (w *Worker) startSnapshot(ctx context.Context) func() {
return func() {
ticker := time.NewTicker(time.Minute)
for {
select {
case <-w.exit:
return
case <-ctx.Done():
return
case <-ticker.C:
// snapshot thread
}
}
}
}

func (w *Worker) startSample(ctx context.Context) func() {
return func() {
ticker := time.NewTicker(time.Minute)
for {
select {
case <-w.exit:
return
case <-ctx.Done():
return
case <-ticker.C:
// sample thread
}
}
}
}

func (w *Worker) getSessionWithRetry() pools.Resource {
for {
_sessctx, err := w.sesspool.Get()
if err != nil {
logutil.BgLogger().Warn("can not init session for repository")
time.Sleep(time.Second)
continue
}
return _sessctx
}
}

func (w *Worker) start(ctx context.Context) func() {
return func() {
w.owner = w.newOwner(ownerKey, promptKey)
_sessctx := w.getSessionWithRetry()
defer w.sesspool.Put(_sessctx)

sess := _sessctx.(sessionctx.Context)
ticker := time.NewTicker(time.Second)
for {
select {
case <-w.exit:
return
case <-ctx.Done():
return
case <-ticker.C:
if w.owner.IsOwner() {
// create table if not exist
createTable(sess)
}
// check if table exists
if checkTableExists(sess) {
w.wg.RunWithRecover(w.startSample(ctx), func(err interface{}) {
logutil.BgLogger().Info("sample panic", zap.Any("err", err), zap.Stack("stack"))
}, "sample")
w.wg.RunWithRecover(w.startSnapshot(ctx), func(err interface{}) {
logutil.BgLogger().Info("snapshot panic", zap.Any("err", err), zap.Stack("stack"))
}, "snapshot")
return
}
}
}
}
}

// Start will start the worker.
func (w *Worker) Start(ctx context.Context) {
if w.cancel != nil {
return
}

nctx, cancel := context.WithCancel(context.Background())
w.cancel = cancel
w.wg.RunWithRecover(w.start(nctx), func(err interface{}) {
logutil.BgLogger().Info("prestart panic", zap.Any("err", err), zap.Stack("stack"))
}, "prestart")
}

// Stop will stop the worker.
func (w *Worker) Stop() {
if w.owner != nil {
w.owner.Cancel()
w.owner = nil
}
if w.cancel != nil {
w.cancel()
w.cancel = nil
}
w.wg.Wait()
}
10 changes: 10 additions & 0 deletions pkg/sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,16 @@ var defaultSysVars = []*SysVar{
}
return normalizedValue, nil
}},
/*
{Scope: ScopeGlobal, Name: TiDBWorkloadRepositoryDest, Type: TypeStr, Value: "",
SetGlobal: func(ctx context.Context, s *SessionVars, val string) error {
return SetRepositoryDest(ctx, val)
},
Validation: func(_ *SessionVars, norm, _ string, _ ScopeFlag) (string, error) {
return ValidateRepositoryDest(norm)
},
},
*/
{Scope: ScopeGlobal, Name: ValidatePasswordEnable, Value: Off, Type: TypeBool},
{Scope: ScopeGlobal, Name: ValidatePasswordPolicy, Value: "MEDIUM", Type: TypeEnum, PossibleValues: []string{"LOW", "MEDIUM", "STRONG"}},
{Scope: ScopeGlobal, Name: ValidatePasswordCheckUserName, Value: On, Type: TypeBool},
Expand Down
7 changes: 7 additions & 0 deletions pkg/sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ const (
// TiDBRetryLimit is the maximum number of retries when committing a transaction.
TiDBRetryLimit = "tidb_retry_limit"

// TiDBWorkloadRepositoryDest is the destination of workload repository.
TiDBWorkloadRepositoryDest = "tidb_workload_repository_dest"

// TiDBDisableTxnAutoRetry disables transaction auto retry.
// Deprecated: This variable is deprecated, please do not use this variable.
TiDBDisableTxnAutoRetry = "tidb_disable_txn_auto_retry"
Expand Down Expand Up @@ -1703,6 +1706,10 @@ var (
DisableDDL func() error = nil
// SwitchFastCreateTable is the func registered by DDL to switch fast create table.
SwitchFastCreateTable func(val bool) error
// SetRepositoryDest is the func registered by domain to noticy repository worker in this instance.
SetRepositoryDest func(context.Context, string) error = nil
// ValidateRepositoryDest is the func registered by domain to noticy repository worker in this instance.
ValidateRepositoryDest func(string) (string, error) = nil
// SetExternalTimestamp is the func registered by staleread to set externaltimestamp in pd
SetExternalTimestamp func(ctx context.Context, ts uint64) error
// GetExternalTimestamp is the func registered by staleread to get externaltimestamp from pd
Expand Down

0 comments on commit fe892e3

Please sign in to comment.