Skip to content

Commit

Permalink
feat: Configurable list of json fields to mine patterns (#14528)
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored Oct 21, 2024
1 parent f568dda commit 7050897
Show file tree
Hide file tree
Showing 15 changed files with 193 additions and 57 deletions.
1 change: 1 addition & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ func (t *Loki) initPatternIngester() (_ services.Service, err error) {
t.Cfg.Pattern.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort
t.PatternIngester, err = pattern.New(
t.Cfg.Pattern,
t.Overrides,
t.PatternRingClient,
t.Cfg.MetricsNamespace,
prometheus.DefaultRegisterer,
Expand Down
9 changes: 7 additions & 2 deletions pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ type Config struct {
MaxAllowedLineLength int
}

type Limits interface {
PatternIngesterTokenizableJSONFields(userID string) []string
}

func createLogClusterCache(maxSize int, onEvict func(int, *LogCluster)) *LogClusterCache {
if maxSize == 0 {
maxSize = math.MaxInt
Expand Down Expand Up @@ -135,7 +139,7 @@ func DefaultConfig() *Config {
}
}

func New(config *Config, format string, metrics *Metrics) *Drain {
func New(tenantID string, config *Config, limits Limits, format string, metrics *Metrics) *Drain {
if config.LogClusterDepth < 3 {
panic("depth argument must be at least 3")
}
Expand All @@ -153,7 +157,8 @@ func New(config *Config, format string, metrics *Metrics) *Drain {
var tokenizer LineTokenizer
switch format {
case FormatJSON:
tokenizer = newJSONTokenizer(config.ParamString, config.MaxAllowedLineLength)
fieldsToTokenize := limits.PatternIngesterTokenizableJSONFields(tenantID)
tokenizer = newJSONTokenizer(config.ParamString, config.MaxAllowedLineLength, fieldsToTokenize)
case FormatLogfmt:
tokenizer = newLogfmtTokenizer(config.ParamString, config.MaxAllowedLineLength)
default:
Expand Down
2 changes: 1 addition & 1 deletion pkg/pattern/drain/drain_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func BenchmarkDrain_TrainExtractsPatterns(b *testing.B) {
line := scanner.Text()
lines = append(lines, line)
}
drain := New(DefaultConfig(), DetectLogFormat(lines[0]), nil)
drain := New("", DefaultConfig(), &fakeLimits{}, DetectLogFormat(lines[0]), nil)

b.ReportAllocs()
b.ResetTimer()
Expand Down
45 changes: 28 additions & 17 deletions pkg/pattern/drain/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,23 @@ import (
"github.com/grafana/loki/v3/pkg/logql/log/pattern"
)

const (
testTenant = "fake"
)

func TestDrain_TrainExtractsPatterns(t *testing.T) {
t.Parallel()

// Set this so the test will print the patterns found, in string slice format for easy copy-paste
outputPatternsForTestUpdate := false

tests := []struct {
drain *Drain
inputFile string
patterns []string
format string
}{
{
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputFile: `testdata/agent-logfmt.txt`,
format: FormatLogfmt,
patterns: []string{
Expand Down Expand Up @@ -56,7 +59,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputFile: `testdata/ingester-logfmt.txt`,
format: FormatLogfmt,
patterns: []string{
Expand All @@ -66,7 +69,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputFile: `testdata/drone-json.txt`,
format: FormatJSON,
patterns: []string{
Expand All @@ -79,7 +82,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputFile: "testdata/distributor-logfmt.txt",
format: FormatLogfmt,
patterns: []string{
Expand All @@ -91,7 +94,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputFile: "testdata/journald.txt",
format: FormatUnknown,
patterns: []string{
Expand Down Expand Up @@ -211,7 +214,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputFile: "testdata/kafka.txt",
format: FormatUnknown,
patterns: []string{
Expand All @@ -232,7 +235,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputFile: "testdata/kubernetes.txt",
format: FormatUnknown,
patterns: []string{
Expand Down Expand Up @@ -273,15 +276,15 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputFile: "testdata/vault.txt",
format: FormatUnknown,
patterns: []string{
`<_> [INFO] expiration: revoked lease: lease_id=<_>`,
},
},
{
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputFile: "testdata/calico.txt",
format: FormatUnknown,
patterns: []string{
Expand Down Expand Up @@ -374,7 +377,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputFile: "testdata/grafana-ruler.txt",
format: FormatLogfmt,
patterns: []string{
Expand Down Expand Up @@ -470,7 +473,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
}{
{
name: "should extract patterns that all lines match",
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputLines: []string{
"test 1 test test",
"test 2 test test",
Expand All @@ -480,7 +483,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
},
{
name: "should extract patterns that match if line ends with newlines",
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputLines: []string{
`test 1 test test
`,
Expand All @@ -494,7 +497,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
},
{
name: "should extract patterns that match if line ends with empty space",
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputLines: []string{
`test 1 test test `,
`test 2 test test `,
Expand All @@ -504,7 +507,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
},
{
name: "should extract patterns that match if line starts with empty space",
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputLines: []string{
` test 1 test test`,
` test 2 test test`,
Expand All @@ -514,7 +517,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
},
{
name: "Scheduler patterns are matchable",
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputLines: []string{
`ts=2024-05-30T12:50:36.648377186Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095`,
`ts=2024-05-30T12:50:36.350575929Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095`,
Expand Down Expand Up @@ -611,7 +614,7 @@ func TestDrain_PruneTreeClearsOldBranches(t *testing.T) {
}{
{
name: "should prune old branches",
drain: New(DefaultConfig(), "", nil),
drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil),
inputLines: []string{
"test test test A",
"test test test B",
Expand Down Expand Up @@ -665,3 +668,11 @@ func countNodes(node *Node) int {
}
return total
}

type fakeLimits struct {
Limits
}

func (f *fakeLimits) PatternIngesterTokenizableJSONFields(_ string) []string {
return []string{"log", "message", "msg", "msg_", "_msg", "content"}
}
16 changes: 11 additions & 5 deletions pkg/pattern/drain/line_tokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,17 +263,23 @@ func (t *logfmtTokenizer) Clone(tokens []string, _ interface{}) ([]string, inter

type jsonTokenizer struct {
*punctuationTokenizer
varReplace string
maxLineLength int
varReplace string
maxLineLength int
fieldsToTokenize []string
}

func newJSONTokenizer(varReplace string, maxLineLength int) *jsonTokenizer {
return &jsonTokenizer{newPunctuationTokenizer(maxLineLength), varReplace, maxLineLength}
func newJSONTokenizer(varReplace string, maxLineLength int, fieldsToTokenize []string) *jsonTokenizer {
return &jsonTokenizer{
punctuationTokenizer: newPunctuationTokenizer(maxLineLength),
varReplace: varReplace,
maxLineLength: maxLineLength,
fieldsToTokenize: fieldsToTokenize,
}
}

func (t *jsonTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) {
var found []byte
for _, key := range []string{"log", "message", "msg", "msg_", "_msg", "content"} {
for _, key := range t.fieldsToTokenize {
msg, ty, _, err := jsonparser.Get(unsafeBytes(line), key)
if err == nil && ty == jsonparser.String {
found = msg
Expand Down
3 changes: 2 additions & 1 deletion pkg/pattern/drain/line_tokenizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ func TestJsonTokenizer(t *testing.T) {
},
}

tokenizer := newJSONTokenizer(param, DefaultConfig().MaxAllowedLineLength)
fieldsToTokenize := []string{"log", "message", "msg", "msg_", "_msg", "content"}
tokenizer := newJSONTokenizer(param, DefaultConfig().MaxAllowedLineLength, fieldsToTokenize)

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pattern/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestSweepInstance(t *testing.T) {
ring: fakeRing,
}

ing, err := New(defaultIngesterTestConfig(t), ringClient, "foo", nil, log.NewNopLogger())
ing, err := New(defaultIngesterTestConfig(t), &fakeLimits{}, ringClient, "foo", nil, log.NewNopLogger())
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
err = services.StartAndAwaitRunning(context.Background(), ing)
Expand Down
8 changes: 8 additions & 0 deletions pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ func (cfg *Config) Validate() error {
return cfg.LifecyclerConfig.Validate()
}

type Limits interface {
drain.Limits
}

type Ingester struct {
services.Service
lifecycler *ring.Lifecycler
Expand All @@ -156,6 +160,7 @@ type Ingester struct {
lifecyclerWatcher *services.FailureWatcher

cfg Config
limits Limits
registerer prometheus.Registerer
logger log.Logger

Expand All @@ -175,6 +180,7 @@ type Ingester struct {

func New(
cfg Config,
limits Limits,
ringClient RingClient,
metricsNamespace string,
registerer prometheus.Registerer,
Expand All @@ -189,6 +195,7 @@ func New(

i := &Ingester{
cfg: cfg,
limits: limits,
ringClient: ringClient,
logger: log.With(logger, "component", "pattern-ingester"),
registerer: registerer,
Expand Down Expand Up @@ -416,6 +423,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
i.logger,
i.metrics,
i.drainCfg,
i.limits,
i.ringClient,
i.lifecycler.ID,
writer,
Expand Down
10 changes: 10 additions & 0 deletions pkg/pattern/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestInstancePushQuery(t *testing.T) {
log.NewNopLogger(),
newIngesterMetrics(nil, "test"),
drain.DefaultConfig(),
&fakeLimits{},
ringClient,
ingesterID,
mockWriter,
Expand Down Expand Up @@ -141,6 +142,7 @@ func TestInstancePushAggregateMetrics(t *testing.T) {
log.NewNopLogger(),
newIngesterMetrics(nil, "test"),
drain.DefaultConfig(),
&fakeLimits{},
ringClient,
ingesterID,
mockWriter,
Expand Down Expand Up @@ -336,3 +338,11 @@ func (m *mockEntryWriter) WriteEntry(ts time.Time, entry string, lbls labels.Lab
func (m *mockEntryWriter) Stop() {
_ = m.Called()
}

type fakeLimits struct {
Limits
}

func (f *fakeLimits) PatternIngesterTokenizableJSONFields(_ string) []string {
return []string{"log", "message", "msg", "msg_", "_msg", "content"}
}
25 changes: 14 additions & 11 deletions pkg/pattern/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,17 @@ const indexShards = 32

// instance is a tenant instance of the pattern ingester.
type instance struct {
instanceID string
buf []byte // buffer used to compute fps.
mapper *ingester.FpMapper // using of mapper no longer needs mutex because reading from streams is lock-free
streams *streamsMap
index *index.BitPrefixInvertedIndex
logger log.Logger
metrics *ingesterMetrics
drainCfg *drain.Config
ringClient RingClient
ingesterID string
instanceID string
buf []byte // buffer used to compute fps.
mapper *ingester.FpMapper // using of mapper no longer needs mutex because reading from streams is lock-free
streams *streamsMap
index *index.BitPrefixInvertedIndex
logger log.Logger
metrics *ingesterMetrics
drainCfg *drain.Config
drainLimits drain.Limits
ringClient RingClient
ingesterID string

aggMetricsLock sync.Mutex
aggMetricsByStreamAndLevel map[string]map[string]*aggregatedMetrics
Expand All @@ -59,6 +60,7 @@ func newInstance(
logger log.Logger,
metrics *ingesterMetrics,
drainCfg *drain.Config,
drainLimits drain.Limits,
ringClient RingClient,
ingesterID string,
writer aggregation.EntryWriter,
Expand All @@ -75,6 +77,7 @@ func newInstance(
index: index,
metrics: metrics,
drainCfg: drainCfg,
drainLimits: drainLimits,
ringClient: ringClient,
ingesterID: ingesterID,
aggMetricsByStreamAndLevel: make(map[string]map[string]*aggregatedMetrics),
Expand Down Expand Up @@ -220,7 +223,7 @@ func (i *instance) createStream(_ context.Context, pushReqStream logproto.Stream
fp := i.getHashForLabels(labels)
sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp)
firstEntryLine := pushReqStream.Entries[0].Line
s, err := newStream(fp, sortedLabels, i.metrics, i.logger, drain.DetectLogFormat(firstEntryLine), i.instanceID, i.drainCfg)
s, err := newStream(fp, sortedLabels, i.metrics, i.logger, drain.DetectLogFormat(firstEntryLine), i.instanceID, i.drainCfg, i.drainLimits)
if err != nil {
return nil, fmt.Errorf("failed to create stream: %w", err)
}
Expand Down
Loading

0 comments on commit 7050897

Please sign in to comment.