Skip to content

Commit

Permalink
Refactor.
Browse files Browse the repository at this point in the history
  • Loading branch information
cube2222 committed Aug 9, 2023
1 parent c24d628 commit c0bce86
Show file tree
Hide file tree
Showing 7 changed files with 9 additions and 9 deletions.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion arrowexec/nodes/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/apache/arrow/go/v13/arrow/compute"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/cube2222/octosql/arrowexec/execution"
"github.com/cube2222/octosql/arrowexec/nodes/helpers"
"github.com/cube2222/octosql/arrowexec/helpers"
"golang.org/x/sync/errgroup"
)

Expand Down
2 changes: 1 addition & 1 deletion arrowexec/nodes/group_by.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/brentp/intintmap"
"github.com/cube2222/octosql/arrowexec/execution"
"github.com/cube2222/octosql/arrowexec/nodes/helpers"
"github.com/cube2222/octosql/arrowexec/helpers"
)

type GroupBy struct {
Expand Down
14 changes: 7 additions & 7 deletions arrowexec/nodes/hashtable/join_hashtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/brentp/intintmap"
"github.com/cube2222/octosql/arrowexec/execution"
"github.com/cube2222/octosql/arrowexec/nodes/helpers"
helpers2 "github.com/cube2222/octosql/arrowexec/helpers"
"github.com/twotwotwo/sorts"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -47,7 +47,7 @@ func buildJoinTablePartitions(records []execution.Record, keyIndices []int) []Jo
// TODO: Handle case where there are 0 records.
keyHashers := make([]func(rowIndex uint) uint64, len(records))
for i, record := range records {
keyHashers[i] = helpers.MakeKeyHasher(record, keyIndices)
keyHashers[i] = helpers2.MakeKeyHasher(record, keyIndices)
}

var overallRowCount int
Expand Down Expand Up @@ -141,7 +141,7 @@ func buildRecords(records []execution.Record, hashPositionsOrdered []hashRowPosi
for columnIndex := 0; columnIndex < columnCount; columnIndex++ {
columnRewriters := make([]func(rowIndex int), len(records))
for recordIndex, record := range records {
columnRewriters[recordIndex] = helpers.MakeColumnRewriter(recordBuilder.Field(columnIndex), record.Column(columnIndex))
columnRewriters[recordIndex] = helpers2.MakeColumnRewriter(recordBuilder.Field(columnIndex), record.Column(columnIndex))
}

g.Go(func() error {
Expand Down Expand Up @@ -185,11 +185,11 @@ func (t *JoinTable) JoinWithRecord(record execution.Record, produce func(executi
}
outSchema := arrow.NewSchema(outFields, nil)

recordKeyHasher := helpers.MakeKeyHasher(record, t.joinedKeyIndices)
recordKeyHasher := helpers2.MakeKeyHasher(record, t.joinedKeyIndices)

partitionKeyEqualityCheckers := make([]func(joinedRowIndex int, tableRowIndex int) bool, len(t.partitions))
for partitionIndex := range t.partitions {
partitionKeyEqualityCheckers[partitionIndex] = helpers.MakeKeyEqualityChecker(record, t.partitions[partitionIndex].values, t.keyIndices, t.joinedKeyIndices)
partitionKeyEqualityCheckers[partitionIndex] = helpers2.MakeKeyEqualityChecker(record, t.partitions[partitionIndex].values, t.keyIndices, t.joinedKeyIndices)
}

recordBuilder := array.NewRecordBuilder(memory.NewGoAllocator(), outSchema)
Expand Down Expand Up @@ -247,12 +247,12 @@ func (t *JoinTable) makeRecordRewriterForPartition(joinedRecord execution.Record

joinedRecordColumnRewriters := make([]func(rowIndex int), len(joinedRecord.Columns()))
for columnIndex := range joinedRecord.Columns() {
joinedRecordColumnRewriters[columnIndex] = helpers.MakeColumnRewriter(recordBuilder.Field(joinedRecordColumnOffset+columnIndex), joinedRecord.Column(columnIndex))
joinedRecordColumnRewriters[columnIndex] = helpers2.MakeColumnRewriter(recordBuilder.Field(joinedRecordColumnOffset+columnIndex), joinedRecord.Column(columnIndex))
}

tableColumnRewriters := make([]func(rowIndex int), len(partition.values.Columns()))
for columnIndex := range partition.values.Columns() {
tableColumnRewriters[columnIndex] = helpers.MakeColumnRewriter(recordBuilder.Field(tableColumnOffset+columnIndex), partition.values.Column(columnIndex))
tableColumnRewriters[columnIndex] = helpers2.MakeColumnRewriter(recordBuilder.Field(tableColumnOffset+columnIndex), partition.values.Column(columnIndex))
}

return func(joinedRowIndex int, tableRowIndex int) {
Expand Down

0 comments on commit c0bce86

Please sign in to comment.