Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription support #49

Closed
F21 opened this issue Nov 4, 2015 · 17 comments
Closed

Subscription support #49

F21 opened this issue Nov 4, 2015 · 17 comments
Labels
Milestone

Comments

@F21
Copy link

F21 commented Nov 4, 2015

Some initial work has been done to implement subscriptions in graphql-js: graphql/graphql-js#189

Here's the relay discussion for getting support into relay: facebook/relay#541

Would love to see this subscriptions land here once they are in the reference implementation 😄

@sogko
Copy link
Member

sogko commented Nov 4, 2015

I would love to see this get into graphql-go as well 👍🏻

As always, PRs are most welcomed 😃

@sogko sogko added this to the 0.4.18 milestone Mar 11, 2016
@sogko
Copy link
Member

sogko commented Jun 1, 2016

subscription operation support has now been added to graphql-go.

To define a subscription root,

    schema, err := graphql.NewSchema(graphql.SchemaConfig{
        Query: ...,
        Mutation: ...,
        Subscription: graphql.NewObject(graphql.ObjectConfig{
            Name: "SubscriptionRoot",
            Fields: graphql.Fields{
                "subscribeUser": &graphql.Field{
                    Type: graphql.String,
                },
            },
        }),
    })

To make subscription query:

query Foo {
  ...
}

mutation Bar {
  ...
}

subscription Baz {
  subscribeUser
}

Closing this.

Cheers!

@sogko sogko closed this as completed Jun 1, 2016
@nikhilmahesh
Copy link

Any Working Example ?

@magbicaleman
Copy link

I'd also love to see an example. @nikhilmahesh did you get anything?

@truongsinh
Copy link

@magbicaleman
Copy link

magbicaleman commented Mar 29, 2017

@truongsinh I saw this, but it returns rand int ...etc. I'd like to see real world example. Which I believe i found in the testing steps here; https://github.com/graphql-go/graphql/blob/master/kitchen-sink.graphql#L32

The truth is I don't know exactly how subscriptions are suppose to work, and seeing server side and front-end would be extremely helpful.

@mikeifomin
Copy link

mikeifomin commented Apr 15, 2017

I guess there is no any full-featured example. Moreover, the example will depend on a websockets lib and client code etc.
The subscription through the websockets is a small protocol, for example check out a nodejs implementation and some protocol message types

@gabrielruiu
Copy link

Has a full-featured example been published recently?

@amrosebirani
Copy link

Any activity here, or any discussions around making a full fledged subscription support like apollo-subscription-server in go graphql server? Really interested in contributing too if needed. Or else if functionality is there, can help setup a example implementation.

@Jannis
Copy link

Jannis commented Dec 20, 2017

Some might be interested: We've just released https://github.com/functionalfoundry/graphqlws/, which allows implement subscription-enabled GraphQL servers that work with typical clients (like Apollo) out of the box. It integrates seamlessly with graphql-go/graphql. It sets up a separate endpoint (WebSocket) but consumes the same schema and makes use of the subscription root in this schema.

I still need to create a useful example. The easiest way to connect it to a database is to have a pubsub mechanism to listen for database changes and whenever something changes, you identify which subscriptions may be affected (e.g. by looking at their top-level query fields), re-run their queries and push data out to them (this is for instance what Graphcool does).

I've described the library a bit here: https://medium.com/functional-foundry/building-graphql-servers-with-subscriptions-in-go-2a60f11dc9f5

Hope this helps!

@ghost
Copy link

ghost commented Jun 26, 2018

This approach is pretty inefficient and just won't scale imho.

Most DBs have triggers at a table or bucket lever. But how can you work out which query maps to that query ?
Then you running the whole query and returning it all to the client !!

If a mutation comes in and it's event fires various reactions then you can maybe isolate which data is changed in the middle tier and return only the resulting delta.

Another easy way is for all viewmodel ( which is basically what a query maps to ) to hold a sequence I'd from a materialised view in the DB.
Then it's very easy to know exactly which clients view models are out of data and push updates to them through a durable message queue of sorts.
So the essence of this working is holding a sequence number, but most graphql server implementations don't hold the query results in the db as a materialised view.

The other way is to just hold a sequence number per query and don't hold any data in the db except the sequence ID. Then when a mutation results in a new materialised view record just show it on the queue.

@ghost
Copy link

ghost commented Jul 12, 2018

Is it still in progress? No examples from @Jannis or @sogko?

@chris-ramon
Copy link
Member

Is it still in progress? No examples from @Jannis or @sogko?

Hi @Dverlik, thanks for reaching us for a question, actually Subscriptions support is ready in the lib for being use for a while now, I went ahead and wrote a small working example, also there are some other interesting ideas on how to optimize the stream of data back to the clients via websockets, here a very simple implementation:

output

package main

import (
	"encoding/json"
	"log"
	"net/http"
	"sync"
	"time"

	"github.com/gorilla/websocket"
	"github.com/graphql-go/graphql"
	"github.com/graphql-go/handler"
)

type Post struct {
	ID    int `json:"id"`
	Likes int `json:"count"`
}

type ConnectionACKMessage struct {
	OperationID string `json:"id,omitempty"`
	Type        string `json:"type"`
	Payload     struct {
		Query string `json:"query"`
	} `json:"payload,omitempty"`
}

var PostType = graphql.NewObject(graphql.ObjectConfig{
	Name: "Post",
	Fields: graphql.Fields{
		"id": &graphql.Field{
			Type: graphql.Int,
		},
		"likes": &graphql.Field{
			Type: graphql.Int,
		},
	},
})

type Subscriber struct {
	ID            int
	Conn          *websocket.Conn
	RequestString string
	OperationID   string
}

func main() {
	var upgrader = websocket.Upgrader{
		ReadBufferSize:  1024,
		WriteBufferSize: 1024,
		CheckOrigin: func(r *http.Request) bool {
			return true
		},
		Subprotocols: []string{"graphql-ws"},
	}
	var posts = []*Post{
		&Post{ID: 1, Likes: 1},
		&Post{ID: 2, Likes: 2},
	}
	var subscribers sync.Map
	schema, err := graphql.NewSchema(graphql.SchemaConfig{
		Query: graphql.NewObject(graphql.ObjectConfig{
			Name: "Query",
			Fields: graphql.Fields{
				"posts": &graphql.Field{
					Type: graphql.NewList(PostType),
					Resolve: func(p graphql.ResolveParams) (interface{}, error) {
						return posts, nil
					},
				},
			},
		}),
		Subscription: graphql.NewObject(graphql.ObjectConfig{
			Name: "Subscription",
			Fields: graphql.Fields{
				"postLikesSubscribe": &graphql.Field{
					Type: graphql.NewList(PostType),
					Resolve: func(p graphql.ResolveParams) (interface{}, error) {
						return posts, nil
					},
				},
			},
		}),
	})
	if err != nil {
		log.Fatal(err)
	}
	h := handler.New(&handler.Config{
		Schema:     &schema,
		Pretty:     true,
		GraphiQL:   false,
		Playground: true,
	})
	http.Handle("/graphql", h)
	http.HandleFunc("/subscriptions", func(w http.ResponseWriter, r *http.Request) {
		conn, err := upgrader.Upgrade(w, r, nil)
		if err != nil {
			log.Printf("failed to do websocket upgrade: %v", err)
			return
		}
		connectionACK, err := json.Marshal(map[string]string{
			"type": "connection_ack",
		})
		if err != nil {
			log.Printf("failed to marshal ws connection ack: %v", err)
		}
		if err := conn.WriteMessage(websocket.TextMessage, connectionACK); err != nil {
			log.Printf("failed to write to ws connection: %v", err)
			return
		}
		go func() {
			for {
				_, p, err := conn.ReadMessage()
				if websocket.IsCloseError(err, websocket.CloseGoingAway) {
					return
				}
				if err != nil {
					log.Println("failed to read websocket message: %v", err)
					return
				}
				var msg ConnectionACKMessage
				if err := json.Unmarshal(p, &msg); err != nil {
					log.Printf("failed to unmarshal: %v", err)
					return
				}
				if msg.Type == "start" {
					length := 0
					subscribers.Range(func(key, value interface{}) bool {
						length++
						return true
					})
					var subscriber = Subscriber{
						ID:            length + 1,
						Conn:          conn,
						RequestString: msg.Payload.Query,
						OperationID:   msg.OperationID,
					}
					subscribers.Store(subscriber.ID, &subscriber)
				}
			}
		}()
	})
	go func() {
		for {
			time.Sleep(1 * time.Second)
			for _, post := range posts {
				post.Likes = post.Likes + 1
			}
			subscribers.Range(func(key, value interface{}) bool {
				subscriber, ok := value.(*Subscriber)
				if !ok {
					return true
				}
				payload := graphql.Do(graphql.Params{
					Schema:        schema,
					RequestString: subscriber.RequestString,
				})
				message, err := json.Marshal(map[string]interface{}{
					"type":    "data",
					"id":      subscriber.OperationID,
					"payload": payload,
				})
				if err != nil {
					log.Printf("failed to marshal message: %v", err)
					return true
				}
				if err := subscriber.Conn.WriteMessage(websocket.TextMessage, message); err != nil {
					if err == websocket.ErrCloseSent {
						subscribers.Delete(key)
						return true
					}
					log.Printf("failed to write to ws connection: %v", err)
					return true
				}
				return true
			})
		}
	}()
	log.Printf("server running on port :8080")
	http.ListenAndServe(":8080", nil)
}

chris-ramon added a commit to chris-ramon/gqlgen that referenced this issue Sep 12, 2018
chris-ramon added a commit to chris-ramon/gqlgen that referenced this issue Sep 12, 2018
@cescoferraro
Copy link

@chris-ramon
subscriptions do not seem to work on graphiql.
Using the playground I can get it to work.
What is the reason for that?

@cescoferraro
Copy link

I am trying to solve this on the handler library
graphql-go/handler#69

@iamtakingiteasy
Copy link

iamtakingiteasy commented Jun 1, 2019

While this kind of subscription is nice for repeating same requests over and over, it is not simplifying much async communication within single request, requiring external ways of actually keeping the state of specific connection.

https://github.com/functionalfoundry/graphqlws mitigates that to some degree, but have quite concealed and non-extensible implementation.

So in my actual project I had to do my own implementation, which lead to my https://github.com/eientei/wsgraphql pet-project, which does basically the same things, but provides a bit more of control and easier way to keep track of state within the same subscription requests (via mutable context instance, which is arguably questionable).

But it is still far from ideal, the proper go way of things would be either a context + channel<- interface{} or a method like Send(update interface{}) error (likely having the same context and channel underneath) available for subscription callback, however graphql-go chooses to incapsulate resolving logic within graphql.Do chain of calls, making subscriptions more of the way of repeating the otherwise synchronous request within asynchronous channel, rather than true asynchronous server channel, requiring to have somewhere a persistent state across multiple immediate "requests" to actually provide required usage pattern. See subscription example in https://godoc.org/github.com/eientei/wsgraphql#ex-package

@tobhoster
Copy link

Is it still in progress? No examples from @Jannis or @sogko?

Hi @Dverlik, thanks for reaching us for a question, actually Subscriptions support is ready in the lib for being use for a while now, I went ahead and wrote a small working example, also there are some other interesting ideas on how to optimize the stream of data back to the clients via websockets, here a very simple implementation:

output

package main

import (
	"encoding/json"
	"log"
	"net/http"
	"sync"
	"time"

	"github.com/gorilla/websocket"
	"github.com/graphql-go/graphql"
	"github.com/graphql-go/handler"
)

type Post struct {
	ID    int `json:"id"`
	Likes int `json:"count"`
}

type ConnectionACKMessage struct {
	OperationID string `json:"id,omitempty"`
	Type        string `json:"type"`
	Payload     struct {
		Query string `json:"query"`
	} `json:"payload,omitempty"`
}

var PostType = graphql.NewObject(graphql.ObjectConfig{
	Name: "Post",
	Fields: graphql.Fields{
		"id": &graphql.Field{
			Type: graphql.Int,
		},
		"likes": &graphql.Field{
			Type: graphql.Int,
		},
	},
})

type Subscriber struct {
	ID            int
	Conn          *websocket.Conn
	RequestString string
	OperationID   string
}

func main() {
	var upgrader = websocket.Upgrader{
		ReadBufferSize:  1024,
		WriteBufferSize: 1024,
		CheckOrigin: func(r *http.Request) bool {
			return true
		},
		Subprotocols: []string{"graphql-ws"},
	}
	var posts = []*Post{
		&Post{ID: 1, Likes: 1},
		&Post{ID: 2, Likes: 2},
	}
	var subscribers sync.Map
	schema, err := graphql.NewSchema(graphql.SchemaConfig{
		Query: graphql.NewObject(graphql.ObjectConfig{
			Name: "Query",
			Fields: graphql.Fields{
				"posts": &graphql.Field{
					Type: graphql.NewList(PostType),
					Resolve: func(p graphql.ResolveParams) (interface{}, error) {
						return posts, nil
					},
				},
			},
		}),
		Subscription: graphql.NewObject(graphql.ObjectConfig{
			Name: "Subscription",
			Fields: graphql.Fields{
				"postLikesSubscribe": &graphql.Field{
					Type: graphql.NewList(PostType),
					Resolve: func(p graphql.ResolveParams) (interface{}, error) {
						return posts, nil
					},
				},
			},
		}),
	})
	if err != nil {
		log.Fatal(err)
	}
	h := handler.New(&handler.Config{
		Schema:     &schema,
		Pretty:     true,
		GraphiQL:   false,
		Playground: true,
	})
	http.Handle("/graphql", h)
	http.HandleFunc("/subscriptions", func(w http.ResponseWriter, r *http.Request) {
		conn, err := upgrader.Upgrade(w, r, nil)
		if err != nil {
			log.Printf("failed to do websocket upgrade: %v", err)
			return
		}
		connectionACK, err := json.Marshal(map[string]string{
			"type": "connection_ack",
		})
		if err != nil {
			log.Printf("failed to marshal ws connection ack: %v", err)
		}
		if err := conn.WriteMessage(websocket.TextMessage, connectionACK); err != nil {
			log.Printf("failed to write to ws connection: %v", err)
			return
		}
		go func() {
			for {
				_, p, err := conn.ReadMessage()
				if websocket.IsCloseError(err, websocket.CloseGoingAway) {
					return
				}
				if err != nil {
					log.Println("failed to read websocket message: %v", err)
					return
				}
				var msg ConnectionACKMessage
				if err := json.Unmarshal(p, &msg); err != nil {
					log.Printf("failed to unmarshal: %v", err)
					return
				}
				if msg.Type == "start" {
					length := 0
					subscribers.Range(func(key, value interface{}) bool {
						length++
						return true
					})
					var subscriber = Subscriber{
						ID:            length + 1,
						Conn:          conn,
						RequestString: msg.Payload.Query,
						OperationID:   msg.OperationID,
					}
					subscribers.Store(subscriber.ID, &subscriber)
				}
			}
		}()
	})
	go func() {
		for {
			time.Sleep(1 * time.Second)
			for _, post := range posts {
				post.Likes = post.Likes + 1
			}
			subscribers.Range(func(key, value interface{}) bool {
				subscriber, ok := value.(*Subscriber)
				if !ok {
					return true
				}
				payload := graphql.Do(graphql.Params{
					Schema:        schema,
					RequestString: subscriber.RequestString,
				})
				message, err := json.Marshal(map[string]interface{}{
					"type":    "data",
					"id":      subscriber.OperationID,
					"payload": payload,
				})
				if err != nil {
					log.Printf("failed to marshal message: %v", err)
					return true
				}
				if err := subscriber.Conn.WriteMessage(websocket.TextMessage, message); err != nil {
					if err == websocket.ErrCloseSent {
						subscribers.Delete(key)
						return true
					}
					log.Printf("failed to write to ws connection: %v", err)
					return true
				}
				return true
			})
		}
	}()
	log.Printf("server running on port :8080")
	http.ListenAndServe(":8080", nil)
}

Is there an efficient way to implement Subscription graphql-go/graphql?

cgxxv pushed a commit to cgxxv/gqlgen that referenced this issue Mar 25, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests