Skip to content

Commit

Permalink
Merge #63770
Browse files Browse the repository at this point in the history
63770: colexec: add builtin json datatype r=jordanlewis a=jordanlewis

This commit adds a builtin json datatype to the colexec package. It's
implemented using the Bytes data structure, and lazily deserializes JSON
objects for processing.

There's an inefficiency here, which is that forming a JSON object costs
an allocation. A future commit can make a cheaper "lazy JSON" object
that doesn't cache or require up-front allocations.

Addresses: #42043.
Fixes: #49470.
Fixes: #49472.

Release note (performance improvement): improve the speed of JSON in the
vectorized execution engine

Co-authored-by: Jordan Lewis <jordanthelewis@gmail.com>
  • Loading branch information
craig[bot] and jordanlewis committed Apr 22, 2021
2 parents ad0a1be + df48f18 commit 5d75621
Show file tree
Hide file tree
Showing 96 changed files with 37,897 additions and 11,667 deletions.
3 changes: 3 additions & 0 deletions pkg/col/coldata/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"batch.go",
"bytes.go",
"datum_vec.go",
"json.go",
"native_types.go",
"nulls.go",
"testutils.go",
Expand All @@ -16,9 +17,11 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/col/typeconv",
"//pkg/sql/colexecerror",
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/duration",
"//pkg/util/json",
"//pkg/util/uuid",
"@com_github_cockroachdb_apd_v2//:apd",
"@com_github_cockroachdb_errors//:errors",
Expand Down
8 changes: 4 additions & 4 deletions pkg/col/coldata/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func NewMemBatchWithCapacity(typs []*types.T, capacity int, factory ColumnFactor
b := NewMemBatchNoCols(typs, capacity).(*MemBatch)
for i, t := range typs {
b.b[i] = NewMemColumn(t, capacity, factory)
if b.b[i].CanonicalTypeFamily() == types.BytesFamily {
if b.b[i].IsBytesLike() {
b.bytesVecIdxs.Add(i)
}
}
Expand Down Expand Up @@ -257,14 +257,14 @@ func (m *MemBatch) SetLength(length int) {
maxIdx = m.sel[length-1]
}
for i, ok := m.bytesVecIdxs.Next(0); ok; i, ok = m.bytesVecIdxs.Next(i + 1) {
m.b[i].Bytes().UpdateOffsetsToBeNonDecreasing(maxIdx + 1)
UpdateOffsetsToBeNonDecreasing(m.b[i], maxIdx+1)
}
}
}

// AppendCol implements the Batch interface.
func (m *MemBatch) AppendCol(col Vec) {
if col.CanonicalTypeFamily() == types.BytesFamily {
if col.IsBytesLike() {
m.bytesVecIdxs.Add(len(m.b))
}
m.b = append(m.b, col)
Expand Down Expand Up @@ -324,7 +324,7 @@ func (m *MemBatch) ResetInternalBatch() {
}
}
for i, ok := m.bytesVecIdxs.Next(0); ok; i, ok = m.bytesVecIdxs.Next(i + 1) {
m.b[i].Bytes().Reset()
Reset(m.b[i])
}
}

Expand Down
80 changes: 73 additions & 7 deletions pkg/col/coldata/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"strings"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -122,13 +124,11 @@ func (b *Bytes) Get(i int) []byte {
return b.data[b.offsets[i]:b.offsets[i+1]]
}

// Set sets the ith []byte in Bytes. Overwriting a value that is not at the end
// of the Bytes is not allowed since it complicates memory movement to make/take
// away necessary space in the flat buffer. Note that a nil value will be
// "converted" into an empty byte slice.
func (b *Bytes) Set(i int, v []byte) {
// getAppendTo returns a byte slice that ith []byte value should be appended to.
// This method will panic if i is less than maximum previously Set index.
func (b *Bytes) getAppendTo(i int) []byte {
if b.isWindow {
panic("Set is called on a window into Bytes")
panic("getAppendTo is called on a window into Bytes")
}
if i < b.maxSetIndex {
panic(
Expand All @@ -144,7 +144,16 @@ func (b *Bytes) Set(i int, v []byte) {
// NULL values that are stored separately. In order to maintain the
// assumption of non-decreasing offsets, we need to backfill them.
b.maybeBackfillOffsets(i)
b.data = append(b.data[:b.offsets[i]], v...)
return b.data[:b.offsets[i]]
}

// Set sets the ith []byte in Bytes. Overwriting a value that is not at the end
// of the Bytes is not allowed since it complicates memory movement to make/take
// away necessary space in the flat buffer. Note that a nil value will be
// "converted" into an empty byte slice.
func (b *Bytes) Set(i int, v []byte) {
appendTo := b.getAppendTo(i)
b.data = append(appendTo, v...)
b.offsets[i+1] = int32(len(b.data))
b.maxSetIndex = i
}
Expand Down Expand Up @@ -444,3 +453,60 @@ func (b *Bytes) ToArrowSerializationFormat(n int) ([]byte, []int32) {
offsets := b.offsets[:n+1]
return data, offsets
}

// AssertOffsetsAreNonDecreasing calls the method of the same name on bytes-like
// vectors, panicking if not bytes-like.
func AssertOffsetsAreNonDecreasing(v Vec, n int) {
family := v.CanonicalTypeFamily()
switch family {
case types.BytesFamily:
v.Bytes().AssertOffsetsAreNonDecreasing(n)
case types.JsonFamily:
v.JSON().AssertOffsetsAreNonDecreasing(n)
default:
colexecerror.InternalError(errors.AssertionFailedf("unsupported type %s", family))
}
}

// UpdateOffsetsToBeNonDecreasing calls the method of the same name on bytes-like
// vectors, panicking if not bytes-like.
func UpdateOffsetsToBeNonDecreasing(v Vec, n int) {
family := v.CanonicalTypeFamily()
switch family {
case types.BytesFamily:
v.Bytes().UpdateOffsetsToBeNonDecreasing(n)
case types.JsonFamily:
v.JSON().UpdateOffsetsToBeNonDecreasing(n)
default:
colexecerror.InternalError(errors.AssertionFailedf("unsupported type %s", family))
}
}

// ProportionalSize calls the method of the same name on bytes-like
// vectors, panicking if not bytes-like.
func ProportionalSize(v Vec, length int64) uintptr {
family := v.CanonicalTypeFamily()
switch family {
case types.BytesFamily:
return v.Bytes().ProportionalSize(length)
case types.JsonFamily:
return v.JSON().ProportionalSize(length)
default:
colexecerror.InternalError(errors.AssertionFailedf("unsupported type %s", family))
}
return 0
}

// Reset calls the method of the same name on bytes-like
// vectors, panicking if not bytes-like.
func Reset(v Vec) {
family := v.CanonicalTypeFamily()
switch family {
case types.BytesFamily:
v.Bytes().Reset()
case types.JsonFamily:
v.JSON().Reset()
default:
colexecerror.InternalError(errors.AssertionFailedf("unsupported type %s", family))
}
}
116 changes: 116 additions & 0 deletions pkg/col/coldata/json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package coldata

import (
"fmt"
"strings"

"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/util/json"
)

// JSONs is a representation of columnar JSON data. It's simply a wrapper around
// the flat Bytes structure. To pull a JSON out of the structure, we construct
// a new "encodedJSON" object from scratch on demand.
type JSONs struct {
Bytes
}

// NewJSONs returns a new JSONs presized to n elements.
func NewJSONs(n int) *JSONs {
return &JSONs{
Bytes: *NewBytes(n),
}
}

// Get returns the ith JSON in JSONs. Note that the returned JSON is
// unsafe for reuse if any write operation happens.
// NOTE: if ith element was never set in any way, the behavior of Get is
// undefined.
func (js *JSONs) Get(i int) json.JSON {
bytes := js.Bytes.Get(i)
if len(bytes) == 0 {
return json.NullJSONValue
}
ret, err := json.FromEncoding(bytes)
if err != nil {
colexecerror.ExpectedError(err)
}
return ret
}

// Set sets the ith JSON in JSONs. Overwriting a value that is not at the end
// of the JSONs is not allowed since it complicates memory movement to make/take
// away necessary space in the flat buffer.
func (js *JSONs) Set(i int, j json.JSON) {
b := &js.Bytes
appendTo := b.getAppendTo(i)
var err error
b.data, err = json.EncodeJSON(appendTo, j)
if err != nil {
colexecerror.ExpectedError(err)
}
b.offsets[i+1] = int32(len(b.data))
b.maxSetIndex = i
}

// Window creates a "window" into the receiver. It behaves similarly to
// Golang's slice, but the returned object is *not* allowed to be modified - it
// is read-only. Window is a lightweight operation that doesn't involve copying
// the underlying data.
func (js *JSONs) Window(start, end int) *JSONs {
return &JSONs{
Bytes: *js.Bytes.Window(start, end),
}
}

// CopySlice copies srcStartIdx inclusive and srcEndIdx exclusive []byte values
// from src into the receiver starting at destIdx. See Bytes.CopySlice.
func (js *JSONs) CopySlice(src *JSONs, destIdx, srcStartIdx, srcEndIdx int) {
js.Bytes.CopySlice(&src.Bytes, destIdx, srcStartIdx, srcEndIdx)
}

// AppendSlice appends srcStartIdx inclusive and srcEndIdx exclusive JSON
// values from src into the receiver starting at destIdx.
func (js *JSONs) AppendSlice(src *JSONs, destIdx, srcStartIdx, srcEndIdx int) {
js.Bytes.AppendSlice(&src.Bytes, destIdx, srcStartIdx, srcEndIdx)
}

// AppendVal appends the given JSON value to the end of the receiver.
func (js *JSONs) AppendVal(j json.JSON) {
if j == nil {
// A nil JSON indicates a NULL value in the column. We've got to insert a
// "zero value" which in this case means an empty byte slice.
js.Bytes.AppendVal(nil)
return
}
b := &js.Bytes
appendTo := b.getAppendTo(b.Len())
var err error
b.data, err = json.EncodeJSON(appendTo, j)
if err != nil {
colexecerror.ExpectedError(err)
}
b.maxSetIndex = b.Len()
b.offsets = append(b.offsets, int32(len(b.data)))
}

// String is used for debugging purposes.
func (js *JSONs) String() string {
var builder strings.Builder
for i := 0; i < js.Len(); i++ {
builder.WriteString(
fmt.Sprintf("%d: %s\n", i, js.Get(i)),
)
}
return builder.String()
}
15 changes: 15 additions & 0 deletions pkg/col/coldata/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,21 @@ func AssertEquivalentBatches(t testingT, expected, actual Batch) {
}
}
}
} else if canonicalTypeFamily == types.JsonFamily {
expectedJSON := expectedVec.JSON().Window(0, expected.Length())
resultJSON := actualVec.JSON().Window(0, actual.Length())
require.Equal(t, expectedJSON.Len(), resultJSON.Len())
for i := 0; i < expectedJSON.Len(); i++ {
if !expectedNulls.NullAt(i) {
cmp, err := expectedJSON.Get(i).Compare(resultJSON.Get(i))
if err != nil {
t.Fatal(err)
}
if cmp != 0 {
t.Fatalf("json mismatch at index %d:\nexpected:\n%s\nactual:\n%s", i, expectedJSON, resultJSON)
}
}
}
} else if canonicalTypeFamily == typeconv.DatumVecCanonicalTypeFamily {
expectedDatum := expectedVec.Datum().Slice(0 /* start */, expected.Length())
resultDatum := actualVec.Datum().Slice(0 /* start */, actual.Length())
Expand Down
Loading

0 comments on commit 5d75621

Please sign in to comment.