Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async scan #3724

Merged
merged 42 commits into from
Aug 31, 2020
Merged

async scan #3724

merged 42 commits into from
Aug 31, 2020

Conversation

oliver-sanders
Copy link
Member

@oliver-sanders oliver-sanders commented Jul 30, 2020

Closes: #3615
Addresses: cylc/cylc-uiserver#96
Sibling to: cylc/cylc-uiserver#150, cylc/cylc-doc#138, conda-forge/cylc-flow-feedstock#10.

Re-write cylc scan and it's underlying logic for simplicity, performance and for better incorporation with Cylc UI Server and TUI.

This is a precursor to:

Highlights:

  • Removes multi-user & port-scanning logic.
  • Separates out filtering, contact file reading and network information requests into separate functions.
  • Makes it super easy to roll-you-own scan client by moving logic into reusable components.
  • Converts from the legacy REST endpoints to GraphQL.
  • Scanning of all workflows in any state under one interface.
  • Performs directory listing asynchronously (which can be very slow on network filesystems).
  • Interlaces different stages of the scan operation for performance (e.g. can perform directory listings, whilst contacting flows).
  • Reduction in worst-case scan times.
  • The whole scan logic is now asynchronous.
  • Unify cylc scan and cylc print.

Async Pipes:

I've had a pet-project for a little while now of constructing asynchronous pipes in Python so this was a nice chance to use it.

Construct a pipe using | syntax like on the command line:

pipe = scan | is_active(True) | contact_info

The pipe is an async generator, iterate through it like so:

async for flow in pipe:
    # ...

Once items start coming out of scan they get pushed into the next stage of the pipe, in theory each stage of the pipe could be running over different items simultaneously.

Sounds complicated but it actually only takes a few lines of Python to make this happen, most of cylc.flow.async_util is just documentation.

Here's your minimal scan client under the new system:

import asyncio
 
from cylc.flow.suite_files import ContactFileFields as CFF 
from cylc.flow.network.scan_nt import (
    scan,
    is_active,
    contact_info,
)                                                                                    
 
 
async def scanner():
    pipe = scan | is_active(True) | contact_info
    async for flow in pipe:
        print(f'{flow["name"]} {flow[CFF.HOST]}')
 
 
asyncio.run(scanner())

File System Efficiency:

In order to perform a scan we need to perform, potentially a very large number of directory listings.

  • One for the cylc-run directory.
  • One for each cylc-run/* directory (this is recursive!).
  • One for the .service dir of each registered flow.

Before all of these directory listings happen one after the other, they are all blocking not only to each other but also to the other operations involved in the scan (e.g. contacting the flows themselves).

Now it is possible for listings to happen simultaneously thanks to a bit of libuv magic. This is the library behind node, there are a few lines of code required to marry up the libuv event loop to asyncio, after that it's plain sailing.

Scan Client:

The scan client is now pretty simple, I've knocked together a basic interface though this can be easily changed.

Screenshot 2020-07-30 at 16 27 42

  • cylc scan with no opts or args still as compatible with Cylc7 as it can be.
  • New name and tree formats replace cylc print.
  • Can now display stopped workflows.
  • Can now filter by workflow state (running, held, stopping, stopped).
  • New rich format.
  • Only contacts workflows when actually necessary (with no opts, no need to do so).
  • Uses colour but has --colour-blind option which still uses colour (--color=never to disable completely) but ensures that colour isn't required to interpret the display.

TODO:

  • Remove the old scan interface - done.
  • Fix any tests broken by syntax changes - wrote new integration tests.
  • Consider adding a functional test for the new scan client - done.
  • Fix tests/f/broadcast/00-simple.t

Questions:

  • What back-compatibility do we require to the Cylc7 cylc scan interface? None
  • Do we want to replace cylc scan -t raw? No
  • Do we need to back-support the old cylc scan -t json interface? No
  • Ok to kill cylc print, the only thing print does which can doesn't is to list the reg dir (which scan could do if desired)?

Requirements check-list

  • I have read CONTRIBUTING.md and added my name as a Code Contributor.
  • Contains logically grouped changes (else tidy your branch by rebase).
  • Does not contain off-topic changes (use other PRs for other changes).
  • Appropriate tests are included (unit and/or functional).
  • Appropriate change log entry included.
  • Documented in scan api cylc-doc#138
  • Created an issue at scan: dependency changes conda-forge/cylc-flow-feedstock#10

@oliver-sanders oliver-sanders added this to the cylc-8.0a3 milestone Jul 30, 2020
@oliver-sanders oliver-sanders self-assigned this Jul 30, 2020
@oliver-sanders oliver-sanders mentioned this pull request Jul 30, 2020
7 tasks
Comment on lines +256 to +302
def _scandir(future, path, request):
"""Callback helper for scandir()."""
future.set_result([
Path(path, directory.name)
for directory in request.result
])


async def scandir(path):
"""Asynchronous directory listing using pyuv."""
ret = asyncio.Future()

loop = pyuv.Loop.default_loop()
pyuv.fs.scandir(loop, str(path), callback=partial(_scandir, ret, path))
loop.run()

return await ret
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the wrapper which allows us to use pyuv with asyncio.

@oliver-sanders oliver-sanders added the question Flag this as a question for the next Cylc project meeting. label Jul 30, 2020
@oliver-sanders oliver-sanders added the efficiency For notable efficiency improvements label Jul 30, 2020
@hjoliver
Copy link
Member

The standard format is a bit dull, for compatibility with Cylc7.

Does it need to be compatible?

@oliver-sanders
Copy link
Member Author

Does it need to be compatible?

That is the question, people may well be scraping cylc scan, though given the interface breakages in other commands do we need to preserve this interface?

@TomekTrzeciak
Copy link
Contributor

Does it need to be compatible?

That is the question, people may well be scraping cylc scan, though given the interface breakages in other commands do we need to preserve this interface?

Is there some machine-readable output format, say JSON? Might be better to provide something like that rather than trying to maintain backward compatibility for human-intended output.

@oliver-sanders
Copy link
Member Author

Yes, both the new and old scan clients provide a JSON format, however, I've broken that too:

$ cylc scan -t json
[
    [
        "generic",
        "olivers-macbook-pro.local",
        "43098",
        "43094",
        "5",
        {
            "name": "generic",
            "owner": "oliver",
            "version": "8.0a2"
        }
    ]
]
$ cylc scan-nt -t json
[
    {
        "name": "generic",
        "path": "/Users/oliver/cylc-run/generic",
        "contact": "/Users/oliver/cylc-run/generic/.service/contact",
        "CYLC_API": "5",
        "CYLC_SSH_USE_LOGIN_SHELL": "True",
        "CYLC_SUITE_HOST": "olivers-macbook-pro.local",
        "CYLC_SUITE_NAME": "generic",
        "CYLC_SUITE_OWNER": "oliver",
        "CYLC_SUITE_PORT": "43098",
        "CYLC_SUITE_PROCESS": "54860 /Users/oliver/miniconda3/envs/cylc8/bin/python3.7 /Users/oliver/miniconda3/envs/cylc8/bin/cylc-run generic",
        "CYLC_SUITE_PUBLISH_PORT": "43094",
        "CYLC_SUITE_RUN_DIR_ON_SUITE_HOST": "/Users/oliver/cylc-run/generic",
        "CYLC_SUITE_UUID": "33000c07-565e-4d93-84f4-39d6f315bf31",
        "CYLC_VERSION": "8.0a2"
    }
]

It would be possible to re-instate the old JSON format [name: str, host: str, port: str, api_version: str, data: dict].

Note that the addition of the publisher port to this output has already broken JSON scan cylc7/8 compatibility.

@TomekTrzeciak
Copy link
Contributor

Yes, both the new and old scan clients provide a JSON format, however, I've broken that too
...
It would be possible to re-instate the old JSON format [name: str, host: str, port: str, api_version: str, data: dict].

The new format is much nicer though - much easier to grep, trivial to turn it into shell variables with a bit of sed or awk.

My 2 cents - you're allowed to break compatibility at major versions. That kind of opportunity doesn't happen very often, so make the most of it.

(PS. you could use json.dump(..., sort_keys=True) for even more stable output).

@oliver-sanders
Copy link
Member Author

oliver-sanders commented Jul 31, 2020

Sorted the keys, good suggestion. I'd rather not go back to the old JSON format as its less explicit and more fragile, but added it to the questions above.

The other format likely to be used by scrapers is "raw"

$ cylc scan -t raw
<suite>|<user>|<host>|port|<port>

I'd rather ditch it but it's trivial to re-implement.

@oliver-sanders oliver-sanders force-pushed the cylc-scan-nt branch 2 times, most recently from fd7ed4d to 8e33040 Compare July 31, 2020 12:48
@hjoliver
Copy link
Member

hjoliver commented Aug 3, 2020

I don't mind breaking the old cylc scan output format (if you could call it that). Scraping that is a pretty niche application I think, unlikely to be done by the average user. Maybe by very few ops-type people who want to automate (via scripts) interactions with many suites? And not too hard to adapt such scripts to the new improved output anyway.

Comment on lines 44 to 45
$(cylc scan --color=never --publisher -n "${SUITE_NAME}" \
| awk -F' ' -v RS='\n([ \t]*\n)+' '{print $1 " " $3}')
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't need to provide host:port via cylc scan, the internal logic does this for us.



@pipe
async def scan(run_dir=None, scan_dir=None, max_depth=4):
Copy link
Member Author

@oliver-sanders oliver-sanders Aug 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The max_depth is a new safety feature to avoid going too deep down the rabbit warren in the event that:

  • Someone puts something which isn't a workflow into their cylc-run dir (a not entirely uncommon occurrence).
  • Incase the way we recognise flows changes (e.g. the suite.rc file gets renamed or converted to a Python file).

What is a sensible default value for this?

Set as 4 for now as that's what's required by cylc functional test battery.

Copy link
Member

@dwsutherland dwsutherland Aug 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CLI option? or too rare to care?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think probably too rare to care, or at least the default should be sufficient that we wouldn't expect users to hit it.

@oliver-sanders
Copy link
Member Author

I've managed to do something to anger tests/f/broadcast/00-simple.t which is notoriously hard to debug but that aside should be ready for review.

The details of the new cylc scan client are all up for debate/input, it's only displaying a small number of fields at the moment but it's trivial to expand that.

@oliver-sanders oliver-sanders marked this pull request as ready for review August 7, 2020 14:53
cylc/flow/hostuserutil.py Outdated Show resolved Hide resolved
@oliver-sanders
Copy link
Member Author

oliver-sanders commented Aug 27, 2020

If this was a classic pipeline (ignoring the letter transformations here for clarity) I'd expect the following sequence of events.

  • speak yields h
  • louder(h) runs while speak is yielding e
  • longer(h) runs while louder(e) runs and speak is yielding l
  • (and so on)

In your example the generator (speak) is not asynchronous as there is no await in it anywhere. This means that the generator does not "yield" control until it has finished processing the last item so the pipe will be blocked and unable to continue. To make it async stick an await asyncio.sleep in it.

Result, no concurrency:

You have pointed out a perfectly legit issue with the pipe logic which is essentially depth-first which means we won't get any real concurrency.

@oliver-sanders
Copy link
Member Author

oliver-sanders commented Aug 27, 2020

Ok, moved to a queue-based system which allows the pipe to follow paths which aren't depth-first.

Here's an example which inserts random sleeps into each stage of the pipe:

import asyncio
from random import random

from cylc.flow.async_util import pipe


@pipe
async def a_range(x):
    for y in range(x):
        await asyncio.sleep(random())
        print(f'# a_range -> {y}')
        yield y


@pipe
async def a_double(x):
    print(f'# a_double({x})')
    await asyncio.sleep(random())
    return x ** 2


@pipe
async def a_str(x):
    print(f'# a_str({x})')
    await asyncio.sleep(random())
    return str(x)


async def consumer():
    pipe = a_range(5) | a_double | a_str
    async for item in pipe:
        print(
            f'=> {repr(item)}'
        )


asyncio.run(consumer())

If it's concurrent you should get a different order of events every time with results not necessarily yielded in order.

run 1:

# a_range -> 0
# a_double(0)
# a_str(0)
# a_range -> 1
# a_double(1)
# a_str(1)
=> '0'
# a_range -> 2
# a_double(2)
# a_str(4)
=> '1'
# a_range -> 3
# a_double(3)
# a_range -> 4
# a_double(4)
=> '4'
# a_str(9)
=> '9'
# a_str(16)
=> '16'

run 2:

# a_range -> 0
# a_double(0)
# a_str(0)
# a_range -> 1
# a_double(1)
=> '0'
# a_range -> 2
# a_double(2)
# a_str(1)
# a_range -> 3
# a_double(3)
# a_str(4)
=> '1'
=> '4'
# a_range -> 4
# a_double(4)
# a_str(16)
=> '16'
# a_str(9)
=> '9'

Setting all of the sleeps to 1 second you get the expected concurrency with all three stages running at the same time and in a predictable order:

# a_range -> 0
# a_double(0)
# a_range -> 1
# a_double(1)
# a_str(0)
# a_range -> 2  # yielding the third item
# a_double(2)   # processing the second item
# a_str(1)      # processing the first item
=> '0'
# a_range -> 3
# a_double(3)
# a_str(4)
=> '1'

@hjoliver
Copy link
Member

hjoliver commented Aug 27, 2020

Well that certainly sped things up; my "speak" example now runs in 4 seconds instead of 20.

However, your new implementation is what I referred to in Element chat yesterday to as the "extreme concurrency" model. This is more concurrency than a pipeline is supposed to have, which I think is fine (and actually desirable) for the scan application, but it kind of calls into question the pipe name and special pipeline syntax doesn't it? Because it doesn't preserve order coming out the end of the pipe.

With random sleeps in my example, I get (e.g.):

SPEAK hello
...
HEARD  L_E_H_O_L_   # should be H_E_L_L_O_ 

To belabor the point, in a shell pipeline I'd expect the file bar to be identical to foo here:

cat foo | tr b B | tr B b > bar

but your pipe would reorder the lines if some of them took much longer to "translate" than others. (Bad example if there are other B's in the file, but you get my point).

@hjoliver
Copy link
Member

In Cylc graph notation, here's a pipeline A | B | C for processing 3 items:

R3/^/P1 = """
   A => B => C
   A[-P1] => A
   B[-P1] => B
   C[-P1] => C"""

so C.3, B.2, and A.1 (e.g.) can run concurrently (they're at different points along the one pipe) but A.1, A.2, A.3 can't (that would be 3 concurrent pipes).

But you've done this, which is 3 pipes:

R3/^/P1 = "A => B => C"

@hjoliver
Copy link
Member

Maybe we could support both modes, to justify the cool syntax? Or both separately, with a slightly different name and syntax for the multiple parallel pipes case?

@dwsutherland
Copy link
Member

dwsutherland commented Aug 27, 2020

Because it doesn't preserve order coming out the end of the pipe.

I think that's fine if your printing just collects whichever is done (reached the end of the pipe) from a queue, and prints one at a time (right?).. Then if we want to order the result, we wait for all concurrency to finish and order the results (which would still be faster) (?)

@hjoliver
Copy link
Member

But how do you know what the order should be at the other end @dwsutherland ?

@dwsutherland
Copy link
Member

But how do you know what the order should be at the other end @dwsutherland ?

That's optional, user determined (or default). (?)

@hjoliver
Copy link
Member

(Note I'm not saying we don't want this maximum concurrency for cylc scan, I'm just arguing about whether or not we can call it a "pipe" and use pipe syntax to do it, because classic pipelines - like shell command pipes for example - don't behave like that).

@dwsutherland
Copy link
Member

(Note I'm not saying we don't want this maximum concurrency for cylc scan, I'm just arguing about whether or not we can call it a "pipe" and use pipe syntax to do it, because classic pipelines - like shell command pipes for example - don't behave like that).

I guess I need to understand it more, I thought the pipe bit was sequential, and the concurrent bit was the multiple pushes through the pipe

@hjoliver
Copy link
Member

hjoliver commented Aug 27, 2020

See my description of the Cylc graph pipeline implementation above. A pipeline has a special kind of concurrency, like (literally) a single pipe with multiple processors spaced along its length. That's why it's called a "pipe". There's only one of each processor, and the data passes through the pipe sequentially, but once the pipe is full all of the processors are running concurrently (on different data elements). That's still more concurrency than passing one data element through the whole pipe before pushing the next element in the front, which is what was happening on this branch yesterday. But it's less concurrency than creating a whole new pipe for each data element.

@hjoliver
Copy link
Member

(Sorry @oliver-sanders - we should have this conversation on Element rather than cluttering up your PR!)

@dwsutherland
Copy link
Member

Latest push looks, appears functionally sound 👍

And concurrent "pipes" working:

(oliver-flow) sutherlander@cortex-vbox:bin$ time ./foo.py 
SPEAK hello
(spork h)
(spork e)
(spork l)
(spork l)
(spork o)
SPOKE hello
 > upper h (sleep...)
 > upper e (sleep...)
 > upper l (sleep...)
 > upper l (sleep...)
 > upper o (sleep...)
 < upper h -> H
 > longer H (sleep...)
 < upper e -> E
 > longer E (sleep...)
 < upper l -> L
 > longer L (sleep...)
 < upper l -> L
 > longer L (sleep...)
 < upper o -> O
 > longer O (sleep...)
 < longer H -> H_
(consumed H_)
 < longer E -> E_
(consumed E_)
 < longer L -> L_
 < longer L -> L_
(consumed L_)
 < longer O -> O_
(consumed L_)
(consumed O_)
HEARD H_E_L_L_O_

real	0m4.079s
user	0m3.981s
sys	0m0.076s

Comment on lines +227 to +231
A generator to begin our pipe with:
>>> @pipe
... async def arange():
... for i in range(10):
... yield i
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In your example the generator (speak) is not asynchronous as there is no await in it anywhere.

I was following this example of yours in the code (although I admit to being confused about whether an async generator needs an await or not - I thought maybe yield passes control back to the event loop).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(OK I think I understand how that works now).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yield yields control to a generator in synchronous code, await yields control to a coroutine in asynchronous code. We have an async-generator here, both paradigms still apply but only in their own context.

@hjoliver
Copy link
Member

hjoliver commented Aug 28, 2020

And concurrent "pipes" working:

That's only with equal sleeps in my coroutines. If you change to asyncio.sleep(random()) the order will not be preserved. Which as discussed (at length at this point!) is OK for cylc scan but it's not a "pipe".

@dwsutherland
Copy link
Member

dwsutherland commented Aug 28, 2020

And concurrent "pipes" working:

That's only with equal sleeps in my coroutines. If you change to asyncio.sleep(random()) the order will not be preserved. Which as discussed (at length at this point!) is OK for cylc scan but it's not a "pipe".

Agreed. We don't need all the scanned workflows to collectively form something coherent (e.g. H_E_L_L_O_) .

@oliver-sanders
Copy link
Member Author

However, your new implementation is what I referred to in Element chat yesterday to as the "extreme concurrency" model.

Yes, and purposefully so.

Because it doesn't preserve order coming out the end of the pipe.

It never proclaimed to, it's an async pipe.

It's possible to add a "preserve order" mode at some point, however, this isn't desirable for the cylc scan case (we have cylc scan --sort for those purposes anyway).

The main motives were:

  • To move IO time into async-sleep time so that TUI/UIS/whatever can do useful things whilst the scan is happening. In my UIS PR the UIS starts opening connections to new flows whilst the scan it still doing its thing.
  • To handle FS/network delays like, for example, NFS taking 5 seconds to read one flow's contact file or to list one directory. Concurrency protects us against worst-case scenarios and delays affect the minimum possible subset of flows.

@oliver-sanders
Copy link
Member Author

oliver-sanders commented Aug 28, 2020

Ok three more commit which:

  • Make async pipes ordered by default.
    • This enables "classic" pipes which still benefit from the same concurrency as unordered pipes.
    • Change the behaviour using pipe.preserver_order=(True|False).
  • Make filesystem scanning more concurrent (was going to do this in followup).
    • This allows multiple scandir operations to happen concurrently.
    • This better protects us against a single operation hanging.

Note: Order in async pipes isn't necessarily meaningful as async generators don't necessarily yield results in any kind of dependable order anyway (and in our case the scan generator doesn't).

@dwsutherland, @datamel you might want to cast your eyes over these commits as they change the async logic.

@hjoliver
Copy link
Member

hjoliver commented Aug 28, 2020

It never proclaimed to, it's an async pipe.

It is async, for sure; I'm just arguing that it's not a pipe, it's a more like a bunch of pipes.

My classic pipe still has concurrency (albeit less of it), so I presume it could still have an async implementation and so still be called an "async pipe".

Of course if "async pipe" is actually a known thing and doesn't have classic pipe behavior then I guess I'm just wrong about the meaning of the term.

But either way there certainly are potential applications where preserved ordering would be expected, like line-by-line text file processing.

Make async pipes ordered by default.

  • This enables "classic" pipes which still benefit from the same concurrency as unordered pipes.

Great, even better 👍

@hjoliver
Copy link
Member

hjoliver commented Aug 28, 2020

I forgot to say up above, it's a great idea (especially with the pipe syntax) and I reckon you should consider making it available as a separate self-contained Python package. Then, as a generic library package, I'm sure people would want both kinds of behaviour for different applications.

Copy link
Member

@hjoliver hjoliver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy now 👍 (Thanks for assuaging my pipe-OCD 😬 )

@hjoliver hjoliver merged commit 6205645 into cylc:master Aug 31, 2020
@oliver-sanders oliver-sanders deleted the cylc-scan-nt branch September 1, 2020 08:27
@hjoliver hjoliver modified the milestones: cylc-8.0a3, cylc-8.0b0 Feb 25, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
efficiency For notable efficiency improvements
Projects
None yet
Development

Successfully merging this pull request may close these issues.

scan: simplifaction
6 participants