Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client signals #2313 #2429

Merged
merged 27 commits into from
Nov 18, 2017
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e946ec5
Client signals #2313
pfreixes Oct 24, 2017
1a01001
Removed not already implemented signal
pfreixes Oct 29, 2017
2c17707
Merge branch 'master' into clientsession_signals
pfreixes Oct 29, 2017
b2f24e9
Merge branch 'master' into clientsession_signals
asvetlov Oct 29, 2017
971ad5c
Removed support for on_content and on_headers signals
pfreixes Oct 31, 2017
4ce2534
Merge branch 'master' into clientsession_signals
pfreixes Nov 1, 2017
fcf7427
fixed small sutff
pfreixes Nov 1, 2017
e7bbc52
Get rid of no longer used helpers.create_future
pfreixes Nov 1, 2017
7a0015d
Update 2313.feature
asvetlov Nov 1, 2017
ef573a2
Merge branch 'master' into clientsession_signals
pfreixes Nov 1, 2017
151ec9b
Merge branch 'master' into clientsession_signals
asvetlov Nov 3, 2017
d3ae4a9
Merge branch 'master' into clientsession_signals
asvetlov Nov 3, 2017
c45725c
on_request_start receives the whole URL object
pfreixes Nov 3, 2017
078c728
on_request_end and error flavor receive the method, URL and headers
pfreixes Nov 3, 2017
ea5d363
Merge remote-tracking branch 'upstream/master' into clientsession_sig…
pfreixes Nov 13, 2017
607db37
TraceConfig as a way to configure the ClientSession tracing
pfreixes Nov 8, 2017
b4a5f03
Merge remote-tracking branch 'upstream/master' into clientsession_sig…
pfreixes Nov 14, 2017
3640aec
Fix flake import issues
pfreixes Nov 14, 2017
436e8eb
Increase dns tracing coverage
pfreixes Nov 14, 2017
e1da600
Merge branch 'master' into clientsession_signals
asvetlov Nov 15, 2017
1dc1a37
Tracing signals are explicits
pfreixes Nov 15, 2017
8449a96
Removed not used session kw argument
pfreixes Nov 15, 2017
8fd0e9d
Accept multiple TraceConfig objects for the ClientSession class
pfreixes Nov 17, 2017
3d3caac
Renamed trace context vars
pfreixes Nov 18, 2017
bd5c8e5
Merge branch 'master' into clientsession_signals
pfreixes Nov 18, 2017
c232cdf
Fix invalid test func definition
pfreixes Nov 18, 2017
031bb3b
Fixed docstring params codification
pfreixes Nov 18, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/2313.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ClientSession publishes a set of signals to track the HTTP request execution
26 changes: 23 additions & 3 deletions aiohttp/_http_parser.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ cdef class HttpParser:
object _last_error
bint _auto_decompress

object _on_headers_received
object _on_content_received
object _trace_context

Py_buffer py_buf

def __cinit__(self):
Expand All @@ -82,7 +86,9 @@ cdef class HttpParser:
object protocol, object loop, object timer=None,
size_t max_line_size=8190, size_t max_headers=32768,
size_t max_field_size=8190, payload_exception=None,
response_with_body=True, auto_decompress=True):
response_with_body=True, auto_decompress=True,
on_headers_received=None, on_content_received=None,
trace_context=None):
cparser.http_parser_init(self._cparser, mode)
self._cparser.data = <void*>self
self._cparser.content_length = 0
Expand Down Expand Up @@ -122,6 +128,10 @@ cdef class HttpParser:
self._csettings.on_chunk_header = cb_on_chunk_header
self._csettings.on_chunk_complete = cb_on_chunk_complete

self._on_headers_received = on_headers_received
self._on_content_received = on_content_received
self._trace_context = trace_context

self._last_error = None

cdef _process_header(self):
Expand Down Expand Up @@ -215,10 +225,16 @@ cdef class HttpParser:

self._messages.append((msg, payload))

if self._on_headers_received is not None:
self._on_headers_received.send(self._trace_context)

cdef _on_message_complete(self):
self._payload.feed_eof()
self._payload = None

if self._on_content_received is not None:
self._on_content_received.send(self._trace_context)

cdef _on_chunk_header(self):
self._payload.begin_http_chunk_receiving()

Expand Down Expand Up @@ -339,10 +355,14 @@ cdef class HttpResponseParserC(HttpParser):
size_t max_line_size=8190, size_t max_headers=32768,
size_t max_field_size=8190, payload_exception=None,
response_with_body=True, read_until_eof=False,
auto_decompress=True):
auto_decompress=True, on_headers_received=None,
on_content_received=None, trace_context=None):
self._init(cparser.HTTP_RESPONSE, protocol, loop, timer,
max_line_size, max_headers, max_field_size,
payload_exception, response_with_body, auto_decompress)
payload_exception, response_with_body, auto_decompress,
on_headers_received=on_headers_received,
on_content_received=on_content_received,
trace_context=trace_context)

cdef object _on_status_complete(self):
if self._buf:
Expand Down
118 changes: 114 additions & 4 deletions aiohttp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import sys
import traceback
import warnings
from types import SimpleNamespace

from multidict import CIMultiDict, MultiDict, MultiDictProxy, istr
from yarl import URL
Expand All @@ -28,6 +29,7 @@
strip_auth_from_url)
from .http import WS_KEY, WebSocketReader, WebSocketWriter
from .http_websocket import WSHandshakeError, ws_ext_gen, ws_ext_parse
from .signals import FuncSignal, Signal
from .streams import FlowControlDataQueue


Expand Down Expand Up @@ -96,6 +98,7 @@ def __init__(self, *, connector=None, loop=None, cookies=None,

if cookies is not None:
self._cookie_jar.update_cookies(cookies)

self._connector = connector
self._connector_owner = connector_owner
self._default_auth = auth
Expand All @@ -108,6 +111,15 @@ def __init__(self, *, connector=None, loop=None, cookies=None,
self._auto_decompress = auto_decompress
self._trust_env = trust_env

self._on_request_start = Signal()
self._on_request_end = Signal()
self._on_request_exception = Signal()
self._on_request_redirect = FuncSignal()
self._on_request_headers_sent = FuncSignal()
self._on_request_content_sent = FuncSignal()
self._on_request_headers_received = FuncSignal()
self._on_request_content_received = FuncSignal()

# Convert to list of tuples
if headers:
headers = CIMultiDict(headers)
Expand Down Expand Up @@ -161,7 +173,8 @@ def _request(self, method, url, *,
verify_ssl=None,
fingerprint=None,
ssl_context=None,
proxy_headers=None):
proxy_headers=None,
trace_context=None):

# NOTE: timeout clamps existing connect and read timeouts. We cannot
# set the default to None because we need to detect if the user wants
Expand Down Expand Up @@ -218,6 +231,18 @@ def _request(self, method, url, *,
handle = tm.start()

url = URL(url)

if trace_context is None:
trace_context = SimpleNamespace()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the API is wrong here.
User will never send a trace_context into async with session.get() explicitly.
It's another level of abstraction.
What the user will do is setting up session properly on initialization stage by substribing on signals and (optionally) providing a trace context factory for creating a new container for user data.
I even doubt if we need a factory parameter, at least at current stage.

Copy link
Contributor Author

@pfreixes pfreixes Nov 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rationale behind this implementation is the following one:

Give the proper freedom to the developer to have a grain control of his requests calls building trace context for each request.

Perhaps

async def on_request_start(trace_context, host, port, headers):
    trace_context['start'] = loop.time()

async def on_request_end(trace_context, resp):
    await send_metrics(
        time=loop.time() - trace_context['start']
        query=trace_context['query']
    )


sesion = ClientSession()
session.add_on_requests_start(on_request_start)
session.add_on_requests_end(on_request_end)

resp = session.get("http://localhost?query=foo", trace={'query':'foo'})
resp = session.get("http://localhost?query=foo", trace={'query':'bar'})

This example shows how the same ClientSession is used to make different queries that might have divergent traces.

In case the user is keen on share information between all requests that belong to the same ClientSession might use a closure pattern, perhaps:

def on_request_end(query):
    async def on_request_end(trace_context, resp):
        await send_metrics(
            time=loop.time() - trace_context['start']
            query=query
        )
    return on_request_end

sesion = ClientSession()
session.add_on_requests_start(on_request_start)
session.add_on_requests_end(on_request_end(query='foo'))

I can see that your point about forcing the user to populate each request call can be less kindy, but from my experience have the way to pass a context that has information about the current execution is a must. Also, take into account that having this granularity allows the user to implement Session or Requests context.

Another solution would pass for implement two different contexts, one for the session and another one for the request. But IMHO overcomplicates right now the implementation.

Thoughts ?


yield from self.on_request_start.send(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

await

trace_context,
method,
url.host,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole URL object maybe? Query part might be interested for tracer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will do that

url.port,
headers
)

timer = tm.timer()
try:
with timer:
Expand Down Expand Up @@ -261,12 +286,20 @@ def _request(self, method, url, *,
proxy=proxy, proxy_auth=proxy_auth, timer=timer,
session=self, auto_decompress=self._auto_decompress,
verify_ssl=verify_ssl, fingerprint=fingerprint,
ssl_context=ssl_context, proxy_headers=proxy_headers)
ssl_context=ssl_context, proxy_headers=proxy_headers,
on_headers_sent=self.on_request_headers_sent,
on_content_sent=self.on_request_content_sent,
on_headers_received=self.on_request_headers_received,
on_content_received=self.on_request_content_received,
trace_context=trace_context)

# connection timeout
try:
with CeilTimeout(self._conn_timeout, loop=self._loop):
conn = yield from self._connector.connect(req)
conn = yield from self._connector.connect(
req,
trace_context=trace_context
)
except asyncio.TimeoutError as exc:
raise ServerTimeoutError(
'Connection timeout '
Expand All @@ -291,6 +324,9 @@ def _request(self, method, url, *,
# redirects
if resp.status in (
301, 302, 303, 307, 308) and allow_redirects:

self.on_request_redirect.send(trace_context, resp)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is intended usage for the signal?
How to figure out what initial request was redirected?
The same is true for all other signals.
Maybe we should always pass URL, method and headers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was to give a way to trace when a redirect happens. Completly agree that the signal parameters are not enough consistency to give enough information to the user, lets add the method, URL, headers.

What do you mean with :: The same is true for all other signals.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do the same with the on_request_end, and on_request_exception allowing the user to know the source URL in case the request came from a redirect.


redirects += 1
history.append(resp)
if max_redirects and redirects >= max_redirects:
Expand Down Expand Up @@ -354,15 +390,17 @@ def _request(self, method, url, *,
handle.cancel()

resp._history = tuple(history)
yield from self.on_request_end.send(trace_context, resp)
return resp

except Exception:
except Exception as e:
# cleanup timer
tm.close()
if handle:
handle.cancel()
handle = None

yield from self.on_request_exception.send(trace_context, e)
raise

def ws_connect(self, url, *,
Expand Down Expand Up @@ -654,6 +692,78 @@ def loop(self):
"""Session's loop."""
return self._loop

@property
def on_request_start(self):
return self._on_request_start

@property
def on_request_redirect(self):
return self._on_request_redirect

@property
def on_request_end(self):
return self._on_request_end

@property
def on_request_exception(self):
return self._on_request_exception

# connector signals

@property
def on_request_queued_start(self):
return self._connector.on_queued_start
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think every session should have own signals.
Connector might be shared between sessions by connector_owner=False or session.detach().
Shared subscriptions make a mess: nobody know when recipient is dead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oks I can see right now the idea of the connector_owner parameter, my fault I havent check it and it head me to a wrong implementation. If at last the connector is shared between sessions its absolutely necessary don't mess the signals.

In another way worries me a bit the connector_owner implementation, leaving to the user's hand the power of making it True or False, meaning that in case the user gives an alternative connector but forget to pass the connector_owner set as False, when the session is gonna be closed this automatically will close the connections of the connector that might be shared by another Session.

Should the connector_owner param handled internally by the Session object` ?


@property
def on_request_queued_end(self):
return self._connector.on_queued_end

@property
def on_request_createconn_start(self):
return self._connector.on_createconn_start

@property
def on_request_createconn_end(self):
return self._connector.on_createconn_end

@property
def on_request_reuseconn(self):
return self._connector.on_reuseconn

@property
def on_request_resolvehost_start(self):
return self._connector.on_resolvehost_start

@property
def on_request_resolvehost_end(self):
return self._connector.on_resolvehost_end

@property
def on_request_dnscache_hit(self):
return self._connector.on_dnscache_hit

@property
def on_request_dnscache_miss(self):
return self._connector.on_dnscache_miss

# req resp signals

@property
def on_request_headers_sent(self):
return self._on_request_headers_sent

@property
def on_request_content_sent(self):
return self._on_request_content_sent

@property
def on_request_headers_received(self):
return self._on_request_headers_received

@property
def on_request_content_received(self):
return self._on_request_content_received

def detach(self):
"""Detach connector from session without closing the former.

Expand Down
10 changes: 8 additions & 2 deletions aiohttp/client_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,21 @@ def set_response_params(self, *, timer=None,
skip_payload=False,
skip_status_codes=(),
read_until_eof=False,
auto_decompress=True):
auto_decompress=True,
on_headers_received=None,
on_content_received=None,
trace_context=None):
self._skip_payload = skip_payload
self._skip_status_codes = skip_status_codes
self._read_until_eof = read_until_eof
self._parser = HttpResponseParser(
self, self._loop, timer=timer,
payload_exception=ClientPayloadError,
read_until_eof=read_until_eof,
auto_decompress=auto_decompress)
auto_decompress=auto_decompress,
on_headers_received=on_headers_received,
on_content_received=on_content_received,
trace_context=trace_context)

if self._tail:
data, self._tail = self._tail, b''
Expand Down
33 changes: 29 additions & 4 deletions aiohttp/client_reqrep.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ def __init__(self, method, url, *,
proxy=None, proxy_auth=None,
timer=None, session=None, auto_decompress=True,
verify_ssl=None, fingerprint=None, ssl_context=None,
proxy_headers=None):
proxy_headers=None, on_headers_sent=None,
on_content_sent=None, on_headers_received=None,
on_content_received=None, trace_context=None):

if verify_ssl is False and ssl_context is not None:
raise ValueError(
Expand Down Expand Up @@ -118,6 +120,12 @@ def __init__(self, method, url, *,
self._verify_ssl = verify_ssl
self._ssl_context = ssl_context

self._on_headers_sent = on_headers_sent
self._on_content_sent = on_content_sent
self._on_headers_received = on_headers_received
self._on_content_received = on_content_received
self._trace_context = trace_context

if loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(1))

Expand Down Expand Up @@ -399,6 +407,9 @@ async def write_bytes(self, writer, conn):
for chunk in self.body:
writer.write(chunk)

if self._on_content_sent is not None:
self._on_content_sent.send(self._trace_context)

await writer.write_eof()
except OSError as exc:
new_exc = ClientOSError(
Expand Down Expand Up @@ -461,14 +472,21 @@ def send(self, conn):
self.method, path, self.version)
writer.write_headers(status_line, self.headers)

if self._on_headers_sent is not None:
self._on_headers_sent.send(self._trace_context)

self._writer = asyncio.ensure_future(
self.write_bytes(writer, conn), loop=self.loop)

self.response = self.response_class(
self.method, self.original_url,
writer=self._writer, continue100=self._continue, timer=self._timer,
request_info=self.request_info,
auto_decompress=self._auto_decompress
auto_decompress=self._auto_decompress,
session=self._session,
on_headers_received=self._on_headers_received,
on_content_received=self._on_content_received,
trace_context=self._trace_context
)

self.response._post_init(self.loop, self._session)
Expand Down Expand Up @@ -511,7 +529,9 @@ class ClientResponse(HeadersMixin):

def __init__(self, method, url, *,
writer=None, continue100=None, timer=None,
request_info=None, auto_decompress=True):
request_info=None, auto_decompress=True,
session=None, on_headers_received=None,
on_content_received=None, trace_context=None):
assert isinstance(url, URL)

self.method = method
Expand All @@ -527,6 +547,9 @@ def __init__(self, method, url, *,
self._request_info = request_info
self._timer = timer if timer is not None else TimerNoop()
self._auto_decompress = auto_decompress
self._on_headers_received = on_headers_received
self._on_content_received = on_content_received
self._trace_context = trace_context

@property
def url(self):
Expand Down Expand Up @@ -612,7 +635,9 @@ async def start(self, connection, read_until_eof=False):
skip_payload=self.method.lower() == 'head',
skip_status_codes=(204, 304),
read_until_eof=read_until_eof,
auto_decompress=self._auto_decompress)
auto_decompress=self._auto_decompress,
on_headers_received=self._on_headers_received,
on_content_received=self._on_content_received)

with self._timer:
while True:
Expand Down
Loading