From bfdf5f432d29657ef0287da3b63c81f4c72ea082 Mon Sep 17 00:00:00 2001 From: Alejandro Serrano Date: Fri, 20 Mar 2020 15:58:07 +0100 Subject: [PATCH] Subscriptions for GraphQL (#146) Co-authored-by: Flavio Corpa --- graphql/exe/Main.hs | 22 +- graphql/mu-graphql.cabal | 8 + graphql/src/Mu/GraphQL/Query/Definition.hs | 21 +- graphql/src/Mu/GraphQL/Query/Parse.hs | 129 +++++- graphql/src/Mu/GraphQL/Query/Run.hs | 391 ++++++++++++++---- graphql/src/Mu/GraphQL/Server.hs | 109 ++--- .../src/Mu/GraphQL/Subscription/Protocol.hs | 158 +++++++ 7 files changed, 690 insertions(+), 148 deletions(-) create mode 100644 graphql/src/Mu/GraphQL/Subscription/Protocol.hs diff --git a/graphql/exe/Main.hs b/graphql/exe/Main.hs index 151dd7a9..3d334ac5 100644 --- a/graphql/exe/Main.hs +++ b/graphql/exe/Main.hs @@ -11,6 +11,8 @@ module Main where +import Data.Conduit +import Data.Conduit.Combinators (yieldMany) import Data.List (find) import Data.Maybe (fromMaybe, listToMaybe) import Data.Proxy @@ -37,7 +39,8 @@ main = do ("Access-Control-Allow-Origin", "*") , ("Access-Control-Allow-Headers", "Content-Type") ] - run 8000 $ hm $ graphQLAppQuery libraryServer (Proxy @"Query") + run 8000 $ hm $ graphQLApp libraryServer + (Proxy @('Just "Query")) (Proxy @'Nothing) (Proxy @('Just "Subscription")) type ServiceDefinition = 'Package ('Just "library") @@ -63,7 +66,10 @@ type ServiceDefinition , ObjectField "books" '[] '[] ('RetSingle ('ListRef ('ObjectRef "Book"))) ] - , Object "Mutation" '[] '[] + , Object "Subscription" '[] + '[ ObjectField "books" '[] + '[] ('RetStream ('ObjectRef "Book")) + ] ] type ServiceMapping = '[ @@ -85,9 +91,10 @@ libraryServer :<&>: (noContext findAuthor :<||>: noContext findBookTitle :<||>: noContext allAuthors - :<||>: noContext allBooks + :<||>: noContext allBooks' :<||>: H0) - :<&>: H0 :<&>: S0 + :<&>: (noContext allBooksConduit :<||>: H0) + :<&>: S0 where findBook i = find ((==i) . fst3) library @@ -108,7 +115,12 @@ libraryServer , title =~ rx] allAuthors = pure $ fst3 <$> library - allBooks = pure [(aid, bid) | (aid, _, books) <- library, (bid, _) <- books] + allBooks = [(aid, bid) | (aid, _, books) <- library, (bid, _) <- books] + allBooks' = pure allBooks + + allBooksConduit :: ConduitM (Integer, Integer) Void m () -> m () + allBooksConduit sink + = runConduit $ yieldMany allBooks .| sink -- helpers diff --git a/graphql/mu-graphql.cabal b/graphql/mu-graphql.cabal index 9e571efd..ab568116 100644 --- a/graphql/mu-graphql.cabal +++ b/graphql/mu-graphql.cabal @@ -22,27 +22,34 @@ library Mu.GraphQL.Query.Definition Mu.GraphQL.Query.Parse Mu.GraphQL.Query.Run + Mu.GraphQL.Subscription.Protocol -- other-extensions: build-depends: aeson + , async , base >=4.12 && <5 , bytestring , conduit , graphql-parser , http-types + , list-t , mtl , mu-rpc , mu-schema , scientific , sop-core + , stm , stm-chans , stm-conduit + , stm-containers , text , unordered-containers , wai + , wai-websockets , warp , warp-tls + , websockets hs-source-dirs: src default-language: Haskell2010 @@ -55,6 +62,7 @@ executable library-graphql ghc-options: -Wall build-depends: base >=4.12 && <5 + , conduit , mu-graphql , mu-rpc , mu-schema diff --git a/graphql/src/Mu/GraphQL/Query/Definition.hs b/graphql/src/Mu/GraphQL/Query/Definition.hs index 0e5ec75f..e4b509ef 100644 --- a/graphql/src/Mu/GraphQL/Query/Definition.hs +++ b/graphql/src/Mu/GraphQL/Query/Definition.hs @@ -11,13 +11,20 @@ import Data.Text import Mu.Rpc import Mu.Schema -data Document (p :: Package snm mnm anm) (qr :: Maybe snm) (mut :: Maybe snm) where - QueryDoc :: LookupService ss qr ~ 'Service qr qanns qms - => ServiceQuery ('Package pname ss) (LookupService ss qr) - -> Document ('Package pname ss) ('Just qr) mut - MutationDoc :: LookupService ss mut ~ 'Service mut manns mms - => ServiceQuery ('Package pname ss) (LookupService ss mut) - -> Document ('Package pname ss) qr ('Just mut) +data Document (p :: Package snm mnm anm) + (qr :: Maybe snm) (mut :: Maybe snm) (sub :: Maybe snm) where + QueryDoc + :: LookupService ss qr ~ 'Service qr qanns qms + => ServiceQuery ('Package pname ss) (LookupService ss qr) + -> Document ('Package pname ss) ('Just qr) mut sub + MutationDoc + :: LookupService ss mut ~ 'Service mut manns mms + => ServiceQuery ('Package pname ss) (LookupService ss mut) + -> Document ('Package pname ss) qr ('Just mut) sub + SubscriptionDoc + :: LookupService ss sub ~ 'Service sub manns mms + => OneMethodQuery ('Package pname ss) (LookupService ss sub) + -> Document ('Package pname ss) qr mut ('Just sub) type ServiceQuery (p :: Package snm mnm anm) (s :: Service snm mnm anm) = [OneMethodQuery p s] diff --git a/graphql/src/Mu/GraphQL/Query/Parse.hs b/graphql/src/Mu/GraphQL/Query/Parse.hs index 7d25d803..87dd5439 100644 --- a/graphql/src/Mu/GraphQL/Query/Parse.hs +++ b/graphql/src/Mu/GraphQL/Query/Parse.hs @@ -10,17 +10,21 @@ {-# language TypeOperators #-} {-# language UndecidableInstances #-} {-# language ViewPatterns #-} -{-# OPTIONS_GHC -Wincomplete-patterns #-} +{-# OPTIONS_GHC -Wincomplete-patterns -fno-warn-orphans #-} module Mu.GraphQL.Query.Parse where import Control.Monad.Except +import qualified Data.Aeson as A +import Data.Coerce (coerce) +import qualified Data.Foldable as F import qualified Data.HashMap.Strict as HM import Data.Int (Int32) import Data.Kind import Data.List (find) import Data.Maybe import Data.Proxy +import Data.Scientific (floatingOrInteger) import Data.SOP.NS import qualified Data.Text as T import GHC.TypeLits @@ -32,15 +36,26 @@ import Mu.Rpc import Mu.Schema type VariableMapC = HM.HashMap T.Text GQL.ValueConst -type VariableMap = HM.HashMap T.Text GQL.Value -type FragmentMap = HM.HashMap T.Text GQL.FragmentDefinition +type VariableMap = HM.HashMap T.Text GQL.Value +type FragmentMap = HM.HashMap T.Text GQL.FragmentDefinition + +instance A.FromJSON GQL.ValueConst where + parseJSON A.Null = pure GQL.VCNull + parseJSON (A.Bool b) = pure $ GQL.VCBoolean b + parseJSON (A.String s) = pure $ GQL.VCString $ coerce s + parseJSON (A.Number n) = pure $ either GQL.VCFloat GQL.VCInt $ floatingOrInteger n + parseJSON (A.Array xs) = GQL.VCList . GQL.ListValueG . F.toList <$> traverse A.parseJSON xs + parseJSON (A.Object o) = GQL.VCObject . GQL.ObjectValueG . fmap toObjFld . HM.toList <$> traverse A.parseJSON o + where + toObjFld :: (T.Text, GQL.ValueConst) -> GQL.ObjectFieldG GQL.ValueConst + toObjFld (k, v) = GQL.ObjectFieldG (coerce k) v parseDoc :: - forall qr mut p f. - ( MonadError T.Text f, ParseTypedDoc p qr mut ) => + forall qr mut sub p f. + ( MonadError T.Text f, ParseTypedDoc p qr mut sub ) => Maybe T.Text -> VariableMapC -> GQL.ExecutableDocument -> - f (Document p qr mut) + f (Document p qr mut sub) -- If there's no operation name, there must be only one query parseDoc Nothing vmap (GQL.ExecutableDocument defns) = case GQL.partitionExDefs defns of @@ -68,10 +83,10 @@ fragmentsToMap = HM.fromList . map fragmentToThingy fragmentToThingy f = (GQL.unName $ GQL._fdName f, f) parseTypedDoc :: - (MonadError T.Text f, ParseTypedDoc p qr mut) => + (MonadError T.Text f, ParseTypedDoc p qr mut sub) => VariableMapC -> FragmentMap -> GQL.TypedOperationDefinition -> - f (Document p qr mut) + f (Document p qr mut sub) parseTypedDoc vmap frmap tod = let defVmap = parseVariableMap (GQL._todVariableDefinitions tod) finalVmap = constToValue <$> HM.union vmap defVmap -- first one takes precedence @@ -81,19 +96,44 @@ parseTypedDoc vmap frmap tod GQL.OperationTypeMutation -> parseTypedDocMutation finalVmap frmap (GQL._todSelectionSet tod) GQL.OperationTypeSubscription - -> throwError "subscriptions are not (yet) supported" + -> parseTypedDocSubscription finalVmap frmap (GQL._todSelectionSet tod) -class ParseTypedDoc (p :: Package') (qr :: Maybe Symbol) (mut :: Maybe Symbol) where +class ParseTypedDoc (p :: Package') + (qr :: Maybe Symbol) (mut :: Maybe Symbol) (sub :: Maybe Symbol) where parseTypedDocQuery :: MonadError T.Text f => VariableMap -> FragmentMap -> GQL.SelectionSet -> - f (Document p qr mut) + f (Document p qr mut sub) parseTypedDocMutation :: MonadError T.Text f => VariableMap -> FragmentMap -> GQL.SelectionSet -> - f (Document p qr mut) + f (Document p qr mut sub) + parseTypedDocSubscription :: + MonadError T.Text f => + VariableMap -> FragmentMap -> + GQL.SelectionSet -> + f (Document p qr mut sub) + +instance + ( p ~ 'Package pname ss, + LookupService ss qr ~ 'Service qr qanns qmethods, + KnownName qr, ParseMethod p qmethods, + LookupService ss mut ~ 'Service mut manns mmethods, + KnownName mut, ParseMethod p mmethods, + LookupService ss sub ~ 'Service sub sanns smethods, + KnownName sub, ParseMethod p smethods + ) => ParseTypedDoc p ('Just qr) ('Just mut) ('Just sub) where + parseTypedDocQuery vmap frmap sset + = QueryDoc <$> parseQuery Proxy Proxy vmap frmap sset + parseTypedDocMutation vmap frmap sset + = MutationDoc <$> parseQuery Proxy Proxy vmap frmap sset + parseTypedDocSubscription vmap frmap sset + = do q <- parseQuery Proxy Proxy vmap frmap sset + case q of + [one] -> pure $ SubscriptionDoc one + _ -> throwError "subscriptions may only have one field" instance ( p ~ 'Package pname ss, @@ -101,38 +141,95 @@ instance KnownName qr, ParseMethod p qmethods, LookupService ss mut ~ 'Service mut manns mmethods, KnownName mut, ParseMethod p mmethods - ) => ParseTypedDoc p ('Just qr) ('Just mut) where + ) => ParseTypedDoc p ('Just qr) ('Just mut) 'Nothing where parseTypedDocQuery vmap frmap sset = QueryDoc <$> parseQuery Proxy Proxy vmap frmap sset parseTypedDocMutation vmap frmap sset = MutationDoc <$> parseQuery Proxy Proxy vmap frmap sset + parseTypedDocSubscription _ _ _ + = throwError "no subscriptions are defined in the schema" + +instance + ( p ~ 'Package pname ss, + LookupService ss qr ~ 'Service qr qanns qmethods, + KnownName qr, ParseMethod p qmethods, + LookupService ss sub ~ 'Service sub sanns smethods, + KnownName sub, ParseMethod p smethods + ) => ParseTypedDoc p ('Just qr) 'Nothing ('Just sub) where + parseTypedDocQuery vmap frmap sset + = QueryDoc <$> parseQuery Proxy Proxy vmap frmap sset + parseTypedDocMutation _ _ _ + = throwError "no mutations are defined in the schema" + parseTypedDocSubscription vmap frmap sset + = do q <- parseQuery Proxy Proxy vmap frmap sset + case q of + [one] -> pure $ SubscriptionDoc one + _ -> throwError "subscriptions may only have one field" instance ( p ~ 'Package pname ss, LookupService ss qr ~ 'Service qr qanns qmethods, KnownName qr, ParseMethod p qmethods - ) => ParseTypedDoc p ('Just qr) 'Nothing where + ) => ParseTypedDoc p ('Just qr) 'Nothing 'Nothing where parseTypedDocQuery vmap frmap sset = QueryDoc <$> parseQuery Proxy Proxy vmap frmap sset parseTypedDocMutation _ _ _ = throwError "no mutations are defined in the schema" + parseTypedDocSubscription _ _ _ + = throwError "no subscriptions are defined in the schema" + +instance + ( p ~ 'Package pname ss, + LookupService ss mut ~ 'Service mut manns mmethods, + KnownName mut, ParseMethod p mmethods, + LookupService ss sub ~ 'Service sub sanns smethods, + KnownName sub, ParseMethod p smethods + ) => ParseTypedDoc p 'Nothing ('Just mut) ('Just sub) where + parseTypedDocQuery _ _ _ + = throwError "no queries are defined in the schema" + parseTypedDocMutation vmap frmap sset + = MutationDoc <$> parseQuery Proxy Proxy vmap frmap sset + parseTypedDocSubscription vmap frmap sset + = do q <- parseQuery Proxy Proxy vmap frmap sset + case q of + [one] -> pure $ SubscriptionDoc one + _ -> throwError "subscriptions may only have one field" instance ( p ~ 'Package pname ss, LookupService ss mut ~ 'Service mut manns mmethods, KnownName mut, ParseMethod p mmethods - ) => ParseTypedDoc p 'Nothing ('Just mut) where + ) => ParseTypedDoc p 'Nothing ('Just mut) 'Nothing where parseTypedDocQuery _ _ _ = throwError "no queries are defined in the schema" parseTypedDocMutation vmap frmap sset = MutationDoc <$> parseQuery Proxy Proxy vmap frmap sset + parseTypedDocSubscription _ _ _ + = throwError "no subscriptions are defined in the schema" + +instance + ( p ~ 'Package pname ss, + LookupService ss sub ~ 'Service sub sanns smethods, + KnownName sub, ParseMethod p smethods + ) => ParseTypedDoc p 'Nothing 'Nothing ('Just sub) where + parseTypedDocQuery _ _ _ + = throwError "no queries are defined in the schema" + parseTypedDocMutation _ _ _ + = throwError "no mutations are defined in the schema" + parseTypedDocSubscription vmap frmap sset + = do q <- parseQuery Proxy Proxy vmap frmap sset + case q of + [one] -> pure $ SubscriptionDoc one + _ -> throwError "subscriptions may only have one field" instance - ParseTypedDoc p 'Nothing 'Nothing where + ParseTypedDoc p 'Nothing 'Nothing 'Nothing where parseTypedDocQuery _ _ _ = throwError "no queries are defined in the schema" parseTypedDocMutation _ _ _ = throwError "no mutations are defined in the schema" + parseTypedDocSubscription _ _ _ + = throwError "no subscriptions are defined in the schema" parseVariableMap :: [GQL.VariableDefinition] -> VariableMapC parseVariableMap vmap diff --git a/graphql/src/Mu/GraphQL/Query/Run.hs b/graphql/src/Mu/GraphQL/Query/Run.hs index 916d48e1..fb43ff97 100644 --- a/graphql/src/Mu/GraphQL/Query/Run.hs +++ b/graphql/src/Mu/GraphQL/Query/Run.hs @@ -17,8 +17,10 @@ module Mu.GraphQL.Query.Run ( GraphQLApp , runPipeline +, runSubscriptionPipeline , runDocument , runQuery +, runSubscription -- * Typeclass to be able to run query handlers , RunQueryFindHandler ) where @@ -30,6 +32,7 @@ import qualified Data.Aeson as Aeson import qualified Data.Aeson.Types as Aeson import Data.Conduit import Data.Conduit.Combinators (sinkList, yieldMany) +import Data.Conduit.Internal (ConduitT (..), Pipe (..)) import Data.Conduit.TQueue import Data.Maybe import qualified Data.Text as T @@ -45,42 +48,80 @@ import Mu.Server data GraphQLError = GraphQLError ServerError [T.Text] -type GraphQLApp p qr mut m chn hs - = (ParseTypedDoc p qr mut, RunDocument p qr mut m chn hs) +type GraphQLApp p qr mut sub m chn hs + = (ParseTypedDoc p qr mut sub, RunDocument p qr mut sub m chn hs) runPipeline - :: forall qr mut p m chn hs. GraphQLApp p qr mut m chn hs + :: forall qr mut sub p m chn hs. GraphQLApp p qr mut sub m chn hs => (forall a. m a -> ServerErrorIO a) -> ServerT chn p m hs - -> Proxy qr -> Proxy mut + -> Proxy qr -> Proxy mut -> Proxy sub -> Maybe T.Text -> VariableMapC -> GQL.ExecutableDocument -> IO Aeson.Value -runPipeline f svr _ _ opName vmap doc - = case parseDoc @qr @mut opName vmap doc of - Left e -> - pure $ - Aeson.object [ - ("errors", Aeson.Array [ - Aeson.object [ ("message", Aeson.String e) ] ])] - Right (d :: Document p qr mut) -> do +runPipeline f svr _ _ _ opName vmap doc + = case parseDoc @qr @mut @sub opName vmap doc of + Left e -> pure $ singleErrValue e + Right (d :: Document p qr mut sub) -> do (data_, errors) <- runWriterT (runDocument f svr d) case errors of [] -> pure $ Aeson.object [ ("data", data_) ] _ -> pure $ Aeson.object [ ("data", data_), ("errors", Aeson.listValue errValue errors) ] - where - errValue :: GraphQLError -> Aeson.Value - errValue (GraphQLError (ServerError _ msg) path) - = Aeson.object [ - ("message", Aeson.String $ T.pack msg) - , ("path", Aeson.toJSON path) - ] - -class RunDocument (p :: Package') (qr :: Maybe Symbol) (mut :: Maybe Symbol) m chn hs where + +runSubscriptionPipeline + :: forall qr mut sub p m chn hs. GraphQLApp p qr mut sub m chn hs + => (forall a. m a -> ServerErrorIO a) + -> ServerT chn p m hs + -> Proxy qr -> Proxy mut -> Proxy sub + -> Maybe T.Text -> VariableMapC -> GQL.ExecutableDocument + -> ConduitT Aeson.Value Void IO () + -> IO () +runSubscriptionPipeline f svr _ _ _ opName vmap doc sink + = case parseDoc @qr @mut @sub opName vmap doc of + Left e + -> yieldSingleError e sink + Right (d :: Document p qr mut sub) + -> runDocumentSubscription f svr d sink + +singleErrValue :: T.Text -> Aeson.Value +singleErrValue e + = Aeson.object [ ("errors", Aeson.Array [ + Aeson.object [ ("message", Aeson.String e) ] ])] + +errValue :: GraphQLError -> Aeson.Value +errValue (GraphQLError (ServerError _ msg) path) + = Aeson.object [ + ("message", Aeson.String $ T.pack msg) + , ("path", Aeson.toJSON path) + ] + +yieldSingleError :: Monad m + => T.Text -> ConduitM Aeson.Value Void m () -> m () +yieldSingleError e sink = + runConduit $ yieldMany ([singleErrValue e] :: [Aeson.Value]) .| sink + +yieldError :: Monad m + => ServerError -> [T.Text] + -> ConduitM Aeson.Value Void m () -> m () +yieldError e path sink = do + let val = Aeson.object [ ("errors", Aeson.listValue errValue [GraphQLError e path]) ] + runConduit $ yieldMany ([val] :: [Aeson.Value]) .| sink + +class RunDocument (p :: Package') + (qr :: Maybe Symbol) + (mut :: Maybe Symbol) + (sub :: Maybe Symbol) + m chn hs where runDocument :: (forall a. m a -> ServerErrorIO a) -> ServerT chn p m hs - -> Document p qr mut + -> Document p qr mut sub -> WriterT [GraphQLError] IO Aeson.Value + runDocumentSubscription :: + (forall a. m a -> ServerErrorIO a) + -> ServerT chn p m hs + -> Document p qr mut sub + -> ConduitT Aeson.Value Void IO () + -> IO () instance ( p ~ 'Package pname ss @@ -90,30 +131,120 @@ instance , KnownSymbol mut , RunQueryFindHandler m p hs chn ss (LookupService ss mut) hs , MappingRight chn mut ~ () - ) => RunDocument p ('Just qr) ('Just mut) m chn hs where + , KnownSymbol sub + , RunQueryFindHandler m p hs chn ss (LookupService ss sub) hs + , MappingRight chn sub ~ () + ) => RunDocument p ('Just qr) ('Just mut) ('Just sub) m chn hs where runDocument f svr (QueryDoc q) - = runQuery f svr () q + = runQuery f svr [] () q runDocument f svr (MutationDoc q) - = runQuery f svr () q + = runQuery f svr [] () q + runDocument _ _ (SubscriptionDoc _) + = pure $ singleErrValue "cannot execute subscriptions in this wire" + runDocumentSubscription f svr (SubscriptionDoc d) + = runSubscription f svr [] () d + runDocumentSubscription f svr d = yieldDocument f svr d + instance ( p ~ 'Package pname ss , KnownSymbol qr , RunQueryFindHandler m p hs chn ss (LookupService ss qr) hs , MappingRight chn qr ~ () - ) => RunDocument p ('Just qr) 'Nothing m chn hs where + , KnownSymbol mut + , RunQueryFindHandler m p hs chn ss (LookupService ss mut) hs + , MappingRight chn mut ~ () + ) => RunDocument p ('Just qr) ('Just mut) 'Nothing m chn hs where runDocument f svr (QueryDoc q) - = runQuery f svr () q + = runQuery f svr [] () q + runDocument f svr (MutationDoc q) + = runQuery f svr [] () q + runDocumentSubscription = yieldDocument + +instance + ( p ~ 'Package pname ss + , KnownSymbol qr + , RunQueryFindHandler m p hs chn ss (LookupService ss qr) hs + , MappingRight chn qr ~ () + , KnownSymbol sub + , RunQueryFindHandler m p hs chn ss (LookupService ss sub) hs + , MappingRight chn sub ~ () + ) => RunDocument p ('Just qr) 'Nothing ('Just sub) m chn hs where + runDocument f svr (QueryDoc q) + = runQuery f svr [] () q + runDocument _ _ (SubscriptionDoc _) + = pure $ singleErrValue "cannot execute subscriptions in this wire" + runDocumentSubscription f svr (SubscriptionDoc d) + = runSubscription f svr [] () d + runDocumentSubscription f svr d = yieldDocument f svr d + +instance + ( p ~ 'Package pname ss + , KnownSymbol qr + , RunQueryFindHandler m p hs chn ss (LookupService ss qr) hs + , MappingRight chn qr ~ () + ) => RunDocument p ('Just qr) 'Nothing 'Nothing m chn hs where + runDocument f svr (QueryDoc q) + = runQuery f svr [] () q + runDocumentSubscription = yieldDocument + +instance + ( p ~ 'Package pname ss + , KnownSymbol mut + , RunQueryFindHandler m p hs chn ss (LookupService ss mut) hs + , MappingRight chn mut ~ () + , KnownSymbol sub + , RunQueryFindHandler m p hs chn ss (LookupService ss sub) hs + , MappingRight chn sub ~ () + ) => RunDocument p 'Nothing ('Just mut) ('Just sub) m chn hs where + runDocument f svr (MutationDoc q) + = runQuery f svr [] () q + runDocument _ _ (SubscriptionDoc _) + = pure $ singleErrValue "cannot execute subscriptions in this wire" + runDocumentSubscription f svr (SubscriptionDoc d) + = runSubscription f svr [] () d + runDocumentSubscription f svr d = yieldDocument f svr d + instance ( p ~ 'Package pname ss , KnownSymbol mut , RunQueryFindHandler m p hs chn ss (LookupService ss mut) hs , MappingRight chn mut ~ () - ) => RunDocument p 'Nothing ('Just mut) m chn hs where + ) => RunDocument p 'Nothing ('Just mut) 'Nothing m chn hs where runDocument f svr (MutationDoc q) - = runQuery f svr () q + = runQuery f svr [] () q + runDocumentSubscription = yieldDocument + instance - RunDocument p 'Nothing 'Nothing m chn hs where + ( p ~ 'Package pname ss + , KnownSymbol sub + , RunQueryFindHandler m p hs chn ss (LookupService ss sub) hs + , MappingRight chn sub ~ () + ) => RunDocument p 'Nothing 'Nothing ('Just sub) m chn hs where + runDocument _ _ (SubscriptionDoc _) + = pure $ singleErrValue "cannot execute subscriptions in this wire" + runDocumentSubscription f svr (SubscriptionDoc d) + = runSubscription f svr [] () d + +instance + RunDocument p 'Nothing 'Nothing 'Nothing m chn hs where runDocument _ = error "this should never be called" + runDocumentSubscription _ = error "this should never be called" + +yieldDocument :: + forall p qr mut sub m chn hs. + RunDocument p qr mut sub m chn hs + => (forall a. m a -> ServerErrorIO a) + -> ServerT chn p m hs + -> Document p qr mut sub + -> ConduitT Aeson.Value Void IO () + -> IO () +yieldDocument f svr doc sink = do + (data_, errors) <- runWriterT (runDocument @p @qr @mut @sub @m @chn @hs f svr doc) + let (val :: Aeson.Value) + = case errors of + [] -> Aeson.object [ ("data", data_) ] + _ -> Aeson.object [ ("data", data_), ("errors", Aeson.listValue errValue errors) ] + runConduit $ yieldMany ([val] :: [Aeson.Value]) .| sink runQuery :: forall m p s pname ss hs sname sanns ms chn inh. @@ -123,10 +254,27 @@ runQuery , inh ~ MappingRight chn sname ) => (forall a. m a -> ServerErrorIO a) -> ServerT chn p m hs + -> [T.Text] -> inh -> ServiceQuery p s -> WriterT [GraphQLError] IO Aeson.Value -runQuery f whole@(Services ss) = runQueryFindHandler f whole ss +runQuery f whole@(Services ss) path = runQueryFindHandler f whole path ss + +runSubscription + :: forall m p s pname ss hs sname sanns ms chn inh. + ( RunQueryFindHandler m p hs chn ss s hs + , p ~ 'Package pname ss + , s ~ 'Service sname sanns ms + , inh ~ MappingRight chn sname ) + => (forall a. m a -> ServerErrorIO a) + -> ServerT chn p m hs + -> [T.Text] + -> inh + -> OneMethodQuery p s + -> ConduitT Aeson.Value Void IO () + -> IO () +runSubscription f whole@(Services ss) path + = runSubscriptionFindHandler f whole path ss class RunQueryFindHandler m p whole chn ss s hs where runQueryFindHandler @@ -135,39 +283,55 @@ class RunQueryFindHandler m p whole chn ss s hs where , inh ~ MappingRight chn sname ) => (forall a. m a -> ServerErrorIO a) -> ServerT chn p m whole + -> [T.Text] -> ServicesT chn ss m hs -> inh -> ServiceQuery p s -> WriterT [GraphQLError] IO Aeson.Value + runSubscriptionFindHandler + :: ( p ~  'Package pname wholess + , s ~ 'Service sname sanns ms + , inh ~ MappingRight chn sname ) + => (forall a. m a -> ServerErrorIO a) + -> ServerT chn p m whole + -> [T.Text] + -> ServicesT chn ss m hs + -> inh + -> OneMethodQuery p s + -> ConduitT Aeson.Value Void IO () + -> IO () instance TypeError ('Text "Could not find handler for " ':<>: 'ShowType s) => RunQueryFindHandler m p whole chn '[] s '[] where runQueryFindHandler _ = error "this should never be called" + runSubscriptionFindHandler _ = error "this should never be called" instance {-# OVERLAPPABLE #-} RunQueryFindHandler m p whole chn ss s hs => RunQueryFindHandler m p whole chn (other ': ss) s (h ': hs) where - runQueryFindHandler f whole (_ :<&>: that) = runQueryFindHandler f whole that + runQueryFindHandler f whole path (_ :<&>: that) + = runQueryFindHandler f whole path that + runSubscriptionFindHandler f whole path (_ :<&>: that) + = runSubscriptionFindHandler f whole path that instance {-# OVERLAPS #-} (s ~ 'Service sname sanns ms, KnownName sname, RunMethod m p whole chn sname ms h) => RunQueryFindHandler m p whole chn (s ': ss) s (h ': hs) where - runQueryFindHandler f whole (this :<&>: _) inh queries + runQueryFindHandler f whole path (this :<&>: _) inh queries = Aeson.object . catMaybes <$> mapM runOneQuery queries where -- if we include the signature we have to write -- an explicit type signature for 'runQueryFindHandler' runOneQuery (OneMethodQuery nm args) - = pass (do (val, methodName) <- runMethod f whole (Proxy @sname) inh this args - let realName = fromMaybe methodName nm - -- choose between given name, - -- or fallback to method name - newVal = fmap (realName,) val - pure (newVal, map (updateErrs realName)) ) - where -- add the additional path component to the errors - updateErrs :: T.Text -> GraphQLError -> GraphQLError - updateErrs methodName (GraphQLError err loc) = GraphQLError err (methodName : loc) + = runMethod f whole (Proxy @sname) path nm inh this args -- handle __typename runOneQuery (TypeNameQuery nm) = let realName = fromMaybe "__typename" nm in pure $ Just (realName, Aeson.String $ T.pack $ nameVal (Proxy @sname)) + -- subscriptions should only have one element + runSubscriptionFindHandler f whole path (this :<&>: _) inh (OneMethodQuery nm args) sink + = runMethodSubscription f whole (Proxy @sname) path nm inh this args sink + runSubscriptionFindHandler _ _ _ _ _ (TypeNameQuery nm) sink + = let realName = fromMaybe "__typename" nm + o = Aeson.object [(realName, Aeson.String $ T.pack $ nameVal (Proxy @sname))] + in runConduit $ yieldMany ([o] :: [Aeson.Value]) .| sink class RunMethod m p whole chn sname ms hs where runMethod @@ -175,65 +339,143 @@ class RunMethod m p whole chn sname ms hs where , inh ~ MappingRight chn sname ) => (forall a. m a -> ServerErrorIO a) -> ServerT chn p m whole - -> Proxy sname -> inh + -> Proxy sname -> [T.Text] -> Maybe T.Text -> inh + -> HandlersT chn inh ms m hs + -> NS (ChosenMethodQuery p) ms + -> WriterT [GraphQLError] IO (Maybe (T.Text, Aeson.Value)) + runMethodSubscription + :: ( p ~ 'Package pname wholess + , inh ~ MappingRight chn sname ) + => (forall a. m a -> ServerErrorIO a) + -> ServerT chn p m whole + -> Proxy sname -> [T.Text] -> Maybe T.Text -> inh -> HandlersT chn inh ms m hs -> NS (ChosenMethodQuery p) ms - -> WriterT [GraphQLError] IO (Maybe Aeson.Value, T.Text) + -> ConduitT Aeson.Value Void IO () + -> IO () instance RunMethod m p whole chn s '[] '[] where runMethod _ = error "this should never be called" + runMethodSubscription _ = error "this should never be called" instance (RunMethod m p whole chn s ms hs, KnownName mname, RunHandler m p whole chn args r h) => RunMethod m p whole chn s ('Method mname anns args r ': ms) (h ': hs) where - runMethod f whole _ inh (h :<||>: _) (Z (ChosenMethodQuery args ret)) - = (, T.pack $ nameVal (Proxy @mname)) <$> runHandler f whole (h inh) args ret - runMethod f whole p inh (_ :<||>: r) (S cont) - = runMethod f whole p inh r cont + -- handle normal methods + runMethod f whole _ path nm inh (h :<||>: _) (Z (ChosenMethodQuery args ret)) + = ((realName ,) <$>) <$> runHandler f whole (path ++ [realName]) (h inh) args ret + where realName = fromMaybe (T.pack $ nameVal (Proxy @mname)) nm + runMethod f whole p path nm inh (_ :<||>: r) (S cont) + = runMethod f whole p path nm inh r cont + -- handle subscriptions + runMethodSubscription f whole _ path nm inh (h :<||>: _) (Z (ChosenMethodQuery args ret)) sink + = runHandlerSubscription f whole (path ++ [realName]) (h inh) args ret sink + where realName = fromMaybe (T.pack $ nameVal (Proxy @mname)) nm + runMethodSubscription f whole p path nm inh (_ :<||>: r) (S cont) sink + = runMethodSubscription f whole p path nm inh r cont sink class Handles chn args r m h => RunHandler m p whole chn args r h where - runHandler :: (forall a. m a -> ServerErrorIO a) - -> ServerT chn p m whole - -> h - -> NP (ArgumentValue p) args - -> ReturnQuery p r - -> WriterT [GraphQLError] IO (Maybe Aeson.Value) + runHandler + :: (forall a. m a -> ServerErrorIO a) + -> ServerT chn p m whole + -> [T.Text] + -> h + -> NP (ArgumentValue p) args + -> ReturnQuery p r + -> WriterT [GraphQLError] IO (Maybe Aeson.Value) + runHandlerSubscription + :: (forall a. m a -> ServerErrorIO a) + -> ServerT chn p m whole + -> [T.Text] + -> h + -> NP (ArgumentValue p) args + -> ReturnQuery p r + -> ConduitT Aeson.Value Void IO () + -> IO () instance (ArgumentConversion chn ref t, RunHandler m p whole chn rest r h) => RunHandler m p whole chn ('ArgSingle aname aanns ref ': rest) r (t -> h) where - runHandler f whole h (ArgumentValue one :* rest) - = runHandler f whole (h (convertArg (Proxy @chn) one)) rest + runHandler f whole path h (ArgumentValue one :* rest) + = runHandler f whole path (h (convertArg (Proxy @chn) one)) rest + runHandlerSubscription f whole path h (ArgumentValue one :* rest) + = runHandlerSubscription f whole path (h (convertArg (Proxy @chn) one)) rest instance ( MonadError ServerError m , FromRef chn ref t , ArgumentConversion chn ('ListRef ref) [t] , RunHandler m p whole chn rest r h ) => RunHandler m p whole chn ('ArgStream aname aanns ref ': rest) r (ConduitT () t m () -> h) where - runHandler f whole h (ArgumentStream lst :* rest) + runHandler f whole path h (ArgumentStream lst :* rest) + = let converted :: [t] = convertArg (Proxy @chn) lst + in runHandler f whole path (h (yieldMany converted)) rest + runHandlerSubscription f whole path h (ArgumentStream lst :* rest) sink = let converted :: [t] = convertArg (Proxy @chn) lst - in runHandler f whole (h (yieldMany converted)) rest + in runHandlerSubscription f whole path (h (yieldMany converted)) rest sink instance (MonadError ServerError m) => RunHandler m p whole chn '[] 'RetNothing (m ()) where - runHandler f _ h Nil _ = do + runHandler f _ path h Nil _ = do res <- liftIO $ runExceptT (f h) case res of Right _ -> pure $ Just Aeson.Null - Left e -> tell [GraphQLError e []] >> pure Nothing + Left e -> tell [GraphQLError e path] >> pure Nothing + runHandlerSubscription f _ path h Nil _ sink = do + res <- liftIO $ runExceptT (f h) + case res of + Right _ -> runConduit $ yieldMany ([] :: [Aeson.Value]) .| sink + Left e -> yieldError e path sink instance (MonadError ServerError m, ResultConversion m p whole chn r l) => RunHandler m p whole chn '[] ('RetSingle r) (m l) where - runHandler f whole h Nil (RSingle q) = do + runHandler f whole path h Nil (RSingle q) = do res <- liftIO $ runExceptT (f h) case res of - Right v -> convertResult f whole q v - Left e -> tell [GraphQLError e []] >> pure Nothing + Right v -> convertResult f whole path q v + Left e -> tell [GraphQLError e path] >> pure Nothing + runHandlerSubscription f whole path h Nil (RSingle q) sink = do + res <- liftIO $ runExceptT (f h) + val <- case res of + Right v -> do + (data_, errors) <- runWriterT (convertResult f whole path q v) + case errors of + [] -> pure $ Aeson.object [ ("data", fromMaybe Aeson.Null data_) ] + _ -> pure $ Aeson.object [ ("data", fromMaybe Aeson.Null data_) + , ("errors", Aeson.listValue errValue errors) ] + Left e -> pure $ Aeson.object [ ("errors", Aeson.listValue errValue [GraphQLError e path]) ] + runConduit $ yieldMany ([val] :: [Aeson.Value]) .| sink instance (MonadIO m, MonadError ServerError m, ResultConversion m p whole chn r l) => RunHandler m p whole chn '[] ('RetStream r) (ConduitT l Void m () -> m ()) where - runHandler f whole h Nil (RStream q) = do + runHandler f whole path h Nil (RStream q) = do queue <- liftIO newTMQueueIO res <- liftIO $ runExceptT $ f $ h (sinkTMQueue queue) case res of Right _ -> do info <- runConduit $ sourceTMQueue queue .| sinkList - Just . Aeson.toJSON . catMaybes <$> traverse (convertResult f whole q) info + Just . Aeson.toJSON . catMaybes <$> traverse (convertResult f whole path q) info Left e -> tell [GraphQLError e []] >> pure Nothing + runHandlerSubscription f whole path h Nil (RStream q) sink = do + res <- liftIO $ runExceptT $ f $ h + (transPipe liftIO (mapInputM convert (error "this should not be called") sink)) + case res of + Right _ -> return () + Left e -> yieldError e path sink + where + convert :: l -> IO Aeson.Value + convert v = do + (data_, errors) <- runWriterT (convertResult f whole path q v) + case errors of + [] -> pure $ Aeson.object [ ("data", fromMaybe Aeson.Null data_) ] + _ -> pure $ Aeson.object [ ("data", fromMaybe Aeson.Null data_) + , ("errors", Aeson.listValue errValue errors) ] + +mapInputM :: Monad m + => (i1 -> m i2) -- ^ map initial input to new input + -> (i2 -> m (Maybe i1)) -- ^ map new leftovers to initial leftovers + -> ConduitT i2 o m r + -> ConduitT i1 o m r +mapInputM f f' (ConduitT c0) = ConduitT $ \rest -> let + go (HaveOutput p o) = HaveOutput (go p) o + go (NeedInput p c) = NeedInput (\i -> PipeM $ go . p <$> f i) (go . c) + go (Done r) = rest r + go (PipeM mp) = PipeM $ fmap go mp + go (Leftover p i) = PipeM $ (\x -> maybe id (flip Leftover) x (go p)) <$> f' i + in go (c0 Done) class FromRef chn ref t => ArgumentConversion chn ref t where @@ -253,33 +495,34 @@ instance ArgumentConversion chn ref t class ToRef chn r l => ResultConversion m p whole chn r l where convertResult :: (forall a. m a -> ServerErrorIO a) -> ServerT chn p m whole + -> [T.Text] -> ReturnQuery' p r -> l -> WriterT [GraphQLError] IO (Maybe Aeson.Value) instance Aeson.ToJSON t => ResultConversion m p whole chn ('PrimitiveRef t) t where - convertResult _ _ RetPrimitive = pure . Just . Aeson.toJSON + convertResult _ _ _ RetPrimitive = pure . Just . Aeson.toJSON instance ( ToSchema sch l r , RunSchemaQuery sch (sch :/: l) ) => ResultConversion m p whole chn ('SchemaRef sch l) r where - convertResult _ _ (RetSchema r) t + convertResult _ _ _ (RetSchema r) t = pure $ Just $ runSchemaQuery (toSchema' @_ @_ @sch @r t) r instance ( MappingRight chn ref ~ t , MappingRight chn sname ~ t , LookupService ss ref ~ 'Service sname sanns ms , RunQueryFindHandler m ('Package pname ss) whole chn ss ('Service sname sanns ms) whole) => ResultConversion m ('Package pname ss) whole chn ('ObjectRef ref) t where - convertResult f whole (RetObject q) h - = Just <$> runQuery @m @('Package pname ss) @(LookupService ss ref) f whole h q + convertResult f whole path (RetObject q) h + = Just <$> runQuery @m @('Package pname ss) @(LookupService ss ref) f whole path h q instance ResultConversion m p whole chn r s => ResultConversion m p whole chn ('OptionalRef r) (Maybe s) where - convertResult _ _ _ Nothing + convertResult _ _ _ _ Nothing = pure Nothing - convertResult f whole (RetOptional q) (Just x) - = convertResult f whole q x + convertResult f whole path (RetOptional q) (Just x) + = convertResult f whole path q x instance ResultConversion m p whole chn r s => ResultConversion m p whole chn ('ListRef r) [s] where - convertResult f whole (RetList q) xs - = Just . Aeson.toJSON . catMaybes <$> mapM (convertResult f whole q) xs + convertResult f whole path (RetList q) xs + = Just . Aeson.toJSON . catMaybes <$> mapM (convertResult f whole path q) xs class RunSchemaQuery sch r where runSchemaQuery diff --git a/graphql/src/Mu/GraphQL/Server.hs b/graphql/src/Mu/GraphQL/Server.hs index 777e39de..a36f431b 100644 --- a/graphql/src/Mu/GraphQL/Server.hs +++ b/graphql/src/Mu/GraphQL/Server.hs @@ -6,7 +6,6 @@ {-# language RankNTypes #-} {-# language ScopedTypeVariables #-} {-# language TypeApplications #-} -{-# OPTIONS_GHC -fno-warn-orphans #-} module Mu.GraphQL.Server ( GraphQLApp @@ -21,29 +20,29 @@ module Mu.GraphQL.Server ( , graphQLAppTransQuery ) where -import Control.Applicative ((<|>)) -import Control.Monad (join) -import qualified Data.Aeson as A -import Data.Aeson.Text (encodeToLazyText) -import Data.ByteString.Lazy (fromStrict, toStrict) -import Data.Coerce (coerce) -import qualified Data.Foldable as F -import Data.HashMap.Strict (empty, toList) +import Control.Applicative ((<|>)) +import Control.Monad (join) +import qualified Data.Aeson as A +import Data.Aeson.Text (encodeToLazyText) +import Data.ByteString.Lazy (fromStrict, toStrict) +import qualified Data.HashMap.Strict as HM import Data.Proxy -import Data.Scientific (floatingOrInteger) -import qualified Data.Text as T -import Data.Text.Encoding (decodeUtf8) -import qualified Data.Text.Lazy.Encoding as T -import Language.GraphQL.Draft.Parser (parseExecutableDoc) -import Language.GraphQL.Draft.Syntax +import qualified Data.Text as T +import Data.Text.Encoding (decodeUtf8) +import qualified Data.Text.Lazy.Encoding as T +import Language.GraphQL.Draft.Parser (parseExecutableDoc) +import Network.HTTP.Types.Header (hContentType) +import Network.HTTP.Types.Method (StdMethod (..), parseMethod) +import Network.HTTP.Types.Status (ok200) +import Network.Wai +import Network.Wai.Handler.Warp (Port, Settings, run, runSettings) +import qualified Network.Wai.Handler.WebSockets as WS +import qualified Network.WebSockets as WS + import Mu.GraphQL.Query.Parse import Mu.GraphQL.Query.Run +import Mu.GraphQL.Subscription.Protocol import Mu.Server -import Network.HTTP.Types.Header (hContentType) -import Network.HTTP.Types.Method (StdMethod (..), parseMethod) -import Network.HTTP.Types.Status (ok200) -import Network.Wai -import Network.Wai.Handler.Warp (Port, Settings, run, runSettings) data GraphQLInput = GraphQLInput T.Text VariableMapC (Maybe T.Text) @@ -51,60 +50,64 @@ instance A.FromJSON GraphQLInput where parseJSON = A.withObject "GraphQLInput" $ \v -> GraphQLInput <$> v A..: "query" - <*> (v A..: "variables" <|> pure empty) + <*> (v A..: "variables" <|> pure HM.empty) <*> v A..:? "operationName" -instance A.FromJSON ValueConst where - parseJSON A.Null = pure VCNull - parseJSON (A.Bool b) = pure $ VCBoolean b - parseJSON (A.String s) = pure $ VCString $ coerce s - parseJSON (A.Number n) = pure $ either VCFloat VCInt $ floatingOrInteger n - parseJSON (A.Array xs) = VCList . ListValueG . F.toList <$> traverse A.parseJSON xs - parseJSON (A.Object o) = VCObject . ObjectValueG . fmap toObjFld . toList <$> traverse A.parseJSON o - where - toObjFld :: (T.Text, ValueConst) -> ObjectFieldG ValueConst - toObjFld (k, v) = ObjectFieldG (coerce k) v - -- | Turn a Mu GraphQL 'Server' into a WAI 'Application'. -- -- These 'Application's can be later combined using, -- for example, @wai-routes@, or you can add middleware -- from @wai-extra@, among others. graphQLApp :: - ( GraphQLApp p qr mut ServerErrorIO chn hs ) + ( GraphQLApp p qr mut sub ServerErrorIO chn hs ) => ServerT chn p ServerErrorIO hs -> Proxy qr -> Proxy mut + -> Proxy sub -> Application graphQLApp = graphQLAppTrans id graphQLAppQuery :: forall qr p chn hs. - ( GraphQLApp p ('Just qr) 'Nothing ServerErrorIO chn hs ) + ( GraphQLApp p ('Just qr) 'Nothing 'Nothing ServerErrorIO chn hs ) => ServerT chn p ServerErrorIO hs -> Proxy qr -> Application graphQLAppQuery svr _ - = graphQLApp svr (Proxy @('Just qr)) (Proxy @'Nothing) + = graphQLApp svr (Proxy @('Just qr)) (Proxy @'Nothing) (Proxy @'Nothing) graphQLAppTransQuery :: forall qr m p chn hs. - ( GraphQLApp p ('Just qr) 'Nothing m chn hs ) + ( GraphQLApp p ('Just qr) 'Nothing 'Nothing m chn hs ) => (forall a. m a -> ServerErrorIO a) -> ServerT chn p m hs -> Proxy qr -> Application graphQLAppTransQuery f svr _ - = graphQLAppTrans f svr (Proxy @('Just qr)) (Proxy @'Nothing) + = graphQLAppTrans f svr (Proxy @('Just qr)) (Proxy @'Nothing) (Proxy @'Nothing) graphQLAppTrans :: - ( GraphQLApp p qr mut m chn hs ) + ( GraphQLApp p qr mut sub m chn hs ) => (forall a. m a -> ServerErrorIO a) -> ServerT chn p m hs -> Proxy qr -> Proxy mut + -> Proxy sub -> Application -graphQLAppTrans f server q m req res = +graphQLAppTrans f server q m s + = WS.websocketsOr WS.defaultConnectionOptions + (wsGraphQLAppTrans f server q m s) + (httpGraphQLAppTrans f server q m s) + +httpGraphQLAppTrans :: + ( GraphQLApp p qr mut sub m chn hs ) + => (forall a. m a -> ServerErrorIO a) + -> ServerT chn p m hs + -> Proxy qr + -> Proxy mut + -> Proxy sub + -> Application +httpGraphQLAppTrans f server q m s req res = case parseMethod (requestMethod req) of Left err -> toError $ decodeUtf8 err Right GET -> do @@ -115,7 +118,7 @@ graphQLAppTrans f server q m req res = case A.eitherDecode $ fromStrict vars of Left err -> toError $ T.pack err Right vrs -> execQuery opN vrs qry - (Just (Just qry), _) -> execQuery opN empty qry + (Just (Just qry), _) -> execQuery opN HM.empty qry _ -> toError "Error parsing query" Right POST -> do body <- strictRequestBody req @@ -125,7 +128,7 @@ graphQLAppTrans f server q m req res = Left err -> toError $ T.pack err Right (GraphQLInput qry vars opName) -> execQuery opName vars qry Just "application/graphql" -> - execQuery Nothing empty (decodeUtf8 $ toStrict body) + execQuery Nothing HM.empty (decodeUtf8 $ toStrict body) _ -> toError "No `Content-Type` header found!" _ -> toError "Unsupported method" where @@ -133,30 +136,44 @@ graphQLAppTrans f server q m req res = execQuery opn vals qry = case parseExecutableDoc qry of Left err -> toError err - Right doc -> runPipeline f server q m opn vals doc >>= toResponse + Right doc -> runPipeline f server q m s opn vals doc >>= toResponse toError :: T.Text -> IO ResponseReceived toError err = toResponse $ A.object [ ("errors", A.Array [ A.object [ ("message", A.String err) ] ])] toResponse :: A.Value -> IO ResponseReceived toResponse = res . responseBuilder ok200 [] . T.encodeUtf8Builder . encodeToLazyText +wsGraphQLAppTrans + :: ( GraphQLApp p qr mut sub m chn hs ) + => (forall a. m a -> ServerErrorIO a) + -> ServerT chn p m hs + -> Proxy qr + -> Proxy mut + -> Proxy sub + -> WS.ServerApp +wsGraphQLAppTrans f server q m s conn + = do conn' <- WS.acceptRequest conn + flip protocol conn' $ runSubscriptionPipeline f server q m s + -- | Run a Mu 'graphQLApp' using the given 'Settings'. -- -- Go to 'Network.Wai.Handler.Warp' to declare 'Settings'. runGraphQLAppSettings :: - ( GraphQLApp p qr mut ServerErrorIO chn hs ) + ( GraphQLApp p qr mut sub ServerErrorIO chn hs ) => Settings -> ServerT chn p ServerErrorIO hs -> Proxy qr -> Proxy mut + -> Proxy sub -> IO () -runGraphQLAppSettings st svr q m = runSettings st (graphQLApp svr q m) +runGraphQLAppSettings st svr q m s = runSettings st (graphQLApp svr q m s) -- | Run a Mu 'graphQLApp' on the given port. runGraphQLApp :: - ( GraphQLApp p qr mut ServerErrorIO chn hs ) + ( GraphQLApp p qr mut sub ServerErrorIO chn hs ) => Port -> ServerT chn p ServerErrorIO hs -> Proxy qr -> Proxy mut + -> Proxy sub -> IO () -runGraphQLApp port svr q m = run port (graphQLApp svr q m) +runGraphQLApp port svr q m s = run port (graphQLApp svr q m s) diff --git a/graphql/src/Mu/GraphQL/Subscription/Protocol.hs b/graphql/src/Mu/GraphQL/Subscription/Protocol.hs new file mode 100644 index 00000000..cfd3326a --- /dev/null +++ b/graphql/src/Mu/GraphQL/Subscription/Protocol.hs @@ -0,0 +1,158 @@ +{-# language FlexibleContexts #-} +{-# language OverloadedStrings #-} +{-# language ScopedTypeVariables #-} +{- +This module implements the protocol as specified in +https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md +-} +module Mu.GraphQL.Subscription.Protocol where + +import Control.Applicative +import Control.Concurrent +import Control.Concurrent.Async +import Control.Concurrent.STM +import Control.Monad (forM_) +import Control.Monad.IO.Class (MonadIO (liftIO)) +import Data.Aeson ((.:), (.:?), (.=)) +import qualified Data.Aeson as A +import Data.Conduit +import qualified Data.HashMap.Strict as HM +import qualified Data.Text as T +import Language.GraphQL.Draft.Parser (parseExecutableDoc) +import Language.GraphQL.Draft.Syntax (ExecutableDocument) +import qualified ListT as L +import Network.WebSockets +import qualified StmContainers.Map as M + +import Mu.GraphQL.Query.Parse + +protocol :: ( Maybe T.Text -> VariableMapC -> ExecutableDocument + -> ConduitT A.Value Void IO () + -> IO () ) + -> Connection -> IO () +protocol f conn = start + where + -- listen for GQL_CONNECTION_INIT + start = do + msg <- receiveJSON conn + case msg of + Just (GQLConnectionInit _) + -> do -- send GQL_CONNECTION_ACK + sendJSON conn GQLConnectionAck + vars <- M.newIO + -- send GQL_KEEP_ALIVE each 1s. + withAsync keepAlive $ \ka -> + -- start listening for incoming messages + listen ka vars + _ -> start -- Keep waiting + -- keep-alive + keepAlive = do + sendJSON conn GQLKeepAlive + threadDelay 1000000 + keepAlive + -- listen for messages from client + listen ka vars = do + msg <- receiveJSON conn + case msg of + Just (GQLStart i q v o) -- start handling + -> withAsync (handle i q v o >> atomically (M.delete i vars)) $ \t -> do + atomically $ M.insert t i vars + listen ka vars + Just (GQLStop i) -- stop with handling that query + -> do r <- atomically $ M.lookup i vars + case r of + Nothing -> return () + Just a -> do cancel a + atomically $ M.delete i vars + listen ka vars + Just GQLTerminate -- terminate all queries + -> do cancelAll ka vars + sendClose conn ("GraphQL session terminated" :: T.Text) + _ -> listen ka vars -- Keep going + -- Handle a single query + handle i q v o + = case parseExecutableDoc q of + Left err -> sendJSON conn (GQLError i (A.toJSON err)) + Right d -> do + f o v d (cndt i) + sendJSON conn (GQLComplete i) + -- Conduit which sends the results via the wire + cndt i = do + msg <- await + case msg of + Nothing -> return () + Just v -> do liftIO $ sendJSON conn (GQLData i v) + cndt i + -- Cancel all pending subscriptions + cancelAll ka vars + = do cancel ka + vs <- atomically $ L.toList $ M.listT vars + forM_ (map snd vs) cancel + +receiveJSON :: A.FromJSON a => Connection -> IO (Maybe a) +receiveJSON conn = do + d <- receiveData conn + return $ A.decode d + +sendJSON :: A.ToJSON a => Connection -> a -> IO () +sendJSON conn v + = sendTextData conn (A.encode v) + +data ClientMessage + = GQLConnectionInit { initPayload :: Maybe A.Value } + | GQLStart { clientMsgId :: T.Text + , query :: T.Text + , variables :: VariableMapC + , operationName :: Maybe T.Text} + | GQLStop { clientMsgId :: T.Text } + | GQLTerminate + deriving Show + +data ServerMessage + = GQLConnectionError { errorPayload :: Maybe A.Value } + | GQLConnectionAck + | GQLData { serverMsgId :: T.Text + , payload :: A.Value } + | GQLError { serverMsgId :: T.Text + , payload :: A.Value } + | GQLComplete { serverMsgId :: T.Text} + | GQLKeepAlive + deriving Show + +instance A.FromJSON ClientMessage where + parseJSON = A.withObject "ClientMessage" $ \v -> do + ty :: String <- v .: "type" + case ty of + "connection_init" + -> GQLConnectionInit <$> v .:? "payload" + "start" + -> do i <- v .: "id" + (q,vrs,opN) <- v .: "payload" >>= parsePayload + pure $ GQLStart i q vrs opN + "stop" + -> GQLStop <$> v .: "id" + "terminate" + -> pure GQLTerminate + _ -> empty + where + parsePayload = A.withObject "ClientMessage/GQL_START" $ + \v -> (,,) <$> v .: "query" + <*> (v .: "variables" <|> pure HM.empty) + <*> v .:? "operationName" + +theType :: (A.KeyValue kv) => T.Text -> kv +theType t = "type" .= t + +instance A.ToJSON ServerMessage where + toJSON (GQLConnectionError e) + = A.object [theType "connection_error", "payload" .= e] + toJSON GQLConnectionAck + = A.object [theType "connection_acl"] + toJSON (GQLData i p) + = A.object [theType "data", "id" .= i, "payload" .= p] + toJSON (GQLError i p) + = A.object [theType "error", "id" .= i, "payload" .= p] + toJSON (GQLComplete i) + = A.object [theType "complete", "id" .= i] + toJSON GQLKeepAlive + = A.object [theType "connection_keep_alive"]