Skip to content

Commit

Permalink
Refactor packages.
Browse files Browse the repository at this point in the history
  • Loading branch information
cube2222 committed Aug 8, 2023
1 parent d8b7312 commit e3c6869
Show file tree
Hide file tree
Showing 8 changed files with 276 additions and 253 deletions.
18 changes: 2 additions & 16 deletions arrowexec/nodes/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package nodes
import (
"fmt"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/compute"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/cube2222/octosql/arrowexec/execution"
"github.com/cube2222/octosql/arrowexec/nodes/helpers"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -74,7 +74,7 @@ func (f *RebatchingFilter) Run(ctx execution.Context, produce execution.ProduceF
g, _ := errgroup.WithContext(ctx.Context)
columns := record.Columns()
for i, column := range columns {
rewriter := MakeColumnRewriter(recordBuilder.Field(i), column)
rewriter := helpers.MakeColumnRewriter(recordBuilder.Field(i), column)
g.Go(func() error {
Rewrite(typedSelection, rewriter)
return nil
Expand Down Expand Up @@ -104,20 +104,6 @@ func (f *RebatchingFilter) Run(ctx execution.Context, produce execution.ProduceF
return nil
}

func MakeColumnRewriter(builder array.Builder, arr arrow.Array) func(rowIndex int) {
// TODO: Should this operate on row ranges instead of single rows? Would make low-selectivity workloads faster, as well as nested types.
switch builder.Type().ID() {
case arrow.INT64:
typedBuilder := builder.(*array.Int64Builder)
typedArr := arr.(*array.Int64)
return func(rowIndex int) {
typedBuilder.Append(typedArr.Value(rowIndex))
}
default:
panic(fmt.Errorf("unsupported type for filtering: %v", builder.Type().ID()))
}
}

func Rewrite(selection *array.Boolean, rewriteFunc func(rowIndex int)) {
for i := 0; i < selection.Len(); i++ {
if selection.Value(i) {
Expand Down
31 changes: 2 additions & 29 deletions arrowexec/nodes/group_by.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/brentp/intintmap"
"github.com/cube2222/octosql/arrowexec/execution"
"github.com/segmentio/fasthash/fnv1a"
"github.com/cube2222/octosql/arrowexec/nodes/helpers"
)

type GroupBy struct {
Expand All @@ -36,7 +36,7 @@ func (g *GroupBy) Run(ctx execution.Context, produce execution.ProduceFunc) erro
}

if err := g.Source.Node.Run(ctx, func(ctx execution.ProduceContext, record execution.Record) error {
getKeyHash := MakeKeyHasher(record, g.KeyColumns)
getKeyHash := helpers.MakeKeyHasher(record, g.KeyColumns)

aggColumnConsumers := make([]func(entryIndex uint, rowIndex uint), len(aggregates))
for i := range aggColumnConsumers {
Expand Down Expand Up @@ -174,33 +174,6 @@ func (agg *SumInt) GetBatch(length int, offset int) arrow.Array {
return array.NewInt64Data(array.NewData(arrow.PrimitiveTypes.Int64, length, []*memory.Buffer{nil, agg.data}, nil, 0 /*TODO: Fixme*/, offset))
}

func MakeKeyHasher(record execution.Record, keyIndices []int) func(rowIndex uint) uint64 {
subHashers := make([]func(hash uint64, rowIndex uint) uint64, len(keyIndices))
for i, colIndex := range keyIndices {
switch record.Column(colIndex).DataType().ID() {
case arrow.INT64:
typedArr := record.Column(colIndex).(*array.Int64).Int64Values()
subHashers[i] = func(hash uint64, rowIndex uint) uint64 {
return fnv1a.AddUint64(hash, uint64(typedArr[rowIndex]))
}
case arrow.STRING:
typedArr := record.Column(colIndex).(*array.String)
subHashers[i] = func(hash uint64, rowIndex uint) uint64 {
return fnv1a.AddString64(hash, typedArr.Value(int(rowIndex)))
}
default:
panic("unsupported")
}
}
return func(rowIndex uint) uint64 {
hash := fnv1a.Init64
for _, hasher := range subHashers {
hash = hasher(hash, rowIndex)
}
return hash
}
}

func MakeKey(dt arrow.DataType) Key {
switch dt.ID() {
case arrow.INT64:
Expand Down
148 changes: 148 additions & 0 deletions arrowexec/nodes/hashtable/join_hashtable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package hashtable

import (
"runtime"
"sync"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/brentp/intintmap"
"github.com/cube2222/octosql/arrowexec/execution"
"github.com/cube2222/octosql/arrowexec/nodes/helpers"
"github.com/twotwotwo/sorts"
"golang.org/x/sync/errgroup"
)

type JoinHashTable struct {
hashStartIndices *intintmap.Map
hashes *array.Uint64
values execution.Record
}

// BuildHashTable groups the records by their key hashes, and returns them with an index of partition starts.
func BuildHashTable(records []execution.Record, indices []int) *JoinHashTable {
// TODO: Handle case where there are 0 records.
keyHashers := make([]func(rowIndex uint) uint64, len(records))
for i, record := range records {
keyHashers[i] = helpers.MakeKeyHasher(record, indices)
}

var overallRowCount int
for _, record := range records {
overallRowCount += int(record.NumRows())
}

hashPositionsOrdered := make([]hashRowPosition, overallRowCount)
i := 0
for recordIndex, record := range records {
numRows := int(record.NumRows())
for rowIndex := 0; rowIndex < numRows; rowIndex++ {
hash := keyHashers[recordIndex](uint(rowIndex))
hashPositionsOrdered[i] = hashRowPosition{
hash: hash,
recordIndex: recordIndex,
rowIndex: rowIndex,
}
i++
}
}

sorts.ByUint64(SortHashPosition(hashPositionsOrdered))

var wg sync.WaitGroup
wg.Add(2)

var hashIndex *intintmap.Map
go func() {
hashIndex = buildHashIndex(hashPositionsOrdered)
wg.Done()
}()

var hashesArray *array.Uint64
go func() {
hashesArray = buildHashesArray(overallRowCount, hashPositionsOrdered)
wg.Done()
}()

record := buildRecords(records, overallRowCount, hashPositionsOrdered)

wg.Wait()

return &JoinHashTable{
hashStartIndices: hashIndex,
hashes: hashesArray,
values: execution.Record{Record: record},
}
}

func buildHashIndex(hashPositionsOrdered []hashRowPosition) *intintmap.Map {
hashIndex := intintmap.New(1024, 0.6)
hashIndex.Put(int64(hashPositionsOrdered[0].hash), 0)
for i := 1; i < len(hashPositionsOrdered); i++ {
if hashPositionsOrdered[i].hash != hashPositionsOrdered[i-1].hash {
hashIndex.Put(int64(hashPositionsOrdered[i].hash), int64(i))
}
}
return hashIndex
}

type hashRowPosition struct {
hash uint64
recordIndex int
rowIndex int
}

func buildHashesArray(overallRowCount int, hashPositionsOrdered []hashRowPosition) *array.Uint64 {
hashesBuilder := array.NewUint64Builder(memory.NewGoAllocator()) // TODO: Get allocator from argument.
hashesBuilder.Reserve(overallRowCount)
for _, hashPosition := range hashPositionsOrdered {
hashesBuilder.UnsafeAppend(hashPosition.hash)
}
return hashesBuilder.NewUint64Array()
}

func buildRecords(records []execution.Record, overallRowCount int, hashPositionsOrdered []hashRowPosition) arrow.Record {
// TODO: Get allocator from argument.
recordBuilder := array.NewRecordBuilder(memory.NewGoAllocator(), records[0].Schema())
recordBuilder.Reserve(overallRowCount)

var g errgroup.Group
g.SetLimit(runtime.GOMAXPROCS(0))

columnCount := len(recordBuilder.Fields())
for columnIndex := 0; columnIndex < columnCount; columnIndex++ {
columnRewriters := make([]func(rowIndex int), len(records))
for recordIndex, record := range records {
columnRewriters[recordIndex] = helpers.MakeColumnRewriter(recordBuilder.Field(columnIndex), record.Column(columnIndex))
}

g.Go(func() error {
for _, hashPosition := range hashPositionsOrdered {
columnRewriters[hashPosition.recordIndex](hashPosition.rowIndex)
}
return nil
})
}
g.Wait()
record := recordBuilder.NewRecord()
return record
}

type SortHashPosition []hashRowPosition

func (h SortHashPosition) Len() int {
return len(h)
}

func (h SortHashPosition) Less(i, j int) bool {
return h[i].hash < h[j].hash
}

func (h SortHashPosition) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

func (h SortHashPosition) Key(i int) uint64 {
return h[i].hash
}
67 changes: 67 additions & 0 deletions arrowexec/nodes/hashtable/join_hashtable_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package hashtable

import (
"log"
"math/rand"
"testing"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/cube2222/octosql/arrowexec/execution"
)

func TestPartition(t *testing.T) {
schema := arrow.NewSchema([]arrow.Field{
{Name: "a", Type: arrow.PrimitiveTypes.Int64},
{Name: "b", Type: arrow.PrimitiveTypes.Int64},
{Name: "c", Type: arrow.PrimitiveTypes.Int64},
}, nil)

var records []execution.Record
recordBuilder := array.NewRecordBuilder(memory.DefaultAllocator, schema)
for i := 0; i < 5; i++ {
for j := 0; j < 10; j++ {
recordBuilder.Field(0).(*array.Int64Builder).Append(int64(rand.Intn(5)))
recordBuilder.Field(1).(*array.Int64Builder).Append(int64(rand.Intn(2)))
recordBuilder.Field(2).(*array.Int64Builder).Append(int64(rand.Intn(7)))
}

records = append(records, execution.Record{
Record: recordBuilder.NewRecord(),
})
}

table := BuildHashTable(records, []int{0, 1})

log.Println("hashes:", table)
}

func BenchmarkPartitionIntegers(b *testing.B) {
b.StopTimer()
schema := arrow.NewSchema([]arrow.Field{
{Name: "a", Type: arrow.PrimitiveTypes.Int64},
{Name: "b", Type: arrow.PrimitiveTypes.Int64},
{Name: "c", Type: arrow.PrimitiveTypes.Int64},
}, nil)

var records []execution.Record
recordBuilder := array.NewRecordBuilder(memory.DefaultAllocator, schema)
for i := 0; i < 128; i++ {
for j := 0; j < execution.IdealBatchSize; j++ {
recordBuilder.Field(0).(*array.Int64Builder).Append(int64(rand.Intn(1024 * 8)))
recordBuilder.Field(1).(*array.Int64Builder).Append(int64(rand.Intn(4)))
recordBuilder.Field(2).(*array.Int64Builder).Append(int64(rand.Intn(7)))
}

records = append(records, execution.Record{
Record: recordBuilder.NewRecord(),
})
}
b.StartTimer()

for i := 0; i < b.N; i++ {
table := BuildHashTable(records, []int{0, 1})
table = table
}
}
35 changes: 35 additions & 0 deletions arrowexec/nodes/helpers/key_hasher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package helpers

import (
"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/cube2222/octosql/arrowexec/execution"
"github.com/segmentio/fasthash/fnv1a"
)

func MakeKeyHasher(record execution.Record, keyIndices []int) func(rowIndex uint) uint64 {
subHashers := make([]func(hash uint64, rowIndex uint) uint64, len(keyIndices))
for i, colIndex := range keyIndices {
switch record.Column(colIndex).DataType().ID() {
case arrow.INT64:
typedArr := record.Column(colIndex).(*array.Int64).Int64Values()
subHashers[i] = func(hash uint64, rowIndex uint) uint64 {
return fnv1a.AddUint64(hash, uint64(typedArr[rowIndex]))
}
case arrow.STRING:
typedArr := record.Column(colIndex).(*array.String)
subHashers[i] = func(hash uint64, rowIndex uint) uint64 {
return fnv1a.AddString64(hash, typedArr.Value(int(rowIndex)))
}
default:
panic("unsupported")
}
}
return func(rowIndex uint) uint64 {
hash := fnv1a.Init64
for _, hasher := range subHashers {
hash = hasher(hash, rowIndex)
}
return hash
}
}
22 changes: 22 additions & 0 deletions arrowexec/nodes/helpers/rewriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package helpers

import (
"fmt"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
)

func MakeColumnRewriter(builder array.Builder, arr arrow.Array) func(rowIndex int) {
// TODO: Should this operate on row ranges instead of single rows? Would make low-selectivity workloads faster, as well as nested types.
switch builder.Type().ID() {
case arrow.INT64:
typedBuilder := builder.(*array.Int64Builder)
typedArr := arr.(*array.Int64)
return func(rowIndex int) {
typedBuilder.Append(typedArr.Value(rowIndex))
}
default:
panic(fmt.Errorf("unsupported type for filtering: %v", builder.Type().ID()))
}
}
Loading

0 comments on commit e3c6869

Please sign in to comment.