Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async scan #150

Merged
merged 10 commits into from
Sep 8, 2020
Merged

async scan #150

merged 10 commits into from
Sep 8, 2020

Conversation

oliver-sanders
Copy link
Member

@oliver-sanders oliver-sanders commented Jul 30, 2020

Addresses: #96
Sibling: cylc/cylc-flow#3724 [MERGED]

This utilises the new scan interface for listing fows.

Note: I was hoping to use this to add stopped flows to the data store, however, I've been having issues there as once you add a stopped flow the resolvers kick in straight away causing errors.
@dwsutherland any ideas on how best to prevent the UIS from trying to sync with stopped flows?

Highlights:

  • Async scanning.
  • Create flow clients only when added.
  • Track stopped flows.

Requirements check-list

  • I have read CONTRIBUTING.md and added my name as a Code Contributor.
  • Contains logically grouped changes (else tidy your branch by rebase).
  • Does not contain off-topic changes (use other PRs for other changes).
  • Appropriate tests are included (unit and/or functional).
  • No change log entry required (why? e.g. invisible to users).
  • No documentation update required.
  • No dependency changes.

@oliver-sanders oliver-sanders self-assigned this Jul 30, 2020
@oliver-sanders oliver-sanders mentioned this pull request Jul 30, 2020
15 tasks
@dwsutherland
Copy link
Member

dwsutherland commented Jul 30, 2020

@dwsutherland any ideas on how best to prevent the UIS from trying to sync with stopped flows?

We might actually be able to hit two birds with one stone here.

Obviously we need to be able to distinguish between live and dead suites, and only sync live ones .. Perhaps with a couple of sets in the workflow manager:

self.the_dead = set()
self.the_living = set()

But we should probably setup a data-store PBWorkflow() data entry for all, with status field running/whatever or dead ... And start new syncs on the difference of self.workflow_mgr.the_living and self.synced (or the like) ... This means:

  • We solve the attempted sync of dead workflows
  • We present dead workflows to the WUI, so the UI can start/resurrect them (which is something on the TODO list).

(P.S. this doesn't restrict our ability to have other conditions for sync, i.e. fine grained sync for only gscan, and only sync when user is connected)

@oliver-sanders
Copy link
Member Author

oliver-sanders commented Jul 31, 2020

Ok, so in the workflows_mgr we now have:

  • active {running, held}
  • inactive {stopped, registered}
  • stopping {running&stopping, held&stopping}

The last commit bypasses the whole sync-workflow stuff to allow the flow to get through to the UI.

Correct approach?

@dwsutherland
Copy link
Member

Correct approach?

Yes, and from here we can:

  • Collect contact information from the inactive ones and make a data-store entry for them (in order to display them)
  • Create an endpoint to start workflows.

@dwsutherland
Copy link
Member

dwsutherland commented Aug 4, 2020

  • Create an endpoint to start workflows.

Can just be a graphql mutation that does/uses cylc_run.py code I suppose

@oliver-sanders
Copy link
Member Author

Can just be a graphql mutation that does/uses cylc_run.py code I suppose

Should be fairly straight forward, just one complication, it will need to handle multiple cylc versions. The defacto way to do that is via the wrapper script, which is kinda cli-only but hey-ho.

I added an option to cylc run to produce JSON output (containing the contact file stuff) to make the work a little bit easier.

@oliver-sanders
Copy link
Member Author

Collect contact information from the inactive ones and make a data-store entry for them (in order to display them)

I had a go at adding the status into the data store, however, this doesn't appear to be coming out at the UI end, any idea how this should be done?

@dwsutherland
Copy link
Member

Had to make these changes to get it working again:

(oliver-uis) sutherlander@cortex-vbox:uiserver$ git diff
diff --git a/cylc/uiserver/data_store_mgr.py b/cylc/uiserver/data_store_mgr.py
index 7b240ea..58cea38 100644
--- a/cylc/uiserver/data_store_mgr.py
+++ b/cylc/uiserver/data_store_mgr.py
@@ -39,8 +39,8 @@ from functools import partial
 import logging
 from time import sleep
 
+from cylc.flow.network import MSG_TIMEOUT
 from cylc.flow.network.server import PB_METHOD_MAP
-from cylc.flow.network.scan import MSG_TIMEOUT
 from cylc.flow.network.subscriber import WorkflowSubscriber, process_delta_msg
 from cylc.flow.data_store_mgr import (
     EDGES, DATA_TEMPLATE, ALL_DELTAS, DELTAS_MAP, WORKFLOW,
diff --git a/cylc/uiserver/workflows_mgr.py b/cylc/uiserver/workflows_mgr.py
index 3d6cf0e..956b40c 100644
--- a/cylc/uiserver/workflows_mgr.py
+++ b/cylc/uiserver/workflows_mgr.py
@@ -34,10 +34,9 @@ import zmq.asyncio
 from cylc.flow import flags, ID_DELIM
 from cylc.flow.exceptions import ClientError, ClientTimeout
 from cylc.flow.hostuserutil import is_remote_host, get_host_ip_by_name
-from cylc.flow.network import API
+from cylc.flow.network import API, MSG_TIMEOUT
 from cylc.flow.network.client import SuiteRuntimeClient
-from cylc.flow.network.scan import MSG_TIMEOUT
-from cylc.flow.network.scan_nt import (
+from cylc.flow.network.scan import (
     api_version,
     contact_info,
     is_active,

Copy link
Member

@dwsutherland dwsutherland left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collect contact information from the inactive ones and make a data-store entry for them (in order to display them)

I had a go at adding the status into the data store, however, this doesn't appear to be coming out at the UI end, any idea how this should be done?

Making the following changes:

(oliver-uis) sutherlander@cortex-vbox:uiserver$ git diff workflows_mgr.py
diff --git a/cylc/uiserver/workflows_mgr.py b/cylc/uiserver/workflows_mgr.py
index 3d6cf0e..7e334e7 100644
--- a/cylc/uiserver/workflows_mgr.py
+++ b/cylc/uiserver/workflows_mgr.py
@@ -34,10 +34,9 @@ import zmq.asyncio
 from cylc.flow import flags, ID_DELIM
 from cylc.flow.exceptions import ClientError, ClientTimeout
 from cylc.flow.hostuserutil import is_remote_host, get_host_ip_by_name
-from cylc.flow.network import API
+from cylc.flow.network import API, MSG_TIMEOUT
 from cylc.flow.network.client import SuiteRuntimeClient
-from cylc.flow.network.scan import MSG_TIMEOUT
-from cylc.flow.network.scan_nt import (
+from cylc.flow.network.scan import (
     api_version,
     contact_info,
     is_active,
@@ -138,7 +137,8 @@ class WorkflowsManager:
         # start scanning
         async for flow in self._scan_pipe:
             name = flow['name']
-            wid = f'{getuser()}{ID_DELIM}{flow["name"]}'
+            owner = getuser()
+            wid = f'{owner}{ID_DELIM}{name}'
 
             if not flow.get('contact'):
                 # this flow isn't running
@@ -146,7 +146,7 @@ class WorkflowsManager:
                     self.active.pop(wid)
                 if wid not in inactive_before:
                     await self.uiserver.data_store_mgr.register_workflow(
-                        wid
+                        wid, name, owner
                     )
                     self.inactive.add(wid)
                 continue
@@ -154,8 +154,8 @@ class WorkflowsManager:
             # reshape the data for the uiserver
             flow.update({
                 'id': wid,
-                # 'name': name,
-                'owner': getuser(),
+                'name': name,
+                'owner': owner,
                 'host': flow[ContactFileFields.HOST],
                 'port': flow[ContactFileFields.PORT],
                 'pub_port': flow[ContactFileFields.PUBLISH_PORT],
sutherlander@cortex-vbox:uiserver$ git diff data_store_mgr.py
diff --git a/cylc/uiserver/data_store_mgr.py b/cylc/uiserver/data_store_mgr.py
index 7b240ea..f6e3e40 100644
--- a/cylc/uiserver/data_store_mgr.py
+++ b/cylc/uiserver/data_store_mgr.py
@@ -39,8 +39,8 @@ from functools import partial
 import logging
 from time import sleep
 
+from cylc.flow.network import MSG_TIMEOUT
 from cylc.flow.network.server import PB_METHOD_MAP
-from cylc.flow.network.scan import MSG_TIMEOUT
 from cylc.flow.network.subscriber import WorkflowSubscriber, process_delta_msg
 from cylc.flow.data_store_mgr import (
     EDGES, DATA_TEMPLATE, ALL_DELTAS, DELTAS_MAP, WORKFLOW,
@@ -90,9 +90,12 @@ class DataStoreMgr:
         )
         await self.entire_workflow_update(ids=[w_id])
 
-    async def register_workflow(self, w_id):
+    async def register_workflow(self, w_id, name, owner):
         flow = deepcopy(DATA_TEMPLATE)
-        flow['status'] = 'stopped'
+        flow[WORKFLOW].id = w_id
+        flow[WORKFLOW].name = name
+        flow[WORKFLOW].owner = owner
+        flow[WORKFLOW].status = 'stopped'
         self.data[w_id] = flow
 
     def purge_workflow(self, w_id):

I was able to get it communicating with the WUI and stopped workflows showing:
image
image
image

However, there are some data fields the WUI summary view/panel is expecting to be there, so breaks:
image

Maybe the WUI needs changed, to display what it can.. But perhaps we can also pull a little more data together for the workflow element (i.e. set last_updated to the time it was last active)..

Nice work! ⭐

cylc/uiserver/data_store_mgr.py Outdated Show resolved Hide resolved
cylc/uiserver/workflows_mgr.py Outdated Show resolved Hide resolved
cylc/uiserver/workflows_mgr.py Outdated Show resolved Hide resolved

# tidy up stopped flows
for wid in active_before - set(self.active):
client = self.active[wid]['req_client']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get an error here when stopping a workflow:

2020-08-20 15:47:20,142 tornado.application ERROR    Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7f9dc2c3cad0>>, <Task finished coro=<WorkflowsManager.gather_workflows() done, defined at /home/sutherlander/cylc/cylc-uiserver/cylc/uiserver/workflows_mgr.py:129> exception=KeyError('sutherlander|quz')>)
Traceback (most recent call last):
  File "/home/sutherlander/.envs/oliver-uis/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/home/sutherlander/.envs/oliver-uis/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "/home/sutherlander/cylc/cylc-uiserver/cylc/uiserver/workflows_mgr.py", line 187, in gather_workflows
    client = self.active[wid]['req_client']
KeyError: 'sutherlander|quz'

Probably because of the self.active.pop(wid)s it the preceding loop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still and issue when stopping suites:

.
.
.
  File "/home/sutherlander/cylc/cylc-uiserver/cylc/uiserver/workflows_mgr.py", line 181, in gather_workflows
    client = self.active[wid]['req_client']
KeyError: 'sutherlander|baz'

cylc/uiserver/data_store_mgr.py Outdated Show resolved Hide resolved
@dwsutherland dwsutherland added this to the 0.3 milestone Aug 20, 2020
@kinow
Copy link
Member

kinow commented Aug 26, 2020

@dwsutherland , here's a diff to get the UI working with the offline workflows.

diff --git a/src/components/cylc/gscan/index.js b/src/components/cylc/gscan/index.js
index a420115..717fc4e 100644
--- a/src/components/cylc/gscan/index.js
+++ b/src/components/cylc/gscan/index.js
@@ -25,23 +25,25 @@
  */
 function getWorkflowSummary (workflow) {
   const states = new Map()
-  for (const taskProxy of workflow.taskProxies) {
-    // a task in waiting, may not have any jobs
-    if (taskProxy.jobs) {
-      for (const job of taskProxy.jobs) {
-        // TODO: temporary fix, as the backend is sending ready jobs, but they will change in cylc flow&uiserver in the future
-        if (job.state === 'ready') {
-          continue
+  if (workflow.taskProxies) {
+    for (const taskProxy of workflow.taskProxies) {
+      // a task in waiting, may not have any jobs
+      if (taskProxy.jobs) {
+        for (const job of taskProxy.jobs) {
+          // TODO: temporary fix, as the backend is sending ready jobs, but they will change in cylc flow&uiserver in the future
+          if (job.state === 'ready') {
+            continue
+          }
+          if (!states.has(job.state)) {
+            states.set(job.state, new Set())
+          }
+          states.get(job.state).add(`${taskProxy.name}.${taskProxy.cyclePoint}`)
         }
-        if (!states.has(job.state)) {
-          states.set(job.state, new Set())
-        }
-        states.get(job.state).add(`${taskProxy.name}.${taskProxy.cyclePoint}`)
       }
     }
-  }
-  for (const [stateName, tasksSet] of states.entries()) {
-    states.set(stateName, [...tasksSet].sort())
+    for (const [stateName, tasksSet] of states.entries()) {
+      states.set(stateName, [...tasksSet].sort())
+    }
   }
   return new Map([...states.entries()].sort())
 }
diff --git a/src/styles/cylc/_header.scss b/src/styles/cylc/_header.scss
index 54fc0b1..67b28ff 100644
--- a/src/styles/cylc/_header.scss
+++ b/src/styles/cylc/_header.scss
@@ -22,7 +22,7 @@
   .layout {
     .c-environment-info {
       font-size: 1rem;
-      font-weight: map-get($font-weights, 'regular');
+      font-weight:g map-get($font-weights, 'regular');
 
       .v-chip {
         font-size: 1rem;
diff --git a/src/views/Workflow.vue b/src/views/Workflow.vue
index 7c14ba0..77a6701 100644
--- a/src/views/Workflow.vue
+++ b/src/views/Workflow.vue
@@ -198,7 +198,9 @@ export default {
         this.$workflowService
           .startDeltasSubscription(WORKFLOW_TREE_DELTAS_SUBSCRIPTION, this.variables, {
             next: function next (response) {
-              applyDeltas(response.data.deltas, vm.tree)
+              if (response.data && response.data.deltas) {
+                applyDeltas(response.data.deltas, vm.tree)
+              }
               vm.isLoading = false
             },
             error: function error (err) {
@@ -275,7 +277,7 @@ export default {
       Vue.set(this.widgets, id, MutationsView.name)
     },
     removeAllWidgets () {
-      const dockWidgets = this.dock.widgets()
+      const dockWidgets = this.$refs.lumino.dock.widgets()
       const widgets = []
       each(iter(dockWidgets), widget => {
         widgets.push(widget)

I tested it applying your two diffs first (just copied into my desktop as diff1.txt and diff2.txt, then git apply diff1.txt...).

image

We will probably need more than just this diff, probably some better icons, a better way to tell in the view that the workflow has no data because it's offline (or display some cached data, etc). But seeing the offline workflows is really useful! ⭐

@kinow
Copy link
Member

kinow commented Aug 26, 2020

@dwsutherland (or @oliver-sanders ) with both @dwsutherland 's and my Cylc UI diff's applied, I can see three workflows.

I get the same if I simplify the GScan query:

subscription {
  workflows {
    id
    status
  }
}

image

But if I use cylc scan --states=running,stopped, I get a lot more workflows.

(venv) kinow@ranma:~/Development/python/workspace/cylc-uiserver$ cylc scan --states=running,stopped
3698
aws
cat
complex
dog
families2
five ranma:43085
reflow1
test-json

Is that because the Cylc version is filtered somewhere? Or maybe some other attribute of the workflow is being filtered somewhere?

@hjoliver
Copy link
Member

@kinow -

a better way to tell in the view that the workflow has no data because it's offline (or display some cached data, etc)

Eventually we'll want the UIS to populate some n-distance window for the stopped workflow, from its database, so we can view what last happened (say) in the tree view etc., just like for live flows.

@kinow
Copy link
Member

kinow commented Aug 26, 2020

@kinow -

a better way to tell in the view that the workflow has no data because it's offline (or display some cached data, etc)

Eventually we'll want the UIS to populate some n-distance window for the stopped workflow, from its database, so we can view what last happened (say) in the tree view etc., just like for live flows.

Gotcha, I think that won't require any work in the UI then. The UIS just sends the data to the UI as it does for other workflows, and it's displayed in the same way. ⚡

@hjoliver
Copy link
Member

Gotcha, I think that won't require any work in the UI then. The UIS just sends the data to the UI as it does for other workflows, and it's displayed in the same way.

Exactly, you've got it easy from now on 🤣

@oliver-sanders oliver-sanders marked this pull request as ready for review August 26, 2020 09:48
@oliver-sanders
Copy link
Member Author

oliver-sanders commented Aug 26, 2020

  • Applied the diffs above, thanks @dwsutherland.
  • Removed the need to re-shape the data.
  • I think we are good to go, no longer a draft, lets see what the tests say.

Copy link
Member

@kinow kinow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running the sibling PR with this one, I'm getting the following after jupyterhub + cylc run five:

[I 2020-08-27T09:55:47.287 JupyterHub log:174] 200 GET /hub/api/authorizations/token/[secret] (kinow@127.0.0.1) 21.14ms
2020-08-27 09:55:49,735 cylc.flow.network.resolvers WARNING  Traceback (most recent call last):
  File "/home/kinow/Development/python/workspace/cylc-flow/cylc/flow/network/resolvers.py", line 396, in subscribe_delta
    root, info, **args)
  File "/home/kinow/Development/python/workspace/cylc-flow/cylc/flow/network/schema.py", line 358, in get_workflows
    return await resolvers.get_workflows(args)
  File "/home/kinow/Development/python/workspace/cylc-flow/cylc/flow/network/resolvers.py", line 218, in get_workflows
    for flow in await self.get_workflows_data(args)],
  File "/home/kinow/Development/python/workspace/cylc-flow/cylc/flow/network/resolvers.py", line 210, in get_workflows_data
    for flow in self.data_store_mgr.data.values()
  File "/home/kinow/Development/python/workspace/cylc-flow/cylc/flow/network/resolvers.py", line 211, in <listcomp>
    if workflow_filter(flow, args)
  File "/home/kinow/Development/python/workspace/cylc-flow/cylc/flow/network/resolvers.py", line 72, in workflow_filter
    w_atts = collate_workflow_atts(flow[WORKFLOW])
TypeError: 'PbWorkflow' object is not subscriptable

@kinow
Copy link
Member

kinow commented Aug 26, 2020

Really confusing. I've attached a debugger, and everything worked. 🤔

Destroyed my venv, re-created it, installed cylc-uiserver, then cylc-flow from the other branch (both in editable mode).

Traceback printed again. Then attached the debugger... and everything worked haha. Looking at the debugger, I see no PbWorkflow, only dict's, and everything works fine. I wonder if it's something that happens before I am able to attach the debugger…

Copy link
Member

@dwsutherland dwsutherland left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just about there, a couple more fixes needed 👍

cylc/uiserver/data_store_mgr.py Outdated Show resolved Hide resolved

# tidy up stopped flows
for wid in active_before - set(self.active):
client = self.active[wid]['req_client']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still and issue when stopping suites:

.
.
.
  File "/home/sutherlander/cylc/cylc-uiserver/cylc/uiserver/workflows_mgr.py", line 181, in gather_workflows
    client = self.active[wid]['req_client']
KeyError: 'sutherlander|baz'

Copy link
Member Author

@oliver-sanders oliver-sanders left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, re-jigged the state change logic, it's now lengthy but super-explicit.

I can't get the stop_workflow method to work though, it should reset any data from the contact file (host, port, etc) as well as the workflow status. @dwsutherland can you give me a pointer on how to wrangle the data store?

cylc/uiserver/workflows_mgr.py Show resolved Hide resolved
# If the client is already established it's not overridden,
# so the following callbacks can happen at the same time.
ioloop.PeriodicCallback(
self.workflows_mgr.gather_workflows, 7000).start()
self.workflows_mgr.update, 1000).start()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temporary change...

@dwsutherland
Copy link
Member

dwsutherland commented Aug 28, 2020

can you give me a pointer on how to wrangle the data store?

Will have a look in the next few days.. Couple of things

  • fields are attributes, and gave creating a delta a quick go:
sutherlander@cortex-vbox:uiserver$ git diff
diff --git a/cylc/uiserver/data_store_mgr.py b/cylc/uiserver/data_store_mgr.py
index e8cd7bf..3ae93ba 100644
--- a/cylc/uiserver/data_store_mgr.py
+++ b/cylc/uiserver/data_store_mgr.py
@@ -43,7 +43,7 @@ from cylc.flow.network.server import PB_METHOD_MAP
 from cylc.flow.network import MSG_TIMEOUT
 from cylc.flow.network.subscriber import WorkflowSubscriber, process_delta_msg
 from cylc.flow.data_store_mgr import (
-    EDGES, DATA_TEMPLATE, ALL_DELTAS, DELTAS_MAP, WORKFLOW,
+    EDGES, DATA_TEMPLATE, ALL_DELTAS, DELTAS_MAP, WORKFLOW, DELTA_UPDATED,
     apply_delta, generate_checksum, create_delta_store
 )
 from .workflows_mgr import workflow_request
@@ -103,9 +103,13 @@ class DataStoreMgr:
 
     def stop_workflow(self, w_id):
         print(f'$ stop_workflow({w_id})')
-        self.data[w_id]['status'] = 'stopped'
-        self.data[w_id]['host'] = None
-        self.data[w_id]['port'] = None
+        delta_store = create_delta_store(workflow_id=w_id)
+        flow = delta_store[DELTA_UPDATED][WORKFLOW]
+        flow.status = 'stopped'
+        flow.host = ''
+        flow.port = 0
+        for delta_queue in self.delta_queues[w_id].values():
+            delta_queue.put((w_id, ALL_DELTAS, delta_store))
  • The absence to presence of a workflow in self.data also triggers a delta push to the UIS (for the subscriptions that don't specify a workflow in the args)

@dwsutherland
Copy link
Member

dwsutherland commented Aug 29, 2020

@oliver-sanders - Here's what I did to get it working:

(oliver-uis) sutherlander@cortex-vbox:cylc-uiserver$ git diff
diff --git a/cylc/uiserver/data_store_mgr.py b/cylc/uiserver/data_store_mgr.py
index e8cd7bf..0872f42 100644
--- a/cylc/uiserver/data_store_mgr.py
+++ b/cylc/uiserver/data_store_mgr.py
@@ -37,7 +37,7 @@ from concurrent.futures import ThreadPoolExecutor
 from copy import deepcopy
 from functools import partial
 import logging
-from time import sleep
+import time
 
 from cylc.flow.network.server import PB_METHOD_MAP
 from cylc.flow.network import MSG_TIMEOUT
@@ -93,6 +93,9 @@ class DataStoreMgr:
 
     async def register_workflow(self, w_id, name, owner):
         print(f'$ register_workflow({w_id})')
+        # set delta queue for subscription resolver
+        self.delta_queues[w_id] = {}
+        # Create inactive workflow data
         data = deepcopy(DATA_TEMPLATE)
         flow = data[WORKFLOW]
         flow.id = w_id
@@ -103,9 +106,18 @@ class DataStoreMgr:
 
     def stop_workflow(self, w_id):
         print(f'$ stop_workflow({w_id})')
-        self.data[w_id]['status'] = 'stopped'
-        self.data[w_id]['host'] = None
-        self.data[w_id]['port'] = None
+        # Create stopped workflow delta
+        delta = DELTAS_MAP[WORKFLOW]()
+        delta.time = time.time()
+        flow = delta.updated
+        flow.status = 'stopped'
+        flow.host = ''
+        flow.port = 0
+        # Apply to existing workflow data
+        apply_delta(WORKFLOW, delta, self.data[w_id])
+        self.data[w_id]['delta_times'][WORKFLOW] = delta.time
+        # Queue delta for subscription push
+        self.delta_store_to_queues(w_id, ALL_DELTAS, delta)
 
     def purge_workflow(self, w_id):
         """Purge the manager of a workflow's subscription and data."""
@@ -158,7 +170,7 @@ class DataStoreMgr:
             while loop_cnt < self.INIT_DATA_WAIT_TIME:
                 if w_id in self.data:
                     break
-                sleep(self.INIT_DATA_RETRY_DELAY)
+                time.sleep(self.INIT_DATA_RETRY_DELAY)
                 loop_cnt += 1
                 continue
         if topic == 'shutdown':
@@ -184,7 +196,7 @@ class DataStoreMgr:
         self.delta_store_to_queues(w_id, topic, delta)
 
     def delta_store_to_queues(self, w_id, topic, delta):
-        # Queue delta for graphql subscription resolving
+        """Create delta store and queue for graphql subscription resolving."""
         if self.delta_queues[w_id]:
             delta_store = create_delta_store(delta, w_id)
             for delta_queue in self.delta_queues[w_id].values():
@@ -249,11 +261,6 @@ class DataStoreMgr:
         if ids is None:
             ids = []
 
-        # Prune old data
-        for w_id in list(self.data):
-            if w_id not in self.workflows_mgr.active:
-                del self.data[w_id]
-
         # Request new data
         req_method = 'pb_entire_workflow'
         req_kwargs = (
@@ -280,4 +287,5 @@ class DataStoreMgr:
                         }
                         continue
                     new_data[field.name] = {n.id: n for n in value}
+                # Create new or overwrite existing workflow data
                 self.data[w_id] = new_data

@hjoliver
Copy link
Member

(cylc-flow sibling PR now merged)

@oliver-sanders
Copy link
Member Author

oliver-sanders commented Sep 1, 2020

Created a function for setting/clearning contact file data.

All seems to work fine, except for one problem, when you stop, the restart a suite, the status gets stuck as "stopped".

(hover over gif)

The associated stdout which demonstrates that sync_workflow is being called properly.

# oliver|c inactive->active
_connect(oliver|c)
$ sync_workflow(oliver|c)
# oliver|c active->inactive
_disconnect(oliver|c)
$ stop_workflow(oliver|c)
# oliver|c inactive->active
_connect(oliver|c)
$ sync_workflow(oliver|c)
# oliver|c active->inactive
_disconnect(oliver|c)
$ stop_workflow(oliver|c)
# oliver|c inactive->active
_connect(oliver|c)
$ sync_workflow(oliver|c)
# oliver|c active->inactive
_disconnect(oliver|c)
$ stop_workflow(oliver|c)
# oliver|c inactive->active
_connect(oliver|c)
$ sync_workflow(oliver|c)
# oliver|c active->inactive
_disconnect(oliver|c)
$ stop_workflow(oliver|c)

@dwsutherland
Copy link
Member

All seems to work fine, except for one problem, when you stop, the restart a suite, the status gets stuck as "stopped".

This should fix it:

sutherlander@cortex-vbox:cylc-uiserver$ git diff
diff --git a/cylc/uiserver/data_store_mgr.py b/cylc/uiserver/data_store_mgr.py
index 347dcf0..dda67ec 100644
--- a/cylc/uiserver/data_store_mgr.py
+++ b/cylc/uiserver/data_store_mgr.py
@@ -39,6 +39,7 @@ from functools import partial
 import logging
 import time
 
+from cylc.flow import ID_DELIM
 from cylc.flow.network.server import PB_METHOD_MAP
 from cylc.flow.network import MSG_TIMEOUT
 from cylc.flow.network.subscriber import WorkflowSubscriber, process_delta_msg
@@ -80,11 +81,14 @@ class DataStoreMgr:
             flow.owner = contact_data['owner']
             flow.host = contact_data[CFF.HOST]
             flow.port = int(contact_data[CFF.PORT])
+            flow.pub_port = int(contact_data[CFF.PUBLISH_PORT])
             flow.api_version = int(contact_data[CFF.API])
         else:
             # wipe pre-existing contact-file data
+            flow.owner, flow.name = w_id.split(ID_DELIM)
             flow.host = ''
             flow.port = 0
+            flow.pub_port = 0
             flow.api_version = 0
             flow.status = 'stopped'
 
@@ -106,14 +110,14 @@ class DataStoreMgr:
         """
         print(f'$ sync_workflow({w_id})')
 
-        self.update_contact(w_id, contact_data)
-
         if self.loop is None:
             self.loop = asyncio.get_running_loop()
+        # don't sync if subscription exists
         if w_id in self.w_subs:
             return
 
         self.delta_queues[w_id] = {}
+        self.update_contact(w_id, contact_data)
 
         # Might be options other than threads to achieve
         # non-blocking subscriptions, but this works.
@@ -124,7 +128,7 @@ class DataStoreMgr:
                 w_id,
                 contact_data['name'],
                 contact_data[CFF.HOST],
-                contact_data[CFF.PORT]
+                contact_data[CFF.PUBLISH_PORT]
             )
         )
         await self.entire_workflow_update(ids=[w_id])
@@ -144,18 +148,20 @@ class DataStoreMgr:
         print(f'$ stop_workflow({w_id})')
         self.update_contact(w_id)
 
-    def purge_workflow(self, w_id):
+    def purge_workflow(self, w_id, data=True):
         """Purge the manager of a workflow's subscription and data."""
         print(f'$ purge_workflow({w_id})')
         if w_id in self.w_subs:
             self.w_subs[w_id].stop()
             del self.w_subs[w_id]
-        if w_id in self.data:
-            del self.data[w_id]
-        if w_id in self.delta_queues:
-            del self.delta_queues[w_id]
-        self.executors[w_id].shutdown(wait=True)
-        del self.executors[w_id]
+        if data:
+            if w_id in self.data:
+                del self.data[w_id]
+            if w_id in self.delta_queues:
+                del self.delta_queues[w_id]
+        if w_id in self.executors:
+            self.executors[w_id].shutdown(wait=True)
+            del self.executors[w_id]
 
     def start_subscription(self, w_id, reg, host, port):
         """Instantiate and run subscriber data-store sync.
diff --git a/cylc/uiserver/workflows_mgr.py b/cylc/uiserver/workflows_mgr.py
index a173dec..7df1300 100644
--- a/cylc/uiserver/workflows_mgr.py
+++ b/cylc/uiserver/workflows_mgr.py
@@ -226,6 +226,7 @@ class WorkflowsManager:
 
         The workflow can't do this itself, because it's not running.
         """
+        self.uiserver.data_store_mgr.purge_workflow(wid, data=False)
         self.uiserver.data_store_mgr.stop_workflow(wid)
 
     async def update(self):

Hopefully the last one (it's looking good now!)

@hjoliver
Copy link
Member

hjoliver commented Sep 1, 2020

A couple of tests failed with:

>       assert not mgr.workflows
E       AttributeError: 'WorkflowsManager' object has no attribute 'workflows'

There's also a lot of this?:

RuntimeError: Event loop is closed

@codecov-commenter
Copy link

codecov-commenter commented Sep 2, 2020

Codecov Report

Merging #150 into master will increase coverage by 1.90%.
The diff coverage is 45.65%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #150      +/-   ##
==========================================
+ Coverage   50.75%   52.66%   +1.90%     
==========================================
  Files           6        7       +1     
  Lines         396      488      +92     
  Branches       64       80      +16     
==========================================
+ Hits          201      257      +56     
- Misses        192      228      +36     
  Partials        3        3              
Impacted Files Coverage Δ
cylc/uiserver/main.py 69.11% <ø> (ø)
cylc/uiserver/data_store_mgr.py 24.05% <20.40%> (-1.97%) ⬇️
cylc/uiserver/workflows_mgr.py 60.41% <55.00%> (+12.50%) ⬆️
cylc/uiserver/conftest.py 100.00% <100.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 0eba8f3...6f05b62. Read the comment docs.

@oliver-sanders
Copy link
Member Author

There's also a lot of this?:
RuntimeError: Event loop is closed

Not sure what's going on there, it seems to be harmless but it's pretty ugly. I can reduce the error to occur once per test module rather than once per test but that's not a great improvement. From Google it looks like some people have hit this with pytest_asyncio+django.

@oliver-sanders
Copy link
Member Author

Ok, managed to eliminate the error with test teardown logic.

Copy link
Member

@kinow kinow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dwsutherland should be able to understand the changes in the WorkflowMgr. The rest looks OK to me. Added one minor comment.

@oliver-sanders here's the UI after I applied the diff below.

image

diff --git a/src/components/cylc/gscan/GScan.vue b/src/components/cylc/gscan/GScan.vue
index 1bcbe05..126daa6 100644
--- a/src/components/cylc/gscan/GScan.vue
+++ b/src/components/cylc/gscan/GScan.vue
@@ -113,8 +113,10 @@ export default {
      */
     workflowsSummaries () {
       const workflowSummaries = new Map()
-      for (const workflow of this.workflows) {
-        workflowSummaries.set(workflow.name, getWorkflowSummary(workflow))
+      if (this.workflows) {
+        for (const workflow of this.workflows) {
+          workflowSummaries.set(workflow.name, getWorkflowSummary(workflow))
+        }
       }
       return workflowSummaries
     }
diff --git a/src/components/cylc/gscan/index.js b/src/components/cylc/gscan/index.js
index a420115..717fc4e 100644
--- a/src/components/cylc/gscan/index.js
+++ b/src/components/cylc/gscan/index.js
@@ -25,23 +25,25 @@
  */
 function getWorkflowSummary (workflow) {
   const states = new Map()
-  for (const taskProxy of workflow.taskProxies) {
-    // a task in waiting, may not have any jobs
-    if (taskProxy.jobs) {
-      for (const job of taskProxy.jobs) {
-        // TODO: temporary fix, as the backend is sending ready jobs, but they will change in cylc flow&uiserver in the future
-        if (job.state === 'ready') {
-          continue
+  if (workflow.taskProxies) {
+    for (const taskProxy of workflow.taskProxies) {
+      // a task in waiting, may not have any jobs
+      if (taskProxy.jobs) {
+        for (const job of taskProxy.jobs) {
+          // TODO: temporary fix, as the backend is sending ready jobs, but they will change in cylc flow&uiserver in the future
+          if (job.state === 'ready') {
+            continue
+          }
+          if (!states.has(job.state)) {
+            states.set(job.state, new Set())
+          }
+          states.get(job.state).add(`${taskProxy.name}.${taskProxy.cyclePoint}`)
         }
-        if (!states.has(job.state)) {
-          states.set(job.state, new Set())
-        }
-        states.get(job.state).add(`${taskProxy.name}.${taskProxy.cyclePoint}`)
       }
     }
-  }
-  for (const [stateName, tasksSet] of states.entries()) {
-    states.set(stateName, [...tasksSet].sort())
+    for (const [stateName, tasksSet] of states.entries()) {
+      states.set(stateName, [...tasksSet].sort())
+    }
   }
   return new Map([...states.entries()].sort())
 }
diff --git a/src/views/Workflow.vue b/src/views/Workflow.vue
index 48a63d8..b8667bf 100644
--- a/src/views/Workflow.vue
+++ b/src/views/Workflow.vue
@@ -212,7 +212,9 @@ export default {
         this.$workflowService
           .startDeltasSubscription(WORKFLOW_TREE_DELTAS_SUBSCRIPTION, this.variables, {
             next: function next (response) {
-              applyDeltas(response.data.deltas, vm.tree)
+              if (response.data && response.data.deltas) {
+                applyDeltas(response.data.deltas, vm.tree)
+              }
               vm.isLoading = false
             },
             error: function error (err) {

My impression is that I had some performance issue when running everything the first time, but I was with a lot of other windows open, plus documents and a YouTube live stream in another monitor.

I've closed everything, and started again. The UI looks a bit slower. I guess that might be because I am too used to having only a couple of simple suites 😆 or maybe more scans to load the stopped workflows??? Something to investigate later I believe.

👍 great to see the stopped workflows! And also solves the current issue on master of ImportError: cannot import name 'MSG_TIMEOUT' from 'cylc.flow.network.scan'. 🎉

flow.port = 0
# flow.pub_port = 0
flow.api_version = 0
flow.status = 'stopped'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use cylc.flow.suite_status.SuiteStatus enum's .STOPPED value here?

@kinow
Copy link
Member

kinow commented Sep 4, 2020

Ah, almost forgot, there's also some extra logging in JupyterHub console. Not important, but that was the only difference I noticed.

2020-09-04 23:03:14,982 cylc.uiserver.main INFO     Listening on 55213 and serving static content from /home/kinow/Development/python/workspace/cylc-uiserver/../cylc-ui/dist
2020-09-04 23:03:14,982 cylc.uiserver.main INFO     Starting Cylc UI
2020-09-04 23:03:14,982 cylc.uiserver.main INFO     /home/kinow/Development/python/workspace/cylc-uiserver/../cylc-ui/dist
2020-09-04 23:03:14,995 cylc.uiserver.data_store_mgr DEBUG    register_workflow(kinow|3698)
2020-09-04 23:03:14,996 cylc.uiserver.data_store_mgr DEBUG    register_workflow(kinow|3804)
2020-09-04 23:03:14,997 cylc.uiserver.data_store_mgr DEBUG    register_workflow(kinow|aws)
2020-09-04 23:03:14,998 cylc.uiserver.data_store_mgr DEBUG    register_workflow(kinow|cat)
2020-09-04 23:03:15,000 cylc.uiserver.data_store_mgr DEBUG    register_workflow(kinow|complex)
2020-09-04 23:03:15,001 cylc.uiserver.data_store_mgr DEBUG    register_workflow(kinow|dog)
2020-09-04 23:03:15,002 cylc.uiserver.data_store_mgr DEBUG    register_workflow(kinow|families2)
2020-09-04 23:03:15,003 cylc.uiserver.data_store_mgr DEBUG    register_workflow(kinow|five)
2020-09-04 23:03:15,004 cylc.uiserver.data_store_mgr DEBUG    register_workflow(kinow|reflow1)
2020-09-04 23:03:15,006 cylc.uiserver.data_store_mgr DEBUG    register_workflow(kinow|test-json)
2020-09-04 23:03:15,007 cylc.uiserver.data_store_mgr DEBUG    register_workflow(kinow|cit-20200904T224814+12/i.test_scan_api/-held-)
2020-09-04 23:03:15,008 cylc.uiserver.data_store_mgr DEBUG    register_workflow(kinow|cit-20200904T224814+12/i.test_scan_api/-running-)
2020-09-04 23:03:15,009 cylc.uiserver.data_store_mgr DEBUG    register_workflow(kinow|cit-20200904T225229+12/i.test_scan_api/-held-)
2020-09-04 23:03:15,010 cylc.uiserver.data_store_mgr DEBUG    register_workflow(kinow|cit-20200904T225229+12/i.test_scan_api/-running-)
[I 2020-09-04T23:03:15.135 JupyterHub log:174] 302 GET /hub/spawn/kinow -> /hub/spawn-pending/kinow (kinow@::1) 1006.15ms

@oliver-sanders
Copy link
Member Author

The extra logging was purposeful, it may help to diagnose issues or to rule out scan issues from comms issues.

When we wrap jupyterhub as cylc hub must remember to default to --log-level=INFO.

Copy link
Member

@dwsutherland dwsutherland left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just that one change that broke it, and we're good to go. 🎊

cylc/uiserver/data_store_mgr.py Show resolved Hide resolved
cylc/uiserver/data_store_mgr.py Show resolved Hide resolved
cylc/uiserver/data_store_mgr.py Outdated Show resolved Hide resolved
cylc/uiserver/data_store_mgr.py Show resolved Hide resolved
cylc/uiserver/data_store_mgr.py Outdated Show resolved Hide resolved
cylc/uiserver/workflows_mgr.py Show resolved Hide resolved
Copy link
Member

@dwsutherland dwsutherland left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All good now! Well done 👍

@dwsutherland dwsutherland merged commit 9354334 into cylc:master Sep 8, 2020
@oliver-sanders oliver-sanders deleted the cylc-scan-nt branch September 8, 2020 22:55
@oliver-sanders oliver-sanders restored the cylc-scan-nt branch September 8, 2020 22:55
@oliver-sanders oliver-sanders deleted the cylc-scan-nt branch September 8, 2020 22:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants