diff --git a/pkg/proxystorage/proxy.go b/pkg/proxystorage/proxy.go index 49be12352..21ee3681a 100644 --- a/pkg/proxystorage/proxy.go +++ b/pkg/proxystorage/proxy.go @@ -61,7 +61,7 @@ func (p *proxyStorageState) Cancel(n *proxyStorageState) { sg.Cancel() } } - // We call close if the new one is nil, or if the appanders don't match + // We call close if the new one is nil, or if the appenders don't match if n == nil || p.appender != n.appender { if p.appenderCloser != nil { p.appenderCloser() @@ -145,7 +145,7 @@ func (p *ProxyStorage) ApplyConfig(c *proxyconfig.Config) error { newState.Ready() // Wait for the newstate to be ready p.state.Store(newState) // Store the new state - if oldState != nil && oldState.appender != newState.appender { + if oldState != nil { oldState.Cancel(newState) // Cancel the old one } diff --git a/pkg/servergroup/servergroup.go b/pkg/servergroup/servergroup.go index 1b5339b5d..148673edb 100644 --- a/pkg/servergroup/servergroup.go +++ b/pkg/servergroup/servergroup.go @@ -108,143 +108,148 @@ func (s *ServerGroup) RoundTrip(r *http.Request) (*http.Response, error) { func (s *ServerGroup) Sync() { syncCh := s.targetManager.SyncCh() -SYNC_LOOP: - for targetGroupMap := range syncCh { - logrus.Debug("Updating targets from discovery manager") - targets := make([]string, 0) - apiClients := make([]promclient.API, 0) - - for _, targetGroupList := range targetGroupMap { - for _, targetGroup := range targetGroupList { - for _, target := range targetGroup.Targets { - - lbls := make([]labels.Label, 0, len(target)+len(targetGroup.Labels)+2) - - for ln, lv := range target { - lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)}) - } - - for ln, lv := range targetGroup.Labels { - if _, ok := target[ln]; !ok { +SyncLoop: + for { + select { + case <-s.ctx.Done(): + return + case targetGroupMap := <-syncCh: + logrus.Debug("Updating targets from discovery manager") + targets := make([]string, 0) + apiClients := make([]promclient.API, 0) + + for _, targetGroupList := range targetGroupMap { + for _, targetGroup := range targetGroupList { + for _, target := range targetGroup.Targets { + + lbls := make([]labels.Label, 0, len(target)+len(targetGroup.Labels)+2) + + for ln, lv := range target { lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)}) } - } - - lbls = append(lbls, labels.Label{Name: model.SchemeLabel, Value: string(s.Cfg.Scheme)}) - lbls = append(lbls, labels.Label{Name: PathPrefixLabel, Value: string(s.Cfg.PathPrefix)}) - lset := labels.New(lbls...) - - logrus.Tracef("Potential target pre-relabel: %v", lset) - lset = relabel.Process(lset, s.Cfg.RelabelConfigs...) - logrus.Tracef("Potential target post-relabel: %v", lset) - // Check if the target was dropped, if so we skip it - if len(lset) == 0 { - continue - } + for ln, lv := range targetGroup.Labels { + if _, ok := target[ln]; !ok { + lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)}) + } + } - // If there is no address, then we can't use this set of targets - if v := lset.Get(model.AddressLabel); v == "" { - logrus.Errorf("Discovery target is missing address label: %v", lset) - continue SYNC_LOOP - } + lbls = append(lbls, labels.Label{Name: model.SchemeLabel, Value: string(s.Cfg.Scheme)}) + lbls = append(lbls, labels.Label{Name: PathPrefixLabel, Value: string(s.Cfg.PathPrefix)}) - u := &url.URL{ - Scheme: lset.Get(model.SchemeLabel), - Host: lset.Get(model.AddressLabel), - Path: lset.Get(PathPrefixLabel), - } + lset := labels.New(lbls...) - targets = append(targets, u.Host) + logrus.Tracef("Potential target pre-relabel: %v", lset) + lset = relabel.Process(lset, s.Cfg.RelabelConfigs...) + logrus.Tracef("Potential target post-relabel: %v", lset) + // Check if the target was dropped, if so we skip it + if len(lset) == 0 { + continue + } - client, err := api.NewClient(api.Config{Address: u.String(), RoundTripper: s}) - if err != nil { - panic(err) // TODO: shouldn't be possible? If this happens I guess we log and skip? - } + // If there is no address, then we can't use this set of targets + if v := lset.Get(model.AddressLabel); v == "" { + logrus.Errorf("Discovery target is missing address label: %v", lset) + continue SyncLoop + } - if len(s.Cfg.QueryParams) > 0 { - client = promclient.NewClientArgsWrap(client, s.Cfg.QueryParams) - } + u := &url.URL{ + Scheme: lset.Get(model.SchemeLabel), + Host: lset.Get(model.AddressLabel), + Path: lset.Get(PathPrefixLabel), + } - var apiClient promclient.API - apiClient = &promclient.PromAPIV1{v1.NewAPI(client)} + targets = append(targets, u.Host) - if s.Cfg.RemoteRead { - u.Path = path.Join(u.Path, s.Cfg.RemoteReadPath) - cfg := &remote.ClientConfig{ - URL: &config_util.URL{u}, - HTTPClientConfig: s.Cfg.HTTPConfig.HTTPConfig, - Timeout: model.Duration(time.Minute * 2), - } - remoteStorageClient, err := remote.NewReadClient("foo", cfg) + client, err := api.NewClient(api.Config{Address: u.String(), RoundTripper: s}) if err != nil { - panic(err) + panic(err) // TODO: shouldn't be possible? If this happens I guess we log and skip? } - apiClient = &promclient.PromAPIRemoteRead{apiClient, remoteStorageClient} - } + if len(s.Cfg.QueryParams) > 0 { + client = promclient.NewClientArgsWrap(client, s.Cfg.QueryParams) + } - // Optionally add time range layers - if s.Cfg.AbsoluteTimeRangeConfig != nil { - apiClient = &promclient.AbsoluteTimeFilter{ - API: apiClient, - Start: s.Cfg.AbsoluteTimeRangeConfig.Start, - End: s.Cfg.AbsoluteTimeRangeConfig.End, - Truncate: s.Cfg.AbsoluteTimeRangeConfig.Truncate, + var apiClient promclient.API + apiClient = &promclient.PromAPIV1{v1.NewAPI(client)} + + if s.Cfg.RemoteRead { + u.Path = path.Join(u.Path, s.Cfg.RemoteReadPath) + cfg := &remote.ClientConfig{ + URL: &config_util.URL{u}, + HTTPClientConfig: s.Cfg.HTTPConfig.HTTPConfig, + Timeout: model.Duration(time.Minute * 2), + } + remoteStorageClient, err := remote.NewReadClient("foo", cfg) + if err != nil { + panic(err) + } + + apiClient = &promclient.PromAPIRemoteRead{apiClient, remoteStorageClient} } - } - if s.Cfg.RelativeTimeRangeConfig != nil { - apiClient = &promclient.RelativeTimeFilter{ - API: apiClient, - Start: s.Cfg.RelativeTimeRangeConfig.Start, - End: s.Cfg.RelativeTimeRangeConfig.End, - Truncate: s.Cfg.RelativeTimeRangeConfig.Truncate, + // Optionally add time range layers + if s.Cfg.AbsoluteTimeRangeConfig != nil { + apiClient = &promclient.AbsoluteTimeFilter{ + API: apiClient, + Start: s.Cfg.AbsoluteTimeRangeConfig.Start, + End: s.Cfg.AbsoluteTimeRangeConfig.End, + Truncate: s.Cfg.AbsoluteTimeRangeConfig.Truncate, + } } - } - // We remove all private labels after we set the target entry - modelLabelSet := make(model.LabelSet, len(lset)) - for _, lbl := range lset { - if !strings.HasPrefix(string(lbl.Name), model.ReservedLabelPrefix) { - modelLabelSet[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value) + if s.Cfg.RelativeTimeRangeConfig != nil { + apiClient = &promclient.RelativeTimeFilter{ + API: apiClient, + Start: s.Cfg.RelativeTimeRangeConfig.Start, + End: s.Cfg.RelativeTimeRangeConfig.End, + Truncate: s.Cfg.RelativeTimeRangeConfig.Truncate, + } } - } - // Add labels - apiClient = &promclient.AddLabelClient{apiClient, modelLabelSet.Merge(s.Cfg.Labels)} + // We remove all private labels after we set the target entry + modelLabelSet := make(model.LabelSet, len(lset)) + for _, lbl := range lset { + if !strings.HasPrefix(string(lbl.Name), model.ReservedLabelPrefix) { + modelLabelSet[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value) + } + } - // If debug logging is enabled, wrap the client with a debugAPI client - // Since these are called in the reverse order of what we add, we want - // to make sure that this is the last wrap of the client - if logrus.GetLevel() >= logrus.DebugLevel { - apiClient = &promclient.DebugAPI{apiClient, u.String()} - } + // Add labels + apiClient = &promclient.AddLabelClient{apiClient, modelLabelSet.Merge(s.Cfg.Labels)} + + // If debug logging is enabled, wrap the client with a debugAPI client + // Since these are called in the reverse order of what we add, we want + // to make sure that this is the last wrap of the client + if logrus.GetLevel() >= logrus.DebugLevel { + apiClient = &promclient.DebugAPI{apiClient, u.String()} + } - apiClients = append(apiClients, apiClient) + apiClients = append(apiClients, apiClient) + } } } - } - apiClientMetricFunc := func(i int, api, status string, took float64) { - serverGroupSummary.WithLabelValues(targets[i], api, status).Observe(took) - } + apiClientMetricFunc := func(i int, api, status string, took float64) { + serverGroupSummary.WithLabelValues(targets[i], api, status).Observe(took) + } - logrus.Debugf("Updating targets from discovery manager: %v", targets) - newState := &ServerGroupState{ - Targets: targets, - apiClient: promclient.NewMultiAPI(apiClients, s.Cfg.GetAntiAffinity(), apiClientMetricFunc, 1), - } + logrus.Debugf("Updating targets from discovery manager: %v", targets) + newState := &ServerGroupState{ + Targets: targets, + apiClient: promclient.NewMultiAPI(apiClients, s.Cfg.GetAntiAffinity(), apiClientMetricFunc, 1), + } - if s.Cfg.IgnoreError { - newState.apiClient = &promclient.IgnoreErrorAPI{newState.apiClient} - } + if s.Cfg.IgnoreError { + newState.apiClient = &promclient.IgnoreErrorAPI{newState.apiClient} + } - s.state.Store(newState) + s.state.Store(newState) - if !s.loaded { - s.loaded = true - close(s.Ready) + if !s.loaded { + s.loaded = true + close(s.Ready) + } } } }