Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ruler: improve control over marshaling relabel.Config #4364

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/pkg/rulefmt"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/promql"
Expand All @@ -27,6 +26,7 @@ import (

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/ruler/util"
)

// RulesLimits is the one function we need from limits.Overrides, and
Expand All @@ -38,7 +38,7 @@ type RulesLimits interface {
RulerRemoteWriteURL(userID string) string
RulerRemoteWriteTimeout(userID string) time.Duration
RulerRemoteWriteHeaders(userID string) map[string]string
RulerRemoteWriteRelabelConfigs(userID string) []*relabel.Config
RulerRemoteWriteRelabelConfigs(userID string) []*util.RelabelConfig
RulerRemoteWriteQueueCapacity(userID string) int
RulerRemoteWriteQueueMinShards(userID string) int
RulerRemoteWriteQueueMaxShards(userID string) int
Expand Down
32 changes: 31 additions & 1 deletion pkg/ruler/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/storage"
"github.com/weaveworks/common/user"
"gopkg.in/yaml.v2"

"github.com/grafana/loki/pkg/ruler/storage/cleaner"
"github.com/grafana/loki/pkg/ruler/storage/instance"
Expand Down Expand Up @@ -228,12 +230,17 @@ func (r *walRegistry) getTenantRemoteWriteConfig(tenant string, base RemoteWrite
return nil, fmt.Errorf("error parsing given remote-write URL: %w", err)
}

relabelConfigs, err := r.createRelabelConfigs(tenant)
if err != nil {
return nil, fmt.Errorf("failed to parse relabel configs: %w", err)
}

overrides := RemoteWriteConfig{
Client: config.RemoteWriteConfig{
URL: &promConfig.URL{u},
RemoteTimeout: model.Duration(r.overrides.RulerRemoteWriteTimeout(tenant)),
Headers: r.overrides.RulerRemoteWriteHeaders(tenant),
WriteRelabelConfigs: r.overrides.RulerRemoteWriteRelabelConfigs(tenant),
WriteRelabelConfigs: relabelConfigs,
Name: fmt.Sprintf("%s-rw", tenant),
SendExemplars: false,
// TODO(dannyk): configure HTTP client overrides
Expand Down Expand Up @@ -270,6 +277,29 @@ func (r *walRegistry) getTenantRemoteWriteConfig(tenant string, base RemoteWrite
return copy, nil
}

// createRelabelConfigs converts the util.RelabelConfig into relabel.Config to allow for
// more control over json/yaml unmarshaling
func (r *walRegistry) createRelabelConfigs(tenant string) ([]*relabel.Config, error) {
configs := r.overrides.RulerRemoteWriteRelabelConfigs(tenant)

var relabelConfigs []*relabel.Config
for _, config := range configs {
out, err := yaml.Marshal(config)
if err != nil {
return nil, err
}

var rc relabel.Config
if err = yaml.Unmarshal(out, &rc); err != nil {
return nil, err
}

relabelConfigs = append(relabelConfigs, &rc)
}

return relabelConfigs, nil
}

var errNotReady = errors.New("appender not ready")

type notReadyAppender struct{}
Expand Down
54 changes: 50 additions & 4 deletions pkg/ruler/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,23 @@ import (
"github.com/weaveworks/common/user"

"github.com/grafana/loki/pkg/ruler/storage/instance"
"github.com/grafana/loki/pkg/ruler/util"
"github.com/grafana/loki/pkg/validation"
)

const enabledRWTenant = "12345"
const disabledRWTenant = "54321"
const additionalHeadersRWTenant = "55443"
const customRelabelsTenant = "98765"
const badRelabelsTenant = "45677"

const defaultCapacity = 1000

func newFakeLimits() fakeLimits {
return fakeLimits{
limits: map[string]*validation.Limits{
enabledRWTenant: {
RulerRemoteWriteQueueCapacity: 987,
RulerRemoteWriteRelabelConfigs: []*relabel.Config{},
RulerRemoteWriteQueueCapacity: 987,
},
disabledRWTenant: {
RulerRemoteWriteDisabled: true,
Expand All @@ -48,6 +50,27 @@ func newFakeLimits() fakeLimits {
"Additional": "Header",
},
},
customRelabelsTenant: {
RulerRemoteWriteRelabelConfigs: []*util.RelabelConfig{
{
Regex: ".+:.+",
SourceLabels: []string{"__name__"},
Action: "drop",
},
{
Regex: "__cluster__",
Action: "labeldrop",
},
},
},
badRelabelsTenant: {
RulerRemoteWriteRelabelConfigs: []*util.RelabelConfig{
{
SourceLabels: []string{"__cluster__"},
Action: "labeldrop",
},
},
},
},
}
}
Expand Down Expand Up @@ -104,8 +127,6 @@ func TestTenantRemoteWriteConfigWithOverride(t *testing.T) {
assert.Len(t, tenantCfg.RemoteWrite, 1)
// but the tenant has an override for the queue capacity
assert.Equal(t, tenantCfg.RemoteWrite[0].QueueConfig.Capacity, 987)
// it should also override the default label configs (in this case by clearing them)
assert.Len(t, tenantCfg.RemoteWrite[0].WriteRelabelConfigs, 0)
}

func TestTenantRemoteWriteConfigWithoutOverride(t *testing.T) {
Expand Down Expand Up @@ -159,6 +180,31 @@ func TestTenantRemoteWriteHeaderOverride(t *testing.T) {
assert.Equal(t, tenantCfg.RemoteWrite[0].Headers[user.OrgIDHeaderName], enabledRWTenant)
}

func TestRelabelConfigOverrides(t *testing.T) {
walDir, err := createTempWALDir()
require.NoError(t, err)
reg := setupRegistry(t, walDir)
defer os.RemoveAll(walDir)

tenantCfg, err := reg.getTenantConfig(customRelabelsTenant)
require.NoError(t, err)

// it should also override the default label configs
assert.Len(t, tenantCfg.RemoteWrite[0].WriteRelabelConfigs, 2)
}

func TestRelabelConfigOverridesWithErrors(t *testing.T) {
walDir, err := createTempWALDir()
require.NoError(t, err)
reg := setupRegistry(t, walDir)
defer os.RemoveAll(walDir)

_, err = reg.getTenantConfig(badRelabelsTenant)

// ensure that relabel validation is being applied
require.EqualError(t, err, "failed to parse relabel configs: labeldrop action requires only 'regex', and no other fields")
}

func TestWALRegistryCreation(t *testing.T) {
overrides, err := validation.NewOverrides(validation.Limits{}, nil)
require.NoError(t, err)
Expand Down
22 changes: 22 additions & 0 deletions pkg/ruler/util/relabel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package util

// copy and modification of github.com/prometheus/prometheus/pkg/relabel/relabel.go
// reason: the custom types in github.com/prometheus/prometheus/pkg/relabel/relabel.go are difficult to unmarshal
type RelabelConfig struct {
// A list of labels from which values are taken and concatenated
// with the configured separator in order.
SourceLabels []string `yaml:"source_labels,flow,omitempty" json:"source_labels,omitempty"`
// Separator is the string between concatenated values from the source labels.
Separator string `yaml:"separator,omitempty" json:"separator,omitempty"`
// Regex against which the concatenation is matched.
Regex string `yaml:"regex,omitempty" json:"regex,omitempty"`
// Modulus to take of the hash of concatenated values from the source labels.
Modulus uint64 `yaml:"modulus,omitempty" json:"modulus,omitempty"`
// TargetLabel is the label to which the resulting string is written in a replacement.
// Regexp interpolation is allowed for the replace action.
TargetLabel string `yaml:"target_label,omitempty" json:"target_label,omitempty"`
// Replacement is the regex replacement pattern to be used.
Replacement string `yaml:"replacement,omitempty" json:"replacement,omitempty"`
// Action is the action to be performed for the relabeling.
Action string `yaml:"action,omitempty" json:"action,omitempty"`
}
30 changes: 15 additions & 15 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"golang.org/x/time/rate"

"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/ruler/util"
"github.com/grafana/loki/pkg/util/flagext"
)

Expand Down Expand Up @@ -83,19 +83,19 @@ type Limits struct {

// this field is the inversion of the general remote_write.enabled because the zero value of a boolean is false,
// and if it were ruler_remote_write_enabled, it would be impossible to know if the value was explicitly set or default
RulerRemoteWriteDisabled bool `yaml:"ruler_remote_write_disabled" json:"ruler_remote_write_disabled"`
RulerRemoteWriteURL string `yaml:"ruler_remote_write_url" json:"ruler_remote_write_url"`
RulerRemoteWriteTimeout time.Duration `yaml:"ruler_remote_write_timeout" json:"ruler_remote_write_timeout"`
RulerRemoteWriteHeaders map[string]string `yaml:"ruler_remote_write_headers" json:"ruler_remote_write_headers"`
RulerRemoteWriteRelabelConfigs []*relabel.Config `yaml:"ruler_remote_write_relabel_configs" json:"ruler_remote_write_relabel_configs"`
RulerRemoteWriteQueueCapacity int `yaml:"ruler_remote_write_queue_capacity" json:"ruler_remote_write_queue_capacity"`
RulerRemoteWriteQueueMinShards int `yaml:"ruler_remote_write_queue_min_shards" json:"ruler_remote_write_queue_min_shards"`
RulerRemoteWriteQueueMaxShards int `yaml:"ruler_remote_write_queue_max_shards" json:"ruler_remote_write_queue_max_shards"`
RulerRemoteWriteQueueMaxSamplesPerSend int `yaml:"ruler_remote_write_queue_max_samples_per_send" json:"ruler_remote_write_queue_max_samples_per_send"`
RulerRemoteWriteQueueBatchSendDeadline time.Duration `yaml:"ruler_remote_write_queue_batch_send_deadline" json:"ruler_remote_write_queue_batch_send_deadline"`
RulerRemoteWriteQueueMinBackoff time.Duration `yaml:"ruler_remote_write_queue_min_backoff" json:"ruler_remote_write_queue_min_backoff"`
RulerRemoteWriteQueueMaxBackoff time.Duration `yaml:"ruler_remote_write_queue_max_backoff" json:"ruler_remote_write_queue_max_backoff"`
RulerRemoteWriteQueueRetryOnRateLimit bool `yaml:"ruler_remote_write_queue_retry_on_ratelimit" json:"ruler_remote_write_queue_retry_on_ratelimit"`
RulerRemoteWriteDisabled bool `yaml:"ruler_remote_write_disabled" json:"ruler_remote_write_disabled"`
RulerRemoteWriteURL string `yaml:"ruler_remote_write_url" json:"ruler_remote_write_url"`
RulerRemoteWriteTimeout time.Duration `yaml:"ruler_remote_write_timeout" json:"ruler_remote_write_timeout"`
RulerRemoteWriteHeaders map[string]string `yaml:"ruler_remote_write_headers" json:"ruler_remote_write_headers"`
RulerRemoteWriteRelabelConfigs []*util.RelabelConfig `yaml:"ruler_remote_write_relabel_configs" json:"ruler_remote_write_relabel_configs"`
RulerRemoteWriteQueueCapacity int `yaml:"ruler_remote_write_queue_capacity" json:"ruler_remote_write_queue_capacity"`
RulerRemoteWriteQueueMinShards int `yaml:"ruler_remote_write_queue_min_shards" json:"ruler_remote_write_queue_min_shards"`
RulerRemoteWriteQueueMaxShards int `yaml:"ruler_remote_write_queue_max_shards" json:"ruler_remote_write_queue_max_shards"`
RulerRemoteWriteQueueMaxSamplesPerSend int `yaml:"ruler_remote_write_queue_max_samples_per_send" json:"ruler_remote_write_queue_max_samples_per_send"`
RulerRemoteWriteQueueBatchSendDeadline time.Duration `yaml:"ruler_remote_write_queue_batch_send_deadline" json:"ruler_remote_write_queue_batch_send_deadline"`
RulerRemoteWriteQueueMinBackoff time.Duration `yaml:"ruler_remote_write_queue_min_backoff" json:"ruler_remote_write_queue_min_backoff"`
RulerRemoteWriteQueueMaxBackoff time.Duration `yaml:"ruler_remote_write_queue_max_backoff" json:"ruler_remote_write_queue_max_backoff"`
RulerRemoteWriteQueueRetryOnRateLimit bool `yaml:"ruler_remote_write_queue_retry_on_ratelimit" json:"ruler_remote_write_queue_retry_on_ratelimit"`

// Global and per tenant retention
RetentionPeriod model.Duration `yaml:"retention_period" json:"retention_period"`
Expand Down Expand Up @@ -446,7 +446,7 @@ func (o *Overrides) RulerRemoteWriteHeaders(userID string) map[string]string {
}

// RulerRemoteWriteRelabelConfigs returns the write relabel configs to use in a remote-write for a given user.
func (o *Overrides) RulerRemoteWriteRelabelConfigs(userID string) []*relabel.Config {
func (o *Overrides) RulerRemoteWriteRelabelConfigs(userID string) []*util.RelabelConfig {
return o.getOverridesForUser(userID).RulerRemoteWriteRelabelConfigs
}

Expand Down