diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index c825b70cb7cfd..acd730b088508 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -482,6 +482,13 @@ rate_store: # If enabled, detailed logs and spans will be emitted. # CLI flag: -distributor.rate-store.debug [debug: | default = false] + +# Experimental. Customize the logging of write failures. +write_failures_logging: + # Experimental and subject to change. Log volume allowed (per second). + # Default: 1KB. + # CLI flag: -distributor.write-failures-logging.rate + [rate: | default = 1KB] ``` ### querier diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 28ad3184f362a..4382f129fe770 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -34,6 +34,7 @@ import ( "github.com/grafana/loki/pkg/analytics" "github.com/grafana/loki/pkg/distributor/clientpool" "github.com/grafana/loki/pkg/distributor/shardstreams" + "github.com/grafana/loki/pkg/distributor/writefailures" "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/syntax" @@ -63,13 +64,18 @@ type Config struct { // For testing. factory ring_client.PoolFactory `yaml:"-"` + // RateStore customizes the rate storing used by stream sharding. RateStore RateStoreConfig `yaml:"rate_store"` + + // WriteFailuresLoggingCfg customizes write failures logging behavior. + WriteFailuresLogging writefailures.Cfg `yaml:"write_failures_logging" doc:"description=Experimental. Customize the logging of write failures."` } // RegisterFlags registers distributor-related flags. func (cfg *Config) RegisterFlags(fs *flag.FlagSet) { cfg.DistributorRing.RegisterFlags(fs) cfg.RateStore.RegisterFlagsWithPrefix("distributor.rate-store", fs) + cfg.WriteFailuresLogging.RegisterFlagsWithPrefix("distributor.write-failures-logging", fs) } // RateStore manages the ingestion rate of streams, populated by data fetched from ingesters. @@ -105,6 +111,10 @@ type Distributor struct { // Per-user rate limiter. ingestionRateLimiter *limiter.RateLimiter labelCache *lru.Cache + + // Push failures rate limiter. + writeFailuresManager *writefailures.Manager + // metrics ingesterAppends *prometheus.CounterVec ingesterAppendFailures *prometheus.CounterVec @@ -184,6 +194,7 @@ func New( Name: "stream_sharding_count", Help: "Total number of times the distributor has sharded streams", }), + writeFailuresManager: writefailures.NewManager(util_log.Logger, cfg.WriteFailuresLogging, configs), } if overrides.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy { diff --git a/pkg/distributor/http.go b/pkg/distributor/http.go index 4ed669eb4b688..d4dec900a929c 100644 --- a/pkg/distributor/http.go +++ b/pkg/distributor/http.go @@ -34,6 +34,8 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { "err", err, ) } + d.writeFailuresManager.Log(tenantID, err) + http.Error(w, err.Error(), http.StatusBadRequest) return } @@ -60,6 +62,8 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { return } + d.writeFailuresManager.Log(tenantID, err) + resp, ok := httpgrpc.HTTPResponseFromError(err) if ok { body := string(resp.Body) diff --git a/pkg/distributor/writefailures/cfg.go b/pkg/distributor/writefailures/cfg.go new file mode 100644 index 0000000000000..7a5ce02e0cb9c --- /dev/null +++ b/pkg/distributor/writefailures/cfg.go @@ -0,0 +1,17 @@ +package writefailures + +import ( + "flag" + + "github.com/grafana/loki/pkg/util/flagext" +) + +type Cfg struct { + LogRate flagext.ByteSize `yaml:"rate" category:"experimental"` +} + +// RegisterFlags registers distributor-related flags. +func (cfg *Cfg) RegisterFlagsWithPrefix(prefix string, fs *flag.FlagSet) { + _ = cfg.LogRate.Set("1KB") + fs.Var(&cfg.LogRate, prefix+".rate", "Experimental and subject to change. Log volume allowed (per second). Default: 1KB.") +} diff --git a/pkg/distributor/writefailures/manager.go b/pkg/distributor/writefailures/manager.go new file mode 100644 index 0000000000000..a7da5d392101e --- /dev/null +++ b/pkg/distributor/writefailures/manager.go @@ -0,0 +1,40 @@ +package writefailures + +import ( + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/limiter" + + "github.com/grafana/loki/pkg/runtime" +) + +type Manager struct { + limiter *limiter.RateLimiter + logger log.Logger + tenantCfgs *runtime.TenantConfigs +} + +func NewManager(logger log.Logger, cfg Cfg, tenants *runtime.TenantConfigs) *Manager { + logger = log.With(logger, "path", "write", "insight", "true") + + strat := newStrategy(cfg.LogRate.Val(), float64(cfg.LogRate.Val())) + + return &Manager{ + limiter: limiter.NewRateLimiter(strat, time.Minute), + logger: logger, + tenantCfgs: tenants, + } +} + +func (m *Manager) Log(tenantID string, err error) { + if !m.tenantCfgs.LimitedLogPushErrors(tenantID) { + return + } + + errMsg := err.Error() + if m.limiter.AllowN(time.Now(), tenantID, len(errMsg)) { + level.Error(m.logger).Log("msg", "write operation failed", "err", errMsg) + } +} diff --git a/pkg/distributor/writefailures/manager_test.go b/pkg/distributor/writefailures/manager_test.go new file mode 100644 index 0000000000000..1a70d4df01e2c --- /dev/null +++ b/pkg/distributor/writefailures/manager_test.go @@ -0,0 +1,162 @@ +package writefailures + +import ( + "bytes" + "fmt" + "strings" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/runtime" + "github.com/grafana/loki/pkg/util/flagext" +) + +func TestWriteFailuresLogging(t *testing.T) { + t.Run("it only logs for the configured tenants and is disabled by default", func(t *testing.T) { + buf := bytes.NewBuffer(nil) + logger := log.NewLogfmtLogger(buf) + + f := func(tenantID string) *runtime.Config { + if tenantID == "good-tenant" { + return &runtime.Config{ + LimitedLogPushErrors: true, + } + } + if tenantID == "bad-tenant" { + return &runtime.Config{ + LimitedLogPushErrors: false, + } + } + return &runtime.Config{} + } + + runtimeCfg, err := runtime.NewTenantConfigs(f) + require.NoError(t, err) + + manager := NewManager(logger, Cfg{LogRate: flagext.ByteSize(1000)}, runtimeCfg) + + manager.Log("bad-tenant", fmt.Errorf("bad-tenant contains invalid entry")) + manager.Log("good-tenant", fmt.Errorf("good-tenant contains invalid entry")) + manager.Log("unknown-tenant", fmt.Errorf("unknown-tenant contains invalid entry")) + + content := buf.String() + require.NotEmpty(t, content) + require.Contains(t, content, "good-tenant") + require.NotContains(t, content, "bad-tenant") + require.NotContains(t, content, "unknown-tenant") + }) +} + +func TestWriteFailuresRateLimiting(t *testing.T) { + buf := bytes.NewBuffer(nil) + logger := log.NewLogfmtLogger(buf) + + f := func(tenantID string) *runtime.Config { + return &runtime.Config{ + LimitedLogPushErrors: true, + } + } + runtimeCfg, err := runtime.NewTenantConfigs(f) + require.NoError(t, err) + + t.Run("with zero rate limiting", func(t *testing.T) { + manager := NewManager(logger, Cfg{LogRate: flagext.ByteSize(0)}, runtimeCfg) + + manager.Log("known-tenant", fmt.Errorf("known-tenant entry error")) + + content := buf.String() + require.Empty(t, content) + }) + + t.Run("bytes exceeded on single message", func(t *testing.T) { + manager := NewManager(logger, Cfg{LogRate: flagext.ByteSize(1000)}, runtimeCfg) + + errorStr := strings.Builder{} + for i := 0; i < 1001; i++ { + errorStr.WriteRune('z') + } + + manager.Log("known-tenant", fmt.Errorf(errorStr.String())) + + content := buf.String() + require.Empty(t, content) + }) + + t.Run("valid bytes", func(t *testing.T) { + manager := NewManager(logger, Cfg{LogRate: flagext.ByteSize(1002)}, runtimeCfg) + + errorStr := strings.Builder{} + for i := 0; i < 1001; i++ { + errorStr.WriteRune('z') + } + + manager.Log("known-tenant", fmt.Errorf(errorStr.String())) + + content := buf.String() + require.NotEmpty(t, content) + require.Contains(t, content, errorStr.String()) + }) + + t.Run("limit is reset after a second", func(t *testing.T) { + manager := NewManager(logger, Cfg{LogRate: flagext.ByteSize(1000)}, runtimeCfg) + + errorStr1 := strings.Builder{} + errorStr2 := strings.Builder{} + errorStr3 := strings.Builder{} + for i := 0; i < 999; i++ { + errorStr1.WriteRune('z') + errorStr2.WriteRune('w') + errorStr2.WriteRune('y') + } + + manager.Log("known-tenant", fmt.Errorf(errorStr1.String())) + manager.Log("known-tenant", fmt.Errorf(errorStr2.String())) // more than 1KB/s + time.Sleep(time.Second) + manager.Log("known-tenant", fmt.Errorf(errorStr3.String())) + + content := buf.String() + require.NotEmpty(t, content) + require.Contains(t, content, errorStr1.String()) + require.NotContains(t, content, errorStr2.String()) + require.Contains(t, content, errorStr3.String()) + }) + + t.Run("limit is per-tenant", func(t *testing.T) { + runtimeCfg, err := runtime.NewTenantConfigs(f) + require.NoError(t, err) + manager := NewManager(logger, Cfg{LogRate: flagext.ByteSize(1000)}, runtimeCfg) + + errorStr1 := strings.Builder{} + errorStr2 := strings.Builder{} + errorStr3 := strings.Builder{} + for i := 0; i < 998; i++ { + errorStr1.WriteRune('z') + errorStr2.WriteRune('w') + errorStr3.WriteRune('y') + } + + manager.Log("tenant1", fmt.Errorf("1%s", errorStr1.String())) + manager.Log("tenant2", fmt.Errorf("2%s", errorStr1.String())) + + manager.Log("tenant1", fmt.Errorf("1%s", errorStr2.String())) // limit exceeded for tenant1, Str2 shouldn't be present. + manager.Log("tenant3", fmt.Errorf("3%s", errorStr1.String())) // all fine with tenant3. + + time.Sleep(time.Second) + manager.Log("tenant1", fmt.Errorf("1%s", errorStr3.String())) // tenant1 is fine again. + manager.Log("tenant3", fmt.Errorf("3%s", errorStr1.String())) // all fine with tenant3. + + content := buf.String() + require.NotEmpty(t, content) + require.Contains(t, content, "1z") + require.Contains(t, content, "2z") + + require.NotContains(t, content, "1w") // Str2 + require.Contains(t, content, "3z") + + require.Contains(t, content, "1y") + require.Contains(t, content, "3z") + }) +} diff --git a/pkg/distributor/writefailures/strategy.go b/pkg/distributor/writefailures/strategy.go new file mode 100644 index 0000000000000..09e382497f7e3 --- /dev/null +++ b/pkg/distributor/writefailures/strategy.go @@ -0,0 +1,21 @@ +package writefailures + +type strategy struct { + burst int + rate float64 +} + +func newStrategy(burst int, rate float64) *strategy { + return &strategy{ + burst: burst, + rate: rate, + } +} + +func (s *strategy) Burst(tenantID string) int { + return s.burst +} + +func (s *strategy) Limit(tenantID string) float64 { + return s.rate +} diff --git a/pkg/runtime/config.go b/pkg/runtime/config.go index 86926cc714a16..9795277fbf137 100644 --- a/pkg/runtime/config.go +++ b/pkg/runtime/config.go @@ -6,7 +6,6 @@ type Config struct { LogPushRequestStreams bool `yaml:"log_push_request_streams"` // LimitedLogPushErrors is to be implemented and will allow logging push failures at a controlled pace. - // TODO(dylanguedes): implement it. LimitedLogPushErrors bool `yaml:"limited_log_push_errors"` }