Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deps: remove aiofiles #5308

Merged
merged 1 commit into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion conda-environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ name: cylc-dev
channels:
- conda-forge
dependencies:
- aiofiles >=0.7.0,<0.8.0
- ansimarkup >=1.0.0
- async-timeout>=3.0.0
- colorama >=0.4,<1.0
Expand Down
38 changes: 27 additions & 11 deletions cylc/flow/async_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
"""Utilities for use with asynchronous code."""

import asyncio
from functools import partial, wraps
import os
from pathlib import Path
from typing import List, Union

from aiofiles.os import wrap # type: ignore[attr-defined]

from cylc.flow import LOG


Expand Down Expand Up @@ -60,12 +59,12 @@ class _AsyncPipe:
"""

def __init__(
self,
func,
args=None,
kwargs=None,
filter_stop=True,
preserve_order=True
self,
func,
args=None,
kwargs=None,
filter_stop=True,
preserve_order=True
):
self.func = func
self.args = args or ()
Expand Down Expand Up @@ -393,9 +392,6 @@ def _pipe(func):
return _pipe


async_listdir = wrap(os.listdir)


async def scandir(path: Union[Path, str]) -> List[Path]:
"""Asynchronous directory listing (performs os.listdir in an executor)."""
return [
Expand Down Expand Up @@ -449,3 +445,23 @@ async def unordered_map(coroutine, iterator):
)
for task in done:
yield task._args, task.result()


def make_async(fcn):
"""Make a synchronous function async by running it in an executor.

The default asyncio executor is the ThreadPoolExecutor so this essentially
syntactic sugar for running the wrapped function in a thread.
"""
@wraps(fcn)
async def _fcn(*args, executor=None, **kwargs):
nonlocal fcn
return await asyncio.get_event_loop().run_in_executor(
executor,
partial(fcn, *args, **kwargs),
)

return _fcn


async_listdir = make_async(os.listdir)
3 changes: 1 addition & 2 deletions cylc/flow/main_loop/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,12 @@ async def my_startup_coroutine(schd, state):
^^^^^^^^^^

.. _coroutines: https://docs.python.org/3/library/asyncio-task.html#coroutines
.. _aiofiles: https://github.com/Tinche/aiofiles

Plugins provide asynchronous functions (`coroutines`_) which Cylc will
then run inside the scheduler.

Coroutines should be fast running (read as gentle on the scheduler)
and perform IO asynchronously e.g. by using `aiofiles`_.
and perform IO asynchronously.

Coroutines shouldn't meddle with the state of the scheduler and should be
parallel-safe with other plugins.
Expand Down
40 changes: 13 additions & 27 deletions cylc/flow/workflow_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@
Tuple, TYPE_CHECKING, Union
)

import aiofiles
import zmq.auth

import cylc.flow.flags
from cylc.flow import LOG
from cylc.flow.async_util import make_async
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.exceptions import (
CylcError,
Expand All @@ -51,6 +50,7 @@
WorkflowFilesError,
handle_rmtree_err,
)
import cylc.flow.flags
from cylc.flow.loggingutil import (
CylcLogFormatter,
close_log,
Expand Down Expand Up @@ -621,10 +621,18 @@ def get_workflow_srv_dir(reg):
return os.path.join(run_d, WorkflowFiles.Service.DIRNAME)


def load_contact_file(reg: str) -> Dict[str, str]:
def load_contact_file(id_: str, run_dir=None) -> Dict[str, str]:
"""Load contact file. Return data as key=value dict."""
if not run_dir:
path = Path(get_contact_file_path(id_))
else:
path = Path(
run_dir,
WorkflowFiles.Service.DIRNAME,
WorkflowFiles.Service.CONTACT
)
try:
with open(get_contact_file_path(reg)) as f:
with open(path) as f:
file_content = f.read()
except IOError:
raise ServiceFileError("Couldn't load contact file")
Expand All @@ -640,29 +648,7 @@ def load_contact_file(reg: str) -> Dict[str, str]:
return data


async def load_contact_file_async(reg, run_dir=None):
if not run_dir:
path = Path(get_contact_file_path(reg))
else:
path = Path(
run_dir,
WorkflowFiles.Service.DIRNAME,
WorkflowFiles.Service.CONTACT
)
try:
async with aiofiles.open(path, mode='r') as cont:
data = {}
async for line in cont:
key, value = [item.strip() for item in line.split("=", 1)]
# BACK COMPAT: contact pre "suite" to "workflow" conversion.
# from:
# Cylc 8
# remove at:
# Cylc 8.x
data[key.replace('SUITE', 'WORKFLOW')] = value
Comment on lines -655 to -662
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will removing this catch anyone out?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was a duplicate of what was already in load_contact_file() so not actually removed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've replaced this with the sync load_contact_file interface run inside a thread via make_async.

return data
except IOError:
raise ServiceFileError("Couldn't load contact file")
load_contact_file_async = make_async(load_contact_file)


def register(
Expand Down
3 changes: 0 additions & 3 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ packages = find_namespace:
include_package_data = True
python_requires = >=3.7
install_requires =
aiofiles==0.7.*
ansimarkup>=1.0.0
async-timeout>=3.0.0
colorama>=0.4,<=1
Expand Down Expand Up @@ -123,9 +122,7 @@ tests =
testfixtures>=6.11.0
# Type annotation stubs
# http://mypy-lang.blogspot.com/2021/05/the-upcoming-switch-to-modular-typeshed.html
types-aiofiles>=0.7.0
types-Jinja2>=0.1.3
types-aiofiles>=0.1.3
types-pkg_resources>=0.1.2
types-protobuf>=0.1.10
types-six>=0.1.6
Expand Down