diff --git a/include/qpid/dispatch/protocol_log.h b/include/qpid/dispatch/protocol_log.h index 126928f8c..0764e3123 100644 --- a/include/qpid/dispatch/protocol_log.h +++ b/include/qpid/dispatch/protocol_log.h @@ -90,11 +90,12 @@ typedef enum plog_attribute { PLOG_ATTRIBUTE_BUILD_VERSION = 32, // String PLOG_ATTRIBUTE_LINK_COST = 33, // uint PLOG_ATTRIBUTE_DIRECTION = 34, // String + PLOG_ATTRIBUTE_RESOURCE = 35, // String } plog_attribute_t; #define VALID_REF_ATTRS 0x00000000000000e6 #define VALID_UINT_ATTRS 0x0000000207800119 -#define VALID_STRING_ATTRS 0x00000005787ffe00 +#define VALID_STRING_ATTRS 0x0000000d787ffe00 #define VALID_TRACE_ATTRS 0x0000000080000000 diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c index 86f782a05..e13ccc329 100644 --- a/src/adaptors/http1/http1_adaptor.c +++ b/src/adaptors/http1/http1_adaptor.c @@ -108,10 +108,11 @@ void qdr_http1_connection_free(qdr_http1_connection_t *hconn) // cleanup outstanding requests // - if (hconn->type == HTTP1_CONN_SERVER) + if (hconn->type == HTTP1_CONN_SERVER) { qdr_http1_server_conn_cleanup(hconn); - else + } else { qdr_http1_client_conn_cleanup(hconn); + } h1_codec_connection_free(hconn->http_conn); if (rconn) { @@ -121,6 +122,9 @@ void qdr_http1_connection_free(qdr_http1_connection_t *hconn) sys_atomic_destroy(&hconn->q2_restart); + plog_end_record(hconn->plog); + hconn->plog = 0; + free(hconn->cfg.host); free(hconn->cfg.port); free(hconn->cfg.address); @@ -681,12 +685,16 @@ static void qd_http1_adaptor_final(void *adaptor_context) qd_http_listener_t *li = DEQ_HEAD(adaptor->listeners); while (li) { DEQ_REMOVE_HEAD(qdr_http1_adaptor->listeners); + plog_end_record(li->plog); + li->plog = 0; qd_http_listener_decref(li); li = DEQ_HEAD(adaptor->listeners); } qd_http_connector_t *ct = DEQ_HEAD(adaptor->connectors); while (ct) { DEQ_REMOVE_HEAD(qdr_http1_adaptor->connectors); + plog_end_record(ct->plog); + ct->plog = 0; qd_http_connector_decref(ct); ct = DEQ_HEAD(adaptor->connectors); } diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c index a42e8a7bf..5c8c21772 100644 --- a/src/adaptors/http1/http1_client.c +++ b/src/adaptors/http1/http1_client.c @@ -24,7 +24,9 @@ #include "qpid/dispatch/protocol_adaptor.h" #include <proton/listener.h> +#include <proton/netaddr.h> #include <proton/proactor.h> +#include <proton/raw_connection.h> #include <proton/netaddr.h> @@ -142,6 +144,7 @@ static qdr_http1_connection_t *_create_client_connection(qd_http_listener_t *li) hconn->handler_context.context = hconn; sys_atomic_init(&hconn->q2_restart, 0); + hconn->client.listener = li; hconn->client.next_msg_id = 1; // configure the HTTP/1.x library @@ -176,6 +179,8 @@ static qdr_http1_connection_t *_create_client_connection(qd_http_listener_t *li) hconn->raw_conn = pn_raw_connection(); pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context); + hconn->plog = plog_start_record(PLOG_RECORD_FLOW, hconn->client.listener->plog); + sys_mutex_lock(qdr_http1_adaptor->lock); DEQ_INSERT_TAIL(qdr_http1_adaptor->connections, hconn); sys_mutex_unlock(qdr_http1_adaptor->lock); @@ -425,6 +430,7 @@ static int _handle_conn_read_event(qdr_http1_connection_t *hconn) } hconn->in_http1_octets += length; + plog_set_uint64(hconn->plog, PLOG_ATTRIBUTE_OCTETS, hconn->in_http1_octets); error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length); } return error; @@ -459,8 +465,15 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi case PN_RAW_CONNECTION_CONNECTED: { _setup_client_connection(hconn); - const struct pn_netaddr_t *na = pn_raw_connection_remote_addr(hconn->raw_conn); - if (na) { + const pn_netaddr_t *na = pn_raw_connection_remote_addr(hconn->raw_conn); + if (!!na) { + char host[200]; + char port[50]; + if (pn_netaddr_host_port(na, host, 200, port, 50) == 0) { + plog_set_string(hconn->plog, PLOG_ATTRIBUTE_SOURCE_HOST, host); + plog_set_string(hconn->plog, PLOG_ATTRIBUTE_SOURCE_PORT, port); + } + char buf[128]; if (pn_netaddr_str(na, buf, sizeof(buf)) > 0) { qd_log(log, QD_LOG_INFO, @@ -776,6 +789,11 @@ static int _client_rx_request_cb(h1_codec_request_state_t *hrs, creq->version_minor = version_minor; DEQ_INIT(creq->responses); + creq->base.plog = plog_start_record(PLOG_RECORD_FLOW, hconn->plog); + plog_set_string(creq->base.plog, PLOG_ATTRIBUTE_METHOD, method); + plog_set_string(creq->base.plog, PLOG_ATTRIBUTE_RESOURCE, target); + plog_latency_start(creq->base.plog); + qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] HTTP request received: msg-id=%"PRIu64" method=%s target=%s version=%"PRIi32".%"PRIi32, hconn->conn_id, creq->base.msg_id, method, target, version_major, version_minor); @@ -802,6 +820,9 @@ static int _client_rx_request_cb(h1_codec_request_state_t *hrs, qd_compose_insert_string(creq->request_props, PATH_PROP_KEY); qd_compose_insert_string(creq->request_props, target); + + qd_compose_insert_symbol(creq->request_props, QD_AP_FLOW_ID); + plog_serialize_identity(creq->base.plog, creq->request_props); } h1_codec_request_state_set_context(hrs, (void*) creq); @@ -1759,8 +1780,11 @@ static void _write_pending_response(_client_request_t *hreq) if (rmsg && DEQ_HEAD(rmsg->out_data)) { uint64_t written = qdr_http1_write_out_data(hreq->base.hconn, &rmsg->out_data); hreq->base.out_http1_octets += written; - qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] %"PRIu64" octets written", - hreq->base.hconn->conn_id, written); + if (written) { + plog_latency_end(hreq->base.plog); + qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] %"PRIu64" octets written", + hreq->base.hconn->conn_id, written); + } } } } @@ -1787,6 +1811,7 @@ static void _client_request_free(_client_request_t *hreq) rmsg = DEQ_HEAD(hreq->responses); } + plog_end_record(hreq->base.plog); free__client_request_t(hreq); } } diff --git a/src/adaptors/http1/http1_private.h b/src/adaptors/http1/http1_private.h index 39bbd1be3..1786abc5f 100644 --- a/src/adaptors/http1/http1_private.h +++ b/src/adaptors/http1/http1_private.h @@ -102,6 +102,7 @@ struct qdr_http1_request_base_t { qd_timestamp_t start; qd_timestamp_t stop; uint64_t out_http1_octets; + plog_record_t *plog; }; DEQ_DECLARE(qdr_http1_request_base_t, qdr_http1_request_list_t); @@ -136,9 +137,10 @@ struct qdr_http1_connection_t { // State if connected to an HTTP client // struct { - char *client_ip_addr; - char *reply_to_addr; // set once link is up - uint64_t next_msg_id; + qd_http_listener_t *listener; + char *client_ip_addr; + char *reply_to_addr; // set once link is up + uint64_t next_msg_id; } client; // State if connected to an HTTP server @@ -167,10 +169,11 @@ struct qdr_http1_connection_t { // qdr_http1_request_list_t requests; - // statistics + // statistics and logging // - uint64_t in_http1_octets; - uint64_t out_http1_octets; + plog_record_t *plog; + uint64_t in_http1_octets; + uint64_t out_http1_octets; // flags // diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c index 1278c2eb5..4ff94e82d 100644 --- a/src/adaptors/http1/http1_server.c +++ b/src/adaptors/http1/http1_server.c @@ -20,7 +20,9 @@ #include "adaptors/adaptor_utils.h" #include "http1_private.h" +#include <proton/netaddr.h> #include <proton/proactor.h> +#include <proton/raw_connection.h> // // This file contains code specific to HTTP server processing. The raw @@ -467,6 +469,13 @@ static void _do_reconnect(void *context) connecting = true; hconn->raw_conn = pn_raw_connection(); pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context); + + // + // Reset the octet counter to monitor the new raw connection. + // + hconn->in_http1_octets = 0; + hconn->plog = plog_start_record(PLOG_RECORD_FLOW, hconn->server.connector->plog); + // this next call may immediately reschedule the connection on another I/O // thread. After this call hconn may no longer be valid! pn_proactor_raw_connect(qd_server_proactor(hconn->qd_server), hconn->raw_conn, hconn->cfg.host_port); @@ -521,6 +530,7 @@ static int _handle_conn_read_event(qdr_http1_connection_t *hconn) } hconn->in_http1_octets += length; + plog_set_uint64(hconn->plog, PLOG_ATTRIBUTE_OCTETS, hconn->in_http1_octets); error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length); } return error; @@ -557,6 +567,17 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] HTTP/1.x server %s connection established", hconn->conn_id, hconn->cfg.host_port); } + + const pn_netaddr_t *na = pn_raw_connection_local_addr(hconn->raw_conn); + if (!!na) { + char host[200]; + char port[50]; + if (pn_netaddr_host_port(na, host, 200, port, 50) == 0) { + plog_set_string(hconn->plog, PLOG_ATTRIBUTE_SOURCE_HOST, host); + plog_set_string(hconn->plog, PLOG_ATTRIBUTE_SOURCE_PORT, port); + } + } + hconn->server.link_timeout = 0; _setup_server_links(hconn); @@ -627,6 +648,9 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi hconn->server.reconnect_pause += RETRY_PAUSE_MSEC; } + plog_end_record(hconn->plog); + hconn->plog = 0; + // prevent core activation sys_mutex_lock(qdr_http1_adaptor->lock); hconn->raw_conn = 0; @@ -984,6 +1008,11 @@ static int _server_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_bo _server_response_msg_t *rmsg = DEQ_TAIL(hreq->responses); assert(rmsg && !rmsg->msg); + // + // Stop the timer on server latency + // + plog_latency_end(hreq->base.plog); + // start building the AMQP message rmsg->msg = qd_message(); @@ -1007,6 +1036,7 @@ static int _server_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_bo char u32_str[64]; snprintf(u32_str, sizeof(u32_str), "%"PRIu32, h1_codec_request_state_response_code(hrs)); qd_compose_insert_string(props, u32_str); + plog_set_string(hreq->base.plog, PLOG_ATTRIBUTE_RESULT, u32_str); } qd_compose_insert_null(props); // reply-to qd_compose_insert_ulong(props, hreq->base.msg_id); // correlation-id @@ -1340,6 +1370,8 @@ static _server_request_t *_create_request_context(qdr_http1_connection_t *hconn, DEQ_INIT(hreq->responses); DEQ_INSERT_TAIL(hconn->requests, &hreq->base); + hreq->base.plog = plog_start_record(PLOG_RECORD_FLOW, hconn->plog); + qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"][L%"PRIu64"] New HTTP Request msg-id=%"PRIu64" reply-to=%s.", hconn->conn_id, hconn->out_link_id, msg_id, reply_to); @@ -1408,6 +1440,11 @@ static uint64_t _send_request_headers(_server_request_t *hreq, qd_message_t *msg free(version_str); } + ref = qd_parse_value_by_key(app_props, QD_AP_FLOW_ID); + if (ref) { + plog_set_ref_from_parsed(hreq->base.plog, PLOG_ATTRIBUTE_COUNTERFLOW, ref); + } + // done copying and converting! qd_log(hconn->adaptor->log, QD_LOG_TRACE, @@ -1490,8 +1527,8 @@ static uint64_t _send_request_headers(_server_request_t *hreq, qd_message_t *msg // static void _encode_request_message(_server_request_t *hreq) { - qdr_http1_connection_t *hconn = hreq->base.hconn; - qd_message_t *msg = qdr_delivery_message(hreq->request_dlv); + qdr_http1_connection_t *hconn = hreq->base.hconn; + qd_message_t *msg = qdr_delivery_message(hreq->request_dlv); if (!hreq->headers_encoded) { hreq->request_dispo = _send_request_headers(hreq, msg); @@ -1575,6 +1612,7 @@ static void _send_request_message(_server_request_t *hreq) default: // encoding failure + plog_set_string(hreq->base.plog, PLOG_ATTRIBUTE_REASON, "Encoding failure"); _cancel_request(hreq); return; } @@ -1647,6 +1685,7 @@ uint64_t qdr_http1_server_core_link_deliver(qdr_http1_adaptor_t *adaptor, // qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING, DLV_FMT" Client has aborted the request", DLV_ARGS(delivery)); + plog_set_string(hreq->base.plog, PLOG_ATTRIBUTE_REASON, "Client abort"); _cancel_request(hreq); return 0; } @@ -1703,6 +1742,7 @@ static void _server_request_free(_server_request_t *hreq) rmsg = DEQ_HEAD(hreq->responses); } + plog_end_record(hreq->base.plog); free__server_request_t(hreq); } } @@ -1714,10 +1754,12 @@ static void _write_pending_request(_server_request_t *hreq) assert(DEQ_PREV(&hreq->base) == 0); // preserve order! uint64_t written = qdr_http1_write_out_data(hreq->base.hconn, &hreq->out_data); hreq->base.out_http1_octets += written; - if (written) + if (written) { + plog_latency_start(hreq->base.plog); qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"][L%"PRIu64"] %"PRIu64" request octets written to server", hreq->base.hconn->conn_id, hreq->base.hconn->out_link_id, written); + } } } diff --git a/src/adaptors/http_common.h b/src/adaptors/http_common.h index 8954a06d8..995027e26 100644 --- a/src/adaptors/http_common.h +++ b/src/adaptors/http_common.h @@ -75,6 +75,7 @@ DEQ_DECLARE(qd_http_listener_t, qd_http_listener_list_t); qd_http_listener_t *qd_http_listener(qd_server_t *server, qd_server_event_handler_t handler); void qd_http_listener_decref(qd_http_listener_t* li); +inline static void qd_http_listener_incref(qd_http_listener_t *li) { sys_atomic_inc(&li->ref_count); } typedef struct qd_http_connector_t qd_http_connector_t; struct qd_http_connector_t { @@ -91,6 +92,7 @@ DEQ_DECLARE(qd_http_connector_t, qd_http_connector_list_t); qd_http_connector_t *qd_http_connector(qd_server_t *server); void qd_http_connector_decref(qd_http_connector_t* c); +inline static void qd_http_connector_incref(qd_http_connector_t *c) { sys_atomic_inc(&c->ref_count); } diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 36251f63d..cd6011b85 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -1449,6 +1449,7 @@ QD_EXPORT void qd_dispatch_delete_tcp_connector(qd_dispatch_t *qd, void *impl) "Deleted TcpConnector for %s, %s:%s", ct->config->address, ct->config->host, ct->config->port); close_egress_dispatcher_connection((qdr_tcp_connection_t*) ct->dispatcher_conn); + ct->dispatcher_conn = 0; DEQ_REMOVE(tcp_adaptor->connectors, ct); qd_tcp_connector_decref(ct); } diff --git a/src/amqp.c b/src/amqp.c index 7c61b9cc0..6f89be06d 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -24,7 +24,7 @@ #include <stdlib.h> #include <string.h> -const char * const QD_AP_FLOW_ID = "flowid"; +const char * const QD_AP_FLOW_ID = ":flowid"; const char * const QD_CAPABILITY_ROUTER_CONTROL = "qd.router"; const char * const QD_CAPABILITY_ROUTER_DATA = "qd.router-data"; const char * const QD_CAPABILITY_EDGE_DOWNLINK = "qd.router-edge-downlink"; diff --git a/src/protocol_log.c b/src/protocol_log.c index 900da76e5..b82d422f0 100644 --- a/src/protocol_log.c +++ b/src/protocol_log.c @@ -139,6 +139,8 @@ static uint64_t next_message_id = 0; static void _plog_set_ref_TH(plog_work_t *work, bool discard); static void _plog_set_string_TH(plog_work_t *work, bool discard); static void _plog_set_int_TH(plog_work_t *work, bool discard); +static const char *_plog_record_type_name(const plog_record_t *record); +static const char *_plog_attribute_name(const plog_attribute_data_t *data); /** * @brief Return the current timestamp in microseconds @@ -221,7 +223,7 @@ static void _plog_strncat_id(char *buffer, size_t n, const plog_identity_t *id) * * @param buffer Target string for concatenation * @param n String size limit - * @param data Data value to extrace the attribute-type from + * @param data Data value to extract the attribute-type from */ static void _plog_strncat_attribute(char *buffer, size_t n, const plog_attribute_data_t *data) { @@ -709,6 +711,7 @@ static const char *_plog_attribute_name(const plog_attribute_data_t *data) case PLOG_ATTRIBUTE_BUILD_VERSION : return "buildVersion"; case PLOG_ATTRIBUTE_LINK_COST : return "linkCost"; case PLOG_ATTRIBUTE_DIRECTION : return "direction"; + case PLOG_ATTRIBUTE_RESOURCE : return "resource"; } return "UNKNOWN"; } @@ -917,7 +920,7 @@ static void _plog_flush_TH(qdr_core_t *core) record->never_flushed = false; record->emit_ordinal++; if (record->ended) { - _plog_free_record_TH(record, false); + _plog_free_record_TH(record, true); } record = DEQ_HEAD(unflushed_records[current_flush_slot]); } @@ -1355,6 +1358,7 @@ void plog_latency_end(plog_record_t *record) if (!!record && record->latency_start > 0) { uint64_t now = _now_in_usec(); plog_set_uint64(record, PLOG_ATTRIBUTE_LATENCY, now - record->latency_start); + record->latency_start = 0; } }