Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Configurable list of json fields to mine patterns #14528

Merged
merged 5 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ func (t *Loki) initPatternIngester() (_ services.Service, err error) {
t.Cfg.Pattern.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort
t.PatternIngester, err = pattern.New(
t.Cfg.Pattern,
t.Overrides,
t.PatternRingClient,
t.Cfg.MetricsNamespace,
prometheus.DefaultRegisterer,
Expand Down
9 changes: 7 additions & 2 deletions pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ type Config struct {
MaxAllowedLineLength int
}

type Limits interface {
PatternIngesterTokenizableJSONFields(userID string) []string
}

func createLogClusterCache(maxSize int, onEvict func(int, *LogCluster)) *LogClusterCache {
if maxSize == 0 {
maxSize = math.MaxInt
Expand Down Expand Up @@ -135,7 +139,7 @@ func DefaultConfig() *Config {
}
}

func New(config *Config, format string, metrics *Metrics) *Drain {
func New(tenantID string, config *Config, limits Limits, format string, metrics *Metrics) *Drain {
if config.LogClusterDepth < 3 {
panic("depth argument must be at least 3")
}
Expand All @@ -153,7 +157,8 @@ func New(config *Config, format string, metrics *Metrics) *Drain {
var tokenizer LineTokenizer
switch format {
case FormatJSON:
tokenizer = newJSONTokenizer(config.ParamString, config.MaxAllowedLineLength)
fieldsToTokenize := limits.PatternIngesterTokenizableJSONFields(tenantID)
tokenizer = newJSONTokenizer(config.ParamString, config.MaxAllowedLineLength, fieldsToTokenize)
case FormatLogfmt:
tokenizer = newLogfmtTokenizer(config.ParamString, config.MaxAllowedLineLength)
default:
Expand Down
2 changes: 1 addition & 1 deletion pkg/pattern/drain/drain_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func BenchmarkDrain_TrainExtractsPatterns(b *testing.B) {
line := scanner.Text()
lines = append(lines, line)
}
drain := New(DefaultConfig(), DetectLogFormat(lines[0]), nil)
drain := New("", DefaultConfig(), &fakeLimits{}, DetectLogFormat(lines[0]), nil)

b.ReportAllocs()
b.ResetTimer()
Expand Down
45 changes: 28 additions & 17 deletions pkg/pattern/drain/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,23 @@ import (
"github.com/grafana/loki/v3/pkg/logql/log/pattern"
)

const (
testTenant = "fake"
)

func TestDrain_TrainExtractsPatterns(t *testing.T) {
t.Parallel()

// Set this so the test will print the patterns found, in string slice format for easy copy-paste
outputPatternsForTestUpdate := false

tests := []struct {
drain *Drain
inputFile string
patterns []string
format string
}{
{
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputFile: `testdata/agent-logfmt.txt`,
format: FormatLogfmt,
patterns: []string{
Expand Down Expand Up @@ -56,7 +59,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputFile: `testdata/ingester-logfmt.txt`,
format: FormatLogfmt,
patterns: []string{
Expand All @@ -66,7 +69,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputFile: `testdata/drone-json.txt`,
format: FormatJSON,
patterns: []string{
Expand All @@ -79,7 +82,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputFile: "testdata/distributor-logfmt.txt",
format: FormatLogfmt,
patterns: []string{
Expand All @@ -91,7 +94,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputFile: "testdata/journald.txt",
format: FormatUnknown,
patterns: []string{
Expand Down Expand Up @@ -211,7 +214,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputFile: "testdata/kafka.txt",
format: FormatUnknown,
patterns: []string{
Expand All @@ -232,7 +235,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputFile: "testdata/kubernetes.txt",
format: FormatUnknown,
patterns: []string{
Expand Down Expand Up @@ -273,15 +276,15 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputFile: "testdata/vault.txt",
format: FormatUnknown,
patterns: []string{
`<_> [INFO] expiration: revoked lease: lease_id=<_>`,
},
},
{
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputFile: "testdata/calico.txt",
format: FormatUnknown,
patterns: []string{
Expand Down Expand Up @@ -374,7 +377,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputFile: "testdata/grafana-ruler.txt",
format: FormatLogfmt,
patterns: []string{
Expand Down Expand Up @@ -470,7 +473,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
}{
{
name: "should extract patterns that all lines match",
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputLines: []string{
"test 1 test test",
"test 2 test test",
Expand All @@ -480,7 +483,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
},
{
name: "should extract patterns that match if line ends with newlines",
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputLines: []string{
`test 1 test test
`,
Expand All @@ -494,7 +497,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
},
{
name: "should extract patterns that match if line ends with empty space",
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputLines: []string{
`test 1 test test `,
`test 2 test test `,
Expand All @@ -504,7 +507,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
},
{
name: "should extract patterns that match if line starts with empty space",
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputLines: []string{
` test 1 test test`,
` test 2 test test`,
Expand All @@ -514,7 +517,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
},
{
name: "Scheduler patterns are matchable",
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputLines: []string{
`ts=2024-05-30T12:50:36.648377186Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095`,
`ts=2024-05-30T12:50:36.350575929Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095`,
Expand Down Expand Up @@ -611,7 +614,7 @@ func TestDrain_PruneTreeClearsOldBranches(t *testing.T) {
}{
{
name: "should prune old branches",
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputLines: []string{
"test test test A",
"test test test B",
Expand Down Expand Up @@ -665,3 +668,11 @@ func countNodes(node *Node) int {
}
return total
}

type fakeLimits struct {
Limits
}

func (f *fakeLimits) PatternIngesterTokenizableJSONFields(_ string) []string {
return []string{"log", "message", "msg", "msg_", "_msg", "content"}
}
16 changes: 11 additions & 5 deletions pkg/pattern/drain/line_tokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,17 +263,23 @@ func (t *logfmtTokenizer) Clone(tokens []string, _ interface{}) ([]string, inter

type jsonTokenizer struct {
*punctuationTokenizer
varReplace string
maxLineLength int
varReplace string
maxLineLength int
fieldsToTokenize []string
}

func newJSONTokenizer(varReplace string, maxLineLength int) *jsonTokenizer {
return &jsonTokenizer{newPunctuationTokenizer(maxLineLength), varReplace, maxLineLength}
func newJSONTokenizer(varReplace string, maxLineLength int, fieldsToTokenize []string) *jsonTokenizer {
return &jsonTokenizer{
punctuationTokenizer: newPunctuationTokenizer(maxLineLength),
varReplace: varReplace,
maxLineLength: maxLineLength,
fieldsToTokenize: fieldsToTokenize,
}
}

func (t *jsonTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) {
var found []byte
for _, key := range []string{"log", "message", "msg", "msg_", "_msg", "content"} {
for _, key := range t.fieldsToTokenize {
msg, ty, _, err := jsonparser.Get(unsafeBytes(line), key)
if err == nil && ty == jsonparser.String {
found = msg
Expand Down
3 changes: 2 additions & 1 deletion pkg/pattern/drain/line_tokenizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ func TestJsonTokenizer(t *testing.T) {
},
}

tokenizer := newJSONTokenizer(param, DefaultConfig().MaxAllowedLineLength)
fieldsToTokenize := []string{"log", "message", "msg", "msg_", "_msg", "content"}
tokenizer := newJSONTokenizer(param, DefaultConfig().MaxAllowedLineLength, fieldsToTokenize)

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pattern/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestSweepInstance(t *testing.T) {
ring: fakeRing,
}

ing, err := New(defaultIngesterTestConfig(t), ringClient, "foo", nil, log.NewNopLogger())
ing, err := New(defaultIngesterTestConfig(t), &fakeLimits{}, ringClient, "foo", nil, log.NewNopLogger())
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
err = services.StartAndAwaitRunning(context.Background(), ing)
Expand Down
8 changes: 8 additions & 0 deletions pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ func (cfg *Config) Validate() error {
return cfg.LifecyclerConfig.Validate()
}

type Limits interface {
drain.Limits
}

type Ingester struct {
services.Service
lifecycler *ring.Lifecycler
Expand All @@ -156,6 +160,7 @@ type Ingester struct {
lifecyclerWatcher *services.FailureWatcher

cfg Config
limits Limits
registerer prometheus.Registerer
logger log.Logger

Expand All @@ -175,6 +180,7 @@ type Ingester struct {

func New(
cfg Config,
limits Limits,
ringClient RingClient,
metricsNamespace string,
registerer prometheus.Registerer,
Expand All @@ -189,6 +195,7 @@ func New(

i := &Ingester{
cfg: cfg,
limits: limits,
ringClient: ringClient,
logger: log.With(logger, "component", "pattern-ingester"),
registerer: registerer,
Expand Down Expand Up @@ -416,6 +423,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
i.logger,
i.metrics,
i.drainCfg,
i.limits,
i.ringClient,
i.lifecycler.ID,
writer,
Expand Down
10 changes: 10 additions & 0 deletions pkg/pattern/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestInstancePushQuery(t *testing.T) {
log.NewNopLogger(),
newIngesterMetrics(nil, "test"),
drain.DefaultConfig(),
&fakeLimits{},
ringClient,
ingesterID,
mockWriter,
Expand Down Expand Up @@ -141,6 +142,7 @@ func TestInstancePushAggregateMetrics(t *testing.T) {
log.NewNopLogger(),
newIngesterMetrics(nil, "test"),
drain.DefaultConfig(),
&fakeLimits{},
ringClient,
ingesterID,
mockWriter,
Expand Down Expand Up @@ -336,3 +338,11 @@ func (m *mockEntryWriter) WriteEntry(ts time.Time, entry string, lbls labels.Lab
func (m *mockEntryWriter) Stop() {
_ = m.Called()
}

type fakeLimits struct {
Limits
}

func (f *fakeLimits) PatternIngesterTokenizableJSONFields(_ string) []string {
return []string{"log", "message", "msg", "msg_", "_msg", "content"}
}
25 changes: 14 additions & 11 deletions pkg/pattern/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,17 @@ const indexShards = 32

// instance is a tenant instance of the pattern ingester.
type instance struct {
instanceID string
buf []byte // buffer used to compute fps.
mapper *ingester.FpMapper // using of mapper no longer needs mutex because reading from streams is lock-free
streams *streamsMap
index *index.BitPrefixInvertedIndex
logger log.Logger
metrics *ingesterMetrics
drainCfg *drain.Config
ringClient RingClient
ingesterID string
instanceID string
buf []byte // buffer used to compute fps.
mapper *ingester.FpMapper // using of mapper no longer needs mutex because reading from streams is lock-free
streams *streamsMap
index *index.BitPrefixInvertedIndex
logger log.Logger
metrics *ingesterMetrics
drainCfg *drain.Config
drainLimits drain.Limits
ringClient RingClient
ingesterID string

aggMetricsLock sync.Mutex
aggMetricsByStreamAndLevel map[string]map[string]*aggregatedMetrics
Expand All @@ -59,6 +60,7 @@ func newInstance(
logger log.Logger,
metrics *ingesterMetrics,
drainCfg *drain.Config,
drainLimits drain.Limits,
ringClient RingClient,
ingesterID string,
writer aggregation.EntryWriter,
Expand All @@ -75,6 +77,7 @@ func newInstance(
index: index,
metrics: metrics,
drainCfg: drainCfg,
drainLimits: drainLimits,
ringClient: ringClient,
ingesterID: ingesterID,
aggMetricsByStreamAndLevel: make(map[string]map[string]*aggregatedMetrics),
Expand Down Expand Up @@ -220,7 +223,7 @@ func (i *instance) createStream(_ context.Context, pushReqStream logproto.Stream
fp := i.getHashForLabels(labels)
sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp)
firstEntryLine := pushReqStream.Entries[0].Line
s, err := newStream(fp, sortedLabels, i.metrics, i.logger, drain.DetectLogFormat(firstEntryLine), i.instanceID, i.drainCfg)
s, err := newStream(fp, sortedLabels, i.metrics, i.logger, drain.DetectLogFormat(firstEntryLine), i.instanceID, i.drainCfg, i.drainLimits)
if err != nil {
return nil, fmt.Errorf("failed to create stream: %w", err)
}
Expand Down
Loading
Loading