Skip to content

Commit

Permalink
GH-38503: [Go][Parquet] Make the arrow column writer internal (#38727)
Browse files Browse the repository at this point in the history
This makes it so the Arrow column writer is not exported from the `pqarrow` package.  This follows up on comments from #38581.
* Closes: #38503

Authored-by: Tim Schaub <tim@planet.com>
Signed-off-by: Matt Topol <zotthewizard@gmail.com>
  • Loading branch information
tschaub authored Nov 15, 2023
1 parent 41e45fe commit 1e7175d
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 64 deletions.
26 changes: 11 additions & 15 deletions go/parquet/pqarrow/encode_arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,25 @@ func nullableRoot(manifest *SchemaManifest, field *SchemaField) bool {
return nullable
}

// ArrowColumnWriter is a convenience object for easily writing arrow data to a specific
// arrowColumnWriter is a convenience object for easily writing arrow data to a specific
// set of columns in a parquet file. Since a single arrow array can itself be a nested type
// consisting of multiple columns of data, this will write to all of the appropriate leaves in
// the parquet file, allowing easy writing of nested columns.
type ArrowColumnWriter struct {
type arrowColumnWriter struct {
builders []*multipathLevelBuilder
leafCount int
colIdx int
rgw file.RowGroupWriter
}

// NewArrowColumnWriter returns a new writer using the chunked array to determine the number of leaf columns,
// newArrowColumnWriter returns a new writer using the chunked array to determine the number of leaf columns,
// and the provided schema manifest to determine the paths for writing the columns.
//
// Using an arrow column writer is a convenience to avoid having to process the arrow array yourself
// and determine the correct definition and repetition levels manually.
func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *SchemaManifest, rgw file.RowGroupWriter, leafColIdx int) (ArrowColumnWriter, error) {
func newArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *SchemaManifest, rgw file.RowGroupWriter, leafColIdx int) (arrowColumnWriter, error) {
if data.Len() == 0 {
return ArrowColumnWriter{leafCount: calcLeafCount(data.DataType()), rgw: rgw}, nil
return arrowColumnWriter{leafCount: calcLeafCount(data.DataType()), rgw: rgw}, nil
}

var (
Expand All @@ -109,7 +109,7 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch
}

if absPos >= int64(data.Len()) {
return ArrowColumnWriter{}, errors.New("cannot write data at offset past end of chunked array")
return arrowColumnWriter{}, errors.New("cannot write data at offset past end of chunked array")
}

leafCount := calcLeafCount(data.DataType())
Expand All @@ -120,7 +120,7 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch

schemaField, err := manifest.GetColumnField(leafColIdx)
if err != nil {
return ArrowColumnWriter{}, err
return arrowColumnWriter{}, err
}
isNullable = nullableRoot(manifest, schemaField)

Expand All @@ -138,10 +138,10 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch
if arrToWrite.Len() > 0 {
bldr, err := newMultipathLevelBuilder(arrToWrite, isNullable)
if err != nil {
return ArrowColumnWriter{}, nil
return arrowColumnWriter{}, nil
}
if leafCount != bldr.leafCount() {
return ArrowColumnWriter{}, fmt.Errorf("data type leaf_count != builder leafcount: %d - %d", leafCount, bldr.leafCount())
return arrowColumnWriter{}, fmt.Errorf("data type leaf_count != builder leafcount: %d - %d", leafCount, bldr.leafCount())
}
builders = append(builders, bldr)
}
Expand All @@ -153,14 +153,10 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch
values += chunkWriteSize
}

return ArrowColumnWriter{builders: builders, leafCount: leafCount, rgw: rgw, colIdx: leafColIdx}, nil
return arrowColumnWriter{builders: builders, leafCount: leafCount, rgw: rgw, colIdx: leafColIdx}, nil
}

func (acw *ArrowColumnWriter) LeafCount() int {
return acw.leafCount
}

func (acw *ArrowColumnWriter) Write(ctx context.Context) error {
func (acw *arrowColumnWriter) Write(ctx context.Context) error {
arrCtx := arrowCtxFromContext(ctx)
for leafIdx := 0; leafIdx < acw.leafCount; leafIdx++ {
var (
Expand Down
80 changes: 32 additions & 48 deletions go/parquet/pqarrow/encode_arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,28 +132,24 @@ func TestWriteArrowCols(t *testing.T) {
tbl := makeDateTimeTypesTable(mem, false, false)
defer tbl.Release()

psc, err := pqarrow.ToParquet(tbl.Schema(), nil, pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
require.NoError(t, err)

manifest, err := pqarrow.NewSchemaManifest(psc, nil, nil)
require.NoError(t, err)

sink := encoding.NewBufferWriter(0, mem)
defer sink.Release()
writer := file.NewParquetWriter(sink, psc.Root(), file.WithWriterProps(parquet.NewWriterProperties(parquet.WithVersion(parquet.V2_4))))

srgw := writer.AppendRowGroup()
ctx := pqarrow.NewArrowWriteContext(context.TODO(), nil)
fileWriter, err := pqarrow.NewFileWriter(
tbl.Schema(),
sink,
parquet.NewWriterProperties(parquet.WithVersion(parquet.V2_4)),
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)),
)
require.NoError(t, err)

colIdx := 0
fileWriter.NewRowGroup()
for i := int64(0); i < tbl.NumCols(); i++ {
acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, colIdx)
colChunk := tbl.Column(int(i)).Data()
err := fileWriter.WriteColumnChunked(colChunk, 0, int64(colChunk.Len()))
require.NoError(t, err)
require.NoError(t, acw.Write(ctx))
colIdx = colIdx + acw.LeafCount()
}
require.NoError(t, srgw.Close())
require.NoError(t, writer.Close())
require.NoError(t, fileWriter.Close())

expected := makeDateTimeTypesTable(mem, true, false)
defer expected.Release()
Expand Down Expand Up @@ -235,31 +231,24 @@ func TestWriteArrowInt96(t *testing.T) {
tbl := makeDateTimeTypesTable(mem, false, false)
defer tbl.Release()

props := pqarrow.NewArrowWriterProperties(pqarrow.WithDeprecatedInt96Timestamps(true), pqarrow.WithAllocator(mem))

psc, err := pqarrow.ToParquet(tbl.Schema(), nil, props)
require.NoError(t, err)

manifest, err := pqarrow.NewSchemaManifest(psc, nil, nil)
require.NoError(t, err)

sink := encoding.NewBufferWriter(0, mem)
defer sink.Release()

writer := file.NewParquetWriter(sink, psc.Root(), file.WithWriterProps(parquet.NewWriterProperties(parquet.WithAllocator(mem))))

srgw := writer.AppendRowGroup()
ctx := pqarrow.NewArrowWriteContext(context.TODO(), &props)
fileWriter, err := pqarrow.NewFileWriter(
tbl.Schema(),
sink,
parquet.NewWriterProperties(parquet.WithAllocator(mem)),
pqarrow.NewArrowWriterProperties(pqarrow.WithDeprecatedInt96Timestamps(true), pqarrow.WithAllocator(mem)),
)
require.NoError(t, err)

colIdx := 0
fileWriter.NewRowGroup()
for i := int64(0); i < tbl.NumCols(); i++ {
acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, colIdx)
colChunk := tbl.Column(int(i)).Data()
err := fileWriter.WriteColumnChunked(colChunk, 0, int64(colChunk.Len()))
require.NoError(t, err)
require.NoError(t, acw.Write(ctx))
colIdx += acw.LeafCount()
}
require.NoError(t, srgw.Close())
require.NoError(t, writer.Close())
require.NoError(t, fileWriter.Close())

expected := makeDateTimeTypesTable(mem, false, false)
defer expected.Release()
Expand Down Expand Up @@ -296,33 +285,28 @@ func TestWriteArrowInt96(t *testing.T) {
func writeTableToBuffer(t *testing.T, mem memory.Allocator, tbl arrow.Table, rowGroupSize int64, props pqarrow.ArrowWriterProperties) *memory.Buffer {
sink := encoding.NewBufferWriter(0, mem)
defer sink.Release()
wrprops := parquet.NewWriterProperties(parquet.WithVersion(parquet.V1_0))
psc, err := pqarrow.ToParquet(tbl.Schema(), wrprops, props)
require.NoError(t, err)

manifest, err := pqarrow.NewSchemaManifest(psc, nil, nil)
fileWriter, err := pqarrow.NewFileWriter(
tbl.Schema(),
sink,
parquet.NewWriterProperties(parquet.WithVersion(parquet.V1_0)),
props,
)
require.NoError(t, err)

writer := file.NewParquetWriter(sink, psc.Root(), file.WithWriterProps(wrprops))
ctx := pqarrow.NewArrowWriteContext(context.TODO(), &props)

offset := int64(0)
for offset < tbl.NumRows() {
sz := utils.Min(rowGroupSize, tbl.NumRows()-offset)
srgw := writer.AppendRowGroup()
colIdx := 0
fileWriter.NewRowGroup()
for i := 0; i < int(tbl.NumCols()); i++ {
col := tbl.Column(i)
acw, err := pqarrow.NewArrowColumnWriter(col.Data(), offset, sz, manifest, srgw, colIdx)
colChunk := tbl.Column(i).Data()
err := fileWriter.WriteColumnChunked(colChunk, 0, int64(colChunk.Len()))
require.NoError(t, err)
require.NoError(t, acw.Write(ctx))
colIdx = colIdx + acw.LeafCount()
}
srgw.Close()
offset += sz
}
writer.Close()

require.NoError(t, fileWriter.Close())
return sink.Finish()
}

Expand Down
2 changes: 1 addition & 1 deletion go/parquet/pqarrow/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (fw *FileWriter) Close() error {
// building of writing columns to a file via arrow data without needing to already have
// a record or table.
func (fw *FileWriter) WriteColumnChunked(data *arrow.Chunked, offset, size int64) error {
acw, err := NewArrowColumnWriter(data, offset, size, fw.manifest, fw.rgw, fw.colIdx)
acw, err := newArrowColumnWriter(data, offset, size, fw.manifest, fw.rgw, fw.colIdx)
if err != nil {
return err
}
Expand Down

0 comments on commit 1e7175d

Please sign in to comment.