-
Notifications
You must be signed in to change notification settings - Fork 1
Async http client #8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
36 commits
Select commit
Hold shift + click to select a range
28cb974
rename http.py to client.py due to causing No Module Found error for …
SgtMarmite 9913dfd
httpx initial implementation
SgtMarmite 1429b68
methods update, post, patch, delete
SgtMarmite 162833d
update methods
SgtMarmite 8d4a9ac
httpx updates
SgtMarmite 00546a9
add raw methods
SgtMarmite 7006391
update client reference
SgtMarmite c0e2bba
cleanup, readme update
SgtMarmite e98f81b
move examples to separate folder
SgtMarmite 2065246
add new tests
SgtMarmite 3310379
update tests
SgtMarmite 7dcdd61
finalize tests
SgtMarmite 35033cc
add Storage API example to docs
SgtMarmite 7406d34
update backoff calculation
SgtMarmite c294a6a
remove unused imports/installs
SgtMarmite 8033dc9
remove more unused imports/installs
SgtMarmite 5c29ef2
add update_auth_header method + tests
SgtMarmite 3c54755
Update async_storage_client.py
SgtMarmite fedba9c
add correct typehints
SgtMarmite b387e31
enable rate limiting using aiolimiter
SgtMarmite 56bb576
add process_multiple method and example
SgtMarmite 865a711
refactor retry strategy
SgtMarmite 7270fa1
fix tests
SgtMarmite 677b0f5
add workaround to disable INFO msgs coming from httpx library
SgtMarmite f8f4d96
add retry warning message
SgtMarmite 1ccea91
reduce debug messages
SgtMarmite 2acc6bc
remove debug param
SgtMarmite e269ddf
revert logging setup
SgtMarmite 90edc2a
silence httpcore debug messages
SgtMarmite 782f4d5
reenable debug
c663646
Update async_client.py
91f5233
Update async_client.py
4f1b061
bump httpx version
cfe3c9b
make max_requests per second float
6ec6c37
added detailed exception message
kudj 6c59f8a
added detailed exception message
kudj File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -139,4 +139,7 @@ dmypy.json | |
| .idea/ | ||
|
|
||
| # MacOS files | ||
| .DS_Store | ||
| .DS_Store | ||
|
|
||
| # Local test script | ||
| /docs/examples/test.py | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| import time | ||
| import asyncio | ||
| from keboola.http_client import AsyncHttpClient | ||
| import csv | ||
| import httpx | ||
| import os | ||
|
|
||
|
|
||
| async def fetch_pokemon(client, poke_id): | ||
| try: | ||
| r = await client.get(str(poke_id)) | ||
| return r | ||
| except httpx.HTTPStatusError as e: | ||
| if e.response.status_code == 404: | ||
| return None | ||
| else: | ||
| raise | ||
|
|
||
|
|
||
| async def save_to_csv(details): | ||
| filename = "pokemon_details.csv" | ||
| fieldnames = ["name", "height", "weight"] | ||
|
|
||
| file_exists = os.path.isfile(filename) | ||
| mode = "a" if file_exists else "w" | ||
|
|
||
| with open(filename, mode, newline="") as csvfile: | ||
| writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | ||
|
|
||
| if not file_exists: | ||
| writer.writeheader() | ||
|
|
||
| writer.writerow({ | ||
| "name": details["name"], | ||
| "height": details["height"], | ||
| "weight": details["weight"] | ||
| }) | ||
|
|
||
|
|
||
| async def main_async(): | ||
| base_url = "https://pokeapi.co/api/v2/pokemon/" | ||
| start_time = time.time() | ||
|
|
||
| async with AsyncHttpClient(base_url=base_url, max_requests_per_second=20) as c: | ||
| poke_id = 1 | ||
|
|
||
| while True: | ||
| details = await fetch_pokemon(c, poke_id) | ||
| if details is None: | ||
| break | ||
|
|
||
| await save_to_csv(details) | ||
|
|
||
| poke_id += 1 | ||
|
|
||
| end_time = time.time() | ||
| print(f"Async: Fetched details for {poke_id - 1} Pokémon in {end_time - start_time:.2f} seconds.") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(main_async()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| import asyncio | ||
| import csv | ||
| import time | ||
| from typing import List | ||
|
|
||
| from keboola.http_client import AsyncHttpClient | ||
|
|
||
|
|
||
| def generate_jobs(nr_of_jobs): | ||
| return [{'method': 'GET', 'endpoint': str(endpoint)} for endpoint in range(1, nr_of_jobs+1)] | ||
|
|
||
| def save_to_csv(results: List[dict]): | ||
| filename = "pokemon_details.csv" | ||
| fieldnames = ["name", "height", "weight"] # Define the fields you want to store | ||
|
|
||
| with open(filename, "w", newline="") as csvfile: | ||
| writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | ||
| writer.writeheader() | ||
| for result in results: | ||
| writer.writerow({ | ||
| "name": result["name"], | ||
| "height": result["height"], | ||
| "weight": result["weight"] | ||
| }) | ||
|
|
||
| async def main_async(): | ||
| base_url = "https://pokeapi.co/api/v2/pokemon/" | ||
| start_time = time.time() | ||
|
|
||
| client = AsyncHttpClient(base_url=base_url, max_requests_per_second=20) | ||
|
|
||
| jobs = generate_jobs(1000) | ||
|
|
||
| results = await client.process_multiple(jobs) | ||
| await client.close() | ||
|
|
||
| end_time = time.time() | ||
| print(f"Fetched details for {len(results)} Pokémon in {end_time - start_time:.2f} seconds.") | ||
|
|
||
| save_to_csv(results) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(main_async()) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| from keboola.http_client import HttpClient | ||
|
|
||
| BASE_URL = 'https://connection.keboola.com/v2/storage' | ||
| MAX_RETRIES = 3 | ||
|
|
||
|
|
||
| class KBCStorageClient(HttpClient): | ||
|
|
||
| def __init__(self, storage_token): | ||
| HttpClient.__init__(self, base_url=BASE_URL, max_retries=MAX_RETRIES, backoff_factor=0.3, | ||
| status_forcelist=(429, 500, 502, 504), | ||
| default_http_header={"X-StorageApi-Token": storage_token}) | ||
|
|
||
| def get_files(self, show_expired=None): | ||
| params = {"include": show_expired} | ||
| return self.get('tables', params=params, timeout=5) | ||
|
|
||
| cl = KBCStorageClient("my_token") | ||
|
|
||
| print(cl.get_files()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| import asyncio | ||
| from keboola.http_client import AsyncHttpClient | ||
|
|
||
| BASE_URL = 'https://connection.keboola.com/v2/storage' | ||
| MAX_RETRIES = 3 | ||
|
|
||
| class KBCStorageClient(AsyncHttpClient): | ||
|
|
||
| def __init__(self, storage_token): | ||
| super().__init__( | ||
| base_url=BASE_URL, | ||
| retries=MAX_RETRIES, | ||
| backoff_factor=0.3, | ||
| retry_status_codes=[429, 500, 502, 504], | ||
| auth_header={"X-StorageApi-Token": storage_token} | ||
| ) | ||
|
|
||
| async def get_files(self, show_expired=False): | ||
| params = {"showExpired": show_expired} | ||
| response = await self.get('tables', params=params, timeout=5) | ||
| return response | ||
|
|
||
| async def main(): | ||
| cl = KBCStorageClient("my_token") | ||
| files = await cl.get_files(show_expired=False) | ||
| print(files) | ||
|
|
||
| asyncio.run(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1,3 @@ | ||
| requests | ||
| requests | ||
| httpx==0.27.0 | ||
| aiolimiter==1.1.0 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1,2 @@ | ||
| from .http import HttpClient # noqa | ||
| from .client import HttpClient # noqa | ||
| from .async_client import AsyncHttpClient # noqa |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to achieve this without having the
main_asyncfunctionasync? E.g. something like this My idea was to have a function, that would run and wait for multiple jobs and just returned all the results (with the disadvantage of having to store all results in memory). Then I would just call something likeresults = client.run_multiple_sync()in my sync function. Is that possible?Also I somehow dislike the name
process_multiplewhat does it do? It actually sends multiple requests (not jobs) and returns list of result futures right? The request is basically the parameters of_request_rawhttp.py equivalent. I think it should be named something likerun_multiple_requests(). Also I can imagine having equivalent for each method, e.g.post_multiple