Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config: support online reload for some config #396

Merged
merged 3 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions lib/config/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ type FrontendNamespace struct {
}

type BackendNamespace struct {
Instances []string `yaml:"instances" json:"instances" toml:"instances"`
SelectorType string `yaml:"selector-type" json:"selector-type" toml:"selector-type"`
Security TLSConfig `yaml:"security" json:"security" toml:"security"`
Instances []string `yaml:"instances" json:"instances" toml:"instances"`
Security TLSConfig `yaml:"security" json:"security" toml:"security"`
//HealthCheck HealthCheck `yaml:"health-check" json:"health-check" toml:"health-check"`
}

Expand Down
3 changes: 1 addition & 2 deletions lib/config/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ var testNamespaceConfig = Namespace{
},
},
Backend: BackendNamespace{
Instances: []string{"127.0.0.1:4000", "127.0.0.1:4001"},
SelectorType: "random",
Instances: []string{"127.0.0.1:4000", "127.0.0.1:4001"},
Security: TLSConfig{
CA: "t",
Cert: "t",
Expand Down
10 changes: 3 additions & 7 deletions lib/config/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type KeepAlive struct {
}

type ProxyServerOnline struct {
RequireBackendTLS bool `yaml:"require-backend-tls,omitempty" toml:"require-backend-tls,omitempty" json:"require-backend-tls,omitempty"`
MaxConnections uint64 `yaml:"max-connections,omitempty" toml:"max-connections,omitempty" json:"max-connections,omitempty"`
ConnBufferSize int `yaml:"conn-buffer-size,omitempty" toml:"conn-buffer-size,omitempty" json:"conn-buffer-size,omitempty"`
FrontendKeepalive KeepAlive `yaml:"frontend-keepalive" toml:"frontend-keepalive" json:"frontend-keepalive"`
Expand All @@ -62,17 +63,12 @@ type ProxyServerOnline struct {
type ProxyServer struct {
Addr string `yaml:"addr,omitempty" toml:"addr,omitempty" json:"addr,omitempty"`
PDAddrs string `yaml:"pd-addrs,omitempty" toml:"pd-addrs,omitempty" json:"pd-addrs,omitempty"`
ServerVersion string `yaml:"server-version,omitempty" toml:"server-version,omitempty" json:"server-version,omitempty"`
RequireBackendTLS bool `yaml:"require-backend-tls,omitempty" toml:"require-backend-tls,omitempty" json:"require-backend-tls,omitempty"`
ProxyServerOnline `yaml:",inline" toml:",inline" json:",inline"`
}

type API struct {
Addr string `yaml:"addr,omitempty" toml:"addr,omitempty" json:"addr,omitempty"`
User string `yaml:"user,omitempty" toml:"user,omitempty" json:"user,omitempty"`
Password string `yaml:"password,omitempty" toml:"password,omitempty" json:"password,omitempty"`
EnableBasicAuth bool `yaml:"enable-basic-auth,omitempty" toml:"enable-basic-auth,omitempty" json:"enable-basic-auth,omitempty"`
ProxyProtocol string `yaml:"proxy-protocol,omitempty" toml:"proxy-protocol,omitempty" json:"proxy-protocol,omitempty"`
Addr string `yaml:"addr,omitempty" toml:"addr,omitempty" json:"addr,omitempty"`
ProxyProtocol string `yaml:"proxy-protocol,omitempty" toml:"proxy-protocol,omitempty" json:"proxy-protocol,omitempty"`
}

type Advance struct {
Expand Down
11 changes: 4 additions & 7 deletions lib/config/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ var testProxyConfig = Config{
IgnoreWrongNamespace: true,
},
Proxy: ProxyServer{
Addr: "0.0.0.0:4000",
PDAddrs: "127.0.0.1:4089",
RequireBackendTLS: true,
Addr: "0.0.0.0:4000",
PDAddrs: "127.0.0.1:4089",
ProxyServerOnline: ProxyServerOnline{
RequireBackendTLS: true,
MaxConnections: 1,
FrontendKeepalive: KeepAlive{Enabled: true},
ProxyProtocol: "v2",
Expand All @@ -30,10 +30,7 @@ var testProxyConfig = Config{
},
},
API: API{
Addr: "0.0.0.0:3080",
EnableBasicAuth: false,
User: "user",
Password: "pwd",
Addr: "0.0.0.0:3080",
},
Metrics: Metrics{
MetricsAddr: "127.0.0.1:9021",
Expand Down
85 changes: 57 additions & 28 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sync"
"time"

"github.com/pingcap/tiproxy/lib/config"
"github.com/pingcap/tiproxy/lib/util/systimemon"
"github.com/pingcap/tiproxy/lib/util/waitgroup"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -52,12 +53,12 @@ func NewMetricsManager() *MetricsManager {
var registerOnce = &sync.Once{}

// Init registers metrics and pushes metrics to prometheus.
func (mm *MetricsManager) Init(ctx context.Context, logger *zap.Logger, metricsAddr string, metricsInterval uint, proxyAddr string) {
func (mm *MetricsManager) Init(ctx context.Context, logger *zap.Logger, proxyAddr string, cfg config.Metrics, cfgch <-chan *config.Config) {
mm.logger = logger
registerOnce.Do(registerProxyMetrics)
ctx, mm.cancel = context.WithCancel(ctx)
mm.setupMonitor(ctx)
mm.pushMetric(ctx, metricsAddr, time.Duration(metricsInterval)*time.Second, proxyAddr)
mm.pushMetric(ctx, proxyAddr, cfg, cfgch)
}

// Close stops all goroutines.
Expand Down Expand Up @@ -89,17 +90,64 @@ func (mm *MetricsManager) setupMonitor(ctx context.Context) {
}

// pushMetric pushes metrics in background.
func (mm *MetricsManager) pushMetric(ctx context.Context, addr string, interval time.Duration, proxyAddr string) {
if interval == time.Duration(0) || len(addr) == 0 {
mm.logger.Info("disable Prometheus push client")
return
}
mm.logger.Info("start prometheus push client", zap.String("server addr", addr), zap.String("interval", interval.String()))
func (mm *MetricsManager) pushMetric(ctx context.Context, proxyAddr string, cfg config.Metrics, cfgch <-chan *config.Config) {
mm.wg.Run(func() {
prometheusPushClient(ctx, mm.logger, addr, interval, proxyAddr)
proxyInstance := instanceName(proxyAddr)
addr := cfg.MetricsAddr
interval := time.Duration(cfg.MetricsInterval) * time.Second
pusher := mm.buildPusher(addr, interval, proxyInstance)

for ctx.Err() == nil {
select {
case newCfg := <-cfgch:
if newCfg == nil {
return
}
interval = time.Duration(newCfg.Metrics.MetricsInterval) * time.Second
if addr != newCfg.Metrics.MetricsAddr {
addr = newCfg.Metrics.MetricsAddr
pusher = mm.buildPusher(addr, interval, proxyInstance)
}
default:
}

// Wait until the config is legal.
if interval == 0 || pusher == nil {
select {
case <-time.After(time.Second):
continue
case <-ctx.Done():
return
}
}

if err := pusher.Push(); err != nil {
mm.logger.Error("could not push metrics to prometheus pushgateway", zap.Error(err))
}
select {
case <-time.After(interval):
case <-ctx.Done():
return
}
}
})
}

func (mm *MetricsManager) buildPusher(addr string, interval time.Duration, proxyInstance string) *push.Pusher {
var pusher *push.Pusher
if len(addr) > 0 {
// Create a new pusher when the address changes.
mm.logger.Info("start prometheus push client", zap.String("server addr", addr), zap.Stringer("interval", interval))
pusher = push.New(addr, "tiproxy")
pusher = pusher.Gatherer(prometheus.DefaultGatherer)
pusher = pusher.Grouping("instance", proxyInstance)
} else {
mm.logger.Info("disable prometheus push client")
pusher = nil
}
return pusher
}

// registerProxyMetrics registers metrics.
func registerProxyMetrics() {
prometheus.DefaultRegisterer.Unregister(collectors.NewGoCollector())
Expand All @@ -122,25 +170,6 @@ func registerProxyMetrics() {
prometheus.MustRegister(MigrateDurationHistogram)
}

// prometheusPushClient pushes metrics to Prometheus Pushgateway.
func prometheusPushClient(ctx context.Context, logger *zap.Logger, addr string, interval time.Duration, proxyAddr string) {
job := "tiproxy"
pusher := push.New(addr, job)
pusher = pusher.Gatherer(prometheus.DefaultGatherer)
pusher = pusher.Grouping("instance", instanceName(proxyAddr))
for ctx.Err() == nil {
err := pusher.Push()
if err != nil {
logger.Error("could not push metrics to prometheus pushgateway", zap.String("err", err.Error()))
}
select {
case <-time.After(interval):
case <-ctx.Done():
return
}
}
}

func instanceName(proxyAddr string) string {
hostname, err := os.Hostname()
if err != nil {
Expand Down
81 changes: 49 additions & 32 deletions pkg/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"testing"
"time"

"github.com/pingcap/tiproxy/lib/config"
"github.com/pingcap/tiproxy/lib/util/logger"
"github.com/stretchr/testify/require"
)
Expand All @@ -21,64 +22,80 @@ import (
func TestPushMetrics(t *testing.T) {
proxyAddr := "0.0.0.0:6000"
labelName := fmt.Sprintf("%s_%s_maxprocs", ModuleProxy, LabelServer)
hostname, err := os.Hostname()
require.NoError(t, err)
expectedPath := fmt.Sprintf("/metrics/job/tiproxy/instance/%s_6000", hostname)
bodyCh := make(chan string)
pgwOK := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
require.NoError(t, err)
bodyCh <- string(body)
require.Equal(t, expectedPath, r.URL.EscapedPath())
w.Header().Set("Content-Type", `text/plain; charset=utf-8`)
w.WriteHeader(http.StatusOK)
}),
)
defer pgwOK.Close()
bodyCh1, bodyCh2 := make(chan string), make(chan string)
pgwOK1, pgwOK2 := setupServer(t, bodyCh1), setupServer(t, bodyCh2)
log, _ := logger.CreateLoggerForTest(t)

tests := []struct {
metricsAddr string
metricsInterval uint
pushed bool
pushedCh chan string
}{
{
metricsAddr: pgwOK.URL,
metricsAddr: pgwOK1.URL,
metricsInterval: 1,
pushed: true,
pushedCh: bodyCh1,
},
{
metricsAddr: "",
metricsAddr: pgwOK1.URL,
metricsInterval: 0,
pushedCh: nil,
},
{
metricsAddr: pgwOK2.URL,
metricsInterval: 1,
pushed: false,
pushedCh: bodyCh2,
},
{
metricsAddr: pgwOK.URL,
metricsInterval: 0,
pushed: false,
metricsAddr: "",
metricsInterval: 1,
pushedCh: nil,
},
}
mm := NewMetricsManager()
cfgCh := make(chan *config.Config, 1)
mm.Init(context.Background(), log, proxyAddr, config.Metrics{}, cfgCh)
for _, tt := range tests {
for len(bodyCh) > 0 {
<-bodyCh
cfgCh <- &config.Config{
Metrics: config.Metrics{
MetricsAddr: tt.metricsAddr,
MetricsInterval: tt.metricsInterval,
},
}
mm := NewMetricsManager()
mm.Init(context.Background(), log, tt.metricsAddr, tt.metricsInterval, proxyAddr)
if tt.pushed {
if tt.pushedCh != nil {
select {
case body := <-bodyCh:
case body := <-tt.pushedCh:
require.Contains(t, body, labelName)
case <-time.After(2 * time.Second):
t.Fatal("not pushed")
}
} else {
select {
case <-bodyCh:
t.Fatal("pushed")
case <-bodyCh1:
t.Fatal("pushed 1")
case <-bodyCh2:
t.Fatal("pushed 2")
case <-time.After(2 * time.Second):
}
}
mm.Close()
}
mm.Close()
}

func setupServer(t *testing.T, bodyCh chan string) *httptest.Server {
hostname, err := os.Hostname()
require.NoError(t, err)
expectedPath := fmt.Sprintf("/metrics/job/tiproxy/instance/%s_6000", hostname)
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
require.NoError(t, err)
bodyCh <- string(body)
require.Equal(t, expectedPath, r.URL.EscapedPath())
w.Header().Set("Content-Type", `text/plain; charset=utf-8`)
w.WriteHeader(http.StatusOK)
}),
)
t.Cleanup(server.Close)
return server
}
11 changes: 3 additions & 8 deletions pkg/proxy/backend/handshake_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,12 @@ type HandshakeHandler interface {
}

type DefaultHandshakeHandler struct {
nsManager *namespace.NamespaceManager
serverVersion string
nsManager *namespace.NamespaceManager
}

func NewDefaultHandshakeHandler(nsManager *namespace.NamespaceManager, serverVersion string) *DefaultHandshakeHandler {
func NewDefaultHandshakeHandler(nsManager *namespace.NamespaceManager) *DefaultHandshakeHandler {
return &DefaultHandshakeHandler{
nsManager: nsManager,
serverVersion: serverVersion,
nsManager: nsManager,
}
}

Expand Down Expand Up @@ -128,9 +126,6 @@ func (handler *DefaultHandshakeHandler) GetCapability() pnet.Capability {
}

func (handler *DefaultHandshakeHandler) GetServerVersion() string {
if len(handler.serverVersion) > 0 {
return handler.serverVersion
}
// TiProxy sends the server version before getting the router, so we don't know which router to get.
// Just get the default one.
if ns, ok := handler.nsManager.GetNamespace("default"); ok {
Expand Down
Loading
Loading