-
Notifications
You must be signed in to change notification settings - Fork 1
/
arrow3.go
81 lines (68 loc) · 1.84 KB
/
arrow3.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package arrow3
import (
"context"
"errors"
"io"
"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/memory"
"github.com/apache/arrow/go/v17/parquet"
"github.com/apache/arrow/go/v17/parquet/schema"
"google.golang.org/protobuf/proto"
)
type Schema[T proto.Message] struct {
msg *message
}
func New[T proto.Message](mem memory.Allocator) (schema *Schema[T], err error) {
defer func() {
e := recover()
if e != nil {
switch x := e.(type) {
case error:
err = x
case string:
err = errors.New(x)
default:
panic(x)
}
}
}()
var a T
b := build(a.ProtoReflect())
b.build(mem)
schema = &Schema[T]{msg: b}
return
}
// Append appends protobuf value to the schema builder.This method is not safe
// for concurrent use.
func (s *Schema[T]) Append(value T) {
s.msg.append(value.ProtoReflect())
}
// NewRecord returns buffered builder value as an arrow.Record. The builder is
// reset and can be reused to build new records.
func (s *Schema[T]) NewRecord() arrow.Record {
return s.msg.NewRecord()
}
// Parquet returns schema as parquet schema
func (s *Schema[T]) Parquet() *schema.Schema {
return s.msg.Parquet()
}
// Parquet returns schema as arrow schema
func (s *Schema[T]) Schema() *arrow.Schema {
return s.msg.schema
}
func (s *Schema[T]) Read(ctx context.Context, r parquet.ReaderAtSeeker, columns []int) (arrow.Record, error) {
return s.msg.Read(ctx, r, columns)
}
func (s *Schema[T]) WriteParquet(w io.Writer) error {
return s.msg.WriteParquet(w)
}
func (s *Schema[T]) WriteParquetRecords(w io.Writer, records ...arrow.Record) error {
return s.msg.WriteParquetRecords(w, records...)
}
// Proto decodes rows and returns them as proto messages.
func (s *Schema[T]) Proto(r arrow.Record, rows []int) []T {
return unmarshal[T](s.msg.root, r, rows)
}
func (s *Schema[T]) Release() {
s.msg.builder.Release()
}