GraphQL subscriptions for AWS Lambda and API Gateway WebSockets.
Have all the functionality of GraphQL subscriptions on a stateful server without the cost.
Note: This project uses the graphql-ws protocol under the hood.
Seriously, read this first before you even think about using this.
This is in beta
This is beta and should be treated as such.
AWS API Gateway Limitations
There are a few noteworthy limitations to the AWS API Gateway WebSocket implementation.
Note: If you work on AWS and want to run through this, hit me up!
Default socket idleness detection in API Gateway is unpredictable.
It is strongly recommended to use socket idleness detection listed here. Alternatively, client->server pinging can be used to keep a connection alive.
API Gateway's current socket closing functionality doesn't support any kind of message/payload. Along with this, graphql-ws won't support error messages.
Because of this limitation, there is no clear way to communicate subprotocol errors to the client. In the case of a subprotocol error the socket will be closed by the server (with no meaningful disconnect payload).
import { createInstance } from 'subscriptionless';
const instance = createInstance({
schema,
});
export const gatewayHandler = instance.gatewayHandler;
Set up API Gateway to route WebSocket events to the exported handler.
πΎ serverless framework example
functions:
websocket:
name: my-subscription-lambda
handler: ./handler.gatewayHandler
events:
- websocket:
route: $connect
- websocket:
route: $disconnect
- websocket:
route: $default
πΎ terraform example
resource "aws_apigatewayv2_api" "ws" {
name = "websocket-api"
protocol_type = "WEBSOCKET"
route_selection_expression = "$request.body.action"
}
resource "aws_apigatewayv2_route" "default_route" {
api_id = aws_apigatewayv2_api.ws.id
route_key = "$default"
target = "integrations/${aws_apigatewayv2_integration.default_integration.id}"
}
resource "aws_apigatewayv2_route" "connect_route" {
api_id = aws_apigatewayv2_api.ws.id
route_key = "$connect"
target = "integrations/${aws_apigatewayv2_integration.default_integration.id}"
}
resource "aws_apigatewayv2_route" "disconnect_route" {
api_id = aws_apigatewayv2_api.ws.id
route_key = "$disconnect"
target = "integrations/${aws_apigatewayv2_integration.default_integration.id}"
}
resource "aws_apigatewayv2_integration" "default_integration" {
api_id = aws_apigatewayv2_api.ws.id
integration_type = "AWS_PROXY"
integration_uri = aws_lambda_function.gateway_handler.invoke_arn
}
resource "aws_lambda_permission" "apigateway_invoke_lambda" {
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.gateway_handler.function_name
principal = "apigateway.amazonaws.com"
}
resource "aws_apigatewayv2_deployment" "ws" {
api_id = aws_apigatewayv2_api.ws.id
triggers = {
redeployment = sha1(join(",", tolist([
jsonencode(aws_apigatewayv2_integration.default_integration),
jsonencode(aws_apigatewayv2_route.default_route),
jsonencode(aws_apigatewayv2_route.connect_route),
jsonencode(aws_apigatewayv2_route.disconnect_route),
])))
}
depends_on = [
aws_apigatewayv2_route.default_route,
aws_apigatewayv2_route.connect_route,
aws_apigatewayv2_route.disconnect_route
]
}
resource "aws_apigatewayv2_stage" "ws" {
api_id = aws_apigatewayv2_api.ws.id
name = "example"
deployment_id = aws_apigatewayv2_deployment.ws.id
}
In-flight connections and subscriptions need to be persisted.
π Changing DynamoDB table names
Use the tableNames
argument to override the default table names.
const instance = createInstance({
/* ... */
tableNames: {
connections: 'my_connections',
subscriptions: 'my_subscriptions',
},
});
πΎ serverless framework example
resources:
Resources:
# Table for tracking connections
connectionsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: ${self:provider.environment.CONNECTIONS_TABLE}
AttributeDefinitions:
- AttributeName: id
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
# Table for tracking subscriptions
subscriptionsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: ${self:provider.environment.SUBSCRIPTIONS_TABLE}
AttributeDefinitions:
- AttributeName: id
AttributeType: S
- AttributeName: topic
AttributeType: S
- AttributeName: connectionId
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
- AttributeName: topic
KeyType: RANGE
GlobalSecondaryIndexes:
- IndexName: ConnectionIndex
KeySchema:
- AttributeName: connectionId
KeyType: HASH
Projection:
ProjectionType: ALL
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
- IndexName: TopicIndex
KeySchema:
- AttributeName: topic
KeyType: HASH
Projection:
ProjectionType: ALL
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
πΎ terraform example
resource "aws_dynamodb_table" "connections-table" {
name = "subscriptionless_connections"
billing_mode = "PROVISIONED"
read_capacity = 1
write_capacity = 1
hash_key = "id"
attribute {
name = "id"
type = "S"
}
ttl {
attribute_name = "ttl"
enabled = true
}
}
resource "aws_dynamodb_table" "subscriptions-table" {
name = "subscriptionless_subscriptions"
billing_mode = "PROVISIONED"
read_capacity = 1
write_capacity = 1
hash_key = "id"
range_key = "topic"
attribute {
name = "id"
type = "S"
}
attribute {
name = "topic"
type = "S"
}
attribute {
name = "connectionId"
type = "S"
}
global_secondary_index {
name = "ConnectionIndex"
hash_key = "connectionId"
write_capacity = 1
read_capacity = 1
projection_type = "ALL"
}
global_secondary_index {
name = "TopicIndex"
hash_key = "topic"
write_capacity = 1
read_capacity = 1
projection_type = "ALL"
}
ttl {
attribute_name = "ttl"
enabled = true
}
}
Set up server->client pinging for socket idleness detection.
Note: While not a hard requirement, this is strongly recommended.
π Configuring instance
Pass a ping
argument to configure delays and what state machine to invoke.
const instance = createInstance({
/* ... */
ping: {
interval: 60, // Rate in seconds to send ping message
timeout: 30, // Threshold for pong response before closing socket
machineArn: process.env.MACHINE_ARN, // State machine to invoke
},
});
Export the resulting handler for use by the state machine.
export const stateMachineHandler = instance.stateMachineHandler;
πΎ serverless framework example
Create a function which exports the aforementioned machine handler.
functions:
machine:
handler: src/handler.stateMachineHandler
Use the serverless-step-functions plugin to create a state machine which invokes the machine handler.
stepFunctions:
stateMachines:
ping:
role: !GetAtt IamRoleLambdaExecution.Arn
definition:
StartAt: Wait
States:
Eval:
Type: Task
Resource: !GetAtt machine.Arn
Next: Choose
Wait:
Type: Wait
SecondsPath: '$.seconds'
Next: Eval
Choose:
Type: Choice
Choices:
- Not:
Variable: '$.state'
StringEquals: 'ABORT'
Next: Wait
Default: End
End:
Type: Pass
End: true
The state machine arn can be passed to your websocket handler function via outputs.
Note: naming of resources will be dependent the function/machine naming in the serverless config.
functions:
subscription:
handler: src/handler.gatewayHandler
environment:
PING_STATE_MACHINE_ARN: ${self:resources.Outputs.PingStateMachine.Value}
# ...
resources:
Outputs:
PingStateMachine:
Value:
Ref: PingStepFunctionsStateMachine
On connection_init
, the state machine will be invoked. Ensure that the websocket handler has the following permissions.
- Effect: Allow
Resource: !GetAtt PingStepFunctionsStateMachine.Arn
Action:
- states:StartExecution
The state machine itself will need the following permissions
- Effect: Allow
Resource: !GetAtt connectionsTable.Arn
Action:
- dynamodb:GetItem
- dynamodb:UpdateItem
- Effect: Allow
Resource: '*'
Action:
- execute-api:*
Note: For a full reproduction, see the example project.
πΎ terraform example
Create a function which can be invoked by the state machine.
resource "aws_lambda_function" "machine" {
function_name = "machine"
runtime = "nodejs14.x"
filename = data.archive_file.handler.output_path
source_code_hash = data.archive_file.handler.output_base64sha256
handler = "example.stateMachineHandler"
role = aws_iam_role.state_machine_function.arn
environment {
variables = {
CONNECTIONS_TABLE = aws_dynamodb_table.connections.id
SUBSCRIPTIONS_TABLE = aws_dynamodb_table.subscriptions.id
}
}
}
Create the following state machine which will be invoked by the gateway handler.
resource "aws_sfn_state_machine" "ping_state_machine" {
name = "ping-state-machine"
role_arn = aws_iam_role.state_machine.arn
definition = jsonencode({
StartAt = "Wait"
States = {
Wait = {
Type = "Wait"
SecondsPath = "$.seconds"
Next = "Eval"
}
Eval = {
Type = "Task"
Resource = aws_lambda_function.machine.arn
Next = "Choose"
}
Choose = {
Type = "Choice"
Choices = [{
Not = {
Variable = "$.state"
StringEquals = "ABORT"
}
Next = "Wait"
}]
Default = "End"
}
End = {
Type = "Pass"
End = true
}
}
})
}
The state machine arn can be passed to your websocket handler via an environment variable.
resource "aws_lambda_function" "gateway_handler" {
# ...
environment {
variables = {
# ...
PING_STATE_MACHINE_ARN = aws_sfn_state_machine.ping_state_machine.arn
}
}
}
Note: For a full reproduction, see the example project.
subscriptionless
uses it's own PubSub implementation which loosely implements the Apollo PubSub Interface.
Note: Unlike the Apollo
PubSub
library, this implementation is (mostly) stateless
π Subscribing to topics
Use the subscribe
function to associate incoming subscriptions with a topic.
import { subscribe } from 'subscriptionless/subscribe';
export const resolver = {
Subscribe: {
mySubscription: {
resolve: (event, args, context) => {/* ... */}
subscribe: subscribe('MY_TOPIC'),
}
}
}
π Filtering events
Wrap any subscribe
function call in a withFilter
to provide filter conditions.
Note: If a function is provided, it will be called on subscription start and must return a serializable object.
import { withFilter, subscribe } from 'subscriptionless/subscribe';
// Subscription agnostic filter
withFilter(subscribe('MY_TOPIC'), {
attr1: '`attr1` must have this value',
attr2: {
attr3: 'Nested attributes work fine',
},
});
// Subscription specific filter
withFilter(subscribe('MY_TOPIC'), (root, args, context, info) => ({
userId: args.userId,
}));
π Concatenating topic subscriptions
Join multiple topic subscriptions together using concat
.
import { concat, subscribe } from 'subscriptionless/subscribe';
concat(subscribe('TOPIC_1'), subscribe('TOPIC_2'));
π Publishing events
Use the publish
on your subscriptionless instance to publish events to active subscriptions.
instance.publish({
type: 'MY_TOPIC',
payload: 'HELLO',
});
Events can come from many sources
// SNS Event
export const snsHandler = (event) =>
Promise.all(
event.Records.map((r) =>
instance.publish({
topic: r.Sns.TopicArn.substring(r.Sns.TopicArn.lastIndexOf(':') + 1), // Get topic name (e.g. "MY_TOPIC")
payload: JSON.parse(r.Sns.Message),
})
)
);
// Manual Invocation
export const invocationHandler = (payload) =>
instance.publish({ topic: 'MY_TOPIC', payload });
Context values are accessible in all resolver level functions (resolve
, subscribe
, onSubscribe
and onComplete
).
π Default value
Assuming no context
argument is provided, the default value is an object containing a connectionParams
attribute.
This attribute contains the (optionally parsed) payload from connection_init
.
export const resolver = {
Subscribe: {
mySubscription: {
resolve: (event, args, context) => {
console.log(context.connectionParams); // payload from connection_init
},
},
},
};
π Setting static context value
An object can be provided via the context
attribute when calling createInstance
.
const instance = createInstance({
/* ... */
context: {
myAttr: 'hello',
},
});
The default values (above) will be appended to this object prior to execution.
π Setting dynamic context value
A function (optionally async) can be provided via the context
attribute when calling createInstance
.
The default context value is passed as an argument.
const instance = createInstance({
/* ... */
context: ({ connectionParams }) => ({
myAttr: 'hello',
user: connectionParams.user,
}),
});
Side effect handlers can be declared on subscription fields to handle onSubscribe
(start) and onComplete
(stop) events.
π Enabling side effects
For onSubscribe
and onComplete
side effects to work, resolvers must first be passed to prepareResolvers
prior to schema construction.
import { prepareResolvers } from 'subscriptionless/subscribe';
const schema = makeExecutableSchema({
typedefs,
resolvers: prepareResolvers(resolvers),
});
π Adding side-effect handlers
export const resolver = {
Subscribe: {
mySubscription: {
resolve: (event, args, context) => {
/* ... */
},
subscribe: subscribe('MY_TOPIC'),
onSubscribe: (root, args) => {
/* Do something on subscription start */
},
onComplete: (root, args) => {
/* Do something on subscription stop */
},
},
},
};
Global events can be provided when calling createInstance
to track the execution cycle of the lambda.
π Connect (onConnect)
Called on an incoming API Gateway $connect
event.
const instance = createInstance({
/* ... */
onConnect: ({ event }) => {
/* */
},
});
π Disconnect (onDisconnect)
Called on an incoming API Gateway $disconnect
event.
const instance = createInstance({
/* ... */
onDisconnect: ({ event }) => {
/* */
},
});
π Authorization (connection_init)
Called on incoming graphql-ws connection_init
message.
onConnectionInit
can be used to verify the connection_init
payload prior to persistence.
Note: Any sensitive data in the incoming message should be removed at this stage.
const instance = createInstance({
/* ... */
onConnectionInit: ({ message }) => {
const token = message.payload.token;
if (!myValidation(token)) {
throw Error('Token validation failed');
}
// Prevent sensitive data from being written to DB
return {
...message.payload,
token: undefined,
};
},
});
By default, the (optionally parsed) payload will be accessible via context.
π Subscribe (onSubscribe)
Called on incoming graphql-ws subscribe
message.
const instance = createInstance({
/* ... */
onSubscribe: ({ event, message }) => {
/* */
},
});
π Complete (onComplete)
Called on graphql-ws complete
message.
const instance = createInstance({
/* ... */
onComplete: ({ event, message }) => {
/* */
},
});
π Ping (onPing)
Called on incoming graphql-ws ping
message.
const instance = createInstance({
/* ... */
onPing: ({ event, message }) => {
/* */
},
});
π Pong (onPong)
Called on incoming graphql-ws pong
message.
const instance = createInstance({
/* ... */
onPong: ({ event, message }) => {
/* */
},
});
π Error (onError)
Called on unexpected errors during resolution of API Gateway or graphql-ws events.
const instance = createInstance({
/* ... */
onError: (error, context) => {
/* */
},
});