Skip to content
This repository was archived by the owner on Mar 20, 2023. It is now read-only.

RuntimeError: Task got bad yield: 200 #61

Closed
bisoldi opened this issue Mar 25, 2019 · 7 comments
Closed

RuntimeError: Task got bad yield: 200 #61

bisoldi opened this issue Mar 25, 2019 · 7 comments

Comments

@bisoldi
Copy link

bisoldi commented Mar 25, 2019

I'm getting a Task got bad yield: 200 error while using the bulk method. I've tried this different ways (i.e. without my own generator function). I've also tried an unofficial Helpers class that was written and I get the same error.

Here is the stack trace:

File "/Users/brooks.isoldi/git/Futures/css/ingest/PythonElasticsearchIngest/venv/lib/python3.6/site-packages/elasticsearch_async/transport.py", line 150, in main_loop
    method, url, params, body, headers=headers, ignore=ignore, timeout=timeout)
RuntimeError: Task got bad yield: 200

Below is the relevant code:

from elasticsearch import RequestsHttpConnection
from elasticsearch_async import AsyncElasticsearch
from assume_role_aws4auth import AssumeRoleAWS4Auth

credentials = boto3.Session().get_credentials()
awsauth = AssumeRoleAWS4Auth(credentials, 'us-east-1', 'es')
event_loop = asyncio.get_event_loop()
es_client = AsyncElasticsearch(hosts=['https://MY-ES_HOST'], http_compress=True, http_auth=awsauth, use_ssl=True,
                               verify_certs=True, connection_class=RequestsHttpConnection, loop=event_loop)


def read_chunk(file_path: str, max_batch_size: int, max_records: int):
    actions: str = ''
    actions_size: int = 0
    num_actions: int = 0
    with gzip.open(file_path, 'rt') as f:
        for line in f:
            request = json.dumps(dict({'index': dict({})})) + '\n' + line + '\n'
            request_size = len(request.encode('utf-8'))

            # Check to see if this record will put us over the limits
            if (actions_size + request_size) > max_batch_size or num_actions == max_records:
                yield actions
                actions = ''
                num_actions = 0
                actions_size = 0

            # Add the record
            actions += request
            num_actions += 1
            actions_size += request_size

    if actions != '':
        yield actions


async def process(filename: str):
    for action_chunk in read_chunk(filename, 10000000, 500):
        try:
            resp = await es_client.bulk(body=action_chunk, index='logs', doc_type='doc', _source=False)
            logger.error(resp)
        except Exception as ex:
            logger.error('Found an exception')
            logger.error(''.join(traceback.format_exception(etype=type(ex), value=ex, tb=ex.__traceback__)))
        await asyncio.sleep(.1)


event_loop.run_until_complete(process(filename))
pending = asyncio.Task.all_tasks()
event_loop.run_until_complete(asyncio.gather(*pending))

Any thoughts?

@ipeluffo
Copy link

ipeluffo commented Jul 15, 2019

@bisoldi , the issue here is that you used requests connection class (i.e. RequestsHttpConnection) and then AsyncElasticsearch tries to make the request using the async/await which are incompatible.

@bisoldi
Copy link
Author

bisoldi commented Jul 15, 2019

Hi @ipeluffo thank you for that! Do you know which connection class I should use when using the AsyncElasticsearch client?

I'm guessing the AIOHttpConnection from elasticsearch_async project?

@ipeluffo
Copy link

Ideally yes but you'll have the issue of signing requests for AWS

@hamx0r
Copy link

hamx0r commented Oct 30, 2019

I'm having the same issue where, if i use AIOHttpConnection, I'm currently stuck with HTTP Basic Auth tuple as my only choice for auth, yet I need to be able to use my AWS Key and Secret. I don't see anywhere online explaining how i can use a Key and Secret as Basic Auth params. Looks like AIOHttpConnection needs to be extended to allow for AWS4Auth objects. Any other solutions out there?

@bisoldi
Copy link
Author

bisoldi commented Oct 30, 2019

Been quite a while, but if I remember correctly, the advice above worked, without having any request signing issues. I was using an IAM Role though, not User...

@hamx0r
Copy link

hamx0r commented Oct 30, 2019

Thanks @bisoldi . So are you sending your IAM Role user/password with using HTTP BasicAuth (ie with something like:
AsyncElasticsearch(http_auth=(user, password), connection_class=AIOHttpConnection)? )

If i use AIOHttpConnection, it only lets me use a username and password, so it looks like one is limited to using IAM Roles or other IAM entities with users/passwords, and not AWS Keys and Secrets. I'll try to get an IAM user/pass for my case, though for posterity, I've been digging into the code and aiohttp.client_reqrep looks like the file to use for updating the request headers using the code of aws4auth.py's __call__() method, which mainly sets a bunch of headers on each request. It does not look trivial.

@sethmlarson
Copy link
Contributor

sethmlarson commented Jul 20, 2020

The sync bulk helper doesn't support being used with an AsyncElasticsearch instance. There are async bulk helpers in the elasticsearch[async] package. See #81 for more information.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants