diff --git a/execute/allocator.go b/execute/allocator.go index d176875a55..45f60b1d90 100644 --- a/execute/allocator.go +++ b/execute/allocator.go @@ -1,8 +1,6 @@ package execute import ( - arrowmem "github.com/apache/arrow/go/v7/arrow/memory" - "github.com/influxdata/flux/memory" ) @@ -158,14 +156,17 @@ func (a *Allocator) GrowFloats(slice []float64, n int) []float64 { return s } -// Strings makes a slice of String values. -func (a *Allocator) Strings(l, c int) []String { +// Strings makes a slice of string values. +// Only the string headers are accounted for. +func (a *Allocator) Strings(l, c int) []string { a.account(c, stringSize) - return make([]String, l, c) + return make([]string, l, c) } -// AppendStrings appends Strings to a slice. -func (a *Allocator) AppendStrings(slice []String, vs ...String) []String { +// AppendStrings appends strings to a slice. +// Only the string headers are accounted for. +func (a *Allocator) AppendStrings(slice []string, vs ...string) []string { + // TODO(nathanielc): Account for actual size of strings if cap(slice)-len(slice) >= len(vs) { return append(slice, vs...) } @@ -175,14 +176,14 @@ func (a *Allocator) AppendStrings(slice []String, vs ...String) []String { return s } -func (a *Allocator) GrowStrings(slice []String, n int) []String { +func (a *Allocator) GrowStrings(slice []string, n int) []string { newCap := len(slice) + n if newCap < cap(slice) { return slice[:newCap] } // grow capacity same way as built-in append newCap = newCap*3/2 + 1 - s := make([]String, len(slice)+n, newCap) + s := make([]string, len(slice)+n, newCap) copy(s, slice) diff := cap(s) - cap(slice) a.account(diff, stringSize) @@ -219,13 +220,3 @@ func (a *Allocator) GrowTimes(slice []Time, n int) []Time { a.account(diff, timeSize) return s } - -// String represents a string stored in some backing byte slice. -type String struct { - offset int - len int -} - -func (s String) Bytes(buf *arrowmem.Buffer) []byte { - return buf.Bytes()[s.offset : s.offset+s.len] -} diff --git a/execute/table.go b/execute/table.go index 9d0c93c1ab..82a0886981 100644 --- a/execute/table.go +++ b/execute/table.go @@ -1,14 +1,11 @@ package execute import ( - "bytes" "fmt" "sort" "sync/atomic" - arrowmem "github.com/apache/arrow/go/v7/arrow/memory" "github.com/google/go-cmp/cmp" - "github.com/influxdata/flux" "github.com/influxdata/flux/array" "github.com/influxdata/flux/arrow" @@ -298,9 +295,8 @@ func TablesEqual(left, right flux.Table, alloc memory.Allocator) (bool, error) { eq = cmp.Equal(leftBuffer.cols[j].(*floatColumnBuilder).data, rightBuffer.cols[j].(*floatColumnBuilder).data) case flux.TString: - eq = cmp.Equal(leftBuffer.cols[j].(*stringColumnBuilder), - rightBuffer.cols[j].(*stringColumnBuilder), - cmp.Comparer(stringColumnBuilderEqual)) + eq = cmp.Equal(leftBuffer.cols[j].(*stringColumnBuilder).data, + rightBuffer.cols[j].(*stringColumnBuilder).data) case flux.TTime: eq = cmp.Equal(leftBuffer.cols[j].(*timeColumnBuilder).data, rightBuffer.cols[j].(*timeColumnBuilder).data) @@ -328,27 +324,6 @@ func colsMatch(left, right []flux.ColMeta) bool { return true } -func stringColumnBuilderEqual(x, y *stringColumnBuilder) bool { - if x.Len() != y.Len() { - return false - } - for i := 0; i < x.Len(); i++ { - if x.IsNil(i) { - if !y.IsNil(i) { - return false - } - continue - } - if y.IsNil(i) { - return false - } - if !bytes.Equal(x.data[i].Bytes(x.buf), y.data[i].Bytes(y.buf)) { - return false - } - } - return true -} - // ColMap writes a mapping of builder index to cols index into colMap. // When colMap does not have enough capacity a new colMap is allocated. // The colMap is always returned @@ -623,7 +598,6 @@ func (b *ColListTableBuilder) AddCol(c flux.ColMeta) (int, error) { case flux.TString: b.cols = append(b.cols, &stringColumnBuilder{ columnBuilderBase: colBase, - buf: arrowmem.NewResizableBuffer(b.alloc.Allocator), }) if b.NRows() > 0 { if err := b.GrowStrings(newIdx, b.NRows()); err != nil { @@ -945,9 +919,8 @@ func (b *ColListTableBuilder) SetString(i int, j int, value string) error { if err := b.checkCol(j, flux.TString); err != nil { return err } - col := b.cols[j].(*stringColumnBuilder) - col.data[i] = col.makeString(value) - col.SetNil(i, false) + b.cols[j].(*stringColumnBuilder).data[i] = value + b.cols[j].SetNil(i, false) return nil } @@ -956,7 +929,7 @@ func (b *ColListTableBuilder) AppendString(j int, value string) error { return err } col := b.cols[j].(*stringColumnBuilder) - col.data = b.alloc.AppendStrings(col.data, col.makeString(value)) + col.data = b.alloc.AppendStrings(col.data, value) b.nrows = len(col.data) return nil } @@ -1179,6 +1152,11 @@ func (b *ColListTableBuilder) Floats(j int) []float64 { CheckColType(b.colMeta[j], flux.TFloat) return b.cols[j].(*floatColumnBuilder).data } +func (b *ColListTableBuilder) Strings(j int) []string { + meta := b.colMeta[j] + CheckColType(meta, flux.TString) + return b.cols[j].(*stringColumnBuilder).data +} func (b *ColListTableBuilder) Times(j int) []values.Time { CheckColType(b.colMeta[j], flux.TTime) return b.cols[j].(*timeColumnBuilder).data @@ -1202,9 +1180,7 @@ func (b *ColListTableBuilder) GetRow(row int) values.Object { case flux.TFloat: val = values.NewFloat(b.cols[j].(*floatColumnBuilder).data[row]) case flux.TString: - // TODO(mhilton): avoid a copy - col := b.cols[j].(*stringColumnBuilder) - val = values.NewString(string(col.data[row].Bytes(col.buf))) + val = values.NewString(b.cols[j].(*stringColumnBuilder).data[row]) case flux.TTime: val = values.NewTime(b.cols[j].(*timeColumnBuilder).data[row]) } @@ -1890,38 +1866,46 @@ func (c *stringColumn) Copy() column { type stringColumnBuilder struct { columnBuilderBase - data []String - - // buf contains a backing buffer containing the bytes of the - // strings. - buf *arrowmem.Buffer + data []string } func (c *stringColumnBuilder) Clear() { - c.buf.Release() - c.buf = arrowmem.NewResizableBuffer(c.alloc.Allocator) - c.data = c.data[:0] + c.data = c.data[0:0] } func (c *stringColumnBuilder) Release() { - c.buf.Release() c.alloc.Free(cap(c.data), stringSize) + c.data = nil } func (c *stringColumnBuilder) Copy() column { - builder := arrow.NewStringBuilder(c.alloc.Allocator) - builder.Reserve(len(c.data)) - builder.ReserveData(c.buf.Len()) - for i, v := range c.data { - if c.nils[i] { - builder.AppendNull() - continue + var data *array.String + if len(c.nils) > 0 { + b := arrow.NewStringBuilder(c.alloc.Allocator) + b.Reserve(len(c.data)) + sz := 0 + for i, v := range c.data { + if c.nils[i] { + continue + } + sz += len(v) } - builder.AppendBytes(v.Bytes(c.buf)) + b.ReserveData(sz) + for i, v := range c.data { + if c.nils[i] { + b.AppendNull() + continue + } + b.Append(v) + } + data = b.NewStringArray() + b.Release() + } else { + data = arrow.NewString(c.data, c.alloc.Allocator) } col := &stringColumn{ ColMeta: c.ColMeta, - data: builder.NewStringArray(), + data: data, } return col } @@ -1932,13 +1916,13 @@ func (c *stringColumnBuilder) Len() int { func (c *stringColumnBuilder) Equal(i, j int) bool { return c.EqualFunc(i, j, func(i, j int) bool { - return bytes.Equal(c.data[i].Bytes(c.buf), c.data[j].Bytes(c.buf)) + return c.data[i] == c.data[j] }) } func (c *stringColumnBuilder) Less(i, j int) bool { return c.LessFunc(i, j, func(i, j int) bool { - return bytes.Compare(c.data[i].Bytes(c.buf), c.data[j].Bytes(c.buf)) < 0 + return c.data[i] < c.data[j] }) } @@ -1947,16 +1931,6 @@ func (c *stringColumnBuilder) Swap(i, j int) { c.data[i], c.data[j] = c.data[j], c.data[i] } -func (c *stringColumnBuilder) makeString(s string) String { - offset := c.buf.Len() - c.buf.Resize(offset + len(s)) - copy(c.buf.Bytes()[offset:], s) - return String{ - offset: offset, - len: len(s), - } -} - type timeColumn struct { flux.ColMeta data *array.Int diff --git a/execute/table_test.go b/execute/table_test.go index 9996acdc5e..8311d0b05a 100644 --- a/execute/table_test.go +++ b/execute/table_test.go @@ -148,58 +148,6 @@ func TestTablesEqual(t *testing.T) { }, want: false, }, - { - name: "string values", - data0: &executetest.Table{ - ColMeta: []flux.ColMeta{ - {Label: "_time", Type: flux.TTime}, - {Label: "_value", Type: flux.TString}, - }, - Data: [][]interface{}{ - {execute.Time(1), "1"}, - {execute.Time(2), "2"}, - {execute.Time(3), "3"}, - }, - }, - data1: &executetest.Table{ - ColMeta: []flux.ColMeta{ - {Label: "_time", Type: flux.TTime}, - {Label: "_value", Type: flux.TString}, - }, - Data: [][]interface{}{ - {execute.Time(1), "1"}, - {execute.Time(2), "2"}, - {execute.Time(3), "3"}, - }, - }, - want: true, - }, - { - name: "string mismatch", - data0: &executetest.Table{ - ColMeta: []flux.ColMeta{ - {Label: "_time", Type: flux.TTime}, - {Label: "_value", Type: flux.TString}, - }, - Data: [][]interface{}{ - {execute.Time(1), "1"}, - {execute.Time(2), "2"}, - {execute.Time(3), "3"}, - }, - }, - data1: &executetest.Table{ - ColMeta: []flux.ColMeta{ - {Label: "_time", Type: flux.TTime}, - {Label: "_value", Type: flux.TString}, - }, - Data: [][]interface{}{ - {execute.Time(1), "1"}, - {execute.Time(2), "2"}, - {execute.Time(3), "4"}, - }, - }, - want: false, - }, } for _, tc := range testCases { tc := tc