Skip to content

Commit

Permalink
Merge branch 'master' into init-service
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx authored Aug 21, 2023
2 parents 11ee189 + 346e771 commit 911be67
Show file tree
Hide file tree
Showing 30 changed files with 668 additions and 165 deletions.
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ else
BUILD_CGO_ENABLED := 1
endif

ifeq ($(FAILPOINT), 1)
BUILD_TAGS += with_fail
endif

ifeq ("$(WITH_RACE)", "1")
BUILD_FLAGS += -race
BUILD_CGO_ENABLED := 1
Expand Down Expand Up @@ -73,6 +77,11 @@ PD_SERVER_DEP += dashboard-ui
pd-server: ${PD_SERVER_DEP}
CGO_ENABLED=$(BUILD_CGO_ENABLED) go build $(BUILD_FLAGS) -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -tags "$(BUILD_TAGS)" -o $(BUILD_BIN_PATH)/pd-server cmd/pd-server/main.go

pd-server-failpoint:
@$(FAILPOINT_ENABLE)
FAILPOINT=1 $(MAKE) pd-server || { $(FAILPOINT_DISABLE); exit 1; }
@$(FAILPOINT_DISABLE)

pd-server-basic:
SWAGGER=0 DASHBOARD=0 $(MAKE) pd-server

Expand Down
6 changes: 3 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type Client interface {
// GetRegion gets a region and its leader Peer from PD by key.
// The region may expire after split. Caller is responsible for caching and
// taking care of region change.
// Also it may return nil if PD finds no Region for the key temporarily,
// Also, it may return nil if PD finds no Region for the key temporarily,
// client should retry later.
GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error)
// GetRegionFromMember gets a region from certain members.
Expand All @@ -96,7 +96,7 @@ type Client interface {
GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error)
// GetRegionByID gets a region and its leader Peer from PD by id.
GetRegionByID(ctx context.Context, regionID uint64, opts ...GetRegionOption) (*Region, error)
// ScanRegion gets a list of regions, starts from the region that contains key.
// ScanRegions gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
Expand All @@ -109,7 +109,7 @@ type Client interface {
// The store may expire later. Caller is responsible for caching and taking care
// of store change.
GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*metapb.Store, error)
// Update GC safe point. TiKV will check it and do GC themselves if necessary.
// UpdateGCSafePoint TiKV will check it and do GC themselves if necessary.
// If the given safePoint is less than the current one, it will not be updated.
// Returns the new safePoint after updating.
UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error)
Expand Down
2 changes: 1 addition & 1 deletion pkg/audit/audit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestLocalLogBackendUsingFile(t *testing.T) {
b, _ := os.ReadFile(fname)
output := strings.SplitN(string(b), "]", 4)
re.Equal(
fmt.Sprintf(" [\"audit log\"] [service-info=\"{ServiceLabel:, Method:HTTP/1.1/GET:/test, Component:anonymous, IP:, "+
fmt.Sprintf(" [\"audit log\"] [service-info=\"{ServiceLabel:, Method:HTTP/1.1/GET:/test, Component:anonymous, IP:, Port:, "+
"StartTime:%s, URLParam:{\\\"test\\\":[\\\"test\\\"]}, BodyParam:testBody}\"]\n",
time.Unix(info.StartTimeStamp, 0).String()),
output[3],
Expand Down
22 changes: 22 additions & 0 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"
Expand Down Expand Up @@ -192,10 +193,13 @@ func (c *Config) validate() error {
// PersistConfig wraps all configurations that need to persist to storage and
// allows to access them safely.
type PersistConfig struct {
// Store the global configuration that is related to the scheduling.
clusterVersion unsafe.Pointer
schedule atomic.Value
replication atomic.Value
storeConfig atomic.Value
// Store the respective configurations for different schedulers.
schedulerConfig sync.Map
}

// NewPersistConfig creates a new PersistConfig instance.
Expand Down Expand Up @@ -253,6 +257,24 @@ func (o *PersistConfig) GetStoreConfig() *sc.StoreConfig {
return o.storeConfig.Load().(*sc.StoreConfig)
}

// SetSchedulerConfig sets the scheduler configuration with the given name.
func (o *PersistConfig) SetSchedulerConfig(name, data string) {
o.schedulerConfig.Store(name, data)
}

// RemoveSchedulerConfig removes the scheduler configuration with the given name.
func (o *PersistConfig) RemoveSchedulerConfig(name string) {
o.schedulerConfig.Delete(name)
}

// GetSchedulerConfig returns the scheduler configuration with the given name.
func (o *PersistConfig) GetSchedulerConfig(name string) string {
if v, ok := o.schedulerConfig.Load(name); ok {
return v.(string)
}
return ""
}

// GetMaxReplicas returns the max replicas.
func (o *PersistConfig) GetMaxReplicas() int {
return int(o.GetReplicationConfig().MaxReplicas)
Expand Down
90 changes: 66 additions & 24 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ package config
import (
"context"
"encoding/json"
"strings"
"sync"

"github.com/coreos/go-semver/semver"
"github.com/pingcap/log"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
Expand All @@ -34,11 +36,20 @@ type Watcher struct {
ctx context.Context
cancel context.CancelFunc

etcdClient *clientv3.Client
watcher *etcdutil.LoopWatcher
// configPath is the path of the configuration in etcd:
// - Key: /pd/{cluster_id}/config
// - Value: configuration JSON.
configPath string
// schedulerConfigPathPrefix is the path prefix of the scheduler configuration in etcd:
// - Key: /pd/{cluster_id}/scheduler_config/{scheduler_name}
// - Value: configuration JSON.
schedulerConfigPathPrefix string

etcdClient *clientv3.Client
configWatcher *etcdutil.LoopWatcher
schedulerConfigWatcher *etcdutil.LoopWatcher

*PersistConfig
// TODO: watch the scheduler config change.
}

type persistedConfig struct {
Expand All @@ -52,19 +63,30 @@ type persistedConfig struct {
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
// configPath is the path of the configuration in etcd:
// - Key: /pd/{cluster_id}/config
// - Value: configuration JSON.
configPath string,
clusterID uint64,
persistConfig *PersistConfig,
) (*Watcher, error) {
ctx, cancel := context.WithCancel(ctx)
cw := &Watcher{
ctx: ctx,
cancel: cancel,
etcdClient: etcdClient,
PersistConfig: persistConfig,
ctx: ctx,
cancel: cancel,
configPath: endpoint.ConfigPath(clusterID),
schedulerConfigPathPrefix: endpoint.SchedulerConfigPathPrefix(clusterID),
etcdClient: etcdClient,
PersistConfig: persistConfig,
}
err := cw.initializeConfigWatcher()
if err != nil {
return nil, err
}
err = cw.initializeSchedulerConfigWatcher()
if err != nil {
return nil, err
}
return cw, nil
}

func (cw *Watcher) initializeConfigWatcher() error {
putFn := func(kv *mvccpb.KeyValue) error {
cfg := &persistedConfig{}
if err := json.Unmarshal(kv.Value, cfg); err != nil {
Expand All @@ -84,21 +106,41 @@ func NewWatcher(
postEventFn := func() error {
return nil
}
cw.watcher = etcdutil.NewLoopWatcher(
ctx,
&cw.wg,
etcdClient,
"scheduling-config-watcher",
configPath,
putFn,
deleteFn,
postEventFn,
cw.configWatcher = etcdutil.NewLoopWatcher(
cw.ctx, &cw.wg,
cw.etcdClient,
"scheduling-config-watcher", cw.configPath,
putFn, deleteFn, postEventFn,
)
cw.watcher.StartWatchLoop()
if err := cw.watcher.WaitLoad(); err != nil {
return nil, err
cw.configWatcher.StartWatchLoop()
return cw.configWatcher.WaitLoad()
}

func (cw *Watcher) initializeSchedulerConfigWatcher() error {
prefixToTrim := cw.schedulerConfigPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
cw.SetSchedulerConfig(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
return nil
}
return cw, nil
deleteFn := func(kv *mvccpb.KeyValue) error {
cw.RemoveSchedulerConfig(strings.TrimPrefix(string(kv.Key), prefixToTrim))
return nil
}
postEventFn := func() error {
return nil
}
cw.schedulerConfigWatcher = etcdutil.NewLoopWatcher(
cw.ctx, &cw.wg,
cw.etcdClient,
"scheduling-scheduler-config-watcher", cw.schedulerConfigPathPrefix,
putFn, deleteFn, postEventFn,
clientv3.WithPrefix(),
)
cw.schedulerConfigWatcher.StartWatchLoop()
return cw.schedulerConfigWatcher.WaitLoad()
}

// Close closes the watcher.
Expand Down
Loading

0 comments on commit 911be67

Please sign in to comment.