-
Notifications
You must be signed in to change notification settings - Fork 3.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Loki: Implement initial phase of
limited_log_push_errors
(#9556)
**What this PR does / why we need it**: Adds an initial implementation of `limited_log_push_errors`. This initial implementation encompass: - Runtime per-tenant configuration - Simple per-tenant rate-limiting based on error message size Notable features that will be added in future phases: - Instead of a single error with the final string, give to the manager the list of all entries - Once it supports per-entry error, make the rate-limiting separated per-reason - Hash the entry error and avoid repeating errors by caching seen errors in memory
- Loading branch information
1 parent
c9a3ff5
commit 7d67b63
Showing
8 changed files
with
262 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
package writefailures | ||
|
||
import ( | ||
"flag" | ||
|
||
"github.com/grafana/loki/pkg/util/flagext" | ||
) | ||
|
||
type Cfg struct { | ||
LogRate flagext.ByteSize `yaml:"rate" category:"experimental"` | ||
} | ||
|
||
// RegisterFlags registers distributor-related flags. | ||
func (cfg *Cfg) RegisterFlagsWithPrefix(prefix string, fs *flag.FlagSet) { | ||
_ = cfg.LogRate.Set("1KB") | ||
fs.Var(&cfg.LogRate, prefix+".rate", "Experimental and subject to change. Log volume allowed (per second). Default: 1KB.") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
package writefailures | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/go-kit/log/level" | ||
"github.com/grafana/dskit/limiter" | ||
|
||
"github.com/grafana/loki/pkg/runtime" | ||
) | ||
|
||
type Manager struct { | ||
limiter *limiter.RateLimiter | ||
logger log.Logger | ||
tenantCfgs *runtime.TenantConfigs | ||
} | ||
|
||
func NewManager(logger log.Logger, cfg Cfg, tenants *runtime.TenantConfigs) *Manager { | ||
logger = log.With(logger, "path", "write", "insight", "true") | ||
|
||
strat := newStrategy(cfg.LogRate.Val(), float64(cfg.LogRate.Val())) | ||
|
||
return &Manager{ | ||
limiter: limiter.NewRateLimiter(strat, time.Minute), | ||
logger: logger, | ||
tenantCfgs: tenants, | ||
} | ||
} | ||
|
||
func (m *Manager) Log(tenantID string, err error) { | ||
if !m.tenantCfgs.LimitedLogPushErrors(tenantID) { | ||
return | ||
} | ||
|
||
errMsg := err.Error() | ||
if m.limiter.AllowN(time.Now(), tenantID, len(errMsg)) { | ||
level.Error(m.logger).Log("msg", "write operation failed", "err", errMsg) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,162 @@ | ||
package writefailures | ||
|
||
import ( | ||
"bytes" | ||
"fmt" | ||
"strings" | ||
"testing" | ||
"time" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/grafana/loki/pkg/runtime" | ||
"github.com/grafana/loki/pkg/util/flagext" | ||
) | ||
|
||
func TestWriteFailuresLogging(t *testing.T) { | ||
t.Run("it only logs for the configured tenants and is disabled by default", func(t *testing.T) { | ||
buf := bytes.NewBuffer(nil) | ||
logger := log.NewLogfmtLogger(buf) | ||
|
||
f := func(tenantID string) *runtime.Config { | ||
if tenantID == "good-tenant" { | ||
return &runtime.Config{ | ||
LimitedLogPushErrors: true, | ||
} | ||
} | ||
if tenantID == "bad-tenant" { | ||
return &runtime.Config{ | ||
LimitedLogPushErrors: false, | ||
} | ||
} | ||
return &runtime.Config{} | ||
} | ||
|
||
runtimeCfg, err := runtime.NewTenantConfigs(f) | ||
require.NoError(t, err) | ||
|
||
manager := NewManager(logger, Cfg{LogRate: flagext.ByteSize(1000)}, runtimeCfg) | ||
|
||
manager.Log("bad-tenant", fmt.Errorf("bad-tenant contains invalid entry")) | ||
manager.Log("good-tenant", fmt.Errorf("good-tenant contains invalid entry")) | ||
manager.Log("unknown-tenant", fmt.Errorf("unknown-tenant contains invalid entry")) | ||
|
||
content := buf.String() | ||
require.NotEmpty(t, content) | ||
require.Contains(t, content, "good-tenant") | ||
require.NotContains(t, content, "bad-tenant") | ||
require.NotContains(t, content, "unknown-tenant") | ||
}) | ||
} | ||
|
||
func TestWriteFailuresRateLimiting(t *testing.T) { | ||
buf := bytes.NewBuffer(nil) | ||
logger := log.NewLogfmtLogger(buf) | ||
|
||
f := func(tenantID string) *runtime.Config { | ||
return &runtime.Config{ | ||
LimitedLogPushErrors: true, | ||
} | ||
} | ||
runtimeCfg, err := runtime.NewTenantConfigs(f) | ||
require.NoError(t, err) | ||
|
||
t.Run("with zero rate limiting", func(t *testing.T) { | ||
manager := NewManager(logger, Cfg{LogRate: flagext.ByteSize(0)}, runtimeCfg) | ||
|
||
manager.Log("known-tenant", fmt.Errorf("known-tenant entry error")) | ||
|
||
content := buf.String() | ||
require.Empty(t, content) | ||
}) | ||
|
||
t.Run("bytes exceeded on single message", func(t *testing.T) { | ||
manager := NewManager(logger, Cfg{LogRate: flagext.ByteSize(1000)}, runtimeCfg) | ||
|
||
errorStr := strings.Builder{} | ||
for i := 0; i < 1001; i++ { | ||
errorStr.WriteRune('z') | ||
} | ||
|
||
manager.Log("known-tenant", fmt.Errorf(errorStr.String())) | ||
|
||
content := buf.String() | ||
require.Empty(t, content) | ||
}) | ||
|
||
t.Run("valid bytes", func(t *testing.T) { | ||
manager := NewManager(logger, Cfg{LogRate: flagext.ByteSize(1002)}, runtimeCfg) | ||
|
||
errorStr := strings.Builder{} | ||
for i := 0; i < 1001; i++ { | ||
errorStr.WriteRune('z') | ||
} | ||
|
||
manager.Log("known-tenant", fmt.Errorf(errorStr.String())) | ||
|
||
content := buf.String() | ||
require.NotEmpty(t, content) | ||
require.Contains(t, content, errorStr.String()) | ||
}) | ||
|
||
t.Run("limit is reset after a second", func(t *testing.T) { | ||
manager := NewManager(logger, Cfg{LogRate: flagext.ByteSize(1000)}, runtimeCfg) | ||
|
||
errorStr1 := strings.Builder{} | ||
errorStr2 := strings.Builder{} | ||
errorStr3 := strings.Builder{} | ||
for i := 0; i < 999; i++ { | ||
errorStr1.WriteRune('z') | ||
errorStr2.WriteRune('w') | ||
errorStr2.WriteRune('y') | ||
} | ||
|
||
manager.Log("known-tenant", fmt.Errorf(errorStr1.String())) | ||
manager.Log("known-tenant", fmt.Errorf(errorStr2.String())) // more than 1KB/s | ||
time.Sleep(time.Second) | ||
manager.Log("known-tenant", fmt.Errorf(errorStr3.String())) | ||
|
||
content := buf.String() | ||
require.NotEmpty(t, content) | ||
require.Contains(t, content, errorStr1.String()) | ||
require.NotContains(t, content, errorStr2.String()) | ||
require.Contains(t, content, errorStr3.String()) | ||
}) | ||
|
||
t.Run("limit is per-tenant", func(t *testing.T) { | ||
runtimeCfg, err := runtime.NewTenantConfigs(f) | ||
require.NoError(t, err) | ||
manager := NewManager(logger, Cfg{LogRate: flagext.ByteSize(1000)}, runtimeCfg) | ||
|
||
errorStr1 := strings.Builder{} | ||
errorStr2 := strings.Builder{} | ||
errorStr3 := strings.Builder{} | ||
for i := 0; i < 998; i++ { | ||
errorStr1.WriteRune('z') | ||
errorStr2.WriteRune('w') | ||
errorStr3.WriteRune('y') | ||
} | ||
|
||
manager.Log("tenant1", fmt.Errorf("1%s", errorStr1.String())) | ||
manager.Log("tenant2", fmt.Errorf("2%s", errorStr1.String())) | ||
|
||
manager.Log("tenant1", fmt.Errorf("1%s", errorStr2.String())) // limit exceeded for tenant1, Str2 shouldn't be present. | ||
manager.Log("tenant3", fmt.Errorf("3%s", errorStr1.String())) // all fine with tenant3. | ||
|
||
time.Sleep(time.Second) | ||
manager.Log("tenant1", fmt.Errorf("1%s", errorStr3.String())) // tenant1 is fine again. | ||
manager.Log("tenant3", fmt.Errorf("3%s", errorStr1.String())) // all fine with tenant3. | ||
|
||
content := buf.String() | ||
require.NotEmpty(t, content) | ||
require.Contains(t, content, "1z") | ||
require.Contains(t, content, "2z") | ||
|
||
require.NotContains(t, content, "1w") // Str2 | ||
require.Contains(t, content, "3z") | ||
|
||
require.Contains(t, content, "1y") | ||
require.Contains(t, content, "3z") | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
package writefailures | ||
|
||
type strategy struct { | ||
burst int | ||
rate float64 | ||
} | ||
|
||
func newStrategy(burst int, rate float64) *strategy { | ||
return &strategy{ | ||
burst: burst, | ||
rate: rate, | ||
} | ||
} | ||
|
||
func (s *strategy) Burst(tenantID string) int { | ||
return s.burst | ||
} | ||
|
||
func (s *strategy) Limit(tenantID string) float64 { | ||
return s.rate | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters