-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WFS Data Store Sync #95
Conversation
adc4f12
to
1f3a0b8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dwsutherland so far all good, I just need a bit more of time with data_mgr.py
to understand how it is going to work (might need it for subscriptions I guess). So will finish review either tonight or tomorrow 👍
There's not a lot of comments or docstrings in this one yet.. Hopefully I'll push that tonight. |
Thanks for the code comments you just added @dwsutherland ! Had a quick read, but will read it with ☕ tomorrow. |
3927ed9
to
f5c16cc
Compare
f5c16cc
to
347189f
Compare
I have no idea how to unit test the data sync... would have to recreate a workflow (like we do in cylc-flow unittests), or create a lot of fake data (which would be annoying to update) and do a publish of this... |
This one might be easier to leave with no test for now, or wait until we add either some sort of functional tests here or a cross project functional test (I think we have an issue somewhere). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dwsutherland the comments are really helpful. Finished reading the rest of the code, and didn't find anything wrong. Tested with Cylc UI, running two workflows, had no issues as well. +1
cylc/uiserver/data_mgr.py
Outdated
Reconciliation on failed verification is done by requesting all elements of a | ||
topic, and replacing the respective data-store elements with this. | ||
|
||
Subscriptions are currently run in a different thread (via ThreadPoolExecutor). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👏 IMO comments like this make maintaining code so much easier! Thanks!!!!
ps: I think Travis will keep failing until cylc-flow's PR is merged: |
Oh, actually had something new in the console with this branch: zmq.error.ZMQError: Too many open files
2019-11-13 12:32:47,467 tornado.application ERROR Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7f490bece080>>, <Task finished coro=<WorkflowsManager.gather_workflows() done, defined at /home/kinow/Development/python/workspace/cylc-uiserver/cylc/uiserver/workflows_mgr.py:95> exception=ZMQError('Too many open files')>)
Traceback (most recent call last):
File "/home/kinow/Development/python/workspace/cylc-uiserver/venv/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
ret = callback()
File "/home/kinow/Development/python/workspace/cylc-uiserver/venv/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
future.result()
File "/home/kinow/Development/python/workspace/cylc-uiserver/cylc/uiserver/workflows_mgr.py", line 105, in gather_workflows
items = await asyncio.gather(*gathers)
File "/home/kinow/Development/python/workspace/cylc-uiserver/cylc/uiserver/workflows_mgr.py", line 77, in est_workflow
context=context, timeout=timeout)
File "/home/kinow/Development/python/workspace/cylc-flow/cylc/flow/network/client.py", line 129, in __init__
self.start(host, port)
File "/home/kinow/Development/python/workspace/cylc-flow/cylc/flow/network/__init__.py", line 91, in start
self._start_sequence(*args, **kwargs)
File "/home/kinow/Development/python/workspace/cylc-flow/cylc/flow/network/__init__.py", line 105, in _start_sequence
self._socket_connect(*args, **kwargs)
File "/home/kinow/Development/python/workspace/cylc-flow/cylc/flow/network/__init__.py", line 137, in _socket_connect
self.socket = self.context.socket(self.pattern)
File "/home/kinow/Development/python/workspace/cylc-uiserver/venv/lib/python3.7/site-packages/zmq/sugar/context.py", line 146, in socket
s = self._socket_class(self, socket_type, **kwargs)
File "/home/kinow/Development/python/workspace/cylc-uiserver/venv/lib/python3.7/site-packages/zmq/_future.py", line 134, in __init__
super(_AsyncSocket, self).__init__(context, socket_type, **kwargs)
File "/home/kinow/Development/python/workspace/cylc-uiserver/venv/lib/python3.7/site-packages/zmq/sugar/socket.py", line 59, in __init__
super(Socket, self).__init__(*a, **kw)
File "zmq/backend/cython/socket.pyx", line 328, in zmq.backend.cython.socket.Socket.__init__
zmq.error.ZMQError: Too many open files I do have a few too many things running in parallel, but not different than what I normally have everyday. I don't remember seeing a "Too many open files" problem before. Will take a look at my OS limits. |
$ ulimit
unlimited
$ ulimit -Hn
1048576
$ ulimit -n
1024 |
Hmmm, restarted the UI Server (while the two workflows are still running), then went to run more benchmarking tests on Cylc UI, then noticed this later in the console:
I guess I have to update my branch of the cylc-flow PR too... |
Yup, all good now. No more errors so far 😬 sorry for the noise! |
All good. Cylc does raise the open file count, I remember having to address this with Cylc7 in NIWA operations.. Thanks 👍 |
Makes sense. The only other thing I noticed is that stopping the hub takes a bit longer now. I assume it's because the UI Server is waiting for the thread or tasks to finish. But that's expected too I believe. |
Yes, correct. I think it's the thread join, but the amount of time seems to be consistent (might be a way to daemonise the threads so we can exit more harshly and quicker). |
The UI Server now uses the published (as a result, the UIS shutdown is quicker if the workflows are shutdown first) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks OK to me, a few comments, mostly for future work.
cylc/uiserver/data_mgr.py
Outdated
""" | ||
# wait until data-store is populated for this workflow | ||
loop_cnt = 0 | ||
while loop_cnt < 5: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another sleep loop to try and replace at a later date.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe (?).. Subscription happens first, but needs to wait around for initial data payload from REQ client... Publishes are queued until the subscriber recv them, earlier than payload deltas are ignored.
cylc/uiserver/data_mgr.py
Outdated
""" | ||
if topic == WORKFLOW: | ||
return | ||
if topic == EDGES: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use id
for edges and stamp
for everything else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because edges don't have dynamic fields (no plan to change that (yet!)). Once created they don't change, a suite reload replaces existing, and they only get deleted thereafter. (guess for consistency we could still do it the same way...)
cylc/uiserver/data_mgr.py
Outdated
apply_delta(topic, delta, self.data[w_id]) | ||
self.data[w_id]['delta_times'][topic] = delta_time | ||
self.reconcile_update(topic, delta, w_id) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can catch inconstancies early at this point in code rather than waiting for the next reconcile_update
?
else:
rebuild_store()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless I missed your point: The elif delta_time >= self.data[w_id]['delta_times'][topic]:
statement filters for valid deltas to apply, so you don't really want to do anything to the store with an invalid delta (delta creation time < latest applied delta creation time).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering about the impact of out of order messages and whether there is the potential for the data store to become corrupted but for us not to notice until the next message comes in.
I think you've got this covered with the topic/checksum system so probably irrelevant:
- If the workflow sends messages: 1, 2, 3
- And the UI Server receives messages: 1, 3, 2
- Then we know something is out-of-wack, we don't need to wait for message 4 to find that out.
I think this system should pick up on the error, if messages 1, 2 and 3 relate to different topics is the the potential for anything to fall down the cracks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if the ZMQ message queue can be out of order? and if you hit recv
on the subscriber you'd just get the oldest first (post connect).. (unless HWM has been hit, then you'd miss any pruned)
Different topics each hit their different respective part of the data store. There's no confusion of topic, as the topic and respective data arrive together.
4f4b6f2
to
247689e
Compare
247689e
to
be928a8
Compare
6039dae
to
c98208e
Compare
c98208e
to
a96949f
Compare
a96949f
to
5fafcf1
Compare
Codecov Report
@@ Coverage Diff @@
## master #95 +/- ##
==========================================
- Coverage 55.42% 48.81% -6.61%
==========================================
Files 6 6
Lines 258 338 +80
Branches 38 57 +19
==========================================
+ Hits 143 165 +22
- Misses 113 170 +57
- Partials 2 3 +1
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Tested (lots) as working. Discussed sync verification (again) with @dwsutherland offline (Riot.im).
I'll merge this, for compatibility with the back end; and @oliver-sanders comments above suggest he's as good as approved it. |
These changes partially address cylc/cylc-flow#3207
This branch is the companion of and needs to be merged after cylc/cylc-flow#3389
(CI will fail until the PR is merged)
Functional/Done:
To Do: (excluding below)
The sync appears solid; verification seldom fails (I haven't seen one yet), and there's reconciliation on failure..
task_proxies
,jobs
,workflow
...etc), and these topic messages/deltas/updates are subscribed-to/received, processed, and verified independently. This means update frequency can differ between topics, while order and content is orchestrated by the workflow service (WFS).task_proxies
) are verified against the respective WFS, via a checksum sent with the delta, and a new set is requested on failure.(deltas only contain the elements and fields that were updated)
We need to figure out if creating threads for each subscription loop (i.e. one per workflow), is scalable.. As in, do they yield execution to other threads while
await
ing a publish.. I did it to avoid blocking the main asyncio loop with subscription loops.. Sockets can't be passed between threads, but I think it's fine for them to exist in their lifetime on one thread. (will see what happens with a large load of workflows)Perhaps if we can dynamically update (
add/remove
) anasyncio.gather
(or something), then we could avoid threads?...But for now it's fully functional! 🎉
Requirements check-list
CONTRIBUTING.md
and added my name as a Code Contributor.