Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
adranwit committed Jun 24, 2024
2 parents 83d6c25 + 02c4a6f commit 07d69ad
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 35 deletions.
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1140,8 +1140,6 @@ github.com/viant/xlsy v0.3.0 h1:j/cADAd41XcxfAGSyPUZmEOHbVe2dcjdVI6JiTKA6ws=
github.com/viant/xlsy v0.3.0/go.mod h1:RajfF9HkL/PfIxRCvZSubpNlpdMUNDKYZp8C1o3vF4Q=
github.com/viant/xmlify v0.1.1-0.20231127181625-8a6b48ceea12 h1:j7I0D1M5lvmc5dxXzSyx99GSfYiilrSc1hnmFFL+jrg=
github.com/viant/xmlify v0.1.1-0.20231127181625-8a6b48ceea12/go.mod h1:w25+umH6nthlQ8ACT3K2/YJOLlbTXKLQXkdqFs6ky9s=
github.com/viant/xreflect v0.6.2-0.20240129222322-972307391f16 h1:bToK8gxp1Lu4pq/bv18rE0zMvEB5krKKyuLMk2SsWgE=
github.com/viant/xreflect v0.6.2-0.20240129222322-972307391f16/go.mod h1:BwI+lqFjhKv2Vn4E0Jt6nvbwcFOWrM6H+sOMOX3JiU4=
github.com/viant/xreflect v0.6.2 h1:PzpiTHHMwqMV2ScDJph+pMkk+JvuXFZFj6xwnM/E6sc=
github.com/viant/xreflect v0.6.2/go.mod h1:BwI+lqFjhKv2Vn4E0Jt6nvbwcFOWrM6H+sOMOX3JiU4=
github.com/viant/xunsafe v0.9.3-0.20240530173106-69808f27713b h1:m4Jj76Wq0E1uHannlo8OSrawrNDXlJjOkGQvW6bVzI8=
Expand Down
18 changes: 13 additions & 5 deletions service/reader/reflect.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,21 @@ import (
)

func combineSlices(slice1, slice2 interface{}) interface{} {
val1 := reflect.ValueOf(slice1)
val2 := reflect.ValueOf(slice2)
sliceVal1 := reflect.ValueOf(slice1)
if sliceVal1.Kind() == reflect.Ptr {
sliceVal1 = sliceVal1.Elem()
}

sliceVal2 := reflect.ValueOf(slice2)
if sliceVal2.Kind() == reflect.Ptr {
sliceVal2 = sliceVal2.Elem()
}
// Create a new slice with the combined length of slice1 and slice2
combined := reflect.MakeSlice(reflect.SliceOf(val1.Type().Elem()), val1.Len()+val2.Len(), val1.Len()+val2.Len())
sum := sliceVal1.Len() + sliceVal2.Len()
combined := reflect.MakeSlice(reflect.SliceOf(sliceVal1.Type().Elem()), sum, sum)
// Copy elements from the first slice
reflect.Copy(combined, val1)
reflect.Copy(combined, sliceVal1)
// Copy elements from the second slice
reflect.Copy(combined.Slice(val1.Len(), combined.Len()), val2)
reflect.Copy(combined.Slice(sliceVal1.Len(), combined.Len()), sliceVal2)
return combined.Interface()
}
7 changes: 5 additions & 2 deletions service/reader/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func (s *Service) queryWithPartitions(ctx context.Context, session *Session, aVi
}
return nil
}
exec, e := s.queryWithHandler(ctx, session, aView, collector, columnInMatcher, parametrizedSQL, db, handler, &readData)
exec, e := s.queryWithHandler(ctx, session, aView, collectors[index], columnInMatcher, parametrizedSQL, db, handler, &readData)
mux.Lock()
if exec != nil {
executions = append(executions, exec...)
Expand Down Expand Up @@ -554,7 +554,7 @@ func (s *Service) queryWithPartitions(ctx context.Context, session *Session, aVi
if newReducer, ok := partitioner.(view.ReducerProvider); ok {
reducer := newReducer.Reducer(ctx)
reduced := reducer.Reduce(result.Dest())
collector.SetDest(reduced)
result.SetDest(reduced)
}

resultValue := reflect.ValueOf(result.Dest())
Expand All @@ -564,6 +564,9 @@ func (s *Service) queryWithPartitions(ctx context.Context, session *Session, aVi
return executions, err
}
}

collector.SetDest(result.Dest())

return executions, err
}

Expand Down
66 changes: 40 additions & 26 deletions view/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ type VisitorFn func(value interface{}) error
// If View or any of the View.With MatchStrategy support Parallel fetching, it is important to call MergeData
// when all needed View was fetched
type Collector struct {
mutex sync.Mutex
parent *Collector

dest interface{}
mutex sync.Mutex
parent *Collector
destValue reflect.Value
appender *xunsafe.Appender
valuePosition map[string]map[interface{}][]int //stores positions in main slice, based on _field name, indexed by _field value.
types map[string]*xunsafe.Type
Expand All @@ -48,16 +47,22 @@ type Collector struct {
}

func (r *Collector) SetDest(dest interface{}) {
r.dest = dest
r.appender = r.slice.Appender(xunsafe.AsPointer(dest))
destValue := reflect.ValueOf(dest)
if destValue.Kind() == reflect.Ptr {
r.destValue.Elem().Set(destValue.Elem())
} else {
r.destValue.Elem().Set(destValue)
}
}

func (r *Collector) Clone() *Collector {
dest := reflect.MakeSlice(r.view.Schema.SliceType(), 0, 1).Interface()
slicePtrValue := reflect.New(r.view.Schema.SliceType())
dest := reflect.MakeSlice(r.view.Schema.SliceType(), 0, 1)
slicePtrValue.Elem().Set(dest)
return &Collector{
parent: r.parent,
dest: dest,
appender: r.slice.Appender(xunsafe.AsPointer(dest)),
destValue: slicePtrValue,
appender: r.slice.Appender(xunsafe.ValuePointer(&slicePtrValue)),
valuePosition: r.valuePosition,
types: r.types,
relation: r.relation,
Expand Down Expand Up @@ -142,7 +147,7 @@ func NewCollector(slice *xunsafe.Slice, view *View, dest interface{}, viewMetaHa
wg := sync.WaitGroup{}
wg.Add(1)
return &Collector{
dest: ensuredDest,
destValue: reflect.ValueOf(ensuredDest),
valuePosition: make(map[string]map[interface{}][]int),
appender: slice.Appender(xunsafe.AsPointer(ensuredDest)),
slice: slice,
Expand All @@ -158,7 +163,9 @@ func NewCollector(slice *xunsafe.Slice, view *View, dest interface{}, viewMetaHa

func ensureDest(dest interface{}, view *View) interface{} {
if _, ok := dest.(*interface{}); ok {
return reflect.MakeSlice(view.Schema.SliceType(), 0, 1).Interface()
rValue := reflect.New(view.Schema.SliceType())
rValue.Elem().Set(reflect.MakeSlice(view.Schema.SliceType(), 0, 1))
return rValue.Elem()
}
return dest
}
Expand Down Expand Up @@ -405,7 +412,9 @@ func (r *Collector) Relations(selector *Statelet) ([]*Collector, error) {
continue
}

dest := reflect.MakeSlice(r.view.With[i].Of.View.Schema.SliceType(), 0, 1).Interface()
destPtr := reflect.New(r.view.With[i].Of.View.Schema.SliceType())
dest := reflect.MakeSlice(r.view.With[i].Of.View.Schema.SliceType(), 0, 1)
destPtr.Elem().Set(dest)
slice := r.view.With[i].Of.View.Schema.Slice()
wg := sync.WaitGroup{}

Expand All @@ -423,8 +432,8 @@ func (r *Collector) Relations(selector *Statelet) ([]*Collector, error) {
result[counter] = &Collector{
parent: r,
viewMetaHandler: handler,
dest: dest,
appender: slice.Appender(xunsafe.AsPointer(dest)),
destValue: destPtr,
appender: slice.Appender(xunsafe.ValuePointer(&destPtr)),
valuePosition: make(map[string]map[interface{}][]int),
types: make(map[string]*xunsafe.Type),
values: make(map[string]*[]interface{}),
Expand Down Expand Up @@ -481,7 +490,7 @@ func (r *Collector) ViewMetaHandler(rel *Relation) (func(viewMeta interface{}) e
return nil
}

slicePtr := xunsafe.AsPointer(r.dest)
slicePtr := xunsafe.AsPointer(r.DestPtr())
for _, position := range positions {
ownerPtr := xunsafe.AsPointer(r.slice.ValuePointerAt(slicePtr, position))
metaParentHolderField.SetValue(ownerPtr, viewMeta)
Expand All @@ -497,9 +506,14 @@ func (r *Collector) View() *View {
return r.view
}

// Project returns collector slice ptr
func (r *Collector) DestPtr() interface{} {
return r.destValue.Interface()
}

// Project returns collector slice
func (r *Collector) Dest() interface{} {
return r.dest
return r.destValue.Elem().Interface()
}

// ReadAll if Collector uses readAll flag, it means that his Relations can fetch all data View in the same time, (no matching parent data)
Expand Down Expand Up @@ -527,10 +541,10 @@ func (r *Collector) mergeToParent() {

for i, link := range links {
valuePositions := r.parentValuesPositions(r.relation.On[i].Column)
destPtr := xunsafe.AsPointer(r.dest)
destPtr := xunsafe.AsPointer(r.DestPtr())
holderField := r.relation.holderField
parentSlice := r.parent.slice
parentDestPtr := xunsafe.AsPointer(r.parent.dest)
parentDestPtr := xunsafe.AsPointer(r.parent.DestPtr())

field := link.xField
for i := 0; i < r.slice.Len(destPtr); i++ {
Expand All @@ -551,7 +565,7 @@ func (r *Collector) mergeToParent() {
appender := r.slice.Appender(holderField.ValuePointer(xunsafe.AsPointer(parentValue)))
appender.Append(value)
r.Lock().Unlock()
r.view.Logger.ObjectReconciling(r.dest, value, parentValue, position)
r.view.Logger.ObjectReconciling(r.Dest(), value, parentValue, position)
}
}
}
Expand All @@ -566,7 +580,7 @@ func (r *Collector) ParentPlaceholders() ([]interface{}, []string) {
if r.parent == nil || r.ReadAll() {
return []interface{}{}, nil
}
destPtr := xunsafe.AsPointer(r.parent.dest)
destPtr := xunsafe.AsPointer(r.parent.DestPtr())
sliceLen := r.parent.slice.Len(destPtr)
result := make([]interface{}, 0)
outer:
Expand Down Expand Up @@ -631,14 +645,14 @@ func (r *Collector) Fetched() {
}

func (r *Collector) Len() int {
if r.dest != nil {
return (*reflect.SliceHeader)(xunsafe.AsPointer(r.dest)).Len
if r.DestPtr() != nil {
return (*reflect.SliceHeader)(xunsafe.AsPointer(r.DestPtr())).Len
}
return 0
}

func (r *Collector) Slice() (unsafe.Pointer, *xunsafe.Slice) {
return xunsafe.AsPointer(r.dest), r.slice
return xunsafe.AsPointer(r.DestPtr()), r.slice
}

func (r *Collector) Relation() *Relation {
Expand All @@ -654,14 +668,14 @@ func (r *Collector) createTreeIfNeeded() {
return
}

aTree := BuildTree(r.view.Schema.Type(), r.view.Schema.Slice(), r.dest, r.view.SelfReference, r.view.CaseFormat)
aTree := BuildTree(r.view.Schema.Type(), r.view.Schema.Slice(), r.DestPtr(), r.view.SelfReference, r.view.CaseFormat)
if aTree != nil {
reflect.ValueOf(r.dest).Elem().Set(reflect.ValueOf(aTree).Elem())
r.SetDest(aTree)
}
}

func (r *Collector) OnSkip(_ []interface{}) error {
sliceLen := r.slice.Len(xunsafe.AsPointer(r.dest))
sliceLen := r.slice.Len(xunsafe.AsPointer(r.DestPtr()))
if sliceLen == 0 {
return nil
}
Expand Down

0 comments on commit 07d69ad

Please sign in to comment.