Skip to content

Commit

Permalink
Merge pull request #3390 from kinow/add-subscriptions
Browse files Browse the repository at this point in the history
Add subscriptions
  • Loading branch information
hjoliver authored Oct 17, 2019
2 parents e1a8bcc + 305de13 commit 8564007
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 8 deletions.
117 changes: 109 additions & 8 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,20 @@

"""GraphQL API schema via Graphene implementation."""

from graphene import (
Boolean, Field, Float, ID, InputObjectType, Int,
List, Mutation, ObjectType, Schema, String, Union
)
from graphene.types.generic import GenericScalar
from graphene.utils.str_converters import to_snake_case
import asyncio
from typing import Callable, AsyncGenerator, Any

from cylc.flow.task_state import TASK_STATUSES_ORDERED
from cylc.flow.ws_data_mgr import (
ID_DELIM, FAMILIES, FAMILY_PROXIES,
JOBS, TASKS, TASK_PROXIES
)

from graphene import (
Boolean, Field, Float, ID, InputObjectType, Int,
List, Mutation, ObjectType, Schema, String, Union
)
from graphene.types.generic import GenericScalar
from graphene.utils.str_converters import to_snake_case

PROXY_NODES = 'proxy_nodes'

Expand Down Expand Up @@ -1182,4 +1183,104 @@ class Mutations(ObjectType):
description=TaskActions._meta.description)


schema = Schema(query=Queries, mutation=Mutations)
# ** Subscription Related ** #

def to_subscription(func: Callable, sleep_seconds: float = 5.) -> Callable:
"""Wraps a function in a while-true-sleep, transforming
the function into an async-generator, used by the
websockets/subscriptions.
Args:
func (Callable): a callable.
sleep_seconds (float): asyncio sleep interval in seconds.
Returns:
Callable: a callable async-generator wrapping the original callable.
"""
async def gen(*args: Any, **kwargs: Any) -> AsyncGenerator[Any, None]:
"""
Args:
*args: Variable length argument list, varies as per schema.
**kwargs: Arbitrary keyword arguments, varies as per schema.
Returns:
AsyncGenerator[Any, None]: an async generator that will
yield values from resolvers.
"""
while True:
yield await func(*args, **kwargs)
await asyncio.sleep(sleep_seconds)
return gen


class Subscriptions(ObjectType):
"""Defines the subscriptions available in the schema."""
class Meta:
description = """Multi-Workflow root level subscriptions."""
workflows = List(
Workflow,
description=Workflow._meta.description,
ids=List(ID, default_value=[]),
exids=List(ID, default_value=[]),
resolver=to_subscription(get_workflows))
job = Field(
Job,
description=Job._meta.description,
id=ID(required=True),
resolver=to_subscription(get_node_by_id))
jobs = List(
Job,
description=Job._meta.description,
args=all_jobs_args,
resolver=to_subscription(get_nodes_all))
task = Field(
Task,
description=Task._meta.description,
id=ID(required=True),
resolver=to_subscription(get_node_by_id))
tasks = List(
Task,
description=Task._meta.description,
args=all_def_args,
resolver=to_subscription(get_nodes_all))
task_proxy = Field(
TaskProxy,
description=TaskProxy._meta.description,
id=ID(required=True),
resolver=to_subscription(get_node_by_id))
task_proxies = List(
TaskProxy,
description=TaskProxy._meta.description,
args=all_proxy_args,
resolver=to_subscription(get_nodes_all))
family = Field(
Family,
description=Family._meta.description,
id=ID(required=True),
resolver=to_subscription(get_node_by_id))
families = List(
Family,
description=Family._meta.description,
args=all_def_args,
resolver=to_subscription(get_nodes_all))
family_proxy = Field(
FamilyProxy,
description=FamilyProxy._meta.description,
id=ID(required=True),
resolver=to_subscription(get_node_by_id))
family_proxies = List(
FamilyProxy,
description=FamilyProxy._meta.description,
args=all_proxy_args,
resolver=to_subscription(get_nodes_all))
edges = List(
Edge,
description=Edge._meta.description,
args=all_edge_args,
resolver=to_subscription(get_edges_all))
nodes_edges = Field(
NodesEdges,
description=NodesEdges._meta.description,
args=nodes_edges_args_all,
resolver=to_subscription(get_nodes_edges))


schema = Schema(query=Queries, subscription=Subscriptions, mutation=Mutations)
35 changes: 35 additions & 0 deletions cylc/flow/tests/network/test_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2019 NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Tests for cylc.flow.network.schema"""

import asyncio
from types import AsyncGeneratorType

from cylc.flow.network.schema import to_subscription


def test_to_subscription():
"""Test to_subscription function."""
async def async_callable():
return []

assert asyncio.iscoroutine(async_callable())

async_generator = to_subscription(async_callable)

assert not asyncio.iscoroutine(async_generator())
assert isinstance(async_generator(), AsyncGeneratorType)

0 comments on commit 8564007

Please sign in to comment.