diff --git a/array/array.go b/array/array.go index 44c689a788..94e2218054 100644 --- a/array/array.go +++ b/array/array.go @@ -2,9 +2,11 @@ package array import ( "strconv" + "sync/atomic" "github.com/apache/arrow/go/v7/arrow" "github.com/apache/arrow/go/v7/arrow/array" + arrowmem "github.com/apache/arrow/go/v7/arrow/memory" "github.com/influxdata/flux/codes" "github.com/influxdata/flux/internal/errors" "github.com/influxdata/flux/memory" @@ -102,9 +104,9 @@ type Builder interface { } type String struct { - value string length int data *array.Binary + value *stringValue } // NewStringFromBinaryArray creates an instance of String from @@ -162,12 +164,16 @@ func (a *String) Len() int { func (a *String) Retain() { if a.data != nil { a.data.Retain() + return } + a.value.Retain() } func (a *String) Release() { if a.data != nil { a.data.Release() + return } + a.value.Release() } func (a *String) Slice(i, j int) Array { if a.data != nil { @@ -177,27 +183,77 @@ func (a *String) Slice(i, j int) Array { data: array.NewBinaryData(data), } } + a.value.Retain() return &String{ value: a.value, length: j - i, } } -func (a *String) Value(i int) string { + +// ValueBytes returns a byte slice containing the value of this string +// at index i. This slice points to the contents of the data buffer and +// is only valid for the lifetime of the array. +func (a *String) ValueBytes(i int) []byte { if a.data != nil { - return a.data.ValueString(i) + return a.data.Value(i) } - return a.value + return a.value.Bytes() +} + +// Value returns a string copy of the value stored at index i. The +// returned value will outlive the array and is safe to use like any +// other go string. The memory backing the string will be allocated by +// the runtime, rather than any provided allocator. +func (a *String) Value(i int) string { + return string(a.ValueBytes(i)) } func (a *String) ValueLen(i int) int { if a.data != nil { return a.data.ValueLen(i) } - return len(a.value) + return a.value.Len() } func (a *String) IsConstant() bool { return a.data == nil } +type stringValue struct { + data []byte + + mem arrowmem.Allocator + rc int64 +} + +func (v *stringValue) Retain() { + if v == nil { + return + } + atomic.AddInt64(&v.rc, 1) +} + +func (v *stringValue) Release() { + if v == nil { + return + } + if atomic.AddInt64(&v.rc, -1) == 0 { + v.mem.Free(v.data) + } +} + +func (v *stringValue) Bytes() []byte { + if v == nil { + return nil + } + return v.data +} + +func (v *stringValue) Len() int { + if v == nil { + return 0 + } + return len(v.data) +} + type sliceable interface { Slice(i, j int) Array } diff --git a/array/array_test.go b/array/array_test.go index ddc34f8222..9f1ca21a2f 100644 --- a/array/array_test.go +++ b/array/array_test.go @@ -25,7 +25,7 @@ func TestString(t *testing.T) { b.Append("a") } }, - sz: 0, + sz: 1, want: []interface{}{ "a", "a", "a", "a", "a", "a", "a", "a", "a", "a", @@ -165,7 +165,7 @@ func TestStringBuilder_NewArray(t *testing.T) { } arr := b.NewArray() - mem.AssertSize(t, 0) + assert.Equal(t, 1, mem.CurrentAlloc(), "unexpected memory allocation.") arr.Release() mem.AssertSize(t, 0) diff --git a/array/binary.gen.go b/array/binary.gen.go index 83cd403b53..a2d14ae6a2 100644 --- a/array/binary.gen.go +++ b/array/binary.gen.go @@ -189,63 +189,6 @@ func FloatAddRConst(l *Float, r float64, mem memory.Allocator) (*Float, error) { return a, nil } -func StringAdd(l, r *String, mem memory.Allocator) (*String, error) { - n := l.Len() - if n != r.Len() { - return nil, errors.Newf(codes.Invalid, "vectors must have equal length for binary operations") - } - b := NewStringBuilder(mem) - b.Resize(n) - for i := 0; i < n; i++ { - if l.IsValid(i) && r.IsValid(i) { - - b.Append(l.Value(i) + r.Value(i)) - - } else { - b.AppendNull() - } - } - a := b.NewStringArray() - b.Release() - return a, nil -} - -func StringAddLConst(l string, r *String, mem memory.Allocator) (*String, error) { - n := r.Len() - b := NewStringBuilder(mem) - b.Resize(n) - for i := 0; i < n; i++ { - if r.IsValid(i) { - - b.Append(l + r.Value(i)) - - } else { - b.AppendNull() - } - } - a := b.NewStringArray() - b.Release() - return a, nil -} - -func StringAddRConst(l *String, r string, mem memory.Allocator) (*String, error) { - n := l.Len() - b := NewStringBuilder(mem) - b.Resize(n) - for i := 0; i < n; i++ { - if l.IsValid(i) { - - b.Append(l.Value(i) + r) - - } else { - b.AppendNull() - } - } - a := b.NewStringArray() - b.Release() - return a, nil -} - func IntSub(l, r *Int, mem memory.Allocator) (*Int, error) { n := l.Len() if n != r.Len() { diff --git a/array/binary.go b/array/binary.go index 812c116167..9ae9cde9a8 100644 --- a/array/binary.go +++ b/array/binary.go @@ -145,3 +145,70 @@ func OrConst(fixed *bool, arr *Boolean, mem memory.Allocator) (*Boolean, error) b.Release() return a, nil } + +func StringAdd(l, r *String, mem memory.Allocator) (*String, error) { + n := l.Len() + if n != r.Len() { + return nil, errors.Newf(codes.Invalid, "vectors must have equal length for binary operations") + } + b := NewStringBuilder(mem) + b.Resize(n) + for i := 0; i < n; i++ { + if l.IsValid(i) && r.IsValid(i) { + lb := l.ValueBytes(i) + rb := r.ValueBytes(i) + buf := make([]byte, len(lb)+len(rb)) + copy(buf, lb) + copy(buf[len(lb):], rb) + b.AppendBytes(buf) + + } else { + b.AppendNull() + } + } + a := b.NewStringArray() + b.Release() + return a, nil +} + +func StringAddLConst(l string, r *String, mem memory.Allocator) (*String, error) { + n := r.Len() + b := NewStringBuilder(mem) + b.Resize(n) + for i := 0; i < n; i++ { + if r.IsValid(i) { + rb := r.ValueBytes(i) + buf := make([]byte, len(l)+len(rb)) + copy(buf, l) + copy(buf[len(l):], rb) + b.AppendBytes(buf) + + } else { + b.AppendNull() + } + } + a := b.NewStringArray() + b.Release() + return a, nil +} + +func StringAddRConst(l *String, r string, mem memory.Allocator) (*String, error) { + n := l.Len() + b := NewStringBuilder(mem) + b.Resize(n) + for i := 0; i < n; i++ { + if l.IsValid(i) { + lb := l.ValueBytes(i) + buf := make([]byte, len(lb)+len(r)) + copy(buf, lb) + copy(buf[len(lb):], r) + b.AppendBytes(buf) + + } else { + b.AppendNull() + } + } + a := b.NewStringArray() + b.Release() + return a, nil +} diff --git a/array/binary.tmpldata b/array/binary.tmpldata index e142023353..29ca21219b 100644 --- a/array/binary.tmpldata +++ b/array/binary.tmpldata @@ -3,7 +3,7 @@ { "Name": "Add", "Op": "+", - "Types": ["Int", "Uint", "Float", "String"] + "Types": ["Int", "Uint", "Float"] }, { "Name": "Sub", diff --git a/array/builder.go b/array/builder.go index 42addd81fb..a495bc6e2b 100644 --- a/array/builder.go +++ b/array/builder.go @@ -1,6 +1,9 @@ package array import ( + "bytes" + "sync/atomic" + "github.com/apache/arrow/go/v7/arrow/array" "github.com/apache/arrow/go/v7/arrow/bitutil" "github.com/apache/arrow/go/v7/arrow/memory" @@ -9,11 +12,11 @@ import ( type StringBuilder struct { builder *array.BinaryBuilder mem memory.Allocator - value string + value *stringValue length int capacity int dataCapacity int - refCount int + refCount int64 } func NewStringBuilder(mem memory.Allocator) *StringBuilder { @@ -23,52 +26,49 @@ func NewStringBuilder(mem memory.Allocator) *StringBuilder { } } func (b *StringBuilder) init() { - if b.builder == nil { - if b.refCount <= 0 { - return - } - - builder := array.NewBinaryBuilder(b.mem, StringType) - if capacity := b.Cap(); capacity > 0 { - builder.Resize(capacity) - dataCapacity := len(b.value) * capacity - if dataCapacity < b.dataCapacity { - dataCapacity = b.dataCapacity - } - builder.ReserveData(dataCapacity) - } - if b.length > 0 { - for i := 0; i < b.length; i++ { - builder.AppendString(b.value) - } + if b.builder != nil { + return + } + builder := array.NewBinaryBuilder(b.mem, StringType) + if capacity := b.Cap(); capacity > 0 { + builder.Resize(capacity) + dataCapacity := b.value.Len() * capacity + if dataCapacity < b.dataCapacity { + dataCapacity = b.dataCapacity } - - for i := 1; i < b.refCount; i++ { - builder.Retain() + builder.ReserveData(dataCapacity) + } + if b.length > 0 { + for i := 0; i < b.length; i++ { + builder.Append(b.value.Bytes()) } - b.builder = builder } + b.builder = builder + b.value.Release() + b.value = nil } func (b *StringBuilder) reset() { b.builder = nil b.length = 0 + b.value = nil b.capacity = 0 b.dataCapacity = 0 - b.value = "" } func (b *StringBuilder) Retain() { - if b.builder != nil { - b.builder.Retain() - return - } - b.refCount++ + atomic.AddInt64(&b.refCount, 1) } func (b *StringBuilder) Release() { + if atomic.AddInt64(&b.refCount, -1) != 0 { + return + } if b.builder != nil { b.builder.Release() - return + b.builder = nil + } + if b.value != nil { + b.value.Release() + b.value = nil } - b.refCount-- } func (b *StringBuilder) Len() int { if b.builder != nil { @@ -93,15 +93,47 @@ func (b *StringBuilder) NullN() int { } return 0 } + +func (b *StringBuilder) AppendBytes(buf []byte) { + if b.builder == nil && (b.length == 0 || bytes.Equal(buf, b.value.Bytes())) { + if b.value == nil { + b.initValue(len(buf)) + copy(b.value.data, buf) + } + b.length++ + return + } + if b.value != nil { + b.init() + } + b.builder.Append(buf) +} + +// Append appends a string to the array being built. The input string +// will always be copied. func (b *StringBuilder) Append(v string) { - if b.builder == nil && (b.length == 0 || v == b.value) { - b.value = v + if b.builder == nil && (b.length == 0 || v == string(b.value.Bytes())) { + if b.value == nil { + b.initValue(len(v)) + copy(b.value.data, v) + } b.length++ return } - b.init() + if b.value != nil { + b.init() + } b.builder.AppendString(v) } + +func (b *StringBuilder) initValue(size int) { + b.value = &stringValue{ + data: b.mem.Allocate(size), + mem: b.mem, + rc: 1, + } +} + func (b *StringBuilder) AppendValues(v []string, valid []bool) { for i, val := range v { if len(valid) != 0 && valid[i] { @@ -166,7 +198,7 @@ func (b *StringBuilder) CopyValidValues(values *String, nullCheckArray Array) { nullOffset := nullCheckArray.Data().Offset() for i := 0; i < values.Len(); i++ { if isValid(nullBitMapBytes, nullOffset, i) { - b.Append(values.Value(i)) + b.AppendBytes(values.ValueBytes(i)) } } } diff --git a/array/repeat.go b/array/repeat.go index 61b9fe1828..526e560bef 100644 --- a/array/repeat.go +++ b/array/repeat.go @@ -3,8 +3,14 @@ package array import "github.com/apache/arrow/go/v7/arrow/memory" func StringRepeat(v string, n int, mem memory.Allocator) *String { + sv := stringValue{ + data: mem.Allocate(len(v)), + mem: mem, + rc: 1, + } + copy(sv.data, v) return &String{ - value: v, + value: &sv, length: n, } } diff --git a/array/repeat_test.go b/array/repeat_test.go index a6abf64e99..627f9a37e6 100644 --- a/array/repeat_test.go +++ b/array/repeat_test.go @@ -39,7 +39,7 @@ func TestRepeat(t *testing.T) { name: "String", t: flux.TString, v: values.NewString("a"), - sz: 0, // optimized away + sz: 1, // optimized to a single instance }, { name: "Boolean", diff --git a/execute/executetest/allocator.go b/execute/executetest/allocator.go index 93c15d76fb..64161cc876 100644 --- a/execute/executetest/allocator.go +++ b/execute/executetest/allocator.go @@ -1,7 +1,35 @@ package executetest import ( + arrowmem "github.com/apache/arrow/go/v7/arrow/memory" + "github.com/influxdata/flux/memory" ) -var UnlimitedAllocator = &memory.ResourceAllocator{} +var UnlimitedAllocator = &memory.ResourceAllocator{ + Allocator: Allocator{}, +} + +// Allocator is an allocator for use in test. When a buffer is freed the +// contents are overwritten with a predictable pattern to help detect +// use-after-free scenarios. +type Allocator struct{} + +func (Allocator) Allocate(size int) []byte { + return arrowmem.DefaultAllocator.Allocate(size) +} + +func (a Allocator) Reallocate(size int, b []byte) []byte { + b1 := a.Allocate(size) + copy(b1, b) + a.Free(b) + return b1 +} + +func (a Allocator) Free(b []byte) { + var pattern = [...]byte{0x00, 0x33, 0xcc, 0xff} + for i := range b { + b[i] = pattern[i%len(pattern)] + } + arrowmem.DefaultAllocator.Free(b) +} diff --git a/execute/executetest/transformation.go b/execute/executetest/transformation.go index e189c75547..374edbf10a 100644 --- a/execute/executetest/transformation.go +++ b/execute/executetest/transformation.go @@ -142,7 +142,7 @@ func ProcessTestHelper2( } }() - alloc := &memory.ResourceAllocator{} + alloc := UnlimitedAllocator store := NewDataStore() tx, d := create(RandomDatasetID(), alloc) d.SetTriggerSpec(plan.DefaultTriggerSpec) diff --git a/execute/format.go b/execute/format.go index de527297b5..08fe1b36f2 100644 --- a/execute/format.go +++ b/execute/format.go @@ -251,7 +251,7 @@ func (f *Formatter) valueBuf(i, j int, typ flux.ColType, cr flux.ColReader) []by } case flux.TString: if cr.Strings(j).IsValid(i) { - buf = []byte(cr.Strings(j).Value(i)) + buf = cr.Strings(j).ValueBytes(i) } case flux.TTime: if cr.Times(j).IsValid(i) { diff --git a/stdlib/experimental/diff.go b/stdlib/experimental/diff.go index b236f628d7..2a4e44e50f 100644 --- a/stdlib/experimental/diff.go +++ b/stdlib/experimental/diff.go @@ -381,7 +381,7 @@ func (d *diffSchema) appendRow(builders []array.Builder, which, i int) { case *array.UintBuilder: b.Append(arr.(*array.Uint).Value(i)) case *array.StringBuilder: - b.Append(arr.(*array.String).Value(i)) + b.AppendBytes(arr.(*array.String).ValueBytes(i)) case *array.BooleanBuilder: b.Append(arr.(*array.Boolean).Value(i)) default: diff --git a/stdlib/universe/group.go b/stdlib/universe/group.go index 6d24cfdaa7..bd99067b9c 100644 --- a/stdlib/universe/group.go +++ b/stdlib/universe/group.go @@ -427,7 +427,7 @@ func (t *groupTransformation) appendValueFromRow(b array.Builder, cr flux.ColRea if vs.IsNull(i) { b.AppendNull() } else { - b.Append(vs.Value(i)) + b.AppendBytes(vs.ValueBytes(i)) } case flux.TBool: b := b.(*array.BooleanBuilder) diff --git a/stdlib/universe/moving_average.go b/stdlib/universe/moving_average.go index eafaa47d50..2507959389 100644 --- a/stdlib/universe/moving_average.go +++ b/stdlib/universe/moving_average.go @@ -373,7 +373,7 @@ func (m *movingAverageState) forceValue() error { b.Append(arr.Value(arr.Len() - 1)) case *array.StringBuilder: arr := arr.(*array.String) - b.Append(arr.Value(arr.Len() - 1)) + b.AppendBytes(arr.ValueBytes(arr.Len() - 1)) case *array.BooleanBuilder: arr := arr.(*array.Boolean) b.Append(arr.Value(arr.Len() - 1))