Skip to content

Commit

Permalink
Updates to rollups and tag map (#731)
Browse files Browse the repository at this point in the history
* Adding a parquet suffix for file out

* Allow negative rollup counts

* Adding a way to pass a function to tag map
  • Loading branch information
i3149 authored Aug 3, 2024
1 parent 1113e87 commit 08cb8a2
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 18 deletions.
7 changes: 6 additions & 1 deletion pkg/cat/jchf.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,12 @@ func (kc *KTranslate) flowToJCHF(ctx context.Context, dst *kt.JCHF, src *Flow, c
}
}
case model.Custom_value_Which_uint64Val:
dst.CustomBigInt[name] = int64(val.Uint64Val())
iv := int64(val.Uint64Val())
if tk, tv, ok := kc.tagMap.LookupTagValueBig(dst.CompanyId, iv, name); ok {
dst.CustomStr[tk] = tv
} else {
dst.CustomBigInt[name] = iv
}
case model.Custom_value_Which_strVal:
sv, _ := val.StrVal()
dst.CustomStr[name] = sv
Expand Down
67 changes: 53 additions & 14 deletions pkg/maps/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@ func init() {
flag.StringVar(&tags, "tag_map", "", "CSV file mapping tag ids to strings")
}

type tagfunc struct {
c string
f func(int64) string
}

type FileTagMapper struct {
logger.ContextL
tags map[uint32][2]string
tags map[uint32][2]string
funcs map[string]tagfunc
}

func NewFileTagMapper(log logger.Underlying, tagMapFilePath string) (*FileTagMapper, error) {
Expand All @@ -32,37 +38,70 @@ func NewFileTagMapper(log logger.Underlying, tagMapFilePath string) (*FileTagMap
defer f.Close()
scanner := bufio.NewScanner(f)

ftm := FileTagMapper{
ContextL: logger.NewContextLFromUnderlying(logger.SContext{S: "fileMapper"}, log),
}

tm := map[uint32][2]string{}
funcs := map[string]tagfunc{}
for scanner.Scan() {
pts := strings.SplitN(scanner.Text(), ",", 4)
if len(pts) != 4 {
continue
}
ida, err := strconv.Atoi(pts[2])
if err != nil {
switch len(pts) {
case 1:
// Noop, just a blank line can skip.
case 3: // its a function.
switch pts[2] {
case "to_hex":
funcs[pts[0]] = tagfunc{
c: kt.FixupName(pts[1]),
f: func(in int64) string {
return strconv.FormatInt(in, 16)
},
}
default:
ftm.Errorf("Invalid function %v, skipping", pts)
}
case 4: // its a fixed mapping.
ida, err := strconv.Atoi(pts[2])
if err != nil {
continue
}

id := uint32(ida)
tm[id] = [2]string{kt.FixupName(pts[1]), kt.FixupName(pts[3])}
default: // its a mistake.
ftm.Errorf("Invalid line %v, skipping", pts)
continue
}

id := uint32(ida)
tm[id] = [2]string{kt.FixupName(pts[1]), kt.FixupName(pts[3])}
}

if err := scanner.Err(); err != nil {
return nil, err
}

ftm := FileTagMapper{
ContextL: logger.NewContextLFromUnderlying(logger.SContext{S: "fileMapper"}, log),
tags: tm,
}
ftm.Infof("Loaded %d tag mappings", len(tm))
ftm.tags = tm
ftm.funcs = funcs
ftm.Infof("Loaded %d tag mappings and %d functions", len(ftm.tags), len(ftm.funcs))

return &ftm, nil
}

func (ftm *FileTagMapper) LookupTagValue(cid kt.Cid, tagval uint32, colname string) (string, string, bool) {
if tf, ok := ftm.funcs[colname]; ok {
return tf.c, tf.f(int64(tagval)), ok
}
if tv, ok := ftm.tags[tagval]; ok {
return tv[0], tv[1], ok
}
return "", "", false
}

func (ftm *FileTagMapper) LookupTagValueBig(cid kt.Cid, tagval int64, colname string) (string, string, bool) {
if tf, ok := ftm.funcs[colname]; ok {
return tf.c, tf.f(tagval), ok
}
if tv, ok := ftm.tags[uint32(tagval)]; ok {
return tv[0], tv[1], ok
}
return "", "", false
}
51 changes: 51 additions & 0 deletions pkg/maps/file/file_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package file

import (
"io/ioutil"
"path/filepath"
"testing"

"github.com/kentik/ktranslate/pkg/eggs/logger"
lt "github.com/kentik/ktranslate/pkg/eggs/logger/testing"
"github.com/kentik/ktranslate/pkg/kt"

"github.com/stretchr/testify/assert"
)

var testTags = []byte(`
100324,C_MARKET_SRC,1344420230,BHN - Bakersfield
100323,C_MARKET_DST,1353464128,LCHTR - Michigan
100324,C_MARKET_SRC,1344420636,LCHTR - Los Angeles
100323,C_MARKET_DST,1344420636,LCHTR - Los Angeles
100323,C_MARKET_DST,1353464487,LCHTR - SLO
100323,C_MARKET_DST,1353464485,LCHTR - Louisiana
100323,C_MARKET_DST,1353465119,LCHTR - Nevada
101199,DST_SUBSCRIBER_ID,to_hex
`)

func TestGenMap(t *testing.T) {
assert := assert.New(t)
l := lt.NewTestContextL(logger.NilContext, t)

dir := t.TempDir()
tmpfn := filepath.Join(dir, "test_tags")
if err := ioutil.WriteFile(tmpfn, testTags, 0666); err != nil {
t.Fatal(err)
}

f, err := NewFileTagMapper(l.GetLogger().GetUnderlyingLogger(), tmpfn)
assert.NoError(err)

_, _, ok := f.LookupTagValue(kt.Cid(10), 0, "")
assert.False(ok)

k, v, ok := f.LookupTagValue(kt.Cid(10), 1344420636, "100323")
assert.True(ok)
assert.Equal(k, "c_market_dst")
assert.Equal(v, "lchtr_-_los_angeles")

k, v, ok = f.LookupTagValueBig(kt.Cid(10), int64(242124693101048), "101199")
assert.True(ok)
assert.Equal(k, "dst_subscriber_id")
assert.Equal(v, "dc360c52d9f8")
}
5 changes: 5 additions & 0 deletions pkg/maps/maps.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (

type TagMapper interface {
LookupTagValue(kt.Cid, uint32, string) (string, string, bool)
LookupTagValueBig(kt.Cid, int64, string) (string, string, bool)
}

func LoadMapper(mtype Mapper, log logger.Underlying, tagMapFilePath string) (TagMapper, error) {
Expand All @@ -33,3 +34,7 @@ type NullType struct {
func (ntm *NullType) LookupTagValue(cid kt.Cid, tagval uint32, colname string) (string, string, bool) {
return "", "", false
}

func (ntm *NullType) LookupTagValueBig(cid kt.Cid, tagval int64, colname string) (string, string, bool) {
return "", "", false
}
4 changes: 2 additions & 2 deletions pkg/rollup/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (r *StatsRollup) Export() []Rollup {
}

sort.Sort(byValue(keys))
if len(keys) > r.config.TopK {
if r.config.TopK > 0 && len(keys) > r.config.TopK {
return keys[0:r.config.TopK]
}

Expand Down Expand Up @@ -303,7 +303,7 @@ func (r *StatsRollup) exportSum(sum map[string]uint64, count map[string]uint64,
}

sort.Sort(byValue(keys))
if len(keys) > r.config.TopK {
if r.config.TopK > 0 && len(keys) > r.config.TopK {
r.getTopkSum(keys, total, totalc, ot, provt, rc)
} else {
rc <- keys
Expand Down
2 changes: 1 addition & 1 deletion pkg/rollup/unique.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (r *UniqueRollup) exportUnique(uniques map[string]gohll.HLL, count map[stri
}

sort.Sort(byValue(keys))
if len(keys) > r.topK {
if r.config.TopK > 0 && len(keys) > r.topK {
r.getTopkUniques(keys, totalc, ot, provt, rc)
} else {
rc <- keys
Expand Down
2 changes: 2 additions & 0 deletions pkg/sinks/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ func (s *FileSink) Init(ctx context.Context, format formats.Format, compression
switch format {
case formats.FORMAT_JSON, formats.FORMAT_JSON_FLAT, formats.FORMAT_NRM, formats.FORMAT_NR, formats.FORMAT_ELASTICSEARCH:
s.suffix = ".json"
case formats.FORMAT_PARQUET:
s.suffix = ".parquet"
}

// Set up a file first.
Expand Down

0 comments on commit 08cb8a2

Please sign in to comment.