Skip to content

Commit

Permalink
Add Server-Sent Events transport (#2498)
Browse files Browse the repository at this point in the history
* Add new transport via server-sent events

* Add graphql-sse option to chat example

* Add SSE transport to documentation

* Reorder imports and handle test err to fix golangci-lint remarks
  • Loading branch information
jmic authored Jan 7, 2023
1 parent b09608d commit f0a090d
Show file tree
Hide file tree
Showing 10 changed files with 369 additions and 9 deletions.
5 changes: 4 additions & 1 deletion _examples/chat/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,20 @@
"@apollo/client": "^3.2.3",
"apollo-utilities": "^1.0.26",
"graphql": "^14.0.2",
"graphql-sse": "^2.0.0",
"graphql-tag": "^2.10.0",
"graphql-ws": "^5.8.1",
"react": "^16.6.3",
"react-dom": "^16.6.3",
"react-scripts": "^2.1.1",
"styled-components": "^5.2.0",
"subscriptions-transport-ws": "^0.9.5"
"subscriptions-transport-ws": "^0.9.5",
"typescript": "^4.9.4"
},
"scripts": {
"start": "react-scripts start",
"start:graphql-transport-ws": "REACT_APP_WS_PROTOCOL=graphql-transport-ws npm run start",
"start:graphql-sse": "REACT_APP_SSE_PROTOCOL=true npm run start",
"build": "react-scripts build",
"test": "react-scripts test --env=jsdom",
"eject": "react-scripts eject"
Expand Down
6 changes: 6 additions & 0 deletions _examples/chat/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,9 @@ or to run the app with the `graphql-ws` implementation (and the newer `graphql-t
```bash
npm run start:graphql-transport-ws
```

or to run the app with the `graphql-sse` implementation do

```bash
npm run start:graphql-sse
```
1 change: 1 addition & 0 deletions _examples/chat/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func main() {

srv := handler.New(chat.NewExecutableSchema(chat.New()))

srv.AddTransport(transport.SSE{})
srv.AddTransport(transport.POST{})
srv.AddTransport(transport.Websocket{
KeepAlivePingInterval: 10 * time.Second,
Expand Down
30 changes: 30 additions & 0 deletions _examples/chat/src/graphql-sse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import {
ApolloLink,
Operation,
FetchResult,
Observable,
} from '@apollo/client/core';
import { print } from 'graphql';
import { createClient, ClientOptions, Client } from 'graphql-sse';

export class SSELink extends ApolloLink {
private client: Client;

constructor(options: ClientOptions) {
super();
this.client = createClient(options);
}

public request(operation: Operation): Observable<FetchResult> {
return new Observable((sink) => {
return this.client.subscribe<FetchResult>(
{ ...operation, query: print(operation.query) },
{
next: sink.next.bind(sink),
complete: sink.complete.bind(sink),
error: sink.error.bind(sink),
},
);
});
}
}
13 changes: 8 additions & 5 deletions _examples/chat/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@ import { WebSocketLink as ApolloWebSocketLink} from '@apollo/client/link/ws';
import { getMainDefinition } from 'apollo-utilities';
import { App } from './App';
import { WebSocketLink as GraphQLWSWebSocketLink } from './graphql-ws'
import { SSELink } from './graphql-sse';

let wsLink;
if (process.env.REACT_APP_WS_PROTOCOL === 'graphql-transport-ws') {
wsLink = new GraphQLWSWebSocketLink({
let subscriptionLink;
if (process.env.REACT_APP_SSE_PROTOCOL) {
subscriptionLink = new SSELink({ url: 'http://localhost:8085/query' });
} else if (process.env.REACT_APP_WS_PROTOCOL === 'graphql-transport-ws') {
subscriptionLink = new GraphQLWSWebSocketLink({
url: `ws://localhost:8085/query`
});
} else {
wsLink = new ApolloWebSocketLink({
subscriptionLink = new ApolloWebSocketLink({
uri: `ws://localhost:8085/query`,
options: {
reconnect: true
Expand All @@ -36,7 +39,7 @@ const link = split(
const { kind, operation } = getMainDefinition(query);
return kind === 'OperationDefinition' && operation === 'subscription';
},
wsLink,
subscriptionLink,
httpLink,
);

Expand Down
25 changes: 25 additions & 0 deletions _examples/chat/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"compilerOptions": {
"target": "es5",
"lib": [
"dom",
"dom.iterable",
"esnext"
],
"allowJs": true,
"skipLibCheck": true,
"esModuleInterop": true,
"allowSyntheticDefaultImports": true,
"strict": true,
"forceConsistentCasingInFileNames": true,
"module": "esnext",
"moduleResolution": "node",
"resolveJsonModule": true,
"isolatedModules": true,
"noEmit": true,
"jsx": "preserve"
},
"include": [
"src"
]
}
35 changes: 35 additions & 0 deletions docs/content/recipes/subscriptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,41 @@ subscription {
Run your query and you should see a response updating with the current timestamp every
second. To gracefully stop the connection click the `Execute query` button again.


## Adding Server-Sent Events transport
You can use instead of WebSocket (or in addition) [Server-Sent Events](https://en.wikipedia.org/wiki/Server-sent_events)
as transport for subscriptions. This can have advantages and disadvantages over transport via WebSocket and requires a
compatible client library, for instance [graphql-sse](https://github.com/enisdenjo/graphql-sse). The connection between
server and client should be HTTP/2+. The client must send the subscription request via POST with
the header `accept: text/event-stream` and `content-type: application/json` in order to be accepted by the SSE transport.
The underling protocol is documented at [distinct connections mode](https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md).

Add the SSE transport as first of all other transports, as the order is important. For that reason, `New` instead of
`NewDefaultServer` will be used.
```go
srv := handler.New(generated.NewExecutableSchema(generated.Config{Resolvers: &graph.Resolver{}}))
srv.AddTransport(transport.SSE{}) // <---- This is the important

// default server
srv.AddTransport(transport.Options{})
srv.AddTransport(transport.GET{})
srv.AddTransport(transport.POST{})
srv.AddTransport(transport.MultipartForm{})
srv.SetQueryCache(lru.New(1000))
srv.Use(extension.Introspection{})
srv.Use(extension.AutomaticPersistedQuery{
Cache: lru.New(100),
})
```

The GraphQL playground does not support SSE yet. You can try out the subscription via curl:
```bash
curl -N --request POST --url http://localhost:8080/query \
--data '{"query":"subscription { currentTime { unixTime timeStamp } }"}' \
-H "accept: text/event-stream" -H 'content-type: application/json' \
--verbose
```

## Full Files

Here are all files at the end of this tutorial. Only files changed from the end
Expand Down
19 changes: 16 additions & 3 deletions graphql/handler/testserver/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
// a generated server, but it aims to be good enough to test the handler package without relying on codegen.
func New() *TestServer {
next := make(chan struct{})
completeSubscription := make(chan struct{})

schema := gqlparser.MustLoadSchema(&ast.Source{Input: `
type Query {
Expand All @@ -30,7 +31,8 @@ func New() *TestServer {
`})

srv := &TestServer{
next: next,
next: next,
completeSubscription: completeSubscription,
}

srv.Server = handler.New(&graphql.ExecutableSchemaMock{
Expand Down Expand Up @@ -74,6 +76,8 @@ func New() *TestServer {
return &graphql.Response{
Data: []byte(`{"name":"test"}`),
}
case <-completeSubscription:
return nil
}
}
default:
Expand Down Expand Up @@ -143,8 +147,9 @@ func NewError() *TestServer {

type TestServer struct {
*handler.Server
next chan struct{}
complexity int
next chan struct{}
completeSubscription chan struct{}
complexity int
}

func (s *TestServer) SendNextSubscriptionMessage() {
Expand All @@ -155,6 +160,14 @@ func (s *TestServer) SendNextSubscriptionMessage() {
}
}

func (s *TestServer) SendCompleteSubscriptionMessage() {
select {
case s.completeSubscription <- struct{}{}:
case <-time.After(1 * time.Second):
fmt.Println("WARNING: no active subscription")
}
}

func (s *TestServer) SetCalculatedComplexity(complexity int) {
s.complexity = complexity
}
110 changes: 110 additions & 0 deletions graphql/handler/transport/sse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package transport

import (
"encoding/json"
"fmt"
"io"
"log"
"mime"
"net/http"
"strings"

"github.com/vektah/gqlparser/v2/gqlerror"

"github.com/99designs/gqlgen/graphql"
)

type SSE struct{}

var _ graphql.Transport = SSE{}

func (t SSE) Supports(r *http.Request) bool {
if !strings.Contains(r.Header.Get("Accept"), "text/event-stream") {
return false
}
mediaType, _, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
if err != nil {
return false
}
return r.Method == http.MethodPost && mediaType == "application/json"
}

func (t SSE) Do(w http.ResponseWriter, r *http.Request, exec graphql.GraphExecutor) {
ctx := r.Context()
flusher, ok := w.(http.Flusher)
if !ok {
SendErrorf(w, http.StatusInternalServerError, "streaming unsupported")
return
}
defer flusher.Flush()

w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Content-Type", "application/json")

params := &graphql.RawParams{}
start := graphql.Now()
params.Headers = r.Header
params.ReadTime = graphql.TraceTiming{
Start: start,
End: graphql.Now(),
}

bodyString, err := getRequestBody(r)
if err != nil {
gqlErr := gqlerror.Errorf("could not get json request body: %+v", err)
resp := exec.DispatchError(ctx, gqlerror.List{gqlErr})
log.Printf("could not get json request body: %+v", err.Error())
writeJson(w, resp)
return
}

bodyReader := io.NopCloser(strings.NewReader(bodyString))
if err = jsonDecode(bodyReader, &params); err != nil {
w.WriteHeader(http.StatusBadRequest)
gqlErr := gqlerror.Errorf(
"json request body could not be decoded: %+v body:%s",
err,
bodyString,
)
resp := exec.DispatchError(ctx, gqlerror.List{gqlErr})
log.Printf("decoding error: %+v body:%s", err.Error(), bodyString)
writeJson(w, resp)
return
}

rc, OpErr := exec.CreateOperationContext(ctx, params)
if OpErr != nil {
w.WriteHeader(statusFor(OpErr))
resp := exec.DispatchError(graphql.WithOperationContext(ctx, rc), OpErr)
writeJson(w, resp)
return
}

ctx = graphql.WithOperationContext(ctx, rc)

w.Header().Set("Content-Type", "text/event-stream")
fmt.Fprint(w, ":\n\n")
flusher.Flush()

responses, ctx := exec.DispatchOperation(ctx, rc)

for {
response := responses(ctx)
if response == nil {
break
}
writeJsonWithSSE(w, response)
flusher.Flush()
}

fmt.Fprint(w, "event: complete\n\n")
}

func writeJsonWithSSE(w io.Writer, response *graphql.Response) {
b, err := json.Marshal(response)
if err != nil {
panic(err)
}
fmt.Fprintf(w, "event: next\ndata: %s\n\n", b)
}
Loading

0 comments on commit f0a090d

Please sign in to comment.