Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix goroutine leak on config reload #508

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/proxystorage/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (p *proxyStorageState) Cancel(n *proxyStorageState) {
sg.Cancel()
}
}
// We call close if the new one is nil, or if the appanders don't match
// We call close if the new one is nil, or if the appenders don't match
if n == nil || p.appender != n.appender {
if p.appenderCloser != nil {
p.appenderCloser()
Expand Down Expand Up @@ -145,7 +145,7 @@ func (p *ProxyStorage) ApplyConfig(c *proxyconfig.Config) error {

newState.Ready() // Wait for the newstate to be ready
p.state.Store(newState) // Store the new state
if oldState != nil && oldState.appender != newState.appender {
if oldState != nil {
oldState.Cancel(newState) // Cancel the old one
}

Expand Down
221 changes: 113 additions & 108 deletions pkg/servergroup/servergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,143 +108,148 @@ func (s *ServerGroup) RoundTrip(r *http.Request) (*http.Response, error) {
func (s *ServerGroup) Sync() {
syncCh := s.targetManager.SyncCh()

SYNC_LOOP:
for targetGroupMap := range syncCh {
logrus.Debug("Updating targets from discovery manager")
targets := make([]string, 0)
apiClients := make([]promclient.API, 0)

for _, targetGroupList := range targetGroupMap {
for _, targetGroup := range targetGroupList {
for _, target := range targetGroup.Targets {

lbls := make([]labels.Label, 0, len(target)+len(targetGroup.Labels)+2)

for ln, lv := range target {
lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
}

for ln, lv := range targetGroup.Labels {
if _, ok := target[ln]; !ok {
SyncLoop:
for {
select {
case <-s.ctx.Done():
return
case targetGroupMap := <-syncCh:
logrus.Debug("Updating targets from discovery manager")
targets := make([]string, 0)
apiClients := make([]promclient.API, 0)

for _, targetGroupList := range targetGroupMap {
for _, targetGroup := range targetGroupList {
for _, target := range targetGroup.Targets {

lbls := make([]labels.Label, 0, len(target)+len(targetGroup.Labels)+2)

for ln, lv := range target {
lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
}
}

lbls = append(lbls, labels.Label{Name: model.SchemeLabel, Value: string(s.Cfg.Scheme)})
lbls = append(lbls, labels.Label{Name: PathPrefixLabel, Value: string(s.Cfg.PathPrefix)})

lset := labels.New(lbls...)

logrus.Tracef("Potential target pre-relabel: %v", lset)
lset = relabel.Process(lset, s.Cfg.RelabelConfigs...)
logrus.Tracef("Potential target post-relabel: %v", lset)
// Check if the target was dropped, if so we skip it
if len(lset) == 0 {
continue
}
for ln, lv := range targetGroup.Labels {
if _, ok := target[ln]; !ok {
lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)})
}
}

// If there is no address, then we can't use this set of targets
if v := lset.Get(model.AddressLabel); v == "" {
logrus.Errorf("Discovery target is missing address label: %v", lset)
continue SYNC_LOOP
}
lbls = append(lbls, labels.Label{Name: model.SchemeLabel, Value: string(s.Cfg.Scheme)})
lbls = append(lbls, labels.Label{Name: PathPrefixLabel, Value: string(s.Cfg.PathPrefix)})

u := &url.URL{
Scheme: lset.Get(model.SchemeLabel),
Host: lset.Get(model.AddressLabel),
Path: lset.Get(PathPrefixLabel),
}
lset := labels.New(lbls...)

targets = append(targets, u.Host)
logrus.Tracef("Potential target pre-relabel: %v", lset)
lset = relabel.Process(lset, s.Cfg.RelabelConfigs...)
logrus.Tracef("Potential target post-relabel: %v", lset)
// Check if the target was dropped, if so we skip it
if len(lset) == 0 {
continue
}

client, err := api.NewClient(api.Config{Address: u.String(), RoundTripper: s})
if err != nil {
panic(err) // TODO: shouldn't be possible? If this happens I guess we log and skip?
}
// If there is no address, then we can't use this set of targets
if v := lset.Get(model.AddressLabel); v == "" {
logrus.Errorf("Discovery target is missing address label: %v", lset)
continue SyncLoop
}

if len(s.Cfg.QueryParams) > 0 {
client = promclient.NewClientArgsWrap(client, s.Cfg.QueryParams)
}
u := &url.URL{
Scheme: lset.Get(model.SchemeLabel),
Host: lset.Get(model.AddressLabel),
Path: lset.Get(PathPrefixLabel),
}

var apiClient promclient.API
apiClient = &promclient.PromAPIV1{v1.NewAPI(client)}
targets = append(targets, u.Host)

if s.Cfg.RemoteRead {
u.Path = path.Join(u.Path, s.Cfg.RemoteReadPath)
cfg := &remote.ClientConfig{
URL: &config_util.URL{u},
HTTPClientConfig: s.Cfg.HTTPConfig.HTTPConfig,
Timeout: model.Duration(time.Minute * 2),
}
remoteStorageClient, err := remote.NewReadClient("foo", cfg)
client, err := api.NewClient(api.Config{Address: u.String(), RoundTripper: s})
if err != nil {
panic(err)
panic(err) // TODO: shouldn't be possible? If this happens I guess we log and skip?
}

apiClient = &promclient.PromAPIRemoteRead{apiClient, remoteStorageClient}
}
if len(s.Cfg.QueryParams) > 0 {
client = promclient.NewClientArgsWrap(client, s.Cfg.QueryParams)
}

// Optionally add time range layers
if s.Cfg.AbsoluteTimeRangeConfig != nil {
apiClient = &promclient.AbsoluteTimeFilter{
API: apiClient,
Start: s.Cfg.AbsoluteTimeRangeConfig.Start,
End: s.Cfg.AbsoluteTimeRangeConfig.End,
Truncate: s.Cfg.AbsoluteTimeRangeConfig.Truncate,
var apiClient promclient.API
apiClient = &promclient.PromAPIV1{v1.NewAPI(client)}

if s.Cfg.RemoteRead {
u.Path = path.Join(u.Path, s.Cfg.RemoteReadPath)
cfg := &remote.ClientConfig{
URL: &config_util.URL{u},
HTTPClientConfig: s.Cfg.HTTPConfig.HTTPConfig,
Timeout: model.Duration(time.Minute * 2),
}
remoteStorageClient, err := remote.NewReadClient("foo", cfg)
if err != nil {
panic(err)
}

apiClient = &promclient.PromAPIRemoteRead{apiClient, remoteStorageClient}
}
}

if s.Cfg.RelativeTimeRangeConfig != nil {
apiClient = &promclient.RelativeTimeFilter{
API: apiClient,
Start: s.Cfg.RelativeTimeRangeConfig.Start,
End: s.Cfg.RelativeTimeRangeConfig.End,
Truncate: s.Cfg.RelativeTimeRangeConfig.Truncate,
// Optionally add time range layers
if s.Cfg.AbsoluteTimeRangeConfig != nil {
apiClient = &promclient.AbsoluteTimeFilter{
API: apiClient,
Start: s.Cfg.AbsoluteTimeRangeConfig.Start,
End: s.Cfg.AbsoluteTimeRangeConfig.End,
Truncate: s.Cfg.AbsoluteTimeRangeConfig.Truncate,
}
}
}

// We remove all private labels after we set the target entry
modelLabelSet := make(model.LabelSet, len(lset))
for _, lbl := range lset {
if !strings.HasPrefix(string(lbl.Name), model.ReservedLabelPrefix) {
modelLabelSet[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
if s.Cfg.RelativeTimeRangeConfig != nil {
apiClient = &promclient.RelativeTimeFilter{
API: apiClient,
Start: s.Cfg.RelativeTimeRangeConfig.Start,
End: s.Cfg.RelativeTimeRangeConfig.End,
Truncate: s.Cfg.RelativeTimeRangeConfig.Truncate,
}
}
}

// Add labels
apiClient = &promclient.AddLabelClient{apiClient, modelLabelSet.Merge(s.Cfg.Labels)}
// We remove all private labels after we set the target entry
modelLabelSet := make(model.LabelSet, len(lset))
for _, lbl := range lset {
if !strings.HasPrefix(string(lbl.Name), model.ReservedLabelPrefix) {
modelLabelSet[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
}
}

// If debug logging is enabled, wrap the client with a debugAPI client
// Since these are called in the reverse order of what we add, we want
// to make sure that this is the last wrap of the client
if logrus.GetLevel() >= logrus.DebugLevel {
apiClient = &promclient.DebugAPI{apiClient, u.String()}
}
// Add labels
apiClient = &promclient.AddLabelClient{apiClient, modelLabelSet.Merge(s.Cfg.Labels)}

// If debug logging is enabled, wrap the client with a debugAPI client
// Since these are called in the reverse order of what we add, we want
// to make sure that this is the last wrap of the client
if logrus.GetLevel() >= logrus.DebugLevel {
apiClient = &promclient.DebugAPI{apiClient, u.String()}
}

apiClients = append(apiClients, apiClient)
apiClients = append(apiClients, apiClient)
}
}
}
}

apiClientMetricFunc := func(i int, api, status string, took float64) {
serverGroupSummary.WithLabelValues(targets[i], api, status).Observe(took)
}
apiClientMetricFunc := func(i int, api, status string, took float64) {
serverGroupSummary.WithLabelValues(targets[i], api, status).Observe(took)
}

logrus.Debugf("Updating targets from discovery manager: %v", targets)
newState := &ServerGroupState{
Targets: targets,
apiClient: promclient.NewMultiAPI(apiClients, s.Cfg.GetAntiAffinity(), apiClientMetricFunc, 1),
}
logrus.Debugf("Updating targets from discovery manager: %v", targets)
newState := &ServerGroupState{
Targets: targets,
apiClient: promclient.NewMultiAPI(apiClients, s.Cfg.GetAntiAffinity(), apiClientMetricFunc, 1),
}

if s.Cfg.IgnoreError {
newState.apiClient = &promclient.IgnoreErrorAPI{newState.apiClient}
}
if s.Cfg.IgnoreError {
newState.apiClient = &promclient.IgnoreErrorAPI{newState.apiClient}
}

s.state.Store(newState)
s.state.Store(newState)

if !s.loaded {
s.loaded = true
close(s.Ready)
if !s.loaded {
s.loaded = true
close(s.Ready)
}
}
}
}
Expand Down