-
Notifications
You must be signed in to change notification settings - Fork 48
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement async requests #4
Comments
I think this would be great. In terms of implementation, I'm a fan of how httpx structures its library: https://github.com/encode/httpx, e.g. https://github.com/encode/httpx/blob/master/httpx/_client.py. It's very explicit about an AsyncClient and (sync) Client being separate, and doesn't try to magically use async / sync. It leads to some code duplication, especially around function signatures, but the predictability in performance makes that worth it IMO. |
Just dropping this hacky implementation of an async search, which using pystac-client to build up the parameters and then async def query(intersects, max_connections=20):
search_start = "2018-01-01"
search_end = "2019-12-31"
catalog = pystac_client.Client.open("https://planetarycomputer.microsoft.com/api/stac/v1")
# The time frame in which we search for non-cloudy imagery
search = catalog.search(
collections=["sentinel-2-l2a"],
intersects=intersects,
datetime=[search_start, search_end],
query={"eo:cloud_cover": {"lt": 10}},
limit=500
)
parameters = search.get_parameters()
results = []
timeout = httpx.Timeout(None, connect=20, read=120)
if isinstance(max_connections, int):
max_connections = asyncio.Semaphore(max_connections)
async with httpx.AsyncClient(timeout=timeout) as client:
async with max_connections:
r = await client.post(search.url, json=parameters)
resp = r.json()
results.extend(resp["features"])
next_link = [x for x in resp["links"] if x["rel"] == "next"]
if next_link:
next_link, = next_link
while next_link:
async with max_connections:
r = await client.post(next_link["href"], json=next_link["body"])
resp = r.json()
results.extend(resp["features"])
next_link = [x for x in resp["links"] if x["rel"] == "next"]
if next_link:
next_link, = next_link
return results I timed that doing 20 searches sequentially, and then 20 searches concurrently (using the single-threaded event loop). I saw about a 5-6x speedup with the concurrent approach. I haven't carefully benchmarked how much the event loop is being blocked by the JSON parsing, but IIRC the split was ~90% I/O, 10% JSON parsing. Notebook is at https://gist.github.com/TomAugspurger/50c3573d39213a2cb450d02074e4db01 |
@matthewhanson whats the status of this? Is this something I can work on? |
As an alternative to using HTTPX for concurrent queries, I experimented with gevent. I used @TomAugspurger's gist (thank you Tom!) as a basis for my own gist (including some refactoring to accommodate slides for a lighting talk at STAC Sprint 8): https://gist.github.com/chuckwondo/6e16cbbc44f8b0e0be41f493c4511796 The summary of the results of running 50 search queries (YMMV):
@gadomski and I chatted at the STAC Sprint about potentially testing the waters with gevent within only the CLI initially |
The other benefit of having an async option of StacApiIO is that you could then enable "direct" access to asgi implementations with httpx using the app/base_url parameters. So for instance, with stac-fastapi-pgstac, you could do something like:
This could enable direct access using pystac-client to a pgstac database without needing to have a running instance of stac-fastapi which would cut network in half as data would not have to go from database->server->client. |
Would we be willing to switch over to exclusively asynchronous? |
If we did, I'd want to keep a "blocking" API, as there's some situations where |
No description provided.
The text was updated successfully, but these errors were encountered: