Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Apply patterns line length limit to json message key #14296

Merged
merged 1 commit into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,11 @@ func New(config *Config, format string, metrics *Metrics) *Drain {
var tokenizer LineTokenizer
switch format {
case FormatJSON:
tokenizer = newJSONTokenizer(config.ParamString)
tokenizer = newJSONTokenizer(config.ParamString, config.MaxAllowedLineLength)
case FormatLogfmt:
tokenizer = newLogfmtTokenizer(config.ParamString)
tokenizer = newLogfmtTokenizer(config.ParamString, config.MaxAllowedLineLength)
default:
tokenizer = newPunctuationTokenizer()
tokenizer = newPunctuationTokenizer(config.MaxAllowedLineLength)
}

d.idToCluster = createLogClusterCache(config.MaxClusters, func(int, *LogCluster) {
Expand Down Expand Up @@ -206,9 +206,6 @@ func (d *Drain) Train(content string, ts int64) *LogCluster {
if !d.limiter.Allow() {
return nil
}
if len(content) > d.config.MaxAllowedLineLength {
return nil
}
d.tokens, d.state = d.tokenizer.Tokenize(content, d.tokens, d.state)
return d.train(d.tokens, d.state, ts)
}
Expand Down
39 changes: 29 additions & 10 deletions pkg/pattern/drain/line_tokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ func (spacesTokenizer) Clone(tokens []string, _ interface{}) ([]string, interfac
type punctuationTokenizer struct {
includeDelimiters [128]rune
excludeDelimiters [128]rune
maxLineLength int
}

func newPunctuationTokenizer() *punctuationTokenizer {
func newPunctuationTokenizer(maxLineLength int) *punctuationTokenizer {
var included [128]rune
var excluded [128]rune
included['='] = 1
Expand All @@ -51,10 +52,15 @@ func newPunctuationTokenizer() *punctuationTokenizer {
return &punctuationTokenizer{
includeDelimiters: included,
excludeDelimiters: excluded,
maxLineLength: maxLineLength,
}
}

func (p *punctuationTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) {
if len(line) > p.maxLineLength {
return nil, nil
}

if cap(tokens) == 0 {
tokens = make([]string, 0, 128)
}
Expand Down Expand Up @@ -190,18 +196,24 @@ func (splittingTokenizer) Clone(tokens []string, state interface{}) ([]string, i
}

type logfmtTokenizer struct {
dec *logfmt.Decoder
varReplace string
dec *logfmt.Decoder
varReplace string
maxLineLength int
}

func newLogfmtTokenizer(varReplace string) *logfmtTokenizer {
func newLogfmtTokenizer(varReplace string, maxLineLength int) *logfmtTokenizer {
return &logfmtTokenizer{
dec: logfmt.NewDecoder(nil),
varReplace: varReplace,
dec: logfmt.NewDecoder(nil),
varReplace: varReplace,
maxLineLength: maxLineLength,
}
}

func (t *logfmtTokenizer) Tokenize(line string, tokens []string, _ interface{}) ([]string, interface{}) {
if len(line) > t.maxLineLength {
return nil, nil
}

if cap(tokens) == 0 {
tokens = make([]string, 0, 64)
}
Expand Down Expand Up @@ -251,11 +263,12 @@ func (t *logfmtTokenizer) Clone(tokens []string, _ interface{}) ([]string, inter

type jsonTokenizer struct {
*punctuationTokenizer
varReplace string
varReplace string
maxLineLength int
}

func newJSONTokenizer(varReplace string) *jsonTokenizer {
return &jsonTokenizer{newPunctuationTokenizer(), varReplace}
func newJSONTokenizer(varReplace string, maxLineLength int) *jsonTokenizer {
return &jsonTokenizer{newPunctuationTokenizer(maxLineLength), varReplace, maxLineLength}
}

func (t *jsonTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) {
Expand All @@ -272,7 +285,13 @@ func (t *jsonTokenizer) Tokenize(line string, tokens []string, state interface{}
return nil, nil
}

return t.punctuationTokenizer.Tokenize(unsafeString(found), tokens, state)
foundLine := unsafeString(found)

if len(foundLine) > t.maxLineLength {
return nil, nil
}

return t.punctuationTokenizer.Tokenize(foundLine, tokens, state)
}

func (t *jsonTokenizer) Join(tokens []string, state interface{}) string {
Expand Down
35 changes: 27 additions & 8 deletions pkg/pattern/drain/line_tokenizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ var testCases = []TestCase{
typeSplitting: {`!@£$%^&*()`},
},
},
{
name: "line length greater than max allowed length",
line: `09:17:38.033366 ▶ INFO route ops sending to dest https://graphite-cortex-ops-blocks-us-east4.grafana.net/graphite/metrics: service_is_carbon-relay-ng.instance_is_carbon-relay-ng-c665b7b-j2trk.mtype_is_counter.dest_is_https_graphite-cortex-ops-blocks-us-east4_grafana_netgraphitemetrics.unit_is_Metric.action_is_drop.reason_is_queue_full 0 1717060658 userid invalid`,
want: map[string][]string{
typePunctuation: []string(nil),
typeSplitting: {`09:`, `17:`, `38.033366`, `▶`, `INFO`, ``, `route`, `ops`, `sending`, `to`, `dest`, `https:`, `//graphite-cortex-ops-blocks-us-east4.grafana.net/graphite/metrics:`, ``, `service_is_carbon-relay-ng.instance_is_carbon-relay-ng-c665b7b-j2trk.mtype_is_counter.dest_is_https_graphite-cortex-ops-blocks-us-east4_grafana_netgraphitemetrics.unit_is_Metric.action_is_drop.reason_is_queue_full`, `0`, `1717060658`, `userid`, `invalid`},
},
},
}

func TestTokenizer_Tokenize(t *testing.T) {
Expand All @@ -124,7 +132,7 @@ func TestTokenizer_Tokenize(t *testing.T) {
}{
{
name: typePunctuation,
tokenizer: newPunctuationTokenizer(),
tokenizer: newPunctuationTokenizer(360),
},
{
name: typeSplitting,
Expand All @@ -149,7 +157,7 @@ func TestTokenizer_TokenizeAndJoin(t *testing.T) {
}{
{
name: typePunctuation,
tokenizer: newPunctuationTokenizer(),
tokenizer: newPunctuationTokenizer(DefaultConfig().MaxAllowedLineLength),
},
{
name: typeSplitting,
Expand All @@ -168,7 +176,7 @@ func TestTokenizer_TokenizeAndJoin(t *testing.T) {
}

func BenchmarkSplittingTokenizer(b *testing.B) {
tokenizer := newPunctuationTokenizer()
tokenizer := newPunctuationTokenizer(DefaultConfig().MaxAllowedLineLength)

for _, tt := range testCases {
tc := tt
Expand Down Expand Up @@ -213,9 +221,13 @@ func TestLogFmtTokenizer(t *testing.T) {
line: `logger=sqlstore.metrics traceID=c933fefbe893411d3be8e1648d6bcf37 t=2024-07-10T16:00:15.564896897Z level=debug msg="query finished" status=success elapsedtime=1.324305ms <REDACTED> error=null`,
want: []string{"logger", "sqlstore.metrics", "traceID", "<_>", "t", "<_>", "level", "debug", "msg", "query finished", "status", "success", "elapsedtime", "1.324305ms", "<REDACTED>", "", "error", "null"},
},
{
line: `ts=2024-05-30T12:50:36.648377186Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095 ip=127.0.0.1 userid=1234456`,
want: []string(nil),
},
}

tokenizer := newLogfmtTokenizer(param)
tokenizer := newLogfmtTokenizer(param, 250)

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -268,7 +280,7 @@ func TestLogFmtTokenizerJoin(t *testing.T) {
},
}

tokenizer := newLogfmtTokenizer("")
tokenizer := newLogfmtTokenizer("", DefaultConfig().MaxAllowedLineLength)

for _, tt := range tests {
t.Run("", func(t *testing.T) {
Expand Down Expand Up @@ -306,16 +318,23 @@ func TestJsonTokenizer(t *testing.T) {
want: []string{"successfully", "discovered", "15", "agent", "IP", "addresses"},
pattern: "<_>successfully discovered 15 agent IP addresses<_>",
},
{
line: `{"msg":{"actor":{"alternateId":"foo@grafana.com","displayName":"Foo bar","id":"dq23","type":"User"},"authenticationContext":{"authenticationStep":0,"externalSessionId":"123d"},"client":{"device":"Computer","geographicalContext":{"city":"Berlin","country":"DE","state":"Land Berlin"},"ipAddress":"0.0.0.0","userAgent":{"browser":"CHROME","os":"Mac OS X","rawUserAgent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"},"zone":"null"},"debugContext":{"debugData":{"authMethodFirstEnrollment":"123","authMethodFirstType":"foo","authMethodFirstVerificationTime":"2024-07-02T11:28:03.219Z","authMethodSecondEnrollment":"var","authMethodSecondType":"ddd","authMethodSecondVerificationTime":"2024-07-03T06:59:09.151Z","authnRequestId":"1","dtHash":"1","logOnlySecurityData":"{\"risk\":{\"level\":\"LOW\"},\"behaviors\":{\"New Geo-Location\":\"NEGATIVE\",\"New Device\":\"NEGATIVE\",\"New IP\":\"NEGATIVE\",\"New State\":\"NEGATIVE\",\"New Country\":\"NEGATIVE\",\"Velocity\":\"NEGATIVE\",\"New City\":\"NEGATIVE\"}}","requestId":"1","threatSuspected":"false","url":"/foo?"}},"displayMessage":"Evaluation of sign-on policy","eventType":"policy.evaluate_sign_on","legacyEventType":"app.oauth2.token.grant.refresh_token_success","outcome":{"reason":"Sign-on policy evaluation resulted in AUTHENTICATED","result":"ALLOW"},"published":"2024-07-03T09:19:59.973Z","request":{"ipChain":[{"geographicalContext":{"city":"Berlin","country":"Germany","geolocation":{"lat":52.5363,"lon":13.4169},"postalCode":"10435","state":"Land Berlin"},"ip":"95.90.234.241","version":"V4"}]},"securityContext":{"asNumber":3209,"asOrg":"kabel deutschland breitband customer 19","domain":"kabel-deutschland.de","isProxy":false,"isp":"vodafone gmbh"},"severity":"INFO","target":[{"alternateId":"Salesforce.com","detailEntry":{"signOnModeEvaluationResult":"AUTHENTICATED","signOnModeType":"SAML_2_0"},"displayName":"Salesforce.com","id":"0oa5sfmj3hz0mTgoW357","type":"AppInstance"},{"alternateId":"unknown","detailEntry":{"policyRuleFactorMode":"2FA"},"displayName":"Catch-all Rule","id":"1","type":"Rule"}],"transaction":{"detail":{},"id":"1","type":"WEB"},"context":[{"repo":{"id":27826205,"name":"hermanwahyudi/selenium","url":"https://api.github.com/repos/hermanwahyudi/selenium"},"payload":{"push_id":536863976,"size":1,"distinct_size":0,"ref":"refs/heads/master","head":"1b58dd4c4e14ea9cf5212b981774bd448a266c3c","before":"20b10e3a605bd177efff62f1130943774ac07bf3","commits":[{"sha":"1b58dd4c4e14ea9cf5212b981774bd448a266c3c","author":{"email":"2bb20d8a71fb7adbc1d6239cc9ff4130f26819dc@gmail.com","name":"Herman"},"message":"Update README.md","distinct":false,"url":"https://api.github.com/repos/hermanwahyudi/selenium/commits/1b58dd4c4e14ea9cf5212b981774bd448a266c3c"}]}},{"repo":{"id":27826205,"name":"hermanwahyudi/selenium","url":"https://api.github.com/repos/hermanwahyudi/selenium"},"payload":{"push_id":536863976,"size":1,"distinct_size":0,"ref":"refs/heads/master","head":"1b58dd4c4e14ea9cf5212b981774bd448a266c3c","before":"20b10e3a605bd177efff62f1130943774ac07bf3","commits":[{"sha":"1b58dd4c4e14ea9cf5212b981774bd448a266c3c","author":{"email":"2bb20d8a71fb7adbc1d6239cc9ff4130f26819dc@gmail.com","name":"Herman"},"message":"Update README.md","distinct":false,"url":"https://api.github.com/repos/hermanwahyudi/selenium/commits/1b58dd4c4e14ea9cf5212b981774bd448a266c3c"}]}}],"uuid":"1","version":"0"},"level":"info","type":"received event","time":"2024-07-03T09:19:59Z"}`,
want: []string(nil),
pattern: "",
},
}

tokenizer := newJSONTokenizer(param)
tokenizer := newJSONTokenizer(param, DefaultConfig().MaxAllowedLineLength)

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, state := tokenizer.Tokenize(tt.line, nil, nil)
require.Equal(t, tt.want, got)
pattern := tokenizer.Join(got, state)
require.Equal(t, tt.pattern, pattern)
if len(got) == len(tt.want) && len(tt.want) != 0 {
pattern := tokenizer.Join(got, state)
require.Equal(t, tt.pattern, pattern)
}
})
}
}
Loading