Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: multiple subscriptions && _st_d_within in subscriptions (fix #3239) #4551

Merged
merged 14 commits into from
May 13, 2020
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@ Read more about the session argument for computed fields in the [docs](https://h

(Add entries here in the order of: server, console, cli, docs, others)

- server: fix mishandling of GeoJSON inputs in subscriptions (fix #3239)
- console: avoid count queries for large tables (#4692)
- console: add read replica support section to pro popup (#4118)
- console: allow modifying default value for PK (fix #4075) (#4679)
33 changes: 4 additions & 29 deletions server/src-lib/Hasura/GraphQL/Execute.hs
Original file line number Diff line number Diff line change
@@ -331,32 +331,6 @@ getMutOp ctx sqlGenCtx userInfo manager reqHeaders selSet =
ordByCtx = _gOrdByCtx ctx
insCtxMap = _gInsCtxMap ctx

getSubsOpM
:: ( MonadError QErr m
, MonadReader r m
, Has QueryCtxMap r
, Has FieldMap r
, Has OrdByCtx r
, Has SQLGenCtx r
, Has UserInfo r
, MonadIO m
, HasVersion
)
=> PGExecCtx
-> QueryReusability
-> VQ.Field
-> QueryActionExecuter
-> m (EL.LiveQueryPlan, Maybe EL.ReusableLiveQueryPlan)
getSubsOpM pgExecCtx initialReusability fld actionExecuter =
case VQ._fName fld of
"__typename" ->
throwVE "you cannot create a subscription on '__typename' field"
_ -> do
(astUnresolved, finalReusability) <- runReusabilityTWith initialReusability $
GR.queryFldToPGAST fld actionExecuter
let varTypes = finalReusability ^? _Reusable
EL.buildLiveQueryPlan pgExecCtx (VQ._fAlias fld) astUnresolved varTypes

getSubsOp
:: ( MonadError QErr m
, MonadIO m
@@ -368,10 +342,11 @@ getSubsOp
-> UserInfo
-> QueryReusability
-> QueryActionExecuter
-> VQ.Field
-> VQ.SelSet
-> m (EL.LiveQueryPlan, Maybe EL.ReusableLiveQueryPlan)
getSubsOp pgExecCtx gCtx sqlGenCtx userInfo queryReusability actionExecuter fld =
runE gCtx sqlGenCtx userInfo $ getSubsOpM pgExecCtx queryReusability fld actionExecuter
getSubsOp pgExecCtx gCtx sqlGenCtx userInfo queryReusability actionExecuter fields =
runE gCtx sqlGenCtx userInfo $ EL.buildLiveQueryPlan pgExecCtx queryReusability actionExecuter fields
-- runE gCtx sqlGenCtx userInfo $ getSubsOpM pgExecCtx queryReusability fld actionExecuter

execRemoteGQ
:: ( HasVersion
100 changes: 65 additions & 35 deletions server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Plan.hs
Original file line number Diff line number Diff line change
@@ -44,43 +44,61 @@ import Data.Has
import Data.UUID (UUID)

import qualified Hasura.GraphQL.Resolve as GR
import qualified Hasura.GraphQL.Resolve.Action as RA
import qualified Hasura.GraphQL.Resolve.Types as GR
import qualified Hasura.GraphQL.Transport.HTTP.Protocol as GH
import qualified Hasura.GraphQL.Validate as GV
import qualified Hasura.GraphQL.Validate.Types as GV
import qualified Hasura.SQL.DML as S

import Hasura.Db
import Hasura.EncJSON
import Hasura.GraphQL.Utils
import Hasura.RQL.Types
import Hasura.SQL.Error
import Hasura.SQL.Types
import Hasura.SQL.Value
import Hasura.Server.Version (HasVersion)

-- -------------------------------------------------------------------------------------------------
-- Multiplexed queries

newtype MultiplexedQuery = MultiplexedQuery { unMultiplexedQuery :: Q.Query }
deriving (Show, Eq, Hashable, J.ToJSON)

mkMultiplexedQuery :: Q.Query -> MultiplexedQuery
mkMultiplexedQuery baseQuery =
MultiplexedQuery . Q.fromText $ foldMap Q.getQueryText [queryPrefix, baseQuery, querySuffix]
mkMultiplexedQuery :: Map.HashMap G.Alias GR.QueryRootFldResolved -> MultiplexedQuery
mkMultiplexedQuery rootFields = MultiplexedQuery . Q.fromBuilder . toSQL $ S.mkSelect
{ S.selExtr =
-- SELECT _subs.result_id, _fld_resp.root AS result
[ S.Extractor (mkQualIden (Iden "_subs") (Iden "result_id")) Nothing
, S.Extractor (mkQualIden (Iden "_fld_resp") (Iden "root")) (Just . S.Alias $ Iden "result") ]
, S.selFrom = Just $ S.FromExp [S.FIJoin $
S.JoinExpr subsInputFromItem S.LeftOuter responseLateralFromItem (S.JoinOn $ S.BELit True)]
}
where
queryPrefix =
[Q.sql|
select
_subs.result_id, _fld_resp.root as result
from
unnest(
$1::uuid[], $2::json[]
) _subs (result_id, result_vars)
left outer join lateral
(
|]

querySuffix =
[Q.sql|
) _fld_resp ON ('true')
|]
-- FROM unnest($1::uuid[], $2::json[]) _subs (result_id, result_vars)
subsInputFromItem = S.FIUnnest
[S.SEPrep 1 `S.SETyAnn` S.TypeAnn "uuid[]", S.SEPrep 2 `S.SETyAnn` S.TypeAnn "json[]"]
(S.Alias $ Iden "_subs")
[S.SEIden $ Iden "result_id", S.SEIden $ Iden "result_vars"]

-- LEFT OUTER JOIN LATERAL ( ... ) _fld_resp
responseLateralFromItem = S.mkLateralFromItem selectRootFields (S.Alias $ Iden "_fld_resp")
selectRootFields = S.mkSelect
{ S.selExtr = [S.Extractor rootFieldsJsonAggregate (Just . S.Alias $ Iden "root")]
, S.selFrom = Just . S.FromExp $
flip map (Map.toList rootFields) $ \(fieldAlias, resolvedAST) ->
S.mkSelFromItem (GR.toSQLSelect resolvedAST) (S.Alias $ aliasToIden fieldAlias)
}

-- json_build_object('field1', field1.root, 'field2', field2.root, ...)
rootFieldsJsonAggregate = S.SEFnApp "json_build_object" rootFieldsJsonPairs Nothing
rootFieldsJsonPairs = flip concatMap (Map.keys rootFields) $ \fieldAlias ->
[ S.SELit (G.unName $ G.unAlias fieldAlias)
, mkQualIden (aliasToIden fieldAlias) (Iden "root") ]

mkQualIden prefix = S.SEQIden . S.QIden (S.QualIden prefix Nothing) -- TODO fix this Nothing of course
aliasToIden = Iden . G.unName . G.unAlias

-- | Resolves an 'GR.UnresolvedVal' by converting 'GR.UVPG' values to SQL expressions that refer to
-- the @result_vars@ input object, collecting variable values along the way.
@@ -103,11 +121,13 @@ resolveMultiplexedValue = \case
GR.UVSQL sqlExp -> pure sqlExp
GR.UVSession -> pure $ fromResVars (PGTypeScalar PGJSON) ["session"]
where
fromResVars ty jPath =
flip S.SETyAnn (S.mkTypeAnn ty) $ S.SEOpApp (S.SQLOp "#>>")
fromResVars pgType jPath = addTypeAnnotation pgType $ S.SEOpApp (S.SQLOp "#>>")
[ S.SEQIden $ S.QIden (S.QualIden (Iden "_subs") Nothing) (Iden "result_vars")
, S.SEArray $ map S.SELit jPath
]
addTypeAnnotation pgType = flip S.SETyAnn (S.mkTypeAnn pgType) . case pgType of
PGTypeScalar scalarType -> withConstructorFn scalarType
PGTypeArray _ -> id

newtype CohortId = CohortId { unCohortId :: UUID }
deriving (Show, Eq, Hashable, J.ToJSON, Q.FromCol)
@@ -230,7 +250,6 @@ data LiveQueryPlan
data ParameterizedLiveQueryPlan
= ParameterizedLiveQueryPlan
{ _plqpRole :: !RoleName
, _plqpAlias :: !G.Alias
, _plqpQuery :: !MultiplexedQuery
} deriving (Show)
$(J.deriveToJSON (J.aesonDrop 4 J.snakeCase) ''ParameterizedLiveQueryPlan)
@@ -249,30 +268,41 @@ buildLiveQueryPlan
:: ( MonadError QErr m
, MonadReader r m
, Has UserInfo r
, Has GR.FieldMap r
, Has GR.OrdByCtx r
, Has GR.QueryCtxMap r
, Has SQLGenCtx r
, HasVersion
, MonadIO m
)
=> PGExecCtx
-> G.Alias
-> GR.QueryRootFldUnresolved
-> Maybe GV.ReusableVariableTypes
-> GV.QueryReusability
-> RA.QueryActionExecuter
-> GV.SelSet
-> m (LiveQueryPlan, Maybe ReusableLiveQueryPlan)
buildLiveQueryPlan pgExecCtx fieldAlias astUnresolved varTypes = do
userInfo <- asks getter
buildLiveQueryPlan pgExecCtx initialReusability actionExecutioner fields = do
((resolvedASTs, (queryVariableValues, syntheticVariableValues)), finalReusability) <-
GV.runReusabilityTWith initialReusability . flip runStateT mempty $
fmap Map.fromList . for (toList fields) $ \field -> case GV._fName field of
"__typename" -> throwVE "you cannot create a subscription on '__typename' field"
_ -> do
unresolvedAST <- GR.queryFldToPGAST field actionExecutioner
resolvedAST <- GR.traverseQueryRootFldAST resolveMultiplexedValue unresolvedAST
pure (GV._fAlias field, resolvedAST)

(astResolved, (queryVariableValues, syntheticVariableValues)) <- flip runStateT mempty $
GR.traverseQueryRootFldAST resolveMultiplexedValue astUnresolved
let pgQuery = mkMultiplexedQuery $ GR.toPGQuery astResolved
userInfo <- asks getter
let multiplexedQuery = mkMultiplexedQuery resolvedASTs
roleName = _uiRole userInfo
parameterizedPlan = ParameterizedLiveQueryPlan roleName fieldAlias pgQuery
parameterizedPlan = ParameterizedLiveQueryPlan roleName multiplexedQuery

-- We need to ensure that the values provided for variables
-- are correct according to Postgres. Without this check
-- an invalid value for a variable for one instance of the
-- subscription will take down the entire multiplexed query
-- We need to ensure that the values provided for variables are correct according to Postgres.
-- Without this check an invalid value for a variable for one instance of the subscription will
-- take down the entire multiplexed query.
validatedQueryVars <- validateVariables pgExecCtx queryVariableValues
validatedSyntheticVars <- validateVariables pgExecCtx (toList syntheticVariableValues)
let cohortVariables = CohortVariables (_uiSession userInfo) validatedQueryVars validatedSyntheticVars
plan = LiveQueryPlan parameterizedPlan cohortVariables
varTypes = finalReusability ^? GV._Reusable
reusablePlan = ReusableLiveQueryPlan parameterizedPlan validatedSyntheticVars <$> varTypes
pure (plan, reusablePlan)

18 changes: 3 additions & 15 deletions server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs
Original file line number Diff line number Diff line change
@@ -44,7 +44,6 @@ import qualified Data.HashMap.Strict as Map
import qualified Data.Time.Clock as Clock
import qualified Data.UUID as UUID
import qualified Data.UUID.V4 as UUID
import qualified Language.GraphQL.Draft.Syntax as G
import qualified ListT
import qualified StmContainers.Map as STMMap
import qualified System.Metrics.Distribution as Metrics
@@ -65,11 +64,7 @@ import Hasura.Session
-- -------------------------------------------------------------------------------------------------
-- Subscribers

data Subscriber
= Subscriber
{ _sRootAlias :: !G.Alias
, _sOnChangeCallback :: !OnChange
}
newtype Subscriber = Subscriber { _sOnChangeCallback :: OnChange }

-- | live query onChange metadata, used for adding more extra analytics data
data LiveQueryMetadata
@@ -85,7 +80,6 @@ data LiveQueryResponse
}

type LGQResponse = GQResult LiveQueryResponse

type OnChange = LGQResponse -> IO ()

newtype SubscriberId = SubscriberId { _unSinkId :: UUID.UUID }
@@ -183,7 +177,6 @@ data CohortSnapshot

pushResultToCohort
:: GQResult EncJSON
-- ^ a response that still needs to be wrapped with each 'Subscriber'’s root 'G.Alias'
-> Maybe ResponseHash
-> LiveQueryMetadata
-> CohortSnapshot
@@ -202,13 +195,8 @@ pushResultToCohort result !respHashM (LiveQueryMetadata dTime) cohortSnapshot =
pushResultToSubscribers sinks
where
CohortSnapshot _ respRef curSinks newSinks = cohortSnapshot
pushResultToSubscribers = A.mapConcurrently_ $ \(Subscriber alias action) ->
let aliasText = G.unName $ G.unAlias alias
wrapWithAlias response = LiveQueryResponse
{ _lqrPayload = encJToLBS $ encJFromAssocList [(aliasText, response)]
, _lqrExecutionTime = dTime
}
in action (wrapWithAlias <$> result)
response = result <&> \payload -> LiveQueryResponse (encJToLBS payload) dTime
pushResultToSubscribers = A.mapConcurrently_ $ \(Subscriber action) -> action response

-- -------------------------------------------------------------------------------------------------
-- Pollers
4 changes: 2 additions & 2 deletions server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs
Original file line number Diff line number Diff line change
@@ -107,11 +107,11 @@ addLiveQuery logger lqState plan onResultAction = do
where
LiveQueriesState lqOpts pgExecCtx lqMap = lqState
LiveQueriesOptions batchSize refetchInterval = lqOpts
LiveQueryPlan (ParameterizedLiveQueryPlan role alias query) cohortKey = plan
LiveQueryPlan (ParameterizedLiveQueryPlan role query) cohortKey = plan

handlerId = PollerKey role query

!subscriber = Subscriber alias onResultAction
!subscriber = Subscriber onResultAction
addToCohort sinkId handlerC =
TMap.insert subscriber sinkId $ _cNewSubscribers handlerC

22 changes: 16 additions & 6 deletions server/src-lib/Hasura/GraphQL/Resolve.hs
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ module Hasura.GraphQL.Resolve
, QueryRootFldUnresolved
, QueryRootFldResolved
, toPGQuery
, toSQLSelect

, RIntro.schemaR
, RIntro.typeR
@@ -68,12 +69,12 @@ traverseQueryRootFldAST f = \case

toPGQuery :: QueryRootFldResolved -> Q.Query
toPGQuery = \case
QRFPk s -> DS.selectQuerySQL DS.JASSingleObject s
QRFSimple s -> DS.selectQuerySQL DS.JASMultipleRows s
QRFAgg s -> DS.selectAggQuerySQL s
QRFActionSelect s -> DS.selectQuerySQL DS.JASSingleObject s
QRFActionExecuteObject s -> DS.selectQuerySQL DS.JASSingleObject s
QRFActionExecuteList s -> DS.selectQuerySQL DS.JASMultipleRows s
QRFPk s -> Q.fromBuilder $ toSQL $ DS.mkSQLSelect DS.JASSingleObject s
QRFSimple s -> Q.fromBuilder $ toSQL $ DS.mkSQLSelect DS.JASMultipleRows s
QRFAgg s -> Q.fromBuilder $ toSQL $ DS.mkAggSelect s
QRFActionSelect s -> Q.fromBuilder $ toSQL $ DS.mkSQLSelect DS.JASSingleObject s
QRFActionExecuteObject s -> Q.fromBuilder $ toSQL $ DS.mkSQLSelect DS.JASSingleObject s
QRFActionExecuteList s -> Q.fromBuilder $ toSQL $ DS.mkSQLSelect DS.JASMultipleRows s

validateHdrs
:: (Foldable t, QErrM m) => UserInfo -> t Text -> m ()
@@ -189,3 +190,12 @@ getOpCtx f = do
opCtxMap <- asks getter
onNothing (Map.lookup f opCtxMap) $ throw500 $
"lookup failed: opctx: " <> showName f

toSQLSelect :: QueryRootFldResolved -> S.Select
toSQLSelect = \case
QRFPk s -> DS.mkSQLSelect DS.JASSingleObject s
QRFSimple s -> DS.mkSQLSelect DS.JASMultipleRows s
QRFAgg s -> DS.mkAggSelect s
QRFActionSelect s -> DS.mkSQLSelect DS.JASSingleObject s
QRFActionExecuteObject s -> DS.mkSQLSelect DS.JASSingleObject s
QRFActionExecuteList s -> DS.mkSQLSelect DS.JASSingleObject s
2 changes: 1 addition & 1 deletion server/src-lib/Hasura/GraphQL/Resolve/Action.hs
Original file line number Diff line number Diff line change
@@ -171,7 +171,7 @@ resolveActionMutationSync field executionContext sessionVariables = do
(_fType field) $ _fSelSet field
astResolved <- RS.traverseAnnSimpleSel resolveValTxt selectAstUnresolved
let jsonAggType = mkJsonAggSelect outputType
return $ (,respHeaders) $ asSingleRowJsonResp (RS.selectQuerySQL jsonAggType astResolved) []
return $ (,respHeaders) $ asSingleRowJsonResp (Q.fromBuilder $ toSQL $ RS.mkSQLSelect jsonAggType astResolved) []
where
ActionExecutionContext actionName outputType outputFields definitionList resolvedWebhook confHeaders
forwardClientHeaders = executionContext
22 changes: 12 additions & 10 deletions server/src-lib/Hasura/GraphQL/Validate.hs
Original file line number Diff line number Diff line change
@@ -21,8 +21,8 @@ import Data.Has

import qualified Data.HashMap.Strict as Map
import qualified Data.HashSet as HS
import qualified Data.Sequence as Seq
import qualified Language.GraphQL.Draft.Syntax as G
import qualified Data.Sequence as Seq

import Hasura.GraphQL.Schema
import Hasura.GraphQL.Transport.HTTP.Protocol
@@ -58,8 +58,7 @@ getTypedOp opNameM selSets opDefs =
throwVE $ "operationName cannot be used when " <>
"an anonymous operation exists in the document"
(Nothing, [selSet], []) ->
return $ G.TypedOperationDefinition
G.OperationTypeQuery Nothing [] [] selSet
return $ G.TypedOperationDefinition G.OperationTypeQuery Nothing [] [] selSet
(Nothing, [], [opDef]) ->
return opDef
(Nothing, _, _) ->
@@ -145,7 +144,7 @@ validateFrag (G.FragmentDefinition n onTy dirs selSet) = do
data RootSelSet
= RQuery !SelSet
| RMutation !SelSet
| RSubscription !Field
| RSubscription !SelSet
deriving (Show, Eq)

validateGQ
@@ -172,12 +171,15 @@ validateGQ (QueryParts opDef opRoot fragDefsL varValsM) = do
G.OperationTypeQuery -> return $ RQuery selSet
G.OperationTypeMutation -> return $ RMutation selSet
G.OperationTypeSubscription ->
case Seq.viewl selSet of
Seq.EmptyL -> throw500 "empty selset for subscription"
fld Seq.:< rst -> do
unless (null rst) $
throwVE "subscription must select only one top level field"
return $ RSubscription fld
case selSet of
Seq.Empty -> throw500 "empty selset for subscription"
(_ Seq.:<| rst) -> do
-- As an internal testing feature, we support subscribing to multiple
-- selection sets. First check if the corresponding directive is set.
let multipleAllowed = elem (G.Directive "_multiple_top_level_fields" []) (G._todDirectives opDef)
unless (multipleAllowed || null rst) $
throwVE "subscriptions must select one top level field"
return $ RSubscription selSet

isQueryInAllowlist :: GQLExecDoc -> HS.HashSet GQLQuery -> Bool
isQueryInAllowlist q = HS.member gqlQuery
3 changes: 3 additions & 0 deletions server/src-lib/Hasura/GraphQL/Validate/Types.hs
Original file line number Diff line number Diff line change
@@ -825,6 +825,9 @@ class (Monad m) => MonadReusability m where
instance (MonadReusability m) => MonadReusability (ReaderT r m) where
recordVariableUse a b = lift $ recordVariableUse a b
markNotReusable = lift markNotReusable
instance (MonadReusability m) => MonadReusability (StateT s m) where
recordVariableUse a b = lift $ recordVariableUse a b
markNotReusable = lift markNotReusable

newtype ReusabilityT m a = ReusabilityT { unReusabilityT :: StateT QueryReusability m a }
deriving (Functor, Applicative, Monad, MonadError e, MonadReader r, MonadIO)
Loading