Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingestion performance tuning #87

Merged
merged 16 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ingestor/metrics/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (
)

var (
bytesBufPool = pool.NewGeneric(50, func(sz int) interface{} {
bytesBufPool = pool.NewGeneric(1000, func(sz int) interface{} {
return bytes.NewBuffer(make([]byte, 0, sz))
})

bytesPool = pool.NewBytes(50)
bytesPool = pool.NewBytes(1000)

writeReqPool = pool.NewGeneric(50, func(sz int) interface{} {
writeReqPool = pool.NewGeneric(1000, func(sz int) interface{} {
return prompb.WriteRequest{
Timeseries: make([]prompb.TimeSeries, 0, sz),
}
Expand Down
24 changes: 14 additions & 10 deletions ingestor/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import (
)

var (
csvWriterPool = pool.NewGeneric(1024, func(sz int) interface{} {
csvWriterPool = pool.NewGeneric(1000, func(sz int) interface{} {
return transform.NewCSVWriter(bytes.NewBuffer(make([]byte, 0, sz)), nil)
})
bytesPool = pool.NewBytes(1000)
)

type Store interface {
Expand Down Expand Up @@ -130,10 +131,13 @@ func (s *LocalStore) newWAL(ctx context.Context, prefix string) (*wal.WAL, error
}

func (s *LocalStore) GetWAL(ctx context.Context, labels []prompb.Label) (*wal.WAL, error) {
key := seriesKey(labels)
b := bytesPool.Get(256)
defer bytesPool.Put(b)

key := seriesKey(b[:0], labels)

s.mu.RLock()
wal := s.wals[key]
wal := s.wals[string(key)]
s.mu.RUnlock()

if wal != nil {
Expand All @@ -143,19 +147,19 @@ func (s *LocalStore) GetWAL(ctx context.Context, labels []prompb.Label) (*wal.WA
s.mu.Lock()
defer s.mu.Unlock()

wal = s.wals[key]
wal = s.wals[string(key)]
if wal != nil {
return wal, nil
}

prefix := key

var err error
wal, err = s.newWAL(ctx, prefix)
wal, err = s.newWAL(ctx, string(prefix))
if err != nil {
return nil, err
}
s.wals[key] = wal
s.wals[string(key)] = wal

return wal, nil
}
Expand All @@ -169,7 +173,7 @@ func (s *LocalStore) WALCount() int {
func (s *LocalStore) WriteTimeSeries(ctx context.Context, ts []prompb.TimeSeries) error {
enc := csvWriterPool.Get(8 * 1024).(*transform.CSVWriter)
defer csvWriterPool.Put(enc)
enc.SetColumns(s.opts.LiftedColumns)
enc.InitColumns(s.opts.LiftedColumns)

for _, v := range ts {
wal, err := s.GetWAL(ctx, v.Labels)
Expand Down Expand Up @@ -236,12 +240,12 @@ func (s *LocalStore) Import(filename string, body io.ReadCloser) (int, error) {
return int(n), nil
}

func seriesKey(labels []prompb.Label) string {
func seriesKey(dst []byte, labels []prompb.Label) []byte {
for _, v := range labels {
if bytes.Equal(v.Name, []byte("__name__")) {
// return fmt.Sprintf("%s%d", string(transform.Normalize(v.Value)), int(atomic.AddUint64(&idx, 1))%2)
return string(transform.Normalize(v.Value))
return transform.AppendNormalize(dst, v.Value)
}
}
return string(transform.Normalize(labels[0].Value))
return transform.AppendNormalize(dst, labels[0].Value)
}
2 changes: 1 addition & 1 deletion ingestor/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestLocalStore_WriteTimeSeries(t *testing.T) {
require.NoError(t, err)
b, err := io.ReadAll(r)
require.NoError(t, err)
require.Equal(t, "1970-01-01T00:00:00Z,-414304664621325809,{},0.000000000\n", string(b))
require.Equal(t, "1970-01-01T00:00:00Z,-414304664621325809,\"{}\",0.000000000\n", string(b))
}

func TestStore_SkipNonCSV(t *testing.T) {
Expand Down
137 changes: 97 additions & 40 deletions ingestor/transform/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,43 @@ import (
"strings"
"time"
"unicode"
"unicode/utf8"

logsv1 "buf.build/gen/go/opentelemetry/opentelemetry/protocolbuffers/go/opentelemetry/proto/collector/logs/v1"
"github.com/Azure/adx-mon/pkg/prompb"
"github.com/cespare/xxhash"
fflib "github.com/pquerna/ffjson/fflib/v1"

adxcsv "github.com/Azure/adx-mon/pkg/csv"
)

type CSVWriter struct {
w *bytes.Buffer
buf *strings.Builder
enc *csv.Writer
columns []string
w *bytes.Buffer
buf *strings.Builder
enc *csv.Writer

labelsBuf *bytes.Buffer
seriesIdBuf *bytes.Buffer
line []byte
columns [][]byte
fields []string
}

// NewCSVWriter returns a new CSVWriter that writes to the given buffer. The columns, if specified, are
// label keys that will be promoted to columns.
func NewCSVWriter(w *bytes.Buffer, columns []string) *CSVWriter {
writer := &CSVWriter{
w: w,
buf: &strings.Builder{},
enc: csv.NewWriter(w),
columns: columns,
w: w,
buf: &strings.Builder{},
seriesIdBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
labelsBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
enc: csv.NewWriter(w),
line: make([]byte, 0, 4096),
columns: make([][]byte, 0, len(columns)),
fields: make([]string, 0, 4+len(columns)),
}
writer.SetColumns(columns)

writer.InitColumns(columns)
return writer
}

Expand All @@ -49,17 +62,19 @@ func (w *CSVWriter) MarshalCSV(t interface{}) error {
}

func (w *CSVWriter) marshalTS(ts prompb.TimeSeries) error {
buf := w.buf
buf := w.labelsBuf
buf.Reset()

seriesIdHasher := xxhash.New()
seriesIdBuf := w.seriesIdBuf
seriesIdBuf.Reset()

var j int

// Marshal the labels as JSON and avoid allocations since this code is in the hot path.
buf.WriteByte('{')
for _, v := range ts.Labels {
seriesIdHasher.Write(v.Name)
seriesIdHasher.Write(v.Value)
w.seriesIdBuf.Write(v.Name)
w.seriesIdBuf.Write(v.Value)

// Drop the __name__ label since it is implied that the name of the CSV file is the name of the metric.
if bytes.Equal(v.Name, []byte("__name__")) {
Expand All @@ -71,7 +86,7 @@ func (w *CSVWriter) marshalTS(ts prompb.TimeSeries) error {
// any matches.
var skip bool
for j < len(w.columns) {
cmp := bytes.Compare([]byte(strings.ToLower(w.columns[j])), bytes.ToLower(v.Name))
cmp := compareLower(w.columns[j], v.Name)
// The lifted column is less than the current label, we need move to the next column and check again.
if cmp < 0 {
j++
Expand All @@ -91,7 +106,7 @@ func (w *CSVWriter) marshalTS(ts prompb.TimeSeries) error {
continue
}

if buf.String()[buf.Len()-1] != '{' {
if buf.Bytes()[buf.Len()-1] != '{' {
buf.WriteByte(',')
}
fflib.WriteJson(buf, v.Name)
Expand All @@ -100,49 +115,52 @@ func (w *CSVWriter) marshalTS(ts prompb.TimeSeries) error {
}

buf.WriteByte('}')
seriesId := seriesIdHasher.Sum64()
seriesId := xxhash.Sum64(seriesIdBuf.Bytes())

fields := make([]string, 0, 4)
for _, v := range ts.Samples {
fields = fields[:0]
line := w.line[:0]

// Timestamp
tm := time.Unix(v.Timestamp/1000, (v.Timestamp%1000)*int64(time.Millisecond)).UTC().Format(time.RFC3339Nano)
fields = append(fields, tm)
line = time.Unix(v.Timestamp/1000, (v.Timestamp%1000)*int64(time.Millisecond)).UTC().AppendFormat(line, time.RFC3339Nano)

// seriesID
fields = append(fields, strconv.FormatInt(int64(seriesId), 10))
line = append(line, ',')
line = strconv.AppendInt(line, int64(seriesId), 10)

if len(w.columns) > 0 {
var i, j int
for i < len(ts.Labels) && j < len(w.columns) {
cmp := bytes.Compare(bytes.ToLower(ts.Labels[i].Name), []byte(strings.ToLower(w.columns[j])))
cmp := compareLower(ts.Labels[i].Name, w.columns[j])
if cmp == 0 {
fields = append(fields, string(ts.Labels[i].Value))
line = adxcsv.Append(line, ts.Labels[i].Value)
j++
i++
} else if cmp > 0 {
j++
fields = append(fields, "")
line = append(line, ',')
} else {
i++
}
}
}

// labels
fields = append(fields, buf.String())
line = adxcsv.AppendQuoted(line, buf.Bytes())
line = append(line, ',')

// Values
fields = append(fields, strconv.FormatFloat(v.Value, 'f', 9, 64))
line = strconv.AppendFloat(line, v.Value, 'f', 9, 64)

// End of line
line = adxcsv.AppendNewLine(line)

if err := w.enc.Write(fields); err != nil {
if n, err := w.w.Write(line); err != nil {
return err
} else if n != len(line) {
return errors.New("short write")
}
}

w.enc.Flush()
return w.enc.Error()
return nil
}

func (w *CSVWriter) marshalLog(log *logsv1.ExportLogsServiceRequest) error {
Expand Down Expand Up @@ -222,25 +240,36 @@ func (w *CSVWriter) Bytes() []byte {
return w.w.Bytes()
}

func (w *CSVWriter) SetColumns(columns []string) {
sortLower := make([]string, len(columns))
// InitColumns initializes the labels that will be promoted to columns in the CSV file. This can be done
// once on the *Writer and subsequent calls are no-ops.
func (w *CSVWriter) InitColumns(columns []string) {
if len(w.columns) > 0 {
return
}

sortLower := make([][]byte, len(columns))
for i, v := range columns {
sortLower[i] = strings.ToLower(v)
sortLower[i] = []byte(strings.ToLower(v))
}
sort.Strings(sortLower)
sort.Slice(sortLower, func(i, j int) bool {
return bytes.Compare(sortLower[i], sortLower[j]) < 0
})
w.columns = sortLower
}

// Normalize converts a metrics name to a ProperCase table name
func Normalize(s []byte) []byte {
var b bytes.Buffer
for i := 0; i < len(s); i++ {
return AppendNormalize(make([]byte, 0, len(s)), s)
}

// AppendNormalize converts a metrics name to a ProperCase table name and appends it to dst.
func AppendNormalize(dst, s []byte) []byte {
for i := 0; i < len(s); i++ {
// Skip _, but capitalize the first letter after an _
if s[i] == '_' {
if i+1 < len(s) {
if s[i+1] >= 'a' && s[i+1] <= 'z' {
b.Write([]byte{byte(unicode.ToUpper(rune(s[i+1])))})
dst = append(dst, byte(unicode.ToUpper(rune(s[i+1]))))
i += 1
continue
}
Expand All @@ -250,10 +279,38 @@ func Normalize(s []byte) []byte {

// Capitalize the first letter
if i == 0 {
b.Write([]byte{byte(unicode.ToUpper(rune(s[i])))})
dst = append(dst, byte(unicode.ToUpper(rune(s[i]))))
continue
}
b.WriteByte(s[i])
dst = append(dst, s[i])
}
return dst
}

func compareLower(sa, sb []byte) int {
for {
rb, nb := utf8.DecodeRune(sb)
ra, na := utf8.DecodeRune(sa)

if na == 0 && nb > 0 {
return -1
} else if na > 0 && nb == 0 {
return 1
} else if na == 0 && nb == 0 {
return 0
}

rb = unicode.ToLower(rb)
ra = unicode.ToLower(ra)

if ra < rb {
return -1
} else if ra > rb {
return 1
}

// Trim rune from the beginning of each string.
sa = sa[na:]
sb = sb[nb:]
jwilder marked this conversation as resolved.
Show resolved Hide resolved
}
return b.Bytes()
}
27 changes: 25 additions & 2 deletions ingestor/transform/csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,28 @@ func TestMarshalCSV(t *testing.T) {

}

func TestCompareLower(t *testing.T) {
for _, tc := range []struct {
a, b []byte
expected int
}{
{[]byte("a"), []byte("b"), -1},
{[]byte("b"), []byte("a"), 1},
{[]byte("a"), []byte("a"), 0},
{[]byte("A"), []byte("a"), 0},
{[]byte("a"), []byte("A"), 0},
{[]byte("A"), []byte("A"), 0},
{[]byte("a"), []byte("aa"), -1},
{[]byte("aa"), []byte("a"), 1},
{[]byte("aa"), []byte("aa"), 0},
{[]byte("aa"), []byte("ab"), -1},
{[]byte("ab"), []byte("aa"), 1},
{[]byte("ab"), []byte("ab"), 0},
} {
require.Equal(t, tc.expected, compareLower(tc.a, tc.b), "%s %s", tc.a, tc.b)
}
}

func BenchmarkMarshalCSV(b *testing.B) {
ts := prompb.TimeSeries{
Labels: []prompb.Label{
Expand Down Expand Up @@ -95,12 +117,13 @@ func BenchmarkMarshalCSV(b *testing.B) {
},
}

var buf bytes.Buffer
w := NewCSVWriter(&buf, nil)
buf := bytes.NewBuffer(make([]byte, 0, 64*1024))
w := NewCSVWriter(buf, []string{"region", "Hostname", "bar"})
b.ResetTimer()

for i := 0; i < b.N; i++ {
w.MarshalCSV(ts)
buf.Reset()
}
}

Expand Down
Loading