Skip to content

Commit

Permalink
py3: respond to feedback(2)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliver-sanders committed Mar 8, 2019
1 parent 432d210 commit 287e558
Show file tree
Hide file tree
Showing 14 changed files with 54 additions and 101 deletions.
4 changes: 2 additions & 2 deletions bin/cylc-cat-state
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ if remrun():
sys.exit(0)

import os
import pickle
import json
import re
import sqlite3
import traceback
Expand Down Expand Up @@ -119,7 +119,7 @@ def extract_lines(state):
yield r"time : %(time_str)s (%(time_since_epoch)s)" % state
yield r"initial cycle : %(initial_point)s" % state
yield r"final cycle : %(final_point)s" % state
yield pickle.dumps(state["broadcast_states"]).splitlines()
yield json.dumps(state["broadcast_states"]).splitlines()
yield "Begin task states"
for item in state["task_pool"]:
yield (
Expand Down
2 changes: 1 addition & 1 deletion bin/cylc-check-software
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ latter grouped as Python, TeX or 'other' (neither). 'opt_spec' item format:
<MODULE>: [<MIN VER OR 'None'>, <FUNC TAG>, <GROUP>, <'OTHER' TUPLE>] with
<'OTHER' TUPLE> = ([<BASE CMD(S)>], <VER OPT>, <REGEX>, <OUTFILE ARG>).
"""
req_py_ver_range = ((3,),)
req_py_ver_range = ((3,6),)
opt_spec = {
'EmPy': [None, 'TEMPLATING', 'PY'],
'sphinx': [(1, 5, 3), 'HTMLDOCS', 'PY'],
Expand Down
2 changes: 1 addition & 1 deletion bin/cylc-check-versions
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def main():
print("All", contacted, "accounts have cylc-" + CYLC_VERSION)
else:
print("WARNING: failed to invoke cylc-%s on %d accounts:" % (
CYLC_VERSION, len(list(warn.keys()))))
CYLC_VERSION, len(warn)))
m = max(len(ac) for ac in warn)
for ac, warning in warn.items():
print(' ', ac.ljust(m), warning)
Expand Down
5 changes: 1 addition & 4 deletions bin/cylc-client
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,4 @@ def main():


if __name__ == '__main__':
try:
main()
except Exception as exc:
sys.exit(exc)
main()
1 change: 0 additions & 1 deletion doc/src/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ Requirements:

The following packages are necessary for running tests in Cylc:

- `mock <https://mock.readthedocs.io>`_
- `pytest <https://pytest.org>`_

To generate the HTML User Guide, you will need:
Expand Down
7 changes: 1 addition & 6 deletions lib/cylc/log_diagnosis.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,7 @@


class LogAnalyserError(Exception):
def __init__(self, msg):
Exception.__init__(self, msg)
self.msg = msg

def __str__(self):
return self.msg
pass


class LogSpec(object):
Expand Down
8 changes: 3 additions & 5 deletions lib/cylc/network/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,9 @@ def get_location(cls, suite, owner, host):
# exc.args = (cls.NOT_RUNNING % suite,)
# raise

if host and host.split('.')[0] == 'localhost':
host = get_host()
elif host and '.' not in host: # Not IP and no domain
host = get_fqdn_by_host(host)
else:
if not host:
host = contact[SuiteSrvFilesManager.KEY_HOST]
host = get_fqdn_by_host(host)

port = int(contact[SuiteSrvFilesManager.KEY_PORT])
return host, port
26 changes: 11 additions & 15 deletions lib/cylc/network/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
def async_map(coroutine, iterator):
"""Map iterator iterator onto a coroutine.
* Yields results in order as an when they are ready
* Yields results in order as and when they are ready.
* Slow workers can block.
Args:
Expand All @@ -52,7 +52,7 @@ def async_map(coroutine, iterator):
Should yield tuples to be passed into the coroutine.
Yields:
list - List of results
list - List of results.
Example:
>>> async def square(number): return number ** 2
Expand All @@ -70,20 +70,18 @@ def async_map(coroutine, iterator):
awaiting.append(task)

index = 0
buff = []
completed_tasks = {}
while awaiting:
completed, awaiting = loop.run_until_complete(
asyncio.wait(awaiting, return_when=asyncio.FIRST_COMPLETED))
buff.extend(completed)
completed_tasks.update({t.ind: t.result() for t in completed})

old_len = -1
while len(buff) != old_len:
old_len = len(buff)
for task in buff:
if task.ind == index:
index += 1
buff.remove(task)
yield task.result()
changed = True
while changed and completed_tasks:
if index in completed_tasks:
yield completed_tasks.pop(index)
changed = True
index += 1


def async_unordered_map(coroutine, iterator):
Expand Down Expand Up @@ -196,7 +194,7 @@ def re_compile_filters(patterns_owner=None, patterns_name=None):
return (cres['owner'], cres['name'])


def get_scan_items_from_fs(owner_pattern=None, reg_pattern=None, updater=None):
def get_scan_items_from_fs(owner_pattern=None, reg_pattern=None):
"""Scrape list of suites from the filesystem.
Walk users' "~/cylc-run/" to get (host, port) from ".service/contact" for
Expand Down Expand Up @@ -231,8 +229,6 @@ def get_scan_items_from_fs(owner_pattern=None, reg_pattern=None, updater=None):
item[1] is not None)))
for run_d, owner in run_dirs:
for dirpath, dnames, _ in os.walk(run_d, followlinks=True):
if updater and updater.quit:
return
# Always descend for top directory, but
# don't descend further if it has a .service/ or log/ dir
if dirpath != run_d and (
Expand Down
66 changes: 15 additions & 51 deletions lib/cylc/network/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def start(self, ports):
self.port = port
break
else:
raise Exception('No room at the inn, all ports occupied.')
raise IOError('No room at the inn, all ports occupied.')

# start accepting requests
self.register_endpoints()
Expand Down Expand Up @@ -152,14 +152,14 @@ def _listener(self):
# process
try:
message = self.decode(msg, self.secret())
LOG.debug('zmq:recv %s', message)
except Exception as exc: # purposefully catch generic exception
# failed to decode message, possibly resulting from failed
# authentication
response = self.encode(
{'error': {'message': str(exc)}}, self.secret())
else:
# success case - serve the request
LOG.debug('zmq:recv %s', message)
res = self._receiver(message)
response = self.encode(res, self.secret())
LOG.debug('zmq:send %s', res)
Expand Down Expand Up @@ -191,7 +191,7 @@ def _receiver(self, message):
response = method(**args)
except Exception as exc:
# includes incorrect arguments (TypeError)
LOG.error(exc) # note the error server side
LOG.exception(exc) # note the error server side
import traceback
return {'error': {
'message': str(exc), 'traceback': traceback.format_exc()}}
Expand All @@ -214,6 +214,14 @@ def authorise(req_priv_level):
Args:
req_priv_level (cylc.network.Priv): A privilege level for the method.
Wrapped function args:
user
The authenticated user (determined server side)
host
The client host (if provided by client) - non trustworthy
prog
The client program name (if provided by client) - non trustworthy
"""
def wrapper(fcn):
def _authorise(self, *args, user='?', meta=None, **kwargs):
Expand All @@ -222,7 +230,7 @@ def _authorise(self, *args, user='?', meta=None, **kwargs):

usr_priv_level = self.get_priv_level(user)
if usr_priv_level < req_priv_level:
LOG.info(
LOG.warn(
"[client-connect] DENIED (privilege '%s' < '%s') %s@%s:%s",
usr_priv_level, req_priv_level, user, host, prog)
raise Exception('Authorisation failure')
Expand All @@ -238,15 +246,6 @@ class SuiteRuntimeServer(ZMQServer):
This class contains the cylc endpoints.
Note the following argument names are protected:
user
The authenticated user (determined server side)
host
The client host (if provided by client) - non trustworthy
prog
The client program name (if provided by client) - non trustworthy
"""

API = 4 # cylc API version
Expand Down Expand Up @@ -306,8 +305,6 @@ def dry_run_tasks(self, items, check_syntax=True):
items[0] is an identifier for matching a task proxy.
"""
if not isinstance(items, list):
items = [items]
self.schd.command_queue.put(('dry_run_tasks', (items,),
{'check_syntax': check_syntax}))
return (True, 'Command queued')
Expand Down Expand Up @@ -338,8 +335,6 @@ def get_graph_raw(self, start_point_string, stop_point_string,
ungroup_all=False):
"""Return raw suite graph."""
# Ensure that a "None" str is converted to the None value.
if stop_point_string is not None:
stop_point_string = str(stop_point_string)
return self.schd.info_get_graph_raw(
start_point_string, stop_point_string,
group_nodes=group_nodes,
Expand All @@ -364,8 +359,6 @@ def get_suite_state_summary(self):
@ZMQServer.expose
def get_task_info(self, names):
"""Return info of a task."""
if not isinstance(names, list):
names = [names]
return self.schd.info_get_task_info(names)

@authorise(Priv.READ)
Expand All @@ -378,10 +371,8 @@ def get_task_jobfile_path(self, task_id):
@ZMQServer.expose
def get_task_requisites(self, items=None, list_prereqs=False):
"""Return prerequisites of a task."""
if not isinstance(items, list):
items = [items]
return self.schd.info_get_task_requisites(
items, list_prereqs=(list_prereqs in [True, 'True']))
items, list_prereqs=list_prereqs)

@authorise(Priv.CONTROL)
@ZMQServer.expose
Expand All @@ -405,8 +396,6 @@ def hold_tasks(self, items):
items is a list of identifiers for matching task proxies.
"""
if not isinstance(items, list):
items = [items]
self.schd.command_queue.put(("hold_tasks", (items,), {}))
return (True, 'Command queued')

Expand Down Expand Up @@ -441,15 +430,10 @@ def insert_tasks(self, items, stop_point_string=None, no_check=False):
items is a list of identifiers of (families of) task instances.
"""
if not isinstance(items, list):
items = [items]
if stop_point_string == "None":
stop_point_string = None
self.schd.command_queue.put((
"insert_tasks",
(items,),
{"stop_point_string": stop_point_string,
"no_check": no_check in ['True', True]}))
{"stop_point_string": stop_point_string, "no_check": no_check}))
return (True, 'Command queued')

@authorise(Priv.CONTROL)
Expand All @@ -459,8 +443,6 @@ def kill_tasks(self, items):
items is a list of identifiers for matching task proxies.
"""
if not isinstance(items, list):
items = [items]
self.schd.command_queue.put(("kill_tasks", (items,), {}))
return (True, 'Command queued')

Expand Down Expand Up @@ -490,11 +472,8 @@ def poll_tasks(self, items=None, poll_succ=False):
items is a list of identifiers for matching task proxies.
"""
if items is not None and not isinstance(items, list):
items = [items]
self.schd.command_queue.put(
("poll_tasks", (items,),
{"poll_succ": poll_succ in ['True', True]}))
("poll_tasks", (items,), {"poll_succ": poll_succ}))
return (True, 'Command queued')

@authorise(Priv.CONTROL)
Expand Down Expand Up @@ -554,8 +533,6 @@ def release_tasks(self, items):
items is a list of identifiers for matching task proxies.
"""
if not isinstance(items, list):
items = [items]
self.schd.command_queue.put(("release_tasks", (items,), {}))
return (True, 'Command queued')

Expand All @@ -566,8 +543,6 @@ def remove_tasks(self, items, spawn=False):
items is a list of identifiers for matching task proxies.
"""
if not isinstance(items, list):
items = [items]
self.schd.command_queue.put(
("remove_tasks", (items,), {"spawn": spawn}))
return (True, 'Command queued')
Expand All @@ -579,10 +554,6 @@ def reset_task_states(self, items, state=None, outputs=None):
items is a list of identifiers for matching task proxies.
"""
if not isinstance(items, list):
items = [items]
if outputs and not isinstance(outputs, list):
outputs = [outputs]
self.schd.command_queue.put((
"reset_task_states",
(items,), {"state": state, "outputs": outputs}))
Expand Down Expand Up @@ -634,8 +605,6 @@ def spawn_tasks(self, items):
items is a list of identifiers for matching task proxies.
"""
if not isinstance(items, list):
items = [items]
self.schd.command_queue.put(("spawn_tasks", (items,), {}))
return (True, 'Command queued')

Expand All @@ -653,8 +622,6 @@ def take_checkpoints(self, items):
items[0] is the name of the checkpoint.
"""
if not isinstance(items, list):
items = [items]
self.schd.command_queue.put(("take_checkpoints", (items,), {}))
return (True, 'Command queued')

Expand All @@ -665,9 +632,6 @@ def trigger_tasks(self, items, back_out=False):
items is a list of identifiers for matching task proxies.
"""
if not isinstance(items, list):
items = [items]
items = [str(item) for item in items]
self.schd.command_queue.put(
("trigger_tasks", (items,), {"back_out": back_out}))
return (True, 'Command queued')
2 changes: 1 addition & 1 deletion lib/cylc/suite_srv_files_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ def register(self, reg=None, source=None, redirect=False):
source_str = source
os.symlink(source_str, target)

print(('REGISTERED %s -> %s' % (reg, source)))
print('REGISTERED %s -> %s' % (reg, source))
return reg

def create_auth_files(self, reg):
Expand Down
17 changes: 17 additions & 0 deletions lib/cylc/tests/test_task_outputs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
#!/bin/bash
#
# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2019 NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import random
import unittest

Expand Down
Loading

0 comments on commit 287e558

Please sign in to comment.