Skip to content

Commit

Permalink
Fixes skupperproject#302 - Added PLOG event generation from the HTTP1…
Browse files Browse the repository at this point in the history
… adaptor

Fixes skupperproject#302 - Re-added the recursive deletion for records.

More aggressive ending of plog records.

Temp

Added ':' to prefix the record-id attribute encoded with the protocol encapsulation.
  • Loading branch information
ted-ross committed Apr 29, 2022
1 parent 96730e8 commit 91fab9b
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 19 deletions.
3 changes: 2 additions & 1 deletion include/qpid/dispatch/protocol_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,12 @@ typedef enum plog_attribute {
PLOG_ATTRIBUTE_BUILD_VERSION = 32, // String
PLOG_ATTRIBUTE_LINK_COST = 33, // uint
PLOG_ATTRIBUTE_DIRECTION = 34, // String
PLOG_ATTRIBUTE_RESOURCE = 35, // String
} plog_attribute_t;

#define VALID_REF_ATTRS 0x00000000000000e6
#define VALID_UINT_ATTRS 0x0000000207800119
#define VALID_STRING_ATTRS 0x00000005787ffe00
#define VALID_STRING_ATTRS 0x0000000d787ffe00
#define VALID_TRACE_ATTRS 0x0000000080000000


Expand Down
12 changes: 10 additions & 2 deletions src/adaptors/http1/http1_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,11 @@ void qdr_http1_connection_free(qdr_http1_connection_t *hconn)

// cleanup outstanding requests
//
if (hconn->type == HTTP1_CONN_SERVER)
if (hconn->type == HTTP1_CONN_SERVER) {
qdr_http1_server_conn_cleanup(hconn);
else
} else {
qdr_http1_client_conn_cleanup(hconn);
}

h1_codec_connection_free(hconn->http_conn);
if (rconn) {
Expand All @@ -121,6 +122,9 @@ void qdr_http1_connection_free(qdr_http1_connection_t *hconn)

sys_atomic_destroy(&hconn->q2_restart);

plog_end_record(hconn->plog);
hconn->plog = 0;

free(hconn->cfg.host);
free(hconn->cfg.port);
free(hconn->cfg.address);
Expand Down Expand Up @@ -681,12 +685,16 @@ static void qd_http1_adaptor_final(void *adaptor_context)
qd_http_listener_t *li = DEQ_HEAD(adaptor->listeners);
while (li) {
DEQ_REMOVE_HEAD(qdr_http1_adaptor->listeners);
plog_end_record(li->plog);
li->plog = 0;
qd_http_listener_decref(li);
li = DEQ_HEAD(adaptor->listeners);
}
qd_http_connector_t *ct = DEQ_HEAD(adaptor->connectors);
while (ct) {
DEQ_REMOVE_HEAD(qdr_http1_adaptor->connectors);
plog_end_record(ct->plog);
ct->plog = 0;
qd_http_connector_decref(ct);
ct = DEQ_HEAD(adaptor->connectors);
}
Expand Down
33 changes: 29 additions & 4 deletions src/adaptors/http1/http1_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
#include "qpid/dispatch/protocol_adaptor.h"

#include <proton/listener.h>
#include <proton/netaddr.h>
#include <proton/proactor.h>
#include <proton/raw_connection.h>
#include <proton/netaddr.h>


Expand Down Expand Up @@ -142,6 +144,7 @@ static qdr_http1_connection_t *_create_client_connection(qd_http_listener_t *li)
hconn->handler_context.context = hconn;
sys_atomic_init(&hconn->q2_restart, 0);

hconn->client.listener = li;
hconn->client.next_msg_id = 1;

// configure the HTTP/1.x library
Expand Down Expand Up @@ -176,6 +179,8 @@ static qdr_http1_connection_t *_create_client_connection(qd_http_listener_t *li)
hconn->raw_conn = pn_raw_connection();
pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context);

hconn->plog = plog_start_record(PLOG_RECORD_FLOW, hconn->client.listener->plog);

sys_mutex_lock(qdr_http1_adaptor->lock);
DEQ_INSERT_TAIL(qdr_http1_adaptor->connections, hconn);
sys_mutex_unlock(qdr_http1_adaptor->lock);
Expand Down Expand Up @@ -425,6 +430,7 @@ static int _handle_conn_read_event(qdr_http1_connection_t *hconn)
}

hconn->in_http1_octets += length;
plog_set_uint64(hconn->plog, PLOG_ATTRIBUTE_OCTETS, hconn->in_http1_octets);
error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length);
}
return error;
Expand Down Expand Up @@ -459,8 +465,15 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
case PN_RAW_CONNECTION_CONNECTED: {
_setup_client_connection(hconn);

const struct pn_netaddr_t *na = pn_raw_connection_remote_addr(hconn->raw_conn);
if (na) {
const pn_netaddr_t *na = pn_raw_connection_remote_addr(hconn->raw_conn);
if (!!na) {
char host[200];
char port[50];
if (pn_netaddr_host_port(na, host, 200, port, 50) == 0) {
plog_set_string(hconn->plog, PLOG_ATTRIBUTE_SOURCE_HOST, host);
plog_set_string(hconn->plog, PLOG_ATTRIBUTE_SOURCE_PORT, port);
}

char buf[128];
if (pn_netaddr_str(na, buf, sizeof(buf)) > 0) {
qd_log(log, QD_LOG_INFO,
Expand Down Expand Up @@ -776,6 +789,11 @@ static int _client_rx_request_cb(h1_codec_request_state_t *hrs,
creq->version_minor = version_minor;
DEQ_INIT(creq->responses);

creq->base.plog = plog_start_record(PLOG_RECORD_FLOW, hconn->plog);
plog_set_string(creq->base.plog, PLOG_ATTRIBUTE_METHOD, method);
plog_set_string(creq->base.plog, PLOG_ATTRIBUTE_RESOURCE, target);
plog_latency_start(creq->base.plog);

qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"] HTTP request received: msg-id=%"PRIu64" method=%s target=%s version=%"PRIi32".%"PRIi32,
hconn->conn_id, creq->base.msg_id, method, target, version_major, version_minor);
Expand All @@ -802,6 +820,9 @@ static int _client_rx_request_cb(h1_codec_request_state_t *hrs,

qd_compose_insert_string(creq->request_props, PATH_PROP_KEY);
qd_compose_insert_string(creq->request_props, target);

qd_compose_insert_symbol(creq->request_props, QD_AP_FLOW_ID);
plog_serialize_identity(creq->base.plog, creq->request_props);
}

h1_codec_request_state_set_context(hrs, (void*) creq);
Expand Down Expand Up @@ -1759,8 +1780,11 @@ static void _write_pending_response(_client_request_t *hreq)
if (rmsg && DEQ_HEAD(rmsg->out_data)) {
uint64_t written = qdr_http1_write_out_data(hreq->base.hconn, &rmsg->out_data);
hreq->base.out_http1_octets += written;
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] %"PRIu64" octets written",
hreq->base.hconn->conn_id, written);
if (written) {
plog_latency_end(hreq->base.plog);
qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] %"PRIu64" octets written",
hreq->base.hconn->conn_id, written);
}
}
}
}
Expand All @@ -1787,6 +1811,7 @@ static void _client_request_free(_client_request_t *hreq)
rmsg = DEQ_HEAD(hreq->responses);
}

plog_end_record(hreq->base.plog);
free__client_request_t(hreq);
}
}
Expand Down
15 changes: 9 additions & 6 deletions src/adaptors/http1/http1_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ struct qdr_http1_request_base_t {
qd_timestamp_t start;
qd_timestamp_t stop;
uint64_t out_http1_octets;
plog_record_t *plog;
};
DEQ_DECLARE(qdr_http1_request_base_t, qdr_http1_request_list_t);

Expand Down Expand Up @@ -136,9 +137,10 @@ struct qdr_http1_connection_t {
// State if connected to an HTTP client
//
struct {
char *client_ip_addr;
char *reply_to_addr; // set once link is up
uint64_t next_msg_id;
qd_http_listener_t *listener;
char *client_ip_addr;
char *reply_to_addr; // set once link is up
uint64_t next_msg_id;
} client;

// State if connected to an HTTP server
Expand Down Expand Up @@ -167,10 +169,11 @@ struct qdr_http1_connection_t {
//
qdr_http1_request_list_t requests;

// statistics
// statistics and logging
//
uint64_t in_http1_octets;
uint64_t out_http1_octets;
plog_record_t *plog;
uint64_t in_http1_octets;
uint64_t out_http1_octets;

// flags
//
Expand Down
48 changes: 45 additions & 3 deletions src/adaptors/http1/http1_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
#include "adaptors/adaptor_utils.h"
#include "http1_private.h"

#include <proton/netaddr.h>
#include <proton/proactor.h>
#include <proton/raw_connection.h>

//
// This file contains code specific to HTTP server processing. The raw
Expand Down Expand Up @@ -467,6 +469,13 @@ static void _do_reconnect(void *context)
connecting = true;
hconn->raw_conn = pn_raw_connection();
pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context);

//
// Reset the octet counter to monitor the new raw connection.
//
hconn->in_http1_octets = 0;
hconn->plog = plog_start_record(PLOG_RECORD_FLOW, hconn->server.connector->plog);

// this next call may immediately reschedule the connection on another I/O
// thread. After this call hconn may no longer be valid!
pn_proactor_raw_connect(qd_server_proactor(hconn->qd_server), hconn->raw_conn, hconn->cfg.host_port);
Expand Down Expand Up @@ -521,6 +530,7 @@ static int _handle_conn_read_event(qdr_http1_connection_t *hconn)
}

hconn->in_http1_octets += length;
plog_set_uint64(hconn->plog, PLOG_ATTRIBUTE_OCTETS, hconn->in_http1_octets);
error = h1_codec_connection_rx_data(hconn->http_conn, &blist, length);
}
return error;
Expand Down Expand Up @@ -557,6 +567,17 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] HTTP/1.x server %s connection established",
hconn->conn_id, hconn->cfg.host_port);
}

const pn_netaddr_t *na = pn_raw_connection_local_addr(hconn->raw_conn);
if (!!na) {
char host[200];
char port[50];
if (pn_netaddr_host_port(na, host, 200, port, 50) == 0) {
plog_set_string(hconn->plog, PLOG_ATTRIBUTE_SOURCE_HOST, host);
plog_set_string(hconn->plog, PLOG_ATTRIBUTE_SOURCE_PORT, port);
}
}

hconn->server.link_timeout = 0;
_setup_server_links(hconn);

Expand Down Expand Up @@ -627,6 +648,9 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi
hconn->server.reconnect_pause += RETRY_PAUSE_MSEC;
}

plog_end_record(hconn->plog);
hconn->plog = 0;

// prevent core activation
sys_mutex_lock(qdr_http1_adaptor->lock);
hconn->raw_conn = 0;
Expand Down Expand Up @@ -984,6 +1008,11 @@ static int _server_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_bo
_server_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
assert(rmsg && !rmsg->msg);

//
// Stop the timer on server latency
//
plog_latency_end(hreq->base.plog);

// start building the AMQP message

rmsg->msg = qd_message();
Expand All @@ -1007,6 +1036,7 @@ static int _server_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_bo
char u32_str[64];
snprintf(u32_str, sizeof(u32_str), "%"PRIu32, h1_codec_request_state_response_code(hrs));
qd_compose_insert_string(props, u32_str);
plog_set_string(hreq->base.plog, PLOG_ATTRIBUTE_RESULT, u32_str);
}
qd_compose_insert_null(props); // reply-to
qd_compose_insert_ulong(props, hreq->base.msg_id); // correlation-id
Expand Down Expand Up @@ -1340,6 +1370,8 @@ static _server_request_t *_create_request_context(qdr_http1_connection_t *hconn,
DEQ_INIT(hreq->responses);
DEQ_INSERT_TAIL(hconn->requests, &hreq->base);

hreq->base.plog = plog_start_record(PLOG_RECORD_FLOW, hconn->plog);

qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] New HTTP Request msg-id=%"PRIu64" reply-to=%s.",
hconn->conn_id, hconn->out_link_id, msg_id, reply_to);
Expand Down Expand Up @@ -1408,6 +1440,11 @@ static uint64_t _send_request_headers(_server_request_t *hreq, qd_message_t *msg
free(version_str);
}

ref = qd_parse_value_by_key(app_props, QD_AP_FLOW_ID);
if (ref) {
plog_set_ref_from_parsed(hreq->base.plog, PLOG_ATTRIBUTE_COUNTERFLOW, ref);
}

// done copying and converting!

qd_log(hconn->adaptor->log, QD_LOG_TRACE,
Expand Down Expand Up @@ -1490,8 +1527,8 @@ static uint64_t _send_request_headers(_server_request_t *hreq, qd_message_t *msg
//
static void _encode_request_message(_server_request_t *hreq)
{
qdr_http1_connection_t *hconn = hreq->base.hconn;
qd_message_t *msg = qdr_delivery_message(hreq->request_dlv);
qdr_http1_connection_t *hconn = hreq->base.hconn;
qd_message_t *msg = qdr_delivery_message(hreq->request_dlv);

if (!hreq->headers_encoded) {
hreq->request_dispo = _send_request_headers(hreq, msg);
Expand Down Expand Up @@ -1575,6 +1612,7 @@ static void _send_request_message(_server_request_t *hreq)

default:
// encoding failure
plog_set_string(hreq->base.plog, PLOG_ATTRIBUTE_REASON, "Encoding failure");
_cancel_request(hreq);
return;
}
Expand Down Expand Up @@ -1647,6 +1685,7 @@ uint64_t qdr_http1_server_core_link_deliver(qdr_http1_adaptor_t *adaptor,
//
qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
DLV_FMT" Client has aborted the request", DLV_ARGS(delivery));
plog_set_string(hreq->base.plog, PLOG_ATTRIBUTE_REASON, "Client abort");
_cancel_request(hreq);
return 0;
}
Expand Down Expand Up @@ -1703,6 +1742,7 @@ static void _server_request_free(_server_request_t *hreq)
rmsg = DEQ_HEAD(hreq->responses);
}

plog_end_record(hreq->base.plog);
free__server_request_t(hreq);
}
}
Expand All @@ -1714,10 +1754,12 @@ static void _write_pending_request(_server_request_t *hreq)
assert(DEQ_PREV(&hreq->base) == 0); // preserve order!
uint64_t written = qdr_http1_write_out_data(hreq->base.hconn, &hreq->out_data);
hreq->base.out_http1_octets += written;
if (written)
if (written) {
plog_latency_start(hreq->base.plog);
qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
"[C%"PRIu64"][L%"PRIu64"] %"PRIu64" request octets written to server",
hreq->base.hconn->conn_id, hreq->base.hconn->out_link_id, written);
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/adaptors/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ DEQ_DECLARE(qd_http_listener_t, qd_http_listener_list_t);

qd_http_listener_t *qd_http_listener(qd_server_t *server, qd_server_event_handler_t handler);
void qd_http_listener_decref(qd_http_listener_t* li);
inline static void qd_http_listener_incref(qd_http_listener_t *li) { sys_atomic_inc(&li->ref_count); }

typedef struct qd_http_connector_t qd_http_connector_t;
struct qd_http_connector_t {
Expand All @@ -91,6 +92,7 @@ DEQ_DECLARE(qd_http_connector_t, qd_http_connector_list_t);

qd_http_connector_t *qd_http_connector(qd_server_t *server);
void qd_http_connector_decref(qd_http_connector_t* c);
inline static void qd_http_connector_incref(qd_http_connector_t *c) { sys_atomic_inc(&c->ref_count); }



Expand Down
1 change: 1 addition & 0 deletions src/adaptors/tcp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,7 @@ QD_EXPORT void qd_dispatch_delete_tcp_connector(qd_dispatch_t *qd, void *impl)
"Deleted TcpConnector for %s, %s:%s",
ct->config->address, ct->config->host, ct->config->port);
close_egress_dispatcher_connection((qdr_tcp_connection_t*) ct->dispatcher_conn);
ct->dispatcher_conn = 0;
DEQ_REMOVE(tcp_adaptor->connectors, ct);
qd_tcp_connector_decref(ct);
}
Expand Down
2 changes: 1 addition & 1 deletion src/amqp.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include <stdlib.h>
#include <string.h>

const char * const QD_AP_FLOW_ID = "flowid";
const char * const QD_AP_FLOW_ID = ":flowid";
const char * const QD_CAPABILITY_ROUTER_CONTROL = "qd.router";
const char * const QD_CAPABILITY_ROUTER_DATA = "qd.router-data";
const char * const QD_CAPABILITY_EDGE_DOWNLINK = "qd.router-edge-downlink";
Expand Down
Loading

0 comments on commit 91fab9b

Please sign in to comment.