Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broken pipe when reading AWS Batch logs #915

Closed
2 tasks done
dazza-codes opened this issue Jan 30, 2022 · 3 comments · Fixed by #926
Closed
2 tasks done

Broken pipe when reading AWS Batch logs #915

dazza-codes opened this issue Jan 30, 2022 · 3 comments · Fixed by #926

Comments

@dazza-codes
Copy link
Contributor

dazza-codes commented Jan 30, 2022

Describe the bug

When reading AWS Batch logs, there are frequent failures to read the entire data stream, with errors like the following (not necessarily in this order):

Connection was closed before we received a valid response from endpoint URL: "https://logs.us-east-1.amazonaws.com/".
An HTTP Client raised an unhandled exception: [Errno 32] Broken pipe
An HTTP Client raised an unhandled exception: Response payload is not completed

(See full traceback below for one or more of these errors.)

Could this be fixed by using noted solutions to await the EOF, see:
aio-libs/aiohttp#4581

Checklist

  • I have reproduced in environment where pip check passes without errors
  • [-] I have provided pip freeze results
    • cannot do this on private code bases
  • I have provided sample code or detailed way to reproduce
    • just try to read a lot of log streams for many AWS batch jobs, it will show up
  • [-] I have tried the same code in botocore to ensure this is an aiobotocore specific issue
    • it looks like an issue with aiohttp and likely how it is used in aiobotocore
  • [-] I have tried similar code in aiohttp to ensure this is is an aiobotocore specific issue
    • there is an issue linked above on aiohttp and comments there indicate ways to avoid these errors
  • [-] I have checked the latest and older versions of aiobotocore/aiohttp/python to see if this is a regression / injection
    • no

Sorry, I can't do all those things for this, or I might never get this issue submitted.

Environment:

  • Python Version: 3.7
  • OS name and version: Ubuntu 20.04

Versions

$ pip show aiobotocore
Name: aiobotocore
Version: 2.0.1
Summary: Async client for aws services using botocore and aiohttp
Home-page: https://github.com/aio-libs/aiobotocore
Author: Nikolay Novik
Author-email: nickolainovik@gmail.com
License: Apache 2
Location: /opt/conda/envs/gis/lib/python3.7/site-packages
Requires: aiohttp, aioitertools, botocore, wrapt
Required-by: aio-aws, s3fs

$ poetry show aiobotocore
name         : aiobotocore
version      : 2.0.1
description  : Async client for aws services using botocore and aiohttp

dependencies
 - aiohttp >=3.3.1
 - aioitertools >=0.5.1
 - boto3 >=1.19.8,<1.19.9
 - botocore >=1.22.8,<1.22.9
 - wrapt >=1.10.10

Additional context

This is a relevant snippet of code from aio-aws, see a full listing at

Note that I tried to use the response to read the events as a stream, but that's not right.

    async with config.create_logs_client() as logs_client:
        for tries in range(config.retries + 1):
            try:
                log_events = []

                forward_token = None
                while True:
                    kwargs = {
                        "logGroupName": "/aws/batch/job",
                        "logStreamName": log_stream_name,
                        "startFromHead": True,
                        # "startTime": 123,
                        # "endTime": 123,
                        # "limit": 123,
                    }
                    if forward_token:
                        kwargs["nextToken"] = forward_token

                    await jitter(task_name, 0.0001, 0.01)
                    response = await logs_client.get_log_events(**kwargs)

                    log_page_events = response.get("events", [])
                    log_events.extend(log_page_events)

                    # # Note: for the record, cannot read the response as a stream
                    # if response and response_success(response):
                    #     response_events = response.get("events")
                    #     if response_events:
                    #         async with response_events as stream:
                    #             log_page_events = await stream.read()
                    #             log_events.extend(log_page_events)

                    next_forward_token = response.get("nextForwardToken")
                    if next_forward_token is None:
                        break
                    if forward_token == next_forward_token:
                        break
                    forward_token = next_forward_token

                if log_events:
                    LOGGER.info(
                        "AWS %s (%s:%s) events: %d",
                        task_name,
                        job.job_name,
                        job.job_id,
                        len(log_events),
                    )
                    job.logs = log_events
                    if config.aio_batch_db:
                        await config.aio_batch_db.save_job_logs(job)
                else:
                    LOGGER.warning(
                        "AWS Batch job (%s:%s) has no log events",
                        job.job_name,
                        job.job_id,
                    )

                return log_events

            except botocore.exceptions.ClientError as err:
                error = err.response.get("Error", {})
                if error.get("Code") in RETRY_EXCEPTIONS:
                    # add an extra random sleep period to avoid API throttle
                    await jitter(task_name, config.min_jitter, config.max_jitter)
                else:
                    raise

        raise RetryError(f"AWS {task_name} exceeded retries")

Stacktrace:

  File "/opt/conda/envs/gis/lib/python3.7/site-packages/aio_aws/aio_aws_batch.py", line 896, in aio_batch_get_logs
    result = await task
  File "/opt/conda/envs/gis/lib/python3.7/asyncio/tasks.py", line 560, in _wait_for_one
    return f.result()  # May raise f.exception().
  File "/opt/conda/envs/gis/lib/python3.7/site-packages/aio_aws/aio_aws_batch.py", line 449, in aio_batch_job_logs
    response = await logs_client.get_log_events(**kwargs)
  File "/opt/conda/envs/gis/lib/python3.7/site-packages/aiobotocore/client.py", line 167, in _make_api_call
    operation_model, request_dict, request_context)
  File "/opt/conda/envs/gis/lib/python3.7/site-packages/aiobotocore/client.py", line 186, in _make_request
    return await self._endpoint.make_request(operation_model, request_dict)
  File "/opt/conda/envs/gis/lib/python3.7/site-packages/aiobotocore/endpoint.py", line 83, in _send_request
    exception):
  File "/opt/conda/envs/gis/lib/python3.7/site-packages/aiobotocore/endpoint.py", line 216, in _needs_retry
    caught_exception=caught_exception, request_dict=request_dict)
  File "/opt/conda/envs/gis/lib/python3.7/site-packages/aiobotocore/hooks.py", line 29, in _emit
    response = handler(**kwargs)
  File "/opt/conda/envs/gis/lib/python3.7/site-packages/botocore/retryhandler.py", line 183, in __call__
    if self._checker(attempts, response, caught_exception):
  File "/opt/conda/envs/gis/lib/python3.7/site-packages/botocore/retryhandler.py", line 251, in __call__
    caught_exception)
  File "/opt/conda/envs/gis/lib/python3.7/site-packages/botocore/retryhandler.py", line 269, in _should_retry
    return self._checker(attempt_number, response, caught_exception)
  File "/opt/conda/envs/gis/lib/python3.7/site-packages/botocore/retryhandler.py", line 317, in __call__
    caught_exception)
  File "/opt/conda/envs/gis/lib/python3.7/site-packages/botocore/retryhandler.py", line 223, in __call__
    attempt_number, caught_exception)
  File "/opt/conda/envs/gis/lib/python3.7/site-packages/botocore/retryhandler.py", line 359, in _check_caught_exception
    raise caught_exception
  File "/opt/conda/envs/gis/lib/python3.7/site-packages/aiobotocore/endpoint.py", line 147, in _do_get_response
    http_response = await self._send(request)
  File "/opt/conda/envs/gis/lib/python3.7/site-packages/aiobotocore/endpoint.py", line 229, in _send
    return await self.http_session.send(request)
  File "/opt/conda/envs/gis/lib/python3.7/site-packages/aiobotocore/httpsession.py", line 224, in send
    raise HTTPClientError(error=e)
botocore.exceptions.HTTPClientError: An HTTP Client raised an unhandled exception: [Errno 32] Broken pipe

@thehesiod
Copy link
Collaborator

can you try the aiohttp fix? seem like the best in position to test this

@thehesiod
Copy link
Collaborator

I believe this is an issue with aiohttp and not aiobotocore, we do not do anything specific in regard to connections, however let me c if we're missing a retry for this type of error

@thehesiod
Copy link
Collaborator

ok, so in the old error handling we had ClientPayloadError as retryable: https://github.com/aio-libs/aiobotocore/blob/master/aiobotocore/_endpoint_helpers.py#L12

however now exceptions are mapped to match botocore and this one falls through to the generic case: https://github.com/aio-libs/aiobotocore/blob/master/aiobotocore/httpsession.py#L218 so it doesn't get retried

So checking botocore i believe this goes under ProtocolError and will get retried, adding it back in #926

@thehesiod thehesiod linked a pull request Mar 17, 2022 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants