Skip to content

Commit

Permalink
router: refactor creation of backendFetcher (#141)
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored Dec 2, 2022
1 parent b6599aa commit a853843
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 24 deletions.
8 changes: 7 additions & 1 deletion pkg/manager/namespace/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@ func NewNamespaceManager() *NamespaceManager {
func (mgr *NamespaceManager) buildNamespace(cfg *config.Namespace) (*Namespace, error) {
logger := mgr.logger.With(zap.String("namespace", cfg.Namespace))

rt, err := router.NewScoreBasedRouter(logger.Named("router"), &cfg.Backend, mgr.client, mgr.httpCli)
var fetcher router.BackendFetcher
if mgr.client != nil {
fetcher = router.NewPDFetcher(mgr.client, logger.Named("be_fetcher"), router.NewDefaultHealthCheckConfig())
} else {
fetcher = router.NewStaticFetcher(cfg.Backend.Instances)
}
rt, err := router.NewScoreBasedRouter(logger.Named("router"), mgr.httpCli, fetcher)
if err != nil {
return nil, errors.Errorf("build router error: %w", err)
}
Expand Down
22 changes: 8 additions & 14 deletions pkg/manager/router/backend_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/pingcap/TiProxy/lib/util/waitgroup"
pnet "github.com/pingcap/TiProxy/pkg/proxy/net"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -88,7 +87,8 @@ type HealthCheckConfig struct {
tombstoneThreshold time.Duration
}

func newDefaultHealthCheckConfig() *HealthCheckConfig {
// NewDefaultHealthCheckConfig creates a default HealthCheckConfig.
func NewDefaultHealthCheckConfig() *HealthCheckConfig {
return &HealthCheckConfig{
healthCheckInterval: healthCheckInterval,
healthCheckMaxRetries: healthCheckMaxRetries,
Expand Down Expand Up @@ -125,9 +125,9 @@ type BackendObserver struct {
}

// StartBackendObserver creates a BackendObserver and starts watching.
func StartBackendObserver(logger *zap.Logger, eventReceiver BackendEventReceiver, client *clientv3.Client, httpCli *http.Client,
config *HealthCheckConfig, staticAddrs []string, customBackendFetcher BackendFetcher) (*BackendObserver, error) {
bo, err := NewBackendObserver(logger, eventReceiver, client, httpCli, config, staticAddrs, customBackendFetcher)
func StartBackendObserver(logger *zap.Logger, eventReceiver BackendEventReceiver, httpCli *http.Client,
config *HealthCheckConfig, backendFetcher BackendFetcher) (*BackendObserver, error) {
bo, err := NewBackendObserver(logger, eventReceiver, httpCli, config, backendFetcher)
if err != nil {
return nil, err
}
Expand All @@ -136,8 +136,8 @@ func StartBackendObserver(logger *zap.Logger, eventReceiver BackendEventReceiver
}

// NewBackendObserver creates a BackendObserver.
func NewBackendObserver(logger *zap.Logger, eventReceiver BackendEventReceiver, client *clientv3.Client, httpCli *http.Client,
config *HealthCheckConfig, staticAddrs []string, customBackendFetcher BackendFetcher) (*BackendObserver, error) {
func NewBackendObserver(logger *zap.Logger, eventReceiver BackendEventReceiver, httpCli *http.Client,
config *HealthCheckConfig, backendFetcher BackendFetcher) (*BackendObserver, error) {
if httpCli == nil {
httpCli = http.DefaultClient
}
Expand All @@ -153,13 +153,7 @@ func NewBackendObserver(logger *zap.Logger, eventReceiver BackendEventReceiver,
httpTLS: httpTLS,
eventReceiver: eventReceiver,
}
if customBackendFetcher != nil {
bo.fetcher = customBackendFetcher
} else if client != nil {
bo.fetcher = NewPDFetcher(client, logger.Named("be_fetcher"), config)
} else {
bo.fetcher = NewStaticFetcher(staticAddrs)
}
bo.fetcher = backendFetcher
return bo, nil
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/manager/router/backend_observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestExternalFetcher(t *testing.T) {
backendChan := make(chan map[string]BackendStatus, 1)
mer := newMockEventReceiver(backendChan)
fetcher := NewExternalFetcher(backendGetter)
bo, err := NewBackendObserver(logger.CreateLoggerForTest(t), mer, nil, nil, newHealthCheckConfigForTest(), nil, fetcher)
bo, err := NewBackendObserver(logger.CreateLoggerForTest(t), mer, nil, newHealthCheckConfigForTest(), fetcher)
require.NoError(t, err)
bo.Start()
defer bo.Close()
Expand All @@ -183,7 +183,8 @@ func runETCDTest(t *testing.T, f func(etcd *embed.Etcd, kv clientv3.KV, bo *Back
kv := clientv3.NewKV(client)
backendChan := make(chan map[string]BackendStatus, 1)
mer := newMockEventReceiver(backendChan)
bo, err := NewBackendObserver(logger.CreateLoggerForTest(t), mer, client, nil, newHealthCheckConfigForTest(), nil, nil)
fetcher := NewPDFetcher(client, logger.CreateLoggerForTest(t), newHealthCheckConfigForTest())
bo, err := NewBackendObserver(logger.CreateLoggerForTest(t), mer, nil, newHealthCheckConfigForTest(), fetcher)
require.NoError(t, err)
f(etcd, kv, bo, backendChan)
bo.Close()
Expand Down
6 changes: 2 additions & 4 deletions pkg/manager/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ import (
"sync"
"time"

"github.com/pingcap/TiProxy/lib/config"
"github.com/pingcap/TiProxy/lib/util/errors"
"github.com/pingcap/TiProxy/lib/util/waitgroup"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -117,14 +115,14 @@ type ScoreBasedRouter struct {
}

// NewScoreBasedRouter creates a ScoreBasedRouter.
func NewScoreBasedRouter(logger *zap.Logger, cfg *config.BackendNamespace, client *clientv3.Client, httpCli *http.Client) (*ScoreBasedRouter, error) {
func NewScoreBasedRouter(logger *zap.Logger, httpCli *http.Client, fetcher BackendFetcher) (*ScoreBasedRouter, error) {
router := &ScoreBasedRouter{
logger: logger,
backends: list.New(),
}
router.Lock()
defer router.Unlock()
observer, err := StartBackendObserver(logger.Named("observer"), router, client, httpCli, newDefaultHealthCheckConfig(), cfg.Instances, nil)
observer, err := StartBackendObserver(logger.Named("observer"), router, httpCli, NewDefaultHealthCheckConfig(), fetcher)
if err != nil {
return nil, err
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/manager/router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"testing"
"time"

"github.com/pingcap/TiProxy/lib/config"
"github.com/pingcap/TiProxy/lib/util/logger"
"github.com/pingcap/TiProxy/lib/util/waitgroup"
"github.com/pingcap/TiProxy/pkg/metrics"
Expand Down Expand Up @@ -520,12 +519,12 @@ func TestRebalanceCornerCase(t *testing.T) {

// Test all kinds of events occur concurrently.
func TestConcurrency(t *testing.T) {
cfg := &config.BackendNamespace{}
// Router.observer doesn't work because the etcd is always empty.
// We create other goroutines to change backends easily.
etcd := createEtcdServer(t, "127.0.0.1:0")
client := createEtcdClient(t, etcd)
router, err := NewScoreBasedRouter(logger.CreateLoggerForTest(t), cfg, client, nil)
fetcher := NewPDFetcher(client, logger.CreateLoggerForTest(t), newHealthCheckConfigForTest())
router, err := NewScoreBasedRouter(logger.CreateLoggerForTest(t), nil, fetcher)
require.NoError(t, err)

var wg waitgroup.WaitGroup
Expand Down

0 comments on commit a853843

Please sign in to comment.