Skip to content

Commit

Permalink
Fix goroutine leak on config reload
Browse files Browse the repository at this point in the history
Promxy is leaking goroutines on config reloads leading to OOM kills in
the long run.

two issues :
 - oldState ServerGroups are not cancelled if appenders didn't change
 - Prometheus discovery target managers never close SyncCh blocking the ServerGroup Sync() goroutine forever
  • Loading branch information
Charles-Antoine Mathieu authored and jacksontj committed Mar 8, 2023
1 parent 5d5e654 commit 8238d3a
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 110 deletions.
4 changes: 2 additions & 2 deletions pkg/proxystorage/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (p *proxyStorageState) Cancel(n *proxyStorageState) {
sg.Cancel()
}
}
// We call close if the new one is nil, or if the appanders don't match
// We call close if the new one is nil, or if the appenders don't match
if n == nil || p.appender != n.appender {
if p.appenderCloser != nil {
p.appenderCloser()
Expand Down Expand Up @@ -147,7 +147,7 @@ func (p *ProxyStorage) ApplyConfig(c *proxyconfig.Config) error {

newState.Ready() // Wait for the newstate to be ready
p.state.Store(newState) // Store the new state
if oldState != nil && oldState.appender != newState.appender {
if oldState != nil {
oldState.Cancel(newState) // Cancel the old one
}

Expand Down
221 changes: 113 additions & 108 deletions pkg/servergroup/servergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,143 +108,148 @@ func (s *ServerGroup) RoundTrip(r *http.Request) (*http.Response, error) {
func (s *ServerGroup) Sync() {
syncCh := s.targetManager.SyncCh()

SYNC_LOOP:
for targetGroupMap := range syncCh {
logrus.Debug("Updating targets from discovery manager")
targets := make([]string, 0)
apiClients := make([]promclient.API, 0)

for _, targetGroupList := range targetGroupMap {
for _, targetGroup := range targetGroupList {
for _, target := range targetGroup.Targets {

lbls := make([]labels.Label, 0, len(target)+len(targetGroup.Labels)+2)

for ln, lv := range target {
lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
}

for ln, lv := range targetGroup.Labels {
if _, ok := target[ln]; !ok {
SyncLoop:
for {
select {
case <-s.ctx.Done():
return
case targetGroupMap := <-syncCh:
logrus.Debug("Updating targets from discovery manager")
targets := make([]string, 0)
apiClients := make([]promclient.API, 0)

for _, targetGroupList := range targetGroupMap {
for _, targetGroup := range targetGroupList {
for _, target := range targetGroup.Targets {

lbls := make([]labels.Label, 0, len(target)+len(targetGroup.Labels)+2)

for ln, lv := range target {
lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
}
}

lbls = append(lbls, labels.Label{Name: model.SchemeLabel, Value: string(s.Cfg.Scheme)})
lbls = append(lbls, labels.Label{Name: PathPrefixLabel, Value: string(s.Cfg.PathPrefix)})

lset := labels.New(lbls...)

logrus.Tracef("Potential target pre-relabel: %v", lset)
lset = relabel.Process(lset, s.Cfg.RelabelConfigs...)
logrus.Tracef("Potential target post-relabel: %v", lset)
// Check if the target was dropped, if so we skip it
if len(lset) == 0 {
continue
}
for ln, lv := range targetGroup.Labels {
if _, ok := target[ln]; !ok {
lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
}
}

// If there is no address, then we can't use this set of targets
if v := lset.Get(model.AddressLabel); v == "" {
logrus.Errorf("Discovery target is missing address label: %v", lset)
continue SYNC_LOOP
}
lbls = append(lbls, labels.Label{Name: model.SchemeLabel, Value: string(s.Cfg.Scheme)})
lbls = append(lbls, labels.Label{Name: PathPrefixLabel, Value: string(s.Cfg.PathPrefix)})

u := &url.URL{
Scheme: lset.Get(model.SchemeLabel),
Host: lset.Get(model.AddressLabel),
Path: lset.Get(PathPrefixLabel),
}
lset := labels.New(lbls...)

targets = append(targets, u.Host)
logrus.Tracef("Potential target pre-relabel: %v", lset)
lset = relabel.Process(lset, s.Cfg.RelabelConfigs...)
logrus.Tracef("Potential target post-relabel: %v", lset)
// Check if the target was dropped, if so we skip it
if len(lset) == 0 {
continue
}

client, err := api.NewClient(api.Config{Address: u.String(), RoundTripper: s})
if err != nil {
panic(err) // TODO: shouldn't be possible? If this happens I guess we log and skip?
}
// If there is no address, then we can't use this set of targets
if v := lset.Get(model.AddressLabel); v == "" {
logrus.Errorf("Discovery target is missing address label: %v", lset)
continue SyncLoop
}

if len(s.Cfg.QueryParams) > 0 {
client = promclient.NewClientArgsWrap(client, s.Cfg.QueryParams)
}
u := &url.URL{
Scheme: lset.Get(model.SchemeLabel),
Host: lset.Get(model.AddressLabel),
Path: lset.Get(PathPrefixLabel),
}

var apiClient promclient.API
apiClient = &promclient.PromAPIV1{v1.NewAPI(client)}
targets = append(targets, u.Host)

if s.Cfg.RemoteRead {
u.Path = path.Join(u.Path, s.Cfg.RemoteReadPath)
cfg := &remote.ClientConfig{
URL: &config_util.URL{u},
HTTPClientConfig: s.Cfg.HTTPConfig.HTTPConfig,
Timeout: model.Duration(time.Minute * 2),
}
remoteStorageClient, err := remote.NewReadClient("foo", cfg)
client, err := api.NewClient(api.Config{Address: u.String(), RoundTripper: s})
if err != nil {
panic(err)
panic(err) // TODO: shouldn't be possible? If this happens I guess we log and skip?
}

apiClient = &promclient.PromAPIRemoteRead{apiClient, remoteStorageClient}
}
if len(s.Cfg.QueryParams) > 0 {
client = promclient.NewClientArgsWrap(client, s.Cfg.QueryParams)
}

// Optionally add time range layers
if s.Cfg.AbsoluteTimeRangeConfig != nil {
apiClient = &promclient.AbsoluteTimeFilter{
API: apiClient,
Start: s.Cfg.AbsoluteTimeRangeConfig.Start,
End: s.Cfg.AbsoluteTimeRangeConfig.End,
Truncate: s.Cfg.AbsoluteTimeRangeConfig.Truncate,
var apiClient promclient.API
apiClient = &promclient.PromAPIV1{v1.NewAPI(client)}

if s.Cfg.RemoteRead {
u.Path = path.Join(u.Path, s.Cfg.RemoteReadPath)
cfg := &remote.ClientConfig{
URL: &config_util.URL{u},
HTTPClientConfig: s.Cfg.HTTPConfig.HTTPConfig,
Timeout: model.Duration(time.Minute * 2),
}
remoteStorageClient, err := remote.NewReadClient("foo", cfg)
if err != nil {
panic(err)
}

apiClient = &promclient.PromAPIRemoteRead{apiClient, remoteStorageClient}
}
}

if s.Cfg.RelativeTimeRangeConfig != nil {
apiClient = &promclient.RelativeTimeFilter{
API: apiClient,
Start: s.Cfg.RelativeTimeRangeConfig.Start,
End: s.Cfg.RelativeTimeRangeConfig.End,
Truncate: s.Cfg.RelativeTimeRangeConfig.Truncate,
// Optionally add time range layers
if s.Cfg.AbsoluteTimeRangeConfig != nil {
apiClient = &promclient.AbsoluteTimeFilter{
API: apiClient,
Start: s.Cfg.AbsoluteTimeRangeConfig.Start,
End: s.Cfg.AbsoluteTimeRangeConfig.End,
Truncate: s.Cfg.AbsoluteTimeRangeConfig.Truncate,
}
}
}

// We remove all private labels after we set the target entry
modelLabelSet := make(model.LabelSet, len(lset))
for _, lbl := range lset {
if !strings.HasPrefix(string(lbl.Name), model.ReservedLabelPrefix) {
modelLabelSet[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
if s.Cfg.RelativeTimeRangeConfig != nil {
apiClient = &promclient.RelativeTimeFilter{
API: apiClient,
Start: s.Cfg.RelativeTimeRangeConfig.Start,
End: s.Cfg.RelativeTimeRangeConfig.End,
Truncate: s.Cfg.RelativeTimeRangeConfig.Truncate,
}
}
}

// Add labels
apiClient = &promclient.AddLabelClient{apiClient, modelLabelSet.Merge(s.Cfg.Labels)}
// We remove all private labels after we set the target entry
modelLabelSet := make(model.LabelSet, len(lset))
for _, lbl := range lset {
if !strings.HasPrefix(string(lbl.Name), model.ReservedLabelPrefix) {
modelLabelSet[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
}
}

// If debug logging is enabled, wrap the client with a debugAPI client
// Since these are called in the reverse order of what we add, we want
// to make sure that this is the last wrap of the client
if logrus.GetLevel() >= logrus.DebugLevel {
apiClient = &promclient.DebugAPI{apiClient, u.String()}
}
// Add labels
apiClient = &promclient.AddLabelClient{apiClient, modelLabelSet.Merge(s.Cfg.Labels)}

// If debug logging is enabled, wrap the client with a debugAPI client
// Since these are called in the reverse order of what we add, we want
// to make sure that this is the last wrap of the client
if logrus.GetLevel() >= logrus.DebugLevel {
apiClient = &promclient.DebugAPI{apiClient, u.String()}
}

apiClients = append(apiClients, apiClient)
apiClients = append(apiClients, apiClient)
}
}
}
}

apiClientMetricFunc := func(i int, api, status string, took float64) {
serverGroupSummary.WithLabelValues(targets[i], api, status).Observe(took)
}
apiClientMetricFunc := func(i int, api, status string, took float64) {
serverGroupSummary.WithLabelValues(targets[i], api, status).Observe(took)
}

logrus.Debugf("Updating targets from discovery manager: %v", targets)
newState := &ServerGroupState{
Targets: targets,
apiClient: promclient.NewMultiAPI(apiClients, s.Cfg.GetAntiAffinity(), apiClientMetricFunc, 1),
}
logrus.Debugf("Updating targets from discovery manager: %v", targets)
newState := &ServerGroupState{
Targets: targets,
apiClient: promclient.NewMultiAPI(apiClients, s.Cfg.GetAntiAffinity(), apiClientMetricFunc, 1),
}

if s.Cfg.IgnoreError {
newState.apiClient = &promclient.IgnoreErrorAPI{newState.apiClient}
}
if s.Cfg.IgnoreError {
newState.apiClient = &promclient.IgnoreErrorAPI{newState.apiClient}
}

s.state.Store(newState)
s.state.Store(newState)

if !s.loaded {
s.loaded = true
close(s.Ready)
if !s.loaded {
s.loaded = true
close(s.Ready)
}
}
}
}
Expand Down

0 comments on commit 8238d3a

Please sign in to comment.