-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* update defer tests to use test table * expand test cases * wip * dig in to specs and solidify client and test implementations * finalize shape of defer tests * add spread fragments * lint * close body * update followschema generated test server to match * Update client/incremental_http.go comment wording * lint --------- Co-authored-by: Steve Coffman <StevenACoffman@users.noreply.github.com>
- Loading branch information
1 parent
de8ff90
commit 3736848
Showing
12 changed files
with
739 additions
and
257 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,195 @@ | ||
package client | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"mime" | ||
"mime/multipart" | ||
"net/http" | ||
"net/http/httptest" | ||
) | ||
|
||
type IncrementalHandler struct { | ||
close func() error | ||
next func(response any) error | ||
} | ||
|
||
func (i *IncrementalHandler) Close() error { | ||
return i.close() | ||
} | ||
|
||
func (i *IncrementalHandler) Next(response any) error { | ||
return i.next(response) | ||
} | ||
|
||
type IncrementalInitialResponse struct { | ||
Data any `json:"data"` | ||
Label string `json:"label"` | ||
Path []any `json:"path"` | ||
HasNext bool `json:"hasNext"` | ||
Errors json.RawMessage `json:"errors"` | ||
Extensions map[string]any `json:"extensions"` | ||
} | ||
|
||
type IncrementalData struct { | ||
// Support for "items" for @stream is not yet available, only "data" for | ||
// @defer, as per the 2023 spec. Similarly, this retains a more complete | ||
// list of fields, but not "id," and represents a mid-point between the | ||
// 2022 and 2023 specs. | ||
|
||
Data any `json:"data"` | ||
Label string `json:"label"` | ||
Path []any `json:"path"` | ||
HasNext bool `json:"hasNext"` | ||
Errors json.RawMessage `json:"errors"` | ||
Extensions map[string]any `json:"extensions"` | ||
} | ||
|
||
type IncrementalResponse struct { | ||
// Does not include the pending or completed fields from the 2023 spec. | ||
|
||
Incremental []IncrementalData `json:"incremental"` | ||
HasNext bool `json:"hasNext"` | ||
Errors json.RawMessage `json:"errors"` | ||
Extensions map[string]any `json:"extensions"` | ||
} | ||
|
||
func errorIncremental(err error) *IncrementalHandler { | ||
return &IncrementalHandler{ | ||
close: func() error { return nil }, | ||
next: func(response any) error { | ||
return err | ||
}, | ||
} | ||
} | ||
|
||
// IncrementalHTTP returns a GraphQL response handler for the current | ||
// GQLGen implementation of the [incremental delivery over HTTP spec]. | ||
// The IncrementalHTTP spec provides for "streaming" responses triggered by | ||
// the use of @stream or @defer as an alternate approach to SSE. To that end, | ||
// the client retains the interface of the handler returned from | ||
// Client.SSE. | ||
// | ||
// IncrementalHTTP delivery using multipart/mixed is just the structure | ||
// of the response: the payloads are specified by the defer-stream spec, | ||
// which are in transition. For more detail, see the links in the | ||
// definition for transport.MultipartMixed. We use the name | ||
// IncrementalHTTP here to distinguish from the multipart form upload | ||
// (the term "multipart" usually referring to the latter). | ||
// | ||
// IncrementalHandler is not safe for concurrent use, or for production | ||
// use at all. | ||
// | ||
// [incremental delivery over HTTP spec]: https://github.com/graphql/graphql-over-http/blob/main/rfcs/IncrementalDelivery.md | ||
func (p *Client) IncrementalHTTP(ctx context.Context, query string, options ...Option) *IncrementalHandler { | ||
r, err := p.newRequest(query, options...) | ||
if err != nil { | ||
return errorIncremental(fmt.Errorf("request: %w", err)) | ||
} | ||
r.Header.Set("Accept", "multipart/mixed") | ||
|
||
w := httptest.NewRecorder() | ||
p.h.ServeHTTP(w, r) | ||
|
||
res := w.Result() //nolint:bodyclose // Remains open since we are reading from it incrementally. | ||
if res.StatusCode >= http.StatusBadRequest { | ||
return errorIncremental(fmt.Errorf("http %d: %s", w.Code, w.Body.String())) | ||
} | ||
mediaType, params, err := mime.ParseMediaType(res.Header.Get("Content-Type")) | ||
if err != nil { | ||
return errorIncremental(fmt.Errorf("parse content-type: %w", err)) | ||
} | ||
if mediaType != "multipart/mixed" { | ||
return errorIncremental(fmt.Errorf("expected content-type multipart/mixed, got %s", mediaType)) | ||
} | ||
|
||
// TODO: worth checking the deferSpec either to confirm this client | ||
// supports it exactly, or simply to make sure it is within some | ||
// expected range. | ||
deferSpec, ok := params["deferspec"] | ||
if !ok || deferSpec == "" { | ||
return errorIncremental(errors.New("expected deferSpec in content-type")) | ||
} | ||
|
||
boundary, ok := params["boundary"] | ||
if !ok || boundary == "" { | ||
return errorIncremental(errors.New("expected boundary in content-type")) | ||
} | ||
mr := multipart.NewReader(res.Body, boundary) | ||
|
||
ctx, cancel := context.WithCancelCause(ctx) | ||
initial := true | ||
|
||
return &IncrementalHandler{ | ||
close: func() error { | ||
res.Body.Close() | ||
cancel(context.Canceled) | ||
return nil | ||
}, | ||
next: func(response any) (err error) { | ||
defer func() { | ||
if err != nil { | ||
res.Body.Close() | ||
cancel(err) | ||
} | ||
}() | ||
|
||
var data any | ||
var rawErrors json.RawMessage | ||
|
||
type nextPart struct { | ||
*multipart.Part | ||
Err error | ||
} | ||
|
||
nextPartCh := make(chan nextPart) | ||
go func() { | ||
var next nextPart | ||
next.Part, next.Err = mr.NextPart() | ||
nextPartCh <- next | ||
}() | ||
|
||
var next nextPart | ||
select { | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
case next = <-nextPartCh: | ||
} | ||
|
||
if next.Err == io.EOF { | ||
res.Body.Close() | ||
cancel(context.Canceled) | ||
return nil | ||
} | ||
if err = next.Err; err != nil { | ||
return err | ||
} | ||
if ct := next.Header.Get("Content-Type"); ct != "application/json" { | ||
err = fmt.Errorf(`expected content-type "application/json", got %q`, ct) | ||
return err | ||
} | ||
|
||
if initial { | ||
initial = false | ||
data = IncrementalInitialResponse{} | ||
} else { | ||
data = IncrementalResponse{} | ||
} | ||
if err = json.NewDecoder(next.Part).Decode(&data); err != nil { | ||
return err | ||
} | ||
|
||
// We want to unpack even if there is an error, so we can see partial | ||
// responses. | ||
err = unpack(data, response, p.dc) | ||
if len(rawErrors) != 0 { | ||
err = RawJsonError{rawErrors} | ||
return err | ||
} | ||
return err | ||
}, | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
Oops, something went wrong.