Skip to content

Commit

Permalink
util/parquet: add option to write kv metadata to files
Browse files Browse the repository at this point in the history
This change adds an option to the writer which allows the caller
to write arbitrary kv metadata to parquet files. This is useful
for testing purposes.

Informs: cockroachdb#99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Release note: None
  • Loading branch information
jayshrivastava committed Jun 21, 2023
1 parent d3144f6 commit 7d6fd4c
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 47 deletions.
1 change: 1 addition & 0 deletions pkg/util/parquet/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"@com_github_apache_arrow_go_v11//parquet",
"@com_github_apache_arrow_go_v11//parquet/compress",
"@com_github_apache_arrow_go_v11//parquet/file",
"@com_github_apache_arrow_go_v11//parquet/metadata",
"@com_github_apache_arrow_go_v11//parquet/schema",
"@com_github_cockroachdb_errors//:errors",
"@com_github_lib_pq//oid",
Expand Down
28 changes: 25 additions & 3 deletions pkg/util/parquet/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/apache/arrow/go/v11/parquet"
"github.com/apache/arrow/go/v11/parquet/compress"
"github.com/apache/arrow/go/v11/parquet/file"
"github.com/apache/arrow/go/v11/parquet/metadata"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/errors"
)
Expand All @@ -24,6 +25,9 @@ type config struct {
maxRowGroupLength int64
version parquet.Version
compression compress.Compression

// Arbitrary kv metadata.
metadata metadata.KeyValueMetadata
}

// An Option is a configurable setting for the Writer.
Expand Down Expand Up @@ -72,6 +76,19 @@ func WithCompressionCodec(compression CompressionCodec) Option {
}
}

// WithMetadata adds arbitrary kv metadata to the parquet file which can be
// read by a reader.
func WithMetadata(m map[string]string) Option {
return func(c *config) error {
for k, v := range m {
if err := c.metadata.Append(k, v); err != nil {
return err
}
}
return nil
}
}

var allowedVersions = map[string]parquet.Version{
"v1.0": parquet.V1_0,
"v2.4": parquet.V1_0,
Expand Down Expand Up @@ -117,6 +134,7 @@ type Writer struct {

ba *batchAlloc

// The current number of rows written to the row group writer.
currentRowGroupSize int64
currentRowGroupWriter file.BufferedRowGroupWriter
}
Expand All @@ -131,6 +149,7 @@ func NewWriter(sch *SchemaDefinition, sink io.Writer, opts ...Option) (*Writer,
maxRowGroupLength: parquet.DefaultMaxRowGroupLen,
version: parquet.V2_6,
compression: compress.Codecs.Uncompressed,
metadata: metadata.KeyValueMetadata{},
}
for _, opt := range opts {
err := opt.apply(&cfg)
Expand All @@ -139,10 +158,13 @@ func NewWriter(sch *SchemaDefinition, sink io.Writer, opts ...Option) (*Writer,
}
}

parquetOpts := []parquet.WriterProperty{parquet.WithCreatedBy("cockroachdb"),
parquet.WithVersion(cfg.version), parquet.WithCompression(cfg.compression)}
parquetOpts := []parquet.WriterProperty{
parquet.WithCreatedBy("cockroachdb"),
parquet.WithVersion(cfg.version),
parquet.WithCompression(cfg.compression),
}
props := parquet.NewWriterProperties(parquetOpts...)
writer := file.NewParquetWriter(sink, sch.schema.Root(), file.WithWriterProps(props))
writer := file.NewParquetWriter(sink, sch.schema.Root(), file.WithWriterProps(props), file.WithWriteMetadata(cfg.metadata))

return &Writer{
sch: sch,
Expand Down
98 changes: 54 additions & 44 deletions pkg/util/parquet/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,67 +516,77 @@ func TestInvalidWriterUsage(t *testing.T) {
}

func TestVersions(t *testing.T) {
schemaDef, err := NewSchema([]string{}, []*types.T{})
require.NoError(t, err)

for version := range allowedVersions {
fileName := "TestVersions.parquet"
f, err := os.CreateTemp("", fileName)
require.NoError(t, err)

writer, err := NewWriter(schemaDef, f, WithVersion(version))
require.NoError(t, err)

err = writer.Close()
require.NoError(t, err)

f, err = os.Open(f.Name())
require.NoError(t, err)

reader, err := file.NewParquetReader(f)
require.NoError(t, err)

require.Equal(t, reader.MetaData().Version(), writer.cfg.version)

err = reader.Close()
require.NoError(t, err)
opt := WithVersion(version)
optionsTest(t, opt, func(t *testing.T, reader *file.Reader) {
require.Equal(t, reader.MetaData().Version(), allowedVersions[version])
})
}

schemaDef, err := NewSchema([]string{}, []*types.T{})
require.NoError(t, err)
buf := bytes.Buffer{}
_, err = NewWriter(schemaDef, &buf, WithVersion("invalid"))
require.Error(t, err)
}

func TestCompressionCodecs(t *testing.T) {
for compression := range compressionCodecToParquet {
opt := WithCompressionCodec(compression)
optionsTest(t, opt, func(t *testing.T, reader *file.Reader) {
colChunk, err := reader.MetaData().RowGroup(0).ColumnChunk(0)
require.NoError(t, err)
require.Equal(t, colChunk.Compression(), compressionCodecToParquet[compression])
})
}
}

// TestMetadata tests writing arbitrary kv metadata to parquet files.
func TestMetadata(t *testing.T) {
meta := map[string]string{}
meta["testKey1"] = "testValue1"
meta["testKey2"] = "testValue2"
opt := WithMetadata(meta)
optionsTest(t, opt, func(t *testing.T, reader *file.Reader) {
val := reader.MetaData().KeyValueMetadata().FindValue("testKey1")
require.NotNil(t, reader.MetaData().KeyValueMetadata().FindValue("testKey1"))
require.Equal(t, *val, "testValue1")

val = reader.MetaData().KeyValueMetadata().FindValue("testKey2")
require.NotNil(t, reader.MetaData().KeyValueMetadata().FindValue("testKey2"))
require.Equal(t, *val, "testValue2")
})
}

// optionsTest can be used to assert the behavior of an Option. It creates a
// writer using the supplied Option and writes a parquet file with sample data.
// Then it calls the provided test function with the reader and subsequently
// closes it.
func optionsTest(t *testing.T, opt Option, testFn func(t *testing.T, reader *file.Reader)) {
schemaDef, err := NewSchema([]string{"a"}, []*types.T{types.Int})
require.NoError(t, err)

for compression := range compressionCodecToParquet {
fileName := "TestCompressionCodecs.parquet"
f, err := os.CreateTemp("", fileName)
require.NoError(t, err)

writer, err := NewWriter(schemaDef, f, WithCompressionCodec(compression))
require.NoError(t, err)
fileName := "OptionsTest.parquet"
f, err := os.CreateTemp("", fileName)
require.NoError(t, err)

err = writer.AddRow([]tree.Datum{tree.NewDInt(0)})
require.NoError(t, err)
writer, err := NewWriter(schemaDef, f, opt)
require.NoError(t, err)

err = writer.Close()
require.NoError(t, err)
err = writer.AddRow([]tree.Datum{tree.NewDInt(0)})
require.NoError(t, err)

f, err = os.Open(f.Name())
require.NoError(t, err)
err = writer.Close()
require.NoError(t, err)

reader, err := file.NewParquetReader(f)
require.NoError(t, err)
f, err = os.Open(f.Name())
require.NoError(t, err)

colChunk, err := reader.MetaData().RowGroup(0).ColumnChunk(0)
require.NoError(t, err)
reader, err := file.NewParquetReader(f)
require.NoError(t, err)

require.Equal(t, colChunk.Compression(), compressionCodecToParquet[compression])
testFn(t, reader)

err = reader.Close()
require.NoError(t, err)
}
err = reader.Close()
require.NoError(t, err)
}

0 comments on commit 7d6fd4c

Please sign in to comment.