Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] feat: reduce allocs & improve throughput #998

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions v2/pkg/engine/datasource/httpclient/nethttpclient.go
Original file line number Diff line number Diff line change
@@ -199,11 +199,6 @@ func makeHTTPRequest(client *http.Client, ctx context.Context, url, method, head
}

if !enableTrace {
if response.ContentLength > 0 {
out.Grow(int(response.ContentLength))
} else {
out.Grow(1024 * 4)
}
_, err = out.ReadFrom(respReader)
return
}
42 changes: 42 additions & 0 deletions v2/pkg/engine/resolve/fetch.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package resolve

import (
"bytes"
"encoding/json"
"slices"
"sync"

"github.com/wundergraph/graphql-go-tools/v2/pkg/ast"
)
@@ -20,6 +22,42 @@ type Fetch interface {
FetchKind() FetchKind
Dependencies() FetchDependencies
DataSourceInfo() DataSourceInfo

GetBuffer() *bytes.Buffer
ReportResponseSize(out *bytes.Buffer)
}

// FetchBufferSizeCalculator calculates the right size for a buffer based on the previous 64 fetches
// Instead of using a buffer with a random default size and growing it to the right cap
// FetchBufferSizeCalculator uses information about previous fetches to suggest a reasonable size
// Overall, this has shown to reduce bytes.growSlice operations to almost zero in hot paths
type FetchBufferSizeCalculator struct {
mux sync.RWMutex
count int
total int
}

func (f *FetchBufferSizeCalculator) GetBuffer() *bytes.Buffer {
f.mux.RLock()
defer f.mux.RUnlock()
if f.count == 0 {
return bytes.NewBuffer(make([]byte, 0, 1024*4))
}
size := f.total / f.count
return bytes.NewBuffer(make([]byte, 0, size))
}

func (f *FetchBufferSizeCalculator) ReportResponseSize(out *bytes.Buffer) {
f.mux.Lock()
defer f.mux.Unlock()
inc := out.Cap()
if f.count > 64 { // reset after 64 fetches
f.total = inc
f.count = 1
} else {
f.count++
f.total += inc
}
}

type FetchItem struct {
@@ -71,6 +109,7 @@ const (
)

type SingleFetch struct {
FetchBufferSizeCalculator
FetchConfiguration
FetchDependencies
InputTemplate InputTemplate
@@ -140,6 +179,7 @@ func (_ *SingleFetch) FetchKind() FetchKind {
// allows to join nested fetches to the same subgraph into a single fetch
// representations variable will contain multiple items according to amount of entities matching this query
type BatchEntityFetch struct {
FetchBufferSizeCalculator
FetchDependencies
Input BatchInput
DataSource DataSource
@@ -182,6 +222,7 @@ func (_ *BatchEntityFetch) FetchKind() FetchKind {
// EntityFetch - represents nested entity fetch on object field
// representations variable will contain single item
type EntityFetch struct {
FetchBufferSizeCalculator
FetchDependencies
Input EntityInput
DataSource DataSource
@@ -217,6 +258,7 @@ func (_ *EntityFetch) FetchKind() FetchKind {
// Usually, you want to batch fetches within a list, which is the default behavior of SingleFetch
// However, if the data source does not support batching, you can use this fetch to make parallel fetches within a list
type ParallelListItemFetch struct {
FetchBufferSizeCalculator
Fetch *SingleFetch
Traces []*SingleFetch
Trace *DataSourceLoadTrace
32 changes: 20 additions & 12 deletions v2/pkg/engine/resolve/loader.go
Original file line number Diff line number Diff line change
@@ -155,38 +155,41 @@ func (l *Loader) resolveSingle(item *FetchItem) error {
switch f := item.Fetch.(type) {
case *SingleFetch:
res := &result{
out: &bytes.Buffer{},
out: f.GetBuffer(),
}
err := l.loadSingleFetch(l.ctx.ctx, f, item, items, res)
if err != nil {
return err
}
f.ReportResponseSize(res.out)
err = l.mergeResult(item, res, items)
if l.ctx.LoaderHooks != nil && res.loaderHookContext != nil {
l.ctx.LoaderHooks.OnFinished(res.loaderHookContext, res.statusCode, res.ds, goerrors.Join(res.err, l.ctx.subgraphErrors))
}
return err
case *BatchEntityFetch:
res := &result{
out: &bytes.Buffer{},
out: f.GetBuffer(),
}
err := l.loadBatchEntityFetch(l.ctx.ctx, item, f, items, res)
if err != nil {
return errors.WithStack(err)
}
f.ReportResponseSize(res.out)
err = l.mergeResult(item, res, items)
if l.ctx.LoaderHooks != nil && res.loaderHookContext != nil {
l.ctx.LoaderHooks.OnFinished(res.loaderHookContext, res.statusCode, res.ds, goerrors.Join(res.err, l.ctx.subgraphErrors))
}
return err
case *EntityFetch:
res := &result{
out: &bytes.Buffer{},
out: f.GetBuffer(),
}
err := l.loadEntityFetch(l.ctx.ctx, item, f, items, res)
if err != nil {
return errors.WithStack(err)
}
f.ReportResponseSize(res.out)
err = l.mergeResult(item, res, items)
if l.ctx.LoaderHooks != nil && res.loaderHookContext != nil {
l.ctx.LoaderHooks.OnFinished(res.loaderHookContext, res.statusCode, res.ds, goerrors.Join(res.err, l.ctx.subgraphErrors))
@@ -201,11 +204,10 @@ func (l *Loader) resolveSingle(item *FetchItem) error {
for i := range items {
i := i
results[i] = &result{
out: &bytes.Buffer{},
out: f.GetBuffer(),
}
if l.ctx.TracingOptions.Enable {
f.Traces[i] = new(SingleFetch)
*f.Traces[i] = *f.Fetch
f.Traces[i] = f.Fetch
g.Go(func() error {
return l.loadFetch(ctx, f.Traces[i], item, items[i:i+1], results[i])
})
@@ -220,6 +222,7 @@ func (l *Loader) resolveSingle(item *FetchItem) error {
return errors.WithStack(err)
}
for i := range results {
f.ReportResponseSize(results[i].out)
err = l.mergeResult(item, results[i], items[i:i+1])
if l.ctx.LoaderHooks != nil && results[i].loaderHookContext != nil {
l.ctx.LoaderHooks.OnFinished(results[i].loaderHookContext, results[i].statusCode, results[i].ds, goerrors.Join(results[i].err, l.ctx.subgraphErrors))
@@ -369,7 +372,8 @@ func (l *Loader) itemsData(items []*astjson.Value) *astjson.Value {
func (l *Loader) loadFetch(ctx context.Context, fetch Fetch, fetchItem *FetchItem, items []*astjson.Value, res *result) error {
switch f := fetch.(type) {
case *SingleFetch:
res.out = &bytes.Buffer{}
res.out = fetch.GetBuffer()
defer fetch.ReportResponseSize(res.out)
return l.loadSingleFetch(ctx, f, fetchItem, items, res)
case *ParallelListItemFetch:
results := make([]*result, len(items))
@@ -380,11 +384,10 @@ func (l *Loader) loadFetch(ctx context.Context, fetch Fetch, fetchItem *FetchIte
for i := range items {
i := i
results[i] = &result{
out: &bytes.Buffer{},
out: fetch.GetBuffer(),
}
if l.ctx.TracingOptions.Enable {
f.Traces[i] = new(SingleFetch)
*f.Traces[i] = *f.Fetch
f.Traces[i] = f.Fetch
g.Go(func() error {
return l.loadFetch(ctx, f.Traces[i], fetchItem, items[i:i+1], results[i])
})
@@ -399,12 +402,17 @@ func (l *Loader) loadFetch(ctx context.Context, fetch Fetch, fetchItem *FetchIte
return errors.WithStack(err)
}
res.nestedMergeItems = results
for i := range results {
fetch.ReportResponseSize(results[i].out)
}
return nil
case *EntityFetch:
res.out = &bytes.Buffer{}
res.out = fetch.GetBuffer()
defer fetch.ReportResponseSize(res.out)
return l.loadEntityFetch(ctx, fetchItem, f, items, res)
case *BatchEntityFetch:
res.out = &bytes.Buffer{}
res.out = fetch.GetBuffer()
defer fetch.ReportResponseSize(res.out)
return l.loadBatchEntityFetch(ctx, fetchItem, f, items, res)
}
return nil
6 changes: 5 additions & 1 deletion v2/pkg/engine/resolve/resolvable.go
Original file line number Diff line number Diff line change
@@ -159,7 +159,11 @@ func (r *Resolvable) ResolveNode(node Node, data *astjson.Value, out io.Writer)
r.printErr = nil
r.authorizationError = nil
r.errors = r.astjsonArena.NewArray()

defer func() {
// remove references to buffers when no longer needed
r.out = nil
r.errors = nil
}()
hasErrors := r.walkNode(node, data)
if hasErrors {
return fmt.Errorf("error resolving node")
35 changes: 29 additions & 6 deletions v2/pkg/engine/resolve/resolve.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import (
"context"
"fmt"
"io"
"runtime"
"sync"
"time"

@@ -67,6 +68,8 @@ type Resolver struct {

propagateSubgraphErrors bool
propagateSubgraphStatusCodes bool

resolvableBufferPool *pool.LimitBufferPool
}

func (r *Resolver) SetAsyncErrorWriter(w AsyncErrorWriter) {
@@ -142,6 +145,8 @@ type ResolverOptions struct {
ResolvableOptions ResolvableOptions
// AllowedCustomSubgraphErrorFields defines which fields are allowed in the subgraph error when in passthrough mode
AllowedSubgraphErrorFields []string
// BufferPoolOptions defines the size & limits of the resolvable buffer pool
BufferPoolOptions pool.LimitBufferPoolOptions
}

// New returns a new Resolver, ctx.Done() is used to cancel all active subscriptions & streams
@@ -175,6 +180,19 @@ func New(ctx context.Context, options ResolverOptions) *Resolver {
allowedErrorFields[field] = struct{}{}
}

if options.BufferPoolOptions.MaxBuffers == 0 {
options.BufferPoolOptions.MaxBuffers = runtime.GOMAXPROCS(-1)
}
if options.BufferPoolOptions.MaxBuffers < 8 {
options.BufferPoolOptions.MaxBuffers = 8
}
if options.BufferPoolOptions.MaxBufferSize == 0 {
options.BufferPoolOptions.MaxBufferSize = 1024 * 1024 * 10 // 10MB
}
if options.BufferPoolOptions.DefaultBufferSize < 1024*8 {
options.BufferPoolOptions.DefaultBufferSize = 1024 * 8 // 8KB
}

resolver := &Resolver{
ctx: ctx,
options: options,
@@ -188,6 +206,7 @@ func New(ctx context.Context, options ResolverOptions) *Resolver {
triggerUpdateBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
allowedErrorExtensionFields: allowedExtensionFields,
allowedErrorFields: allowedErrorFields,
resolvableBufferPool: pool.NewLimitBufferPool(ctx, options.BufferPoolOptions),
}
resolver.maxConcurrency = make(chan struct{}, options.MaxConcurrency)
for i := 0; i < options.MaxConcurrency; i++ {
@@ -242,6 +261,7 @@ func (r *Resolver) ResolveGraphQLResponse(ctx *Context, response *GraphQLRespons
}()

t := newTools(r.options, r.allowedErrorExtensionFields, r.allowedErrorFields)
defer t.resolvable.Reset() // set all references to nil, e.g. pointers to buffers

err := t.resolvable.Init(ctx, data, response.Info.OperationType)
if err != nil {
@@ -254,15 +274,17 @@ func (r *Resolver) ResolveGraphQLResponse(ctx *Context, response *GraphQLRespons
return nil, err
}
}

buf := &bytes.Buffer{}
err = t.resolvable.Resolve(ctx.ctx, response.Data, response.Fetches, buf)
buf := r.resolvableBufferPool.Get()
defer r.resolvableBufferPool.Put(buf)
err = t.resolvable.Resolve(ctx.ctx, response.Data, response.Fetches, buf.Buf)
if err != nil {
return nil, err
}

_, err = buf.WriteTo(writer)
return resp, err
_, err = buf.Buf.WriteTo(writer)
if err != nil {
return nil, err
}
return resp, nil
}

type trigger struct {
@@ -287,6 +309,7 @@ func (r *Resolver) executeSubscriptionUpdate(ctx *Context, sub *sub, sharedInput
fmt.Printf("resolver:trigger:subscription:update:%d\n", sub.id.SubscriptionID)
}
t := newTools(r.options, r.allowedErrorExtensionFields, r.allowedErrorFields)
defer t.resolvable.Reset() // reset all references

input := make([]byte, len(sharedInput))
copy(input, sharedInput)
Loading