Skip to content

Commit

Permalink
chore: remove Persister[].SortingColumns method (#3383)
Browse files Browse the repository at this point in the history
- remove Persister[].SortingColumns which is ued only in tests
- remove index argument from Desconstruct
- remove index return value from Reconstruct
  • Loading branch information
korniltsev authored Jun 26, 2024
1 parent b62930e commit 0d7e66a
Show file tree
Hide file tree
Showing 11 changed files with 92 additions and 129 deletions.
2 changes: 1 addition & 1 deletion pkg/phlaredb/block_querier_symbols.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (r *inMemoryparquetReader[M, P]) readRG(dst []M, rg parquet.RowGroup) (err
n, err := rr.ReadRows(buf)
if n > 0 {
for _, row := range buf[:n] {
_, v, err := r.persister.Reconstruct(row)
v, err := r.persister.Reconstruct(row)
if err != nil {
return err
}
Expand Down
8 changes: 3 additions & 5 deletions pkg/phlaredb/schemas/v1/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ func (*FunctionPersister) Name() string { return "functions" }

func (*FunctionPersister) Schema() *parquet.Schema { return functionsSchema }

func (*FunctionPersister) SortingColumns() parquet.SortingOption { return parquet.SortingColumns() }

func (*FunctionPersister) Deconstruct(row parquet.Row, _ uint64, fn *InMemoryFunction) parquet.Row {
func (*FunctionPersister) Deconstruct(row parquet.Row, fn *InMemoryFunction) parquet.Row {
if cap(row) < 5 {
row = make(parquet.Row, 0, 5)
}
Expand All @@ -29,15 +27,15 @@ func (*FunctionPersister) Deconstruct(row parquet.Row, _ uint64, fn *InMemoryFun
return row
}

func (*FunctionPersister) Reconstruct(row parquet.Row) (uint64, *InMemoryFunction, error) {
func (*FunctionPersister) Reconstruct(row parquet.Row) (*InMemoryFunction, error) {
loc := InMemoryFunction{
Id: row[0].Uint64(),
Name: row[1].Uint32(),
SystemName: row[2].Uint32(),
Filename: row[3].Uint32(),
StartLine: row[4].Uint32(),
}
return 0, &loc, nil
return &loc, nil
}

type InMemoryFunction struct {
Expand Down
8 changes: 3 additions & 5 deletions pkg/phlaredb/schemas/v1/locations.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ func (*LocationPersister) Name() string { return "locations" }

func (*LocationPersister) Schema() *parquet.Schema { return locationsSchema }

func (*LocationPersister) SortingColumns() parquet.SortingOption { return parquet.SortingColumns() }

func (*LocationPersister) Deconstruct(row parquet.Row, _ uint64, loc *InMemoryLocation) parquet.Row {
func (*LocationPersister) Deconstruct(row parquet.Row, loc *InMemoryLocation) parquet.Row {
var (
col = -1
newCol = func() int {
Expand Down Expand Up @@ -61,7 +59,7 @@ func (*LocationPersister) Deconstruct(row parquet.Row, _ uint64, loc *InMemoryLo
return row
}

func (*LocationPersister) Reconstruct(row parquet.Row) (uint64, *InMemoryLocation, error) {
func (*LocationPersister) Reconstruct(row parquet.Row) (*InMemoryLocation, error) {
loc := InMemoryLocation{
Id: row[0].Uint64(),
MappingId: uint32(row[1].Uint64()),
Expand All @@ -76,7 +74,7 @@ func (*LocationPersister) Reconstruct(row parquet.Row) (uint64, *InMemoryLocatio
for i, v := range lines[len(lines)/2:] {
loc.Line[i].Line = int32(v.Uint64())
}
return 0, &loc, nil
return &loc, nil
}

type InMemoryLocation struct {
Expand Down
8 changes: 3 additions & 5 deletions pkg/phlaredb/schemas/v1/mappings.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ func (*MappingPersister) Name() string { return "mappings" }

func (*MappingPersister) Schema() *parquet.Schema { return mappingsSchema }

func (*MappingPersister) SortingColumns() parquet.SortingOption { return parquet.SortingColumns() }

func (*MappingPersister) Deconstruct(row parquet.Row, _ uint64, m *InMemoryMapping) parquet.Row {
func (*MappingPersister) Deconstruct(row parquet.Row, m *InMemoryMapping) parquet.Row {
if cap(row) < 10 {
row = make(parquet.Row, 0, 10)
}
Expand All @@ -34,7 +32,7 @@ func (*MappingPersister) Deconstruct(row parquet.Row, _ uint64, m *InMemoryMappi
return row
}

func (*MappingPersister) Reconstruct(row parquet.Row) (uint64, *InMemoryMapping, error) {
func (*MappingPersister) Reconstruct(row parquet.Row) (*InMemoryMapping, error) {
mapping := InMemoryMapping{
Id: row[0].Uint64(),
MemoryStart: row[1].Uint64(),
Expand All @@ -47,7 +45,7 @@ func (*MappingPersister) Reconstruct(row parquet.Row) (uint64, *InMemoryMapping,
HasLineNumbers: row[8].Boolean(),
HasInlineFrames: row[9].Boolean(),
}
return 0, &mapping, nil
return &mapping, nil
}

type InMemoryMapping struct {
Expand Down
16 changes: 4 additions & 12 deletions pkg/phlaredb/schemas/v1/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,25 +219,17 @@ func (*ProfilePersister) Schema() *parquet.Schema {
return ProfilesSchema
}

func (*ProfilePersister) SortingColumns() parquet.SortingOption {
return parquet.SortingColumns(
parquet.Ascending("SeriesIndex"),
parquet.Ascending("TimeNanos"),
parquet.Ascending("Samples", "list", "element", "StacktraceID"),
)
}

func (*ProfilePersister) Deconstruct(row parquet.Row, id uint64, s *Profile) parquet.Row {
func (*ProfilePersister) Deconstruct(row parquet.Row, s *Profile) parquet.Row {
row = ProfilesSchema.Deconstruct(row, s)
return row
}

func (*ProfilePersister) Reconstruct(row parquet.Row) (id uint64, s *Profile, err error) {
func (*ProfilePersister) Reconstruct(row parquet.Row) (s *Profile, err error) {
var profile Profile
if err := ProfilesSchema.Reconstruct(&profile, row); err != nil {
return 0, nil, err
return nil, err
}
return 0, &profile, nil
return &profile, nil
}

type SliceRowReader[T any] struct {
Expand Down
61 changes: 2 additions & 59 deletions pkg/phlaredb/schemas/v1/read_writer.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package v1

import (
"io"
"sort"

"github.com/parquet-go/parquet-go"
)

Expand All @@ -14,60 +11,6 @@ type PersisterName interface {
type Persister[T any] interface {
PersisterName
Schema() *parquet.Schema
Deconstruct(parquet.Row, uint64, T) parquet.Row
Reconstruct(parquet.Row) (uint64, T, error)
SortingColumns() parquet.SortingOption
}

type ReadWriter[T any, P Persister[T]] struct{}

func (*ReadWriter[T, P]) WriteParquetFile(file io.Writer, elements []T) error {
var (
persister P
rows = make([]parquet.Row, len(elements))
)

buffer := parquet.NewBuffer(persister.Schema(), parquet.SortingRowGroupConfig(persister.SortingColumns()))

for pos := range rows {
rows[pos] = persister.Deconstruct(rows[pos], uint64(pos), elements[pos])
}

if _, err := buffer.WriteRows(rows); err != nil {
return err
}
sort.Sort(buffer)

writer := parquet.NewWriter(file, persister.Schema())
if _, err := parquet.CopyRows(writer, buffer.Rows()); err != nil {
return err
}

return writer.Close()
}

func (*ReadWriter[T, P]) ReadParquetFile(file io.ReaderAt) ([]T, error) {
var (
persister P
reader = parquet.NewReader(file, persister.Schema())
)
defer reader.Close()

rows := make([]parquet.Row, reader.NumRows())
if _, err := reader.ReadRows(rows); err != nil {
return nil, err
}

var (
elements = make([]T, reader.NumRows())
err error
)
for pos := range elements {
_, elements[pos], err = persister.Reconstruct(rows[pos])
if err != nil {
return nil, err
}
}

return elements, nil
Deconstruct(parquet.Row, T) parquet.Row
Reconstruct(parquet.Row) (T, error)
}
88 changes: 66 additions & 22 deletions pkg/phlaredb/schemas/v1/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v1

import (
"bytes"
"io"
"strings"
"testing"

Expand Down Expand Up @@ -256,21 +257,17 @@ func (*pprofLocationPersister) Name() string { return "locations" }

func (*pprofLocationPersister) Schema() *parquet.Schema { return protoLocationsSchema }

func (*pprofLocationPersister) SortingColumns() parquet.SortingOption {
return parquet.SortingColumns()
}

func (*pprofLocationPersister) Deconstruct(row parquet.Row, _ uint64, loc *profilev1.Location) parquet.Row {
func (*pprofLocationPersister) Deconstruct(row parquet.Row, loc *profilev1.Location) parquet.Row {
row = protoLocationsSchema.Deconstruct(row, loc)
return row
}

func (*pprofLocationPersister) Reconstruct(row parquet.Row) (uint64, *profilev1.Location, error) {
func (*pprofLocationPersister) Reconstruct(row parquet.Row) (*profilev1.Location, error) {
var loc profilev1.Location
if err := protoLocationsSchema.Reconstruct(&loc, row); err != nil {
return 0, nil, err
return nil, err
}
return 0, &loc, nil
return &loc, nil
}

func TestFunctionsRoundTrip(t *testing.T) {
Expand Down Expand Up @@ -329,21 +326,17 @@ func (*pprofFunctionPersister) Name() string { return "functions" }

func (*pprofFunctionPersister) Schema() *parquet.Schema { return protoFunctionSchema }

func (*pprofFunctionPersister) SortingColumns() parquet.SortingOption {
return parquet.SortingColumns()
}

func (*pprofFunctionPersister) Deconstruct(row parquet.Row, _ uint64, loc *profilev1.Function) parquet.Row {
func (*pprofFunctionPersister) Deconstruct(row parquet.Row, loc *profilev1.Function) parquet.Row {
row = protoFunctionSchema.Deconstruct(row, loc)
return row
}

func (*pprofFunctionPersister) Reconstruct(row parquet.Row) (uint64, *profilev1.Function, error) {
func (*pprofFunctionPersister) Reconstruct(row parquet.Row) (*profilev1.Function, error) {
var fn profilev1.Function
if err := protoFunctionSchema.Reconstruct(&fn, row); err != nil {
return 0, nil, err
return nil, err
}
return 0, &fn, nil
return &fn, nil
}

func TestMappingsRoundTrip(t *testing.T) {
Expand Down Expand Up @@ -422,17 +415,68 @@ func (*pprofMappingPersister) Name() string { return "mappings" }

func (*pprofMappingPersister) Schema() *parquet.Schema { return protoMappingSchema }

func (*pprofMappingPersister) SortingColumns() parquet.SortingOption { return parquet.SortingColumns() }

func (*pprofMappingPersister) Deconstruct(row parquet.Row, _ uint64, loc *profilev1.Mapping) parquet.Row {
func (*pprofMappingPersister) Deconstruct(row parquet.Row, loc *profilev1.Mapping) parquet.Row {
row = protoMappingSchema.Deconstruct(row, loc)
return row
}

func (*pprofMappingPersister) Reconstruct(row parquet.Row) (uint64, *profilev1.Mapping, error) {
func (*pprofMappingPersister) Reconstruct(row parquet.Row) (*profilev1.Mapping, error) {
var m profilev1.Mapping
if err := protoMappingSchema.Reconstruct(&m, row); err != nil {
return 0, nil, err
return nil, err
}
return &m, nil
}

type ReadWriter[T any, P Persister[T]] struct {
}

func (r *ReadWriter[T, P]) WriteParquetFile(file io.Writer, elements []T) error {
var (
persister P
rows = make([]parquet.Row, len(elements))
)

buffer := parquet.NewBuffer(persister.Schema())

for pos := range rows {
rows[pos] = persister.Deconstruct(rows[pos], elements[pos])
}

if _, err := buffer.WriteRows(rows); err != nil {
return err
}

writer := parquet.NewWriter(file, persister.Schema())
if _, err := parquet.CopyRows(writer, buffer.Rows()); err != nil {
return err
}
return 0, &m, nil

return writer.Close()
}

func (*ReadWriter[T, P]) ReadParquetFile(file io.ReaderAt) ([]T, error) {
var (
persister P
reader = parquet.NewReader(file, persister.Schema())
)
defer reader.Close()

rows := make([]parquet.Row, reader.NumRows())
if _, err := reader.ReadRows(rows); err != nil {
return nil, err
}

var (
elements = make([]T, reader.NumRows())
err error
)
for pos := range elements {
elements[pos], err = persister.Reconstruct(rows[pos])
if err != nil {
return nil, err
}
}

return elements, nil
}
16 changes: 4 additions & 12 deletions pkg/phlaredb/schemas/v1/stacktraces.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,17 @@ func (*StacktracePersister) Schema() *parquet.Schema {
return stacktracesSchema
}

func (*StacktracePersister) SortingColumns() parquet.SortingOption {
return parquet.SortingColumns(
parquet.Ascending("ID"),
parquet.Ascending("LocationIDs", "list", "element"),
)
}

func (*StacktracePersister) Deconstruct(row parquet.Row, id uint64, s *Stacktrace) parquet.Row {
func (*StacktracePersister) Deconstruct(row parquet.Row, s *Stacktrace) parquet.Row {
var stored storedStacktrace
stored.ID = id
stored.LocationIDs = s.LocationIDs
row = stacktracesSchema.Deconstruct(row, &stored)
return row
}

func (*StacktracePersister) Reconstruct(row parquet.Row) (id uint64, s *Stacktrace, err error) {
func (*StacktracePersister) Reconstruct(row parquet.Row) (s *Stacktrace, err error) {
var stored storedStacktrace
if err := stacktracesSchema.Reconstruct(&stored, row); err != nil {
return 0, nil, err
return nil, err
}
return stored.ID, &Stacktrace{LocationIDs: stored.LocationIDs}, nil
return &Stacktrace{LocationIDs: stored.LocationIDs}, nil
}
10 changes: 4 additions & 6 deletions pkg/phlaredb/schemas/v1/strings.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,16 @@ func (*StringPersister) Name() string { return "strings" }

func (*StringPersister) Schema() *parquet.Schema { return stringsSchema }

func (*StringPersister) SortingColumns() parquet.SortingOption { return parquet.SortingColumns() }

func (*StringPersister) Deconstruct(row parquet.Row, id uint64, s string) parquet.Row {
func (*StringPersister) Deconstruct(row parquet.Row, s string) parquet.Row {
if cap(row) < 2 {
row = make(parquet.Row, 0, 2)
}
row = row[:0]
row = append(row, parquet.Int64Value(int64(id)).Level(0, 0, 0))
row = append(row, parquet.Int64Value(int64(0)).Level(0, 0, 0))
row = append(row, parquet.ByteArrayValue([]byte(s)).Level(0, 0, 1))
return row
}

func (*StringPersister) Reconstruct(row parquet.Row) (id uint64, s string, err error) {
return 0, row[1].String(), nil
func (*StringPersister) Reconstruct(row parquet.Row) (s string, err error) {
return row[1].String(), nil
}
2 changes: 1 addition & 1 deletion pkg/phlaredb/symdb/block_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ func (t *parquetTableRange[M, P]) readRows(dst []M, buf []parquet.Row, rows parq
if i == len(dst) {
return nil
}
_, v, err := t.persister.Reconstruct(row)
v, err := t.persister.Reconstruct(row)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 0d7e66a

Please sign in to comment.