Skip to content

Commit

Permalink
fix(influxd): update xxhash, avoid stringtoslicebyte in cache (#578) (#…
Browse files Browse the repository at this point in the history
…25622)

* fix(influxd): update xxhash, avoid stringtoslicebyte in cache (#578)

* fix(influxd): update xxhash, avoid stringtoslicebyte in cache

This commit does 3 things:

* it updates xxhash from v1 to v2; v2 includes a assembly arm version of
  Sum64
* it changes the cache storer to write with a string key instead of a
  byte slice. The cache only reads the key which WriteMulti already has
as a string so we can avoid a host of allocations when converting back
and forth from immutable strings to mutable byte slices. This includes
updating the cache ring and ring partition to write with a string key
* it updates the xxhash for finding the cache ring partition to use
Sum64String which uses unsafe pointers to directly use a string as a
byte slice since it only reads the string. Note: this now uses an
assembly version because of the v2 xxhash update. Go 1.22 included new
compiler ability to recognize calls of Method([]byte(myString)) and not
make a copy but from looking at the call sites, I'm not sure the
compiler would recognize it as the conversion to a byte slice was
happening several calls earlier.

That's what this change set does. If we are uncomfortable with any of
these, we can do fewer of them (for example, not upgrade xxhash; and/or
not use the specialized Sum64String, etc).

For the performance issue in maz-rr, I see converting string keys to
byte slices taking between 3-5% of cpu usage on both the primary and
secondary. So while this pr doesn't address directly the increased cpu
usage on the secondary, it makes cpu usage less on both which still
feels like a win. I believe these changes are easier to review that
switching to a byte slice pool that is likely needed in other places as
the compiler provides nearly all of the correctness checks we need (we
are relying also on xxhash v2 being correct).

* helps #550

* chore: fix tests/lint

* chore: don't use assembly version; should inline

This 2 line change causes xxhash to use a purego Sum64 implementation
which allows the compiler to see that Sum64 only read the byte slice
input which them means is can skip the string to byte slice allocation
and since it can skip that, it should inline all the calls to
getPartitionStringKey and Sum64 avoiding 1 call to Sum64String which
isn't inlined.

* chore: update ci build file

the ci build doesn't use the make file!!!

* chore: revert "chore: update ci build file"

This reverts commit 94be66fde03e0bbe18004aab25c0e19051406de2.

* chore: revert "chore: don't use assembly version; should inline"

This reverts commit 67d8d06c02e17e91ba643a2991e30a49308a5283.

(cherry picked from commit 1d334c679ca025645ed93518b7832ae676499cd2)

* feat: need to update go sum

---------

Co-authored-by: Phil Bracikowski <13472206+philjb@users.noreply.github.com>
  • Loading branch information
devanbenz and philjb authored Dec 5, 2024
1 parent 514e247 commit 06ab224
Show file tree
Hide file tree
Showing 12 changed files with 30 additions and 29 deletions.
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/benbjohnson/clock v0.0.0-20161215174838-7dc76406b6d3
github.com/benbjohnson/tmpl v1.0.0
github.com/buger/jsonparser v1.1.1
github.com/cespare/xxhash v1.1.0
github.com/cespare/xxhash/v2 v2.3.0
github.com/davecgh/go-spew v1.1.1
github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8
github.com/dustin/go-humanize v1.0.1
Expand Down Expand Up @@ -130,7 +130,6 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bonitoo-io/go-sql-bigquery v0.3.4-1.4.0 // indirect
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/containerd/containerd v1.6.18 // indirect
github.com/danieljoos/wincred v1.2.1 // indirect
github.com/deepmap/oapi-codegen v1.6.0 // indirect
Expand Down
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ github.com/NYTimes/gziphandler v1.0.1 h1:iLrQrdwjDd52kHDA5op2UBJFjmOb9g+7scBan4R
github.com/NYTimes/gziphandler v1.0.1/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2 h1:+vx7roKuyA63nhn5WAunQHLTznkw5W8b1Xc0dNjp83s=
github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2/go.mod h1:HBCaDeC1lPdgDeDbhX8XFpy1jqjK0IBG8W5K+xYqA0w=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/RoaringBitmap/roaring v0.4.16 h1:NholfewybRLOwACgfqfzn/N5xa6keKNs4fP00t0cwLo=
github.com/RoaringBitmap/roaring v0.4.16/go.mod h1:8khRDP4HmeXns4xIj9oGrKSz7XTQiJx2zgh7AcNke4w=
Expand Down Expand Up @@ -225,7 +224,6 @@ github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QH
github.com/cenkalti/backoff/v4 v4.2.0 h1:HN5dHm3WBOgndBH6E8V0q2jIYIR3s9yglV8k/+MN3u4=
github.com/cenkalti/backoff/v4 v4.2.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
Expand Down Expand Up @@ -895,7 +893,6 @@ github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIK
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc=
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloom/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"fmt"
"math"

"github.com/cespare/xxhash"
"github.com/cespare/xxhash/v2"
)

// Filter represents a bloom filter.
Expand Down
2 changes: 1 addition & 1 deletion pkg/estimator/hll/hll.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"sort"
"unsafe"

"github.com/cespare/xxhash"
"github.com/cespare/xxhash/v2"
"github.com/influxdata/influxdb/v2/pkg/estimator"
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/rhh/rhh.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"sort"
"time"

"github.com/cespare/xxhash"
"github.com/cespare/xxhash/v2"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down
2 changes: 1 addition & 1 deletion task/backend/scheduler/treescheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"time"

"github.com/benbjohnson/clock"
"github.com/cespare/xxhash"
"github.com/cespare/xxhash/v2"
"github.com/google/btree"
)

Expand Down
6 changes: 3 additions & 3 deletions tsdb/engine/tsm1/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (e *entry) InfluxQLType() (influxql.DataType, error) {
// storer is the interface that descibes a cache's store.
type storer interface {
entry(key []byte) *entry // Get an entry by its key.
write(key []byte, values Values) (bool, error) // Write an entry to the store.
write(key string, values Values) (bool, error) // Write an entry to the store.
remove(key []byte) // Remove an entry from the store.
keys(sorted bool) [][]byte // Return an optionally sorted slice of entry keys.
apply(f func([]byte, *entry) error) error // Apply f to all entries in the store in parallel.
Expand Down Expand Up @@ -340,7 +340,7 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
// We'll optimistically set size here, and then decrement it for write errors.
c.increaseSize(addedSize)
for k, v := range values {
newKey, err := store.write([]byte(k), v)
newKey, err := store.write(k, v)
if err != nil {
// The write failed, hold onto the error and adjust the size delta.
werr = err
Expand Down Expand Up @@ -797,7 +797,7 @@ func valueType(v Value) byte {
type emptyStore struct{}

func (e emptyStore) entry(key []byte) *entry { return nil }
func (e emptyStore) write(key []byte, values Values) (bool, error) { return false, nil }
func (e emptyStore) write(key string, values Values) (bool, error) { return false, nil }
func (e emptyStore) remove(key []byte) {}
func (e emptyStore) keys(sorted bool) [][]byte { return nil }
func (e emptyStore) apply(f func([]byte, *entry) error) error { return nil }
Expand Down
9 changes: 4 additions & 5 deletions tsdb/engine/tsm1/cache_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package tsm1

import (
"bytes"
"errors"
"fmt"
"math"
Expand Down Expand Up @@ -80,8 +79,8 @@ func TestCache_WriteMulti_Stats(t *testing.T) {
c.init()
c.store = ms

ms.writef = func(key []byte, v Values) (bool, error) {
if bytes.Equal(key, []byte("foo")) {
ms.writef = func(key string, v Values) (bool, error) {
if key == "foo" {
return false, errors.New("write failed")
}
return true, nil
Expand Down Expand Up @@ -811,7 +810,7 @@ func mustMarshalEntry(entry WALEntry) (WalEntryType, []byte) {
// Cache's storer implementation.
type TestStore struct {
entryf func(key []byte) *entry
writef func(key []byte, values Values) (bool, error)
writef func(key string, values Values) (bool, error)
removef func(key []byte)
keysf func(sorted bool) [][]byte
applyf func(f func([]byte, *entry) error) error
Expand All @@ -823,7 +822,7 @@ type TestStore struct {

func NewTestStore() *TestStore { return &TestStore{} }
func (s *TestStore) entry(key []byte) *entry { return s.entryf(key) }
func (s *TestStore) write(key []byte, values Values) (bool, error) { return s.writef(key, values) }
func (s *TestStore) write(key string, values Values) (bool, error) { return s.writef(key, values) }
func (s *TestStore) remove(key []byte) { s.removef(key) }
func (s *TestStore) keys(sorted bool) [][]byte { return s.keysf(sorted) }
func (s *TestStore) apply(f func([]byte, *entry) error) error { return s.applyf(f) }
Expand Down
20 changes: 13 additions & 7 deletions tsdb/engine/tsm1/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"sync"

"github.com/cespare/xxhash"
"github.com/cespare/xxhash/v2"
"github.com/influxdata/influxdb/v2/pkg/bytesutil"
)

Expand Down Expand Up @@ -80,6 +80,12 @@ func (r *ring) getPartition(key []byte) *partition {
return r.partitions[int(xxhash.Sum64(key)%uint64(len(r.partitions)))]
}

// getPartition retrieves the hash ring partition associated with the provided
// key, as a string, which can be faster if you already have a string as this is read only
func (r *ring) getPartitionStringKey(key string) *partition {
return r.partitions[int(xxhash.Sum64String(key)%uint64(len(r.partitions)))]
}

// entry returns the entry for the given key.
// entry is safe for use by multiple goroutines.
func (r *ring) entry(key []byte) *entry {
Expand All @@ -89,8 +95,8 @@ func (r *ring) entry(key []byte) *entry {
// write writes values to the entry in the ring's partition associated with key.
// If no entry exists for the key then one will be created.
// write is safe for use by multiple goroutines.
func (r *ring) write(key []byte, values Values) (bool, error) {
return r.getPartition(key).write(key, values)
func (r *ring) write(key string, values Values) (bool, error) {
return r.getPartitionStringKey(key).write(key, values)
}

// remove deletes the entry for the given key.
Expand Down Expand Up @@ -218,9 +224,9 @@ func (p *partition) entry(key []byte) *entry {
// write writes the values to the entry in the partition, creating the entry
// if it does not exist.
// write is safe for use by multiple goroutines.
func (p *partition) write(key []byte, values Values) (bool, error) {
func (p *partition) write(key string, values Values) (bool, error) {
p.mu.RLock()
e := p.store[string(key)]
e := p.store[key]
p.mu.RUnlock()
if e != nil {
// Hot path.
Expand All @@ -231,7 +237,7 @@ func (p *partition) write(key []byte, values Values) (bool, error) {
defer p.mu.Unlock()

// Check again.
if e = p.store[string(key)]; e != nil {
if e = p.store[key]; e != nil {
return false, e.add(values)
}

Expand All @@ -241,7 +247,7 @@ func (p *partition) write(key []byte, values Values) (bool, error) {
return false, err
}

p.store[string(key)] = e
p.store[key] = e
return true, nil
}

Expand Down
6 changes: 3 additions & 3 deletions tsdb/engine/tsm1/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var strSliceRes [][]byte
func benchmarkRingkeys(b *testing.B, r *ring, keys int) {
// Add some keys
for i := 0; i < keys; i++ {
r.write([]byte(fmt.Sprintf("cpu,host=server-%d value=1", i)), Values([]Value{
r.write(fmt.Sprintf("cpu,host=server-%d value=1", i), Values([]Value{
IntegerValue{
unixnano: 1,
value: int64(i),
Expand All @@ -77,7 +77,7 @@ func benchmarkRingGetPartition(b *testing.B, r *ring, keys int) {
// Add some keys
for i := 0; i < keys; i++ {
vals[i] = []byte(fmt.Sprintf("cpu,host=server-%d field1=value1,field2=value2,field4=value4,field5=value5,field6=value6,field7=value7,field8=value1,field9=value2,field10=value4,field11=value5,field12=value6,field13=value7", i))
r.write([]byte(fmt.Sprintf("cpu,host=server-%d value=1", i)), Values([]Value{
r.write(fmt.Sprintf("cpu,host=server-%d value=1", i), Values([]Value{
IntegerValue{
unixnano: 1,
value: int64(i),
Expand Down Expand Up @@ -109,7 +109,7 @@ func benchmarkRingWrite(b *testing.B, r *ring, n int) {
go func() {
defer wg.Done()
for j := 0; j < n; j++ {
if _, err := r.write([]byte(fmt.Sprintf("cpu,host=server-%d value=1", j)), Values{}); err != nil {
if _, err := r.write(fmt.Sprintf("cpu,host=server-%d value=1", j), Values{}); err != nil {
errC <- err
}
}
Expand Down
2 changes: 1 addition & 1 deletion tsdb/index/tsi1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"time"
"unsafe"

"github.com/cespare/xxhash"
"github.com/cespare/xxhash/v2"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/estimator"
"github.com/influxdata/influxdb/v2/pkg/estimator/hll"
Expand Down
2 changes: 1 addition & 1 deletion tsdb/series_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"sort"
"sync"

"github.com/cespare/xxhash"
"github.com/cespare/xxhash/v2"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/binaryutil"
"github.com/influxdata/influxdb/v2/pkg/limiter"
Expand Down

0 comments on commit 06ab224

Please sign in to comment.