Skip to content

Commit

Permalink
feat: Add new Drain tokenizer that splits on most punctuation (#13143)
Browse files Browse the repository at this point in the history
  • Loading branch information
benclive authored Jun 7, 2024
1 parent 9981e9e commit 6a0fdd0
Show file tree
Hide file tree
Showing 9 changed files with 50,711 additions and 379 deletions.
1 change: 1 addition & 0 deletions pkg/logcli/output/loki.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package output
89 changes: 55 additions & 34 deletions pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ package drain
import (
"math"
"strconv"
"strings"
"unicode"
"unsafe"

"github.com/hashicorp/golang-lru/v2/simplelru"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -139,7 +141,7 @@ func DefaultConfig() *Config {
// MaxClusterDepth and SimTh, the less the chance that there will be
// "similar" clusters, but the greater the footprint.
SimTh: 0.3,
MaxChildren: 100,
MaxChildren: 15,
ParamString: `<_>`,
MaxClusters: 300,
}
Expand All @@ -156,22 +158,24 @@ func New(config *Config, metrics *Metrics) *Drain {
}

d := &Drain{
config: config,
rootNode: createNode(),
idToCluster: createLogClusterCache(config.MaxClusters, evictFn),
metrics: metrics,
tokenizer: splittingTokenizer{}, // Default to this for now
config: config,
rootNode: createNode(),
idToCluster: createLogClusterCache(config.MaxClusters, evictFn),
metrics: metrics,
tokenizer: newPunctuationTokenizer(),
maxAllowedLineLength: 3000,
}
return d
}

type Drain struct {
config *Config
rootNode *Node
idToCluster *LogClusterCache
clustersCounter int
metrics *Metrics
tokenizer LineTokenizer
config *Config
rootNode *Node
idToCluster *LogClusterCache
clustersCounter int
metrics *Metrics
tokenizer LineTokenizer
maxAllowedLineLength int
}

func (d *Drain) Clusters() []*LogCluster {
Expand All @@ -183,10 +187,14 @@ func (d *Drain) TrainTokens(tokens []string, stringer func([]string) string, ts
}

func (d *Drain) Train(content string, ts int64) *LogCluster {
return d.train(d.tokenizer.Tokenize(content), d.tokenizer.Join, ts)
if len(content) > d.maxAllowedLineLength {
return nil
}
tokens, state := d.tokenizer.Tokenize(content)
return d.train(tokens, state, ts)
}

func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64) *LogCluster {
func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster {
if len(tokens) < 4 {
return nil
}
Expand All @@ -196,11 +204,12 @@ func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64)
d.clustersCounter++
clusterID := d.clustersCounter
matchCluster = &LogCluster{
Tokens: tokens,
id: clusterID,
Size: 1,
Stringer: stringer,
Chunks: Chunks{},
Tokens: tokens,
TokenState: state,
id: clusterID,
Size: 1,
Stringer: d.tokenizer.Join,
Chunks: Chunks{},
}
matchCluster.append(model.TimeFromUnixNano(ts))
d.idToCluster.Set(clusterID, matchCluster)
Expand All @@ -219,15 +228,16 @@ func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64)
}

func (d *Drain) TrainPattern(content string, samples []*logproto.PatternSample) *LogCluster {
tokens := deduplicatePlaceholders(d.tokenizer.Tokenize(content), d.config.ParamString)
tokens, state := d.tokenizer.Tokenize(content)
matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, true)
// Match no existing log cluster
if matchCluster == nil {
d.clustersCounter++
clusterID := d.clustersCounter
matchCluster = &LogCluster{
Tokens: tokens,
id: clusterID,
Tokens: tokens,
TokenState: state,
id: clusterID,
}
d.idToCluster.Set(clusterID, matchCluster)
d.addSeqToPrefixTree(d.rootNode, matchCluster)
Expand All @@ -241,24 +251,33 @@ func (d *Drain) TrainPattern(content string, samples []*logproto.PatternSample)
return matchCluster
}

func deduplicatePlaceholders(tokens []string, param string) []string {
if len(tokens) < 2 {
return tokens
func deduplicatePlaceholders(line string, placeholder string) string {
first := strings.Index(line, "<_><_>")
if first == -1 {
return line
}
i := 1
for k := 1; k < len(tokens); k++ {
if tokens[k] != param || tokens[k] != tokens[k-1] {
if i != k {
tokens[i] = tokens[k]
builder := make([]byte, 0, len(line))
low := 0
for i := first; i < len(line)-5; i++ {
if line[i:i+len(placeholder)] == placeholder {
high := i + 3
for ; high < len(line)-2; high += 3 {
if line[high:high+len(placeholder)] != placeholder {
break
}
}
i++
builder = append(builder, line[low:i+len(placeholder)]...)
low = high
i = high
}
}
return tokens[:i]
builder = append(builder, line[low:]...)

return unsafe.String(unsafe.SliceData(builder), len(builder))
}

func (d *Drain) PatternString(c *LogCluster) string {
s := d.tokenizer.Join(deduplicatePlaceholders(c.Tokens, d.config.ParamString))
s := deduplicatePlaceholders(d.tokenizer.Join(c.Tokens, c.TokenState), d.config.ParamString)
if s == d.config.ParamString {
return ""
}
Expand All @@ -271,7 +290,7 @@ func (d *Drain) Delete(cluster *LogCluster) {

// Match against an already existing cluster. Match shall be perfect (sim_th=1.0). New cluster will not be created as a result of this call, nor any cluster modifications.
func (d *Drain) Match(content string) *LogCluster {
contentTokens := d.tokenizer.Tokenize(content)
contentTokens, _ := d.tokenizer.Tokenize(content)
matchCluster := d.treeSearch(d.rootNode, contentTokens, 1.0, true)
return matchCluster
}
Expand Down Expand Up @@ -413,6 +432,7 @@ func (d *Drain) addSeqToPrefixTree(rootNode *Node, cluster *LogCluster) {
// if token not matched in this layer of existing tree.
if _, ok = curNode.keyToChildNode[token]; !ok {
if !d.hasNumbers(token) {
// Numbers in token: Prioritize the param string path
if _, ok = curNode.keyToChildNode[d.config.ParamString]; ok {
if len(curNode.keyToChildNode) < d.config.MaxChildren {
newNode := createNode()
Expand All @@ -435,6 +455,7 @@ func (d *Drain) addSeqToPrefixTree(rootNode *Node, cluster *LogCluster) {
}
}
} else {
// No numbers, use the key as-is to traverse
if _, ok = curNode.keyToChildNode[d.config.ParamString]; !ok {
newNode := createNode()
curNode.keyToChildNode[d.config.ParamString] = newNode
Expand Down
2 changes: 1 addition & 1 deletion pkg/pattern/drain/drain_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func BenchmarkDrain_TrainExtractsPatterns(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
drain := New(DefaultConfig(), nil)
for _, line := range lines {
drain := New(DefaultConfig(), nil)
drain.Train(line, 0)
}
}
Expand Down
Loading

0 comments on commit 6a0fdd0

Please sign in to comment.