Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(influxd): update xxhash, avoid stringtoslicebyte in cache (#578)
Browse files Browse the repository at this point in the history
* fix(influxd): update xxhash, avoid stringtoslicebyte in cache

This commit does 3 things:

* it updates xxhash from v1 to v2; v2 includes a assembly arm version of
  Sum64
* it changes the cache storer to write with a string key instead of a
  byte slice. The cache only reads the key which WriteMulti already has
as a string so we can avoid a host of allocations when converting back
and forth from immutable strings to mutable byte slices. This includes
updating the cache ring and ring partition to write with a string key
* it updates the xxhash for finding the cache ring partition to use
Sum64String which uses unsafe pointers to directly use a string as a
byte slice since it only reads the string. Note: this now uses an
assembly version because of the v2 xxhash update. Go 1.22 included new
compiler ability to recognize calls of Method([]byte(myString)) and not
make a copy but from looking at the call sites, I'm not sure the
compiler would recognize it as the conversion to a byte slice was
happening several calls earlier.

That's what this change set does. If we are uncomfortable with any of
these, we can do fewer of them (for example, not upgrade xxhash; and/or
not use the specialized Sum64String, etc).

For the performance issue in maz-rr, I see converting string keys to
byte slices taking between 3-5% of cpu usage on both the primary and
secondary. So while this pr doesn't address directly the increased cpu
usage on the secondary, it makes cpu usage less on both which still
feels like a win. I believe these changes are easier to review that
switching to a byte slice pool that is likely needed in other places as
the compiler provides nearly all of the correctness checks we need (we
are relying also on xxhash v2 being correct).

* helps #550

* chore: fix tests/lint

* chore: don't use assembly version; should inline

This 2 line change causes xxhash to use a purego Sum64 implementation
which allows the compiler to see that Sum64 only read the byte slice
input which them means is can skip the string to byte slice allocation
and since it can skip that, it should inline all the calls to
getPartitionStringKey and Sum64 avoiding 1 call to Sum64String which
isn't inlined.

* chore: update ci build file

the ci build doesn't use the make file!!!

* chore: revert "chore: update ci build file"

This reverts commit 94be66fde03e0bbe18004aab25c0e19051406de2.

* chore: revert "chore: don't use assembly version; should inline"

This reverts commit 67d8d06c02e17e91ba643a2991e30a49308a5283.

(cherry picked from commit 1d334c679ca025645ed93518b7832ae676499cd2)
philjb authored and devanbenz committed Dec 5, 2024
1 parent 514e247 commit 9735329
Showing 12 changed files with 71 additions and 78 deletions.
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ require (
github.com/benbjohnson/clock v0.0.0-20161215174838-7dc76406b6d3
github.com/benbjohnson/tmpl v1.0.0
github.com/buger/jsonparser v1.1.1
github.com/cespare/xxhash v1.1.0
github.com/cespare/xxhash/v2 v2.3.0
github.com/davecgh/go-spew v1.1.1
github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8
github.com/dustin/go-humanize v1.0.1
@@ -130,7 +130,6 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bonitoo-io/go-sql-bigquery v0.3.4-1.4.0 // indirect
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/containerd/containerd v1.6.18 // indirect
github.com/danieljoos/wincred v1.2.1 // indirect
github.com/deepmap/oapi-codegen v1.6.0 // indirect
93 changes: 41 additions & 52 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/bloom/bloom.go
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ import (
"fmt"
"math"

"github.com/cespare/xxhash"
"github.com/cespare/xxhash/v2"
)

// Filter represents a bloom filter.
2 changes: 1 addition & 1 deletion pkg/estimator/hll/hll.go
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@ import (
"sort"
"unsafe"

"github.com/cespare/xxhash"
"github.com/cespare/xxhash/v2"
"github.com/influxdata/influxdb/v2/pkg/estimator"
)

2 changes: 1 addition & 1 deletion pkg/rhh/rhh.go
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ import (
"sort"
"time"

"github.com/cespare/xxhash"
"github.com/cespare/xxhash/v2"
"github.com/prometheus/client_golang/prometheus"
)

2 changes: 1 addition & 1 deletion task/backend/scheduler/treescheduler.go
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ import (
"time"

"github.com/benbjohnson/clock"
"github.com/cespare/xxhash"
"github.com/cespare/xxhash/v2"
"github.com/google/btree"
)

6 changes: 3 additions & 3 deletions tsdb/engine/tsm1/cache.go
Original file line number Diff line number Diff line change
@@ -147,7 +147,7 @@ func (e *entry) InfluxQLType() (influxql.DataType, error) {
// storer is the interface that descibes a cache's store.
type storer interface {
entry(key []byte) *entry // Get an entry by its key.
write(key []byte, values Values) (bool, error) // Write an entry to the store.
write(key string, values Values) (bool, error) // Write an entry to the store.
remove(key []byte) // Remove an entry from the store.
keys(sorted bool) [][]byte // Return an optionally sorted slice of entry keys.
apply(f func([]byte, *entry) error) error // Apply f to all entries in the store in parallel.
@@ -340,7 +340,7 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
// We'll optimistically set size here, and then decrement it for write errors.
c.increaseSize(addedSize)
for k, v := range values {
newKey, err := store.write([]byte(k), v)
newKey, err := store.write(k, v)
if err != nil {
// The write failed, hold onto the error and adjust the size delta.
werr = err
@@ -797,7 +797,7 @@ func valueType(v Value) byte {
type emptyStore struct{}

func (e emptyStore) entry(key []byte) *entry { return nil }
func (e emptyStore) write(key []byte, values Values) (bool, error) { return false, nil }
func (e emptyStore) write(key string, values Values) (bool, error) { return false, nil }
func (e emptyStore) remove(key []byte) {}
func (e emptyStore) keys(sorted bool) [][]byte { return nil }
func (e emptyStore) apply(f func([]byte, *entry) error) error { return nil }
9 changes: 4 additions & 5 deletions tsdb/engine/tsm1/cache_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package tsm1

import (
"bytes"
"errors"
"fmt"
"math"
@@ -80,8 +79,8 @@ func TestCache_WriteMulti_Stats(t *testing.T) {
c.init()
c.store = ms

ms.writef = func(key []byte, v Values) (bool, error) {
if bytes.Equal(key, []byte("foo")) {
ms.writef = func(key string, v Values) (bool, error) {
if key == "foo" {
return false, errors.New("write failed")
}
return true, nil
@@ -811,7 +810,7 @@ func mustMarshalEntry(entry WALEntry) (WalEntryType, []byte) {
// Cache's storer implementation.
type TestStore struct {
entryf func(key []byte) *entry
writef func(key []byte, values Values) (bool, error)
writef func(key string, values Values) (bool, error)
removef func(key []byte)
keysf func(sorted bool) [][]byte
applyf func(f func([]byte, *entry) error) error
@@ -823,7 +822,7 @@ type TestStore struct {

func NewTestStore() *TestStore { return &TestStore{} }
func (s *TestStore) entry(key []byte) *entry { return s.entryf(key) }
func (s *TestStore) write(key []byte, values Values) (bool, error) { return s.writef(key, values) }
func (s *TestStore) write(key string, values Values) (bool, error) { return s.writef(key, values) }
func (s *TestStore) remove(key []byte) { s.removef(key) }
func (s *TestStore) keys(sorted bool) [][]byte { return s.keysf(sorted) }
func (s *TestStore) apply(f func([]byte, *entry) error) error { return s.applyf(f) }
20 changes: 13 additions & 7 deletions tsdb/engine/tsm1/ring.go
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ import (
"fmt"
"sync"

"github.com/cespare/xxhash"
"github.com/cespare/xxhash/v2"
"github.com/influxdata/influxdb/v2/pkg/bytesutil"
)

@@ -80,6 +80,12 @@ func (r *ring) getPartition(key []byte) *partition {
return r.partitions[int(xxhash.Sum64(key)%uint64(len(r.partitions)))]
}

// getPartition retrieves the hash ring partition associated with the provided
// key, as a string, which can be faster if you already have a string as this is read only
func (r *ring) getPartitionStringKey(key string) *partition {
return r.partitions[int(xxhash.Sum64String(key)%uint64(len(r.partitions)))]
}

// entry returns the entry for the given key.
// entry is safe for use by multiple goroutines.
func (r *ring) entry(key []byte) *entry {
@@ -89,8 +95,8 @@ func (r *ring) entry(key []byte) *entry {
// write writes values to the entry in the ring's partition associated with key.
// If no entry exists for the key then one will be created.
// write is safe for use by multiple goroutines.
func (r *ring) write(key []byte, values Values) (bool, error) {
return r.getPartition(key).write(key, values)
func (r *ring) write(key string, values Values) (bool, error) {
return r.getPartitionStringKey(key).write(key, values)
}

// remove deletes the entry for the given key.
@@ -218,9 +224,9 @@ func (p *partition) entry(key []byte) *entry {
// write writes the values to the entry in the partition, creating the entry
// if it does not exist.
// write is safe for use by multiple goroutines.
func (p *partition) write(key []byte, values Values) (bool, error) {
func (p *partition) write(key string, values Values) (bool, error) {
p.mu.RLock()
e := p.store[string(key)]
e := p.store[key]
p.mu.RUnlock()
if e != nil {
// Hot path.
@@ -231,7 +237,7 @@ func (p *partition) write(key []byte, values Values) (bool, error) {
defer p.mu.Unlock()

// Check again.
if e = p.store[string(key)]; e != nil {
if e = p.store[key]; e != nil {
return false, e.add(values)
}

@@ -241,7 +247,7 @@ func (p *partition) write(key []byte, values Values) (bool, error) {
return false, err
}

p.store[string(key)] = e
p.store[key] = e
return true, nil
}

6 changes: 3 additions & 3 deletions tsdb/engine/tsm1/ring_test.go
Original file line number Diff line number Diff line change
@@ -51,7 +51,7 @@ var strSliceRes [][]byte
func benchmarkRingkeys(b *testing.B, r *ring, keys int) {
// Add some keys
for i := 0; i < keys; i++ {
r.write([]byte(fmt.Sprintf("cpu,host=server-%d value=1", i)), Values([]Value{
r.write(fmt.Sprintf("cpu,host=server-%d value=1", i), Values([]Value{
IntegerValue{
unixnano: 1,
value: int64(i),
@@ -77,7 +77,7 @@ func benchmarkRingGetPartition(b *testing.B, r *ring, keys int) {
// Add some keys
for i := 0; i < keys; i++ {
vals[i] = []byte(fmt.Sprintf("cpu,host=server-%d field1=value1,field2=value2,field4=value4,field5=value5,field6=value6,field7=value7,field8=value1,field9=value2,field10=value4,field11=value5,field12=value6,field13=value7", i))
r.write([]byte(fmt.Sprintf("cpu,host=server-%d value=1", i)), Values([]Value{
r.write(fmt.Sprintf("cpu,host=server-%d value=1", i), Values([]Value{
IntegerValue{
unixnano: 1,
value: int64(i),
@@ -109,7 +109,7 @@ func benchmarkRingWrite(b *testing.B, r *ring, n int) {
go func() {
defer wg.Done()
for j := 0; j < n; j++ {
if _, err := r.write([]byte(fmt.Sprintf("cpu,host=server-%d value=1", j)), Values{}); err != nil {
if _, err := r.write(fmt.Sprintf("cpu,host=server-%d value=1", j), Values{}); err != nil {
errC <- err
}
}
2 changes: 1 addition & 1 deletion tsdb/index/tsi1/index.go
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@ import (
"time"
"unsafe"

"github.com/cespare/xxhash"
"github.com/cespare/xxhash/v2"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/estimator"
"github.com/influxdata/influxdb/v2/pkg/estimator/hll"
2 changes: 1 addition & 1 deletion tsdb/series_file.go
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ import (
"sort"
"sync"

"github.com/cespare/xxhash"
"github.com/cespare/xxhash/v2"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/binaryutil"
"github.com/influxdata/influxdb/v2/pkg/limiter"

0 comments on commit 9735329

Please sign in to comment.