Skip to content

Commit

Permalink
util/parquet: create parquet writer library
Browse files Browse the repository at this point in the history
This change implements a `Writer` struct in the new `util/parquet` package.
This `Writer` takes datum rows and writes them to the `io.Writer` sink
using a configurable parquet version (defaults to v2.6).

The package implements several features internally required to write in the parquet format:
- schema creation
- row group / column page management
- encoding/decoding of CRDB datums to parquet datums
Currently, the writer only supports types found in the TPCC workload, namely INT, DECIMAL, STRING
UUID, TIMESTAMP and BOOL.

This change also adds a benchmark and tests which verify the correctness of the
writer and test utils for reading datums from parquet files.

Informs: #99028
Epic: None
Release note: None
  • Loading branch information
jayshrivastava committed Apr 3, 2023
1 parent fae192b commit 6b9b940
Show file tree
Hide file tree
Showing 11 changed files with 1,181 additions and 1 deletion.
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ ALL_TESTS = [
"//pkg/util/netutil/addr:addr_test",
"//pkg/util/netutil:netutil_test",
"//pkg/util/optional:optional_test",
"//pkg/util/parquet:parquet_test",
"//pkg/util/pprofutil:pprofutil_test",
"//pkg/util/pretty:pretty_test",
"//pkg/util/protoutil:protoutil_test",
Expand Down Expand Up @@ -2204,6 +2205,8 @@ GO_TARGETS = [
"//pkg/util/netutil:netutil_test",
"//pkg/util/optional:optional",
"//pkg/util/optional:optional_test",
"//pkg/util/parquet:parquet",
"//pkg/util/parquet:parquet_test",
"//pkg/util/pprofutil:pprofutil",
"//pkg/util/pprofutil:pprofutil_test",
"//pkg/util/pretty:pretty",
Expand Down Expand Up @@ -3292,6 +3295,7 @@ GET_X_DATA_TARGETS = [
"//pkg/util/netutil:get_x_data",
"//pkg/util/netutil/addr:get_x_data",
"//pkg/util/optional:get_x_data",
"//pkg/util/parquet:get_x_data",
"//pkg/util/pprofutil:get_x_data",
"//pkg/util/pretty:get_x_data",
"//pkg/util/protoutil:get_x_data",
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ go_library(
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_apache_arrow_go_v11//parquet",
"@com_github_cockroachdb_apd_v3//:apd",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/sem/tree/datum.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,16 @@ func MustBeDDecimal(e Expr) DDecimal {
panic(errors.AssertionFailedf("expected *DDecimal, found %T", e))
}

// AsDDecimal attempts to retrieve a DDecimal from an Expr, returning a DDecimal and
// a flag signifying whether the assertion was successful.
func AsDDecimal(e Expr) (*DDecimal, bool) {
switch t := e.(type) {
case *DDecimal:
return t, true
}
return nil, false
}

// ParseDDecimal parses and returns the *DDecimal Datum value represented by the
// provided string, or an error if parsing is unsuccessful.
func ParseDDecimal(s string) (*DDecimal, error) {
Expand Down
50 changes: 50 additions & 0 deletions pkg/util/parquet/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "parquet",
srcs = [
"decoders.go",
"schema.go",
"testutils.go",
"write_functions.go",
"writer.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/util/parquet",
visibility = ["//visibility:public"],
deps = [
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util/uuid",
"@com_github_apache_arrow_go_v11//parquet",
"@com_github_apache_arrow_go_v11//parquet/file",
"@com_github_apache_arrow_go_v11//parquet/schema",
"@com_github_cockroachdb_errors//:errors",
"@com_github_lib_pq//oid",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)

go_test(
name = "parquet_test",
srcs = [
"writer_bench_test.go",
"writer_test.go",
],
args = ["-test.timeout=295s"],
embed = [":parquet"],
deps = [
"//pkg/sql/randgen",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_apache_arrow_go_v11//parquet/file",
"@com_github_stretchr_testify//require",
],
)

get_x_data(name = "get_x_data")
104 changes: 104 additions & 0 deletions pkg/util/parquet/decoders.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package parquet

import (
"time"

"github.com/apache/arrow/go/v11/parquet"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

// decoder is used to store typedDecoders of various types in the same
// schema definition.
type decoder interface{}

type typedDecoder[T parquetDatatypes] interface {
decoder
decode(v T) (tree.Datum, error)
}

func decode[T parquetDatatypes](dec decoder, v T) (tree.Datum, error) {
td, ok := dec.(typedDecoder[T])
if !ok {
return nil, errors.AssertionFailedf("expected typedDecoder[%T], but found %T", v, dec)
}
return td.decode(v)
}

type boolDecoder struct{}

func (boolDecoder) decode(v bool) (tree.Datum, error) {
return tree.MakeDBool(tree.DBool(v)), nil
}

type stringDecoder struct{}

func (stringDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
return tree.NewDString(string(v)), nil
}

type int64Decoder struct{}

func (int64Decoder) decode(v int64) (tree.Datum, error) {
return tree.NewDInt(tree.DInt(v)), nil
}

type int32Decoder struct{}

func (int32Decoder) decode(v int32) (tree.Datum, error) {
return tree.NewDInt(tree.DInt(v)), nil
}

type decimalDecoder struct{}

func (decimalDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
return tree.ParseDDecimal(string(v))
}

type timestampDecoder struct{}

func (timestampDecoder) decode(v parquet.ByteArray) (tree.Datum, error) {
dtStr := string(v)
d, dependsOnCtx, err := tree.ParseDTimestamp(nil, dtStr, time.Microsecond)
if dependsOnCtx {
return nil, errors.New("TimestampTZ depends on context")
}
if err != nil {
return nil, err
}
// Converts the timezone from "loc(+0000)" to "UTC", which are equivalent.
d.Time = d.Time.UTC()
return d, nil
}

type uUIDDecoder struct{}

func (uUIDDecoder) decode(v parquet.FixedLenByteArray) (tree.Datum, error) {
uid, err := uuid.FromBytes(v)
if err != nil {
return nil, err
}
return tree.NewDUuid(tree.DUuid{UUID: uid}), nil
}

// Defeat the linter's unused lint errors.
func init() {
var _, _ = boolDecoder{}.decode(false)
var _, _ = stringDecoder{}.decode(parquet.ByteArray{})
var _, _ = int32Decoder{}.decode(0)
var _, _ = int64Decoder{}.decode(0)
var _, _ = decimalDecoder{}.decode(parquet.ByteArray{})
var _, _ = timestampDecoder{}.decode(parquet.ByteArray{})
var _, _ = uUIDDecoder{}.decode(parquet.FixedLenByteArray{})
}
Loading

0 comments on commit 6b9b940

Please sign in to comment.