Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util/parquet: add support for all types #101946

Merged
merged 1 commit into from
May 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 76 additions & 2 deletions pkg/sql/sem/tree/datum.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,20 @@ func MustBeDFloat(e Expr) DFloat {
panic(errors.AssertionFailedf("expected *DFloat, found %T", e))
}

// AsDFloat attempts to retrieve a DFloat from an Expr, returning a DFloat and
// a flag signifying whether the assertion was successful. The function should
// be used instead of direct type assertions wherever a *DFloat wrapped by a
// *DOidWrapper is possible.
func AsDFloat(e Expr) (*DFloat, bool) {
switch t := e.(type) {
case *DFloat:
return t, true
case *DOidWrapper:
return AsDFloat(t.Wrapped)
}
return nil, false
}

// NewDFloat is a helper routine to create a *DFloat initialized from its
// argument.
func NewDFloat(d DFloat) *DFloat {
Expand Down Expand Up @@ -1407,6 +1421,20 @@ func NewDCollatedString(
return &d, nil
}

// AsDCollatedString attempts to retrieve a DString from an Expr, returning a AsDCollatedString and
// a flag signifying whether the assertion was successful. The function should
// be used instead of direct type assertions wherever a *DCollatedString wrapped by a
// *DOidWrapper is possible.
func AsDCollatedString(e Expr) (DCollatedString, bool) {
switch t := e.(type) {
case *DCollatedString:
return *t, true
case *DOidWrapper:
return AsDCollatedString(t.Wrapped)
}
return DCollatedString{}, false
}

// AmbiguousFormat implements the Datum interface.
func (*DCollatedString) AmbiguousFormat() bool { return false }

Expand Down Expand Up @@ -2286,6 +2314,20 @@ func MakeDTime(t timeofday.TimeOfDay) *DTime {
return &d
}

// AsDTime attempts to retrieve a DTime from an Expr, returning a DTimestamp and
// a flag signifying whether the assertion was successful. The function should
// be used instead of direct type assertions wherever a *DTime wrapped by a
// *DOidWrapper is possible.
func AsDTime(e Expr) (DTime, bool) {
switch t := e.(type) {
case *DTime:
return *t, true
case *DOidWrapper:
return AsDTime(t.Wrapped)
}
return DTime(timeofday.FromInt(0)), false
}

// ParseDTime parses and returns the *DTime Datum value represented by the
// provided string, or an error if parsing is unsuccessful.
//
Expand Down Expand Up @@ -2434,6 +2476,20 @@ func NewDTimeTZFromLocation(t timeofday.TimeOfDay, loc *time.Location) *DTimeTZ
return &DTimeTZ{timetz.MakeTimeTZFromLocation(t, loc)}
}

// AsDTimeTZ attempts to retrieve a DTimeTZ from an Expr, returning a DTimeTZ and
// a flag signifying whether the assertion was successful. The function should
// be used instead of direct type assertions wherever a *DTimeTZ wrapped by a
// *DOidWrapper is possible.
func AsDTimeTZ(e Expr) (DTimeTZ, bool) {
switch t := e.(type) {
case *DTimeTZ:
return *t, true
case *DOidWrapper:
return AsDTimeTZ(t.Wrapped)
}
return DTimeTZ{}, false
}

// ParseDTimeTZ parses and returns the *DTime Datum value represented by the
// provided string, or an error if parsing is unsuccessful.
//
Expand Down Expand Up @@ -3069,12 +3125,16 @@ type DInterval struct {
duration.Duration
}

// AsDInterval attempts to retrieve a DInterval from an Expr, panicking if the
// assertion fails.
// AsDInterval attempts to retrieve a DInterval from an Expr, returning a DInterval and
// a flag signifying whether the assertion was successful. The function should
// be used instead of direct type assertions wherever a *DInterval wrapped by a
// *DOidWrapper is possible.
func AsDInterval(e Expr) (*DInterval, bool) {
switch t := e.(type) {
case *DInterval:
return t, true
case *DOidWrapper:
return AsDInterval(t.Wrapped)
}
return nil, false
}
Expand Down Expand Up @@ -5017,6 +5077,20 @@ func NewDEnum(e DEnum) *DEnum {
return &e
}

// AsDEnum attempts to retrieve a DEnum from an Expr, returning a DEnum and
// a flag signifying whether the assertion was successful. The function should
// // be used instead of direct type assertions wherever a *DEnum wrapped by a
// // *DOidWrapper is possible.
func AsDEnum(e Expr) (*DEnum, bool) {
switch t := e.(type) {
case *DEnum:
return t, true
case *DOidWrapper:
return AsDEnum(t.Wrapped)
}
return nil, false
}

// MakeDEnumFromPhysicalRepresentation creates a DEnum of the input type
// and the input physical representation.
func MakeDEnumFromPhysicalRepresentation(typ *types.T, rep []byte) (DEnum, error) {
Expand Down
12 changes: 12 additions & 0 deletions pkg/util/parquet/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,18 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/util/parquet",
visibility = ["//visibility:public"],
deps = [
"//pkg/geo",
"//pkg/geo/geopb",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sem/catid",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/bitarray",
"//pkg/util/duration",
"//pkg/util/encoding",
"//pkg/util/timeofday",
"//pkg/util/uuid",
"@com_github_apache_arrow_go_v11//parquet",
"@com_github_apache_arrow_go_v11//parquet/file",
Expand All @@ -38,10 +45,15 @@ go_test(
args = ["-test.timeout=295s"],
embed = [":parquet"],
deps = [
"//pkg/geo",
"//pkg/sql/randgen",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util/bitarray",
"//pkg/util/duration",
"//pkg/util/ipaddr",
"//pkg/util/timeutil",
"//pkg/util/timeutil/pgdate",
"//pkg/util/uuid",
"@com_github_apache_arrow_go_v11//parquet/file",
"@com_github_stretchr_testify//require",
Expand Down
164 changes: 163 additions & 1 deletion pkg/util/parquet/decoders.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,15 @@ import (
"time"

"github.com/apache/arrow/go/v11/parquet"
"github.com/cockroachdb/cockroach/pkg/geo"
"github.com/cockroachdb/cockroach/pkg/geo/geopb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/bitarray"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/timeofday"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/lib/pq/oid"
)

// decoder is used to store typedDecoders of various types in the same
Expand Down Expand Up @@ -72,7 +78,23 @@ func (timestampDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
dtStr := string(v)
d, dependsOnCtx, err := tree.ParseDTimestamp(nil, dtStr, time.Microsecond)
if dependsOnCtx {
return nil, errors.New("TimestampTZ depends on context")
return nil, errors.Newf("decoding timestamp %s depends on context", v)
}
if err != nil {
return nil, err
}
// Converts the timezone from "loc(+0000)" to "UTC", which are equivalent.
d.Time = d.Time.UTC()
return d, nil
}

type timestampTZDecoder struct{}

func (timestampTZDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
dtStr := string(v)
d, dependsOnCtx, err := tree.ParseDTimestampTZ(nil, dtStr, time.Microsecond)
if dependsOnCtx {
return nil, errors.Newf("decoding timestampTZ %s depends on context", v)
}
if err != nil {
return nil, err
Expand All @@ -92,6 +114,128 @@ func (uUIDDecoder) decode(v parquet.FixedLenByteArray) (tree.Datum, error) {
return tree.NewDUuid(tree.DUuid{UUID: uid}), nil
}

type iNetDecoder struct{}

func (iNetDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
return tree.ParseDIPAddrFromINetString(string(v))
}

type jsonDecoder struct{}

func (jsonDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
return tree.ParseDJSON(string(v))
}

type bitDecoder struct{}

func (bitDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
ba, err := bitarray.Parse(string(v))
if err != nil {
return nil, err
}
return &tree.DBitArray{BitArray: ba}, err
}

type bytesDecoder struct{}

func (bytesDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
return tree.NewDBytes(tree.DBytes(v)), nil
}

type enumDecoder struct{}

func (ed enumDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
return &tree.DEnum{
LogicalRep: string(v),
}, nil
}

type dateDecoder struct{}

func (dateDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
d, dependCtx, err := tree.ParseDDate(nil, string(v))
if dependCtx {
return nil, errors.Newf("decoding date %s depends on context", v)
}
return d, err
}

type box2DDecoder struct{}

func (box2DDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
b, err := geo.ParseCartesianBoundingBox(string(v))
if err != nil {
return nil, err
}
return tree.NewDBox2D(b), nil
}

type geographyDecoder struct{}

func (geographyDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
g, err := geo.ParseGeographyFromEWKB(geopb.EWKB(v))
if err != nil {
return nil, err
}
return &tree.DGeography{Geography: g}, nil
}

type geometryDecoder struct{}

func (geometryDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
g, err := geo.ParseGeometryFromEWKB(geopb.EWKB(v))
if err != nil {
return nil, err
}
return &tree.DGeometry{Geometry: g}, nil
}

type intervalDecoder struct{}

func (intervalDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
return tree.ParseDInterval(duration.IntervalStyle_ISO_8601, string(v))
}

type timeDecoder struct{}

func (timeDecoder) decode(v int64) (tree.Datum, error) {
return tree.MakeDTime(timeofday.TimeOfDay(v)), nil
}

type timeTZDecoder struct{}

func (timeTZDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
d, dependsOnCtx, err := tree.ParseDTimeTZ(nil, string(v), time.Microsecond)
if dependsOnCtx {
return nil, errors.Newf("parsed timeTZ %s depends on context", v)
}
return d, err
}

type float32Decoder struct{}

func (float32Decoder) decode(v float32) (tree.Datum, error) {
return tree.NewDFloat(tree.DFloat(v)), nil
}

type float64Decoder struct{}

func (float64Decoder) decode(v float64) (tree.Datum, error) {
return tree.NewDFloat(tree.DFloat(v)), nil
}

type oidDecoder struct{}

func (oidDecoder) decode(v int32) (tree.Datum, error) {
return tree.NewDOid(oid.Oid(v)), nil
}

type collatedStringDecoder struct{}

func (collatedStringDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
return &tree.DCollatedString{Contents: string(v)}, nil
}

// Defeat the linter's unused lint errors.
func init() {
var _, _ = boolDecoder{}.decode(false)
Expand All @@ -100,5 +244,23 @@ func init() {
var _, _ = int64Decoder{}.decode(0)
var _, _ = decimalDecoder{}.decode(parquet.ByteArray{})
var _, _ = timestampDecoder{}.decode(parquet.ByteArray{})
var _, _ = timestampTZDecoder{}.decode(parquet.ByteArray{})
var _, _ = uUIDDecoder{}.decode(parquet.FixedLenByteArray{})
var _, _ = iNetDecoder{}.decode(parquet.ByteArray{})
var _, _ = jsonDecoder{}.decode(parquet.ByteArray{})
var _, _ = bitDecoder{}.decode(parquet.ByteArray{})
var _, _ = bytesDecoder{}.decode(parquet.ByteArray{})
var _, _ = enumDecoder{}.decode(parquet.ByteArray{})
var _, _ = dateDecoder{}.decode(parquet.ByteArray{})
var _, _ = box2DDecoder{}.decode(parquet.ByteArray{})
var _, _ = box2DDecoder{}.decode(parquet.ByteArray{})
var _, _ = geographyDecoder{}.decode(parquet.ByteArray{})
var _, _ = geometryDecoder{}.decode(parquet.ByteArray{})
var _, _ = intervalDecoder{}.decode(parquet.ByteArray{})
var _, _ = timeDecoder{}.decode(0)
var _, _ = timeTZDecoder{}.decode(parquet.ByteArray{})
var _, _ = float64Decoder{}.decode(0.0)
var _, _ = float32Decoder{}.decode(0.0)
var _, _ = oidDecoder{}.decode(0)
var _, _ = collatedStringDecoder{}.decode(parquet.ByteArray{})
}
Loading