Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine async and multiproc for speedup #6

Closed
lmmx opened this issue Mar 23, 2021 · 5 comments
Closed

Combine async and multiproc for speedup #6

lmmx opened this issue Mar 23, 2021 · 5 comments
Labels
difficult Extra attention is needed

Comments

@lmmx
Copy link
Owner

lmmx commented Mar 23, 2021

Look into the aioprocessing library

  • ChannelListings waits for all requests to return (even though they are handled asynchronously and could therefore use a callback)
  • Once all schedule GET requests are returned, they are parsed on all cores with multiprocessing
  • The multiprocessing is a synchronous, blocking step, and rewriting it with aioprocessing would mean the batches begin before the last GET request is returned: meaning they would surely finish sooner
  • This would shave a bit of waiting time off the listings generation process, and make it feasible to generate listings for multiple channels
@lmmx lmmx added the difficult Extra attention is needed label Mar 23, 2021
@lmmx
Copy link
Owner Author

lmmx commented Mar 24, 2021

ProgrammeCatalogue is built by iterating over the episodes in the list ChannelListings.all_broadcasts, and avoids pulling already-encountered programmes ("brands") by keeping a list of episode titles [and importantly, these are only added from the programme/"brand" title once returned from the EpisodeMetadataPidJson]

It would no longer be possible to avoid these superfluous GET requests if the GET requests for episode metadata become asynchronous (or more optimistically: it would not be possible within an async batch)

Nevertheless I expect it would give a speedup, so I will retain both forms when refactoring the synchronous approach to an async one (so I can compare before/after)

The current time to load the past 30 days is about 40-60 seconds

@lmmx
Copy link
Owner Author

lmmx commented Mar 24, 2021

The problem as it stands is that the EpisodeMetadataPidJson class (by virtue of being based on beeb.api.serialisation.PullMixIn) is only able to be pulled synchronously

We can defer this from happening on init (beeb.api.serialisation.JsonHandler gates the pull() call by the init kwarg defer_pull=False) but the .url attribute will still need to be parsed: this can be swapped in by making a new method in beeb.share.http_utils

@lmmx
Copy link
Owner Author

lmmx commented Mar 24, 2021

Error (timeout?) occurring due to bug in the HTTP2 implementation in httpx encode/httpx#841

Click to show traceback

Traceback (most recent call last):
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpx/_exceptions.py", line 326, in map_exceptions
    yield
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpx/_client.py", line 1492, in _send_single_request
    (status_code, headers, stream, ext) = await transport.arequest(
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpx/_transports/default.py", line 169, in arequest
    return await self._pool.arequest(
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpcore/_async/connection_pool.py", line 218, in arequest
    response = await connection.arequest(
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpcore/_async/connection.py", line 106, in arequest
    return await self.connection.arequest(method, url, headers, stream, ext)
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpcore/_async/http2.py", line 119, in arequest
    return await h2_stream.arequest(method, url, headers, stream, ext)
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpcore/_async/http2.py", line 297, in arequest
    status_code, headers = await self.receive_response(timeout)
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpcore/_async/http2.py", line 371, in receive_response
    event = await self.connection.wait_for_event(self.stream_id, timeout)
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpcore/_async/http2.py", line 198, in wait_for_event
    await self.receive_events(timeout)
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpcore/_async/http2.py", line 215, in receive_events
    raise RemoteProtocolError(event)
httpcore.RemoteProtocolError: <ConnectionTerminated error_code:ErrorCodes.NO_ERROR, last_stream_id:1999, additional_data:None>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/louis/dev/beeb/src/beeb/nav/sched/catalogue.py", line 16, in __init__
    self.pull_and_parse(listings, async_pull=async_pull)
  File "/home/louis/dev/beeb/src/beeb/nav/sched/catalogue.py", line 20, in pull_and_parse
    return self.async_pull_and_parse(listings)
  File "/home/louis/dev/beeb/src/beeb/nav/sched/catalogue.py", line 45, in async_pull_and_parse
    fetch_episode_metadata(listings, pbar=pbar, verbose=verbose)
  File "/home/louis/dev/beeb/src/beeb/nav/sched/async_utils.py", line 98, in fetch_episode_metadata
    return asyncio.run(async_fetch_episodes(listings, pbar, verbose))
  File "/home/louis/miniconda3/lib/python3.8/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/home/louis/miniconda3/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/home/louis/dev/beeb/src/beeb/nav/sched/async_utils.py", line 94, in async_fetch_episodes
    return await zs
  File "/home/louis/miniconda3/lib/python3.8/site-packages/aiostream/core.py", line 32, in wait_stream
    async for item in streamer:
  File "/home/louis/miniconda3/lib/python3.8/site-packages/aiostream/stream/advanced.py", line 59, in base_combine
    result = task.result()
  File "/home/louis/miniconda3/lib/python3.8/site-packages/aiostream/stream/combine.py", line 73, in smap
    async for item in streamer:
  File "/home/louis/miniconda3/lib/python3.8/site-packages/aiostream/stream/advanced.py", line 59, in base_combine
    result = task.result()
  File "/home/louis/miniconda3/lib/python3.8/site-packages/aiostream/stream/create.py", line 85, in call
    yield await func(*args, **kwargs)
  File "/home/louis/miniconda3/lib/python3.8/site-packages/aiostream/stream/transform.py", line 52, in starfunc
    return await func(*args)
  File "/home/louis/dev/beeb/src/beeb/nav/sched/async_utils.py", line 12, in fetch
    response = await session.get(str(url))
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpx/_client.py", line 1539, in get
    return await self.request(
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpx/_client.py", line 1361, in request
    response = await self.send(
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpx/_client.py", line 1396, in send
    response = await self._send_handling_auth(
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpx/_client.py", line 1434, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpx/_client.py", line 1466, in _send_handling_redirects
    response = await self._send_single_request(request, timeout)
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpx/_client.py", line 1492, in _send_single_request
    (status_code, headers, stream, ext) = await transport.arequest(
  File "/home/louis/miniconda3/lib/python3.8/contextlib.py", line 131, in __exit__
    self.gen.throw(type, value, traceback)
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpx/_exceptions.py", line 343, in map_exceptions
    raise mapped_exc(message, **kwargs) from exc  # type: ignore
httpx.RemoteProtocolError: <ConnectionTerminated error_code:ErrorCodes.NO_ERROR, last_stream_id:1999, additional_data:None>

The last line in particular: "NO_ERROR", "additional_data:None"

Consider switching http2 off for the broadcasts (or... batch/retry?)

Solution: RTFM! As mentioned on the above issue tracker thread, it's not a bug but a feature of httpx.AsyncClient (unlike an individual GET request), the timeout default is 5 seconds, the default for this can be increased to avoid the exception being thrown

client = httpx.Client()              # Use a default 5s timeout everywhere.
client = httpx.Client(timeout=10.0)  # Use a default 10s timeout everywhere.
client = httpx.Client(timeout=None)  # Disable all timeouts by default.

Upon setting this, I'm still seeing timeout errors (hanging indefinitely if set to None), even with HTTP/1, so it's not due to HTTP/2 specifically (I'd suggest a retry mechanism in that case)

I don't get the timeouts so frequently when I set the task limit lower (I tried increasing from 30 to 100, definitely saw more hangs/timeouts at 100)

  • Resolved: turned off HTTP2 (but kept it for schedule listing queries), task limit 20

@lmmx
Copy link
Owner Author

lmmx commented Mar 25, 2021

Still obtaining an error (rarely) from HTTP/1.1 (this one?)

Traceback (most recent call last):                                                                                          
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpx/_exceptions.py", line 326, in map_exceptions               
    yield                                                                                                                   
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpx/_client.py", line 1492, in _send_single_request            
    (status_code, headers, stream, ext) = await transport.arequest(                                                         
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpx/_transports/default.py", line 169, in arequest             
    return await self._pool.arequest(                                                                                       
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpcore/_async/connection_pool.py", line 218, in arequest       
    response = await connection.arequest(                                                                                   
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpcore/_async/connection.py", line 106, in arequest            
    return await self.connection.arequest(method, url, headers, stream, ext)                                                
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpcore/_async/http11.py", line 72, in arequest                 
    ) = await self._receive_response(timeout)                                                                               
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpcore/_async/http11.py", line 133, in _receive_response       
    event = await self._receive_event(timeout)                                                                              
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpcore/_async/http11.py", line 169, in _receive_event          
    event = self.h11_state.next_event()                                                                                     
  File "/home/louis/miniconda3/lib/python3.8/contextlib.py", line 131, in __exit__                                          
    self.gen.throw(type, value, traceback)                                                                                  
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpcore/_exceptions.py", line 12, in map_exceptions             
    raise to_exc(exc) from None                                                                                             
httpcore.RemoteProtocolError: can't handle event type ConnectionClosed when role=SERVER and state=SEND_RESPONSE             
                                                                                                                            
The above exception was the direct cause of the following exception:                                                        
                                                                                                                            
Traceback (most recent call last):                                                                                          
  File "<string>", line 1, in <module>                                                                                      
  File "/home/louis/dev/beeb/src/beeb/nav/sched/catalogue.py", line 17, in __init__                                         
    self.async_pull_and_parse(listings)                                                                                     
  File "/home/louis/dev/beeb/src/beeb/nav/sched/catalogue.py", line 46, in async_pull_and_parse                             
    fetch_episode_metadata(listings, pbar=pbar, verbose=verbose)                                                            
  File "/home/louis/dev/beeb/src/beeb/nav/sched/async_utils.py", line 69, in fetch_episode_metadata                         
    return asyncio.run(async_fetch_episodes(listings, pbar, verbose))                                                       
  File "/home/louis/miniconda3/lib/python3.8/asyncio/runners.py", line 43, in run                                           
    return loop.run_until_complete(main)                                                                                    
  File "/home/louis/miniconda3/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete                       
    return future.result()                                                                                                  
  File "/home/louis/dev/beeb/src/beeb/nav/sched/async_utils.py", line 65, in async_fetch_episodes                           
    return await zs                                                                                                         
  File "/home/louis/miniconda3/lib/python3.8/site-packages/aiostream/core.py", line 32, in wait_stream                      
    async for item in streamer:                                                                                             
  File "/home/louis/miniconda3/lib/python3.8/site-packages/aiostream/stream/advanced.py", line 59, in base_combine          
    result = task.result()                                                                                                  
  File "/home/louis/miniconda3/lib/python3.8/site-packages/aiostream/stream/combine.py", line 73, in smap                   
    async for item in streamer:                                                                                             
  File "/home/louis/miniconda3/lib/python3.8/site-packages/aiostream/stream/advanced.py", line 59, in base_combine          
    result = task.result()                                                                                                  
  File "/home/louis/miniconda3/lib/python3.8/site-packages/aiostream/stream/create.py", line 85, in call                    
    yield await func(*args, **kwargs)                                                                                       
  File "/home/louis/miniconda3/lib/python3.8/site-packages/aiostream/stream/transform.py", line 52, in starfunc             
    return await func(*args)   
  File "/home/louis/dev/beeb/src/beeb/nav/sched/async_utils.py", line 12, in fetch                                          
    response = await session.get(str(url))                    
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpx/_client.py", line 1539, in get                             
    return await self.request(                                
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpx/_client.py", line 1361, in request                         
    response = await self.send(                               
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpx/_client.py", line 1396, in send                            
    response = await self._send_handling_auth(                
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpx/_client.py", line 1434, in _send_handling_auth             
    response = await self._send_handling_redirects(           
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpx/_client.py", line 1466, in _send_handling_redirects        
    response = await self._send_single_request(request, timeout)                                                            
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpx/_client.py", line 1492, in _send_single_request            
    (status_code, headers, stream, ext) = await transport.arequest(                                                         
  File "/home/louis/miniconda3/lib/python3.8/contextlib.py", line 131, in __exit__                                          
    self.gen.throw(type, value, traceback)                    
  File "/home/louis/miniconda3/lib/python3.8/site-packages/httpx/_exceptions.py", line 343, in map_exceptions               
    raise mapped_exc(message, **kwargs) from exc  # type: ignore                                                            
httpx.RemoteProtocolError: can't handle event type ConnectionClosed when role=SERVER and state=SEND_RESPONSE

@lmmx
Copy link
Owner Author

lmmx commented Mar 25, 2021

Fixed the async bugs with up to 3 retries (now silent), no need for multiproc, async processing with aiostream.starmap is sufficiently fast 6cf0dde

@lmmx lmmx closed this as completed Mar 25, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
difficult Extra attention is needed
Projects
None yet
Development

No branches or pull requests

1 participant