Skip to content

Commit

Permalink
Merge pull request #6230 from yosefe/topic/ucp-proto-improve-new-prot…
Browse files Browse the repository at this point in the history
…ocols-logging

UCP/PROTO: Improve new protocols logging
  • Loading branch information
yosefe authored Jan 31, 2021
2 parents c664678 + 48d5b56 commit 6c3588b
Show file tree
Hide file tree
Showing 18 changed files with 437 additions and 256 deletions.
2 changes: 1 addition & 1 deletion .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ ForEachMacros: ['FOR_EACH_ENTITY',
'UCS_TEST_F',
'UCX_PERF_TEST_FOREACH']
StatementMacros : []
TypenameMacros: ['khash_t']
TypenameMacros: ['khash_t', 'ucs_array_t']
WhitespaceSensitiveMacros: []

# CPP
Expand Down
11 changes: 7 additions & 4 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -2226,10 +2226,11 @@ static void ucp_ep_config_print(FILE *stream, ucp_worker_h worker,

void ucp_ep_print_info(ucp_ep_h ep, FILE *stream)
{
ucp_worker_h worker = ep->worker;
ucp_worker_h worker = ep->worker;
ucp_ep_config_t *config = ucp_ep_config(ep);
ucp_rsc_index_t aux_rsc_index;
ucp_lane_index_t wireup_msg_lane;
ucs_string_buffer_t strb;
uct_ep_h wireup_ep;

UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker);
Expand All @@ -2241,7 +2242,7 @@ void ucp_ep_print_info(ucp_ep_h ep, FILE *stream)

/* if there is a wireup lane, set aux_rsc_index to the stub ep resource */
aux_rsc_index = UCP_NULL_RESOURCE;
wireup_msg_lane = ucp_ep_config(ep)->key.wireup_msg_lane;
wireup_msg_lane = config->key.wireup_msg_lane;
if (wireup_msg_lane != UCP_NULL_LANE) {
wireup_ep = ep->uct_eps[wireup_msg_lane];
if (ucp_wireup_ep_test(wireup_ep)) {
Expand All @@ -2253,9 +2254,11 @@ void ucp_ep_print_info(ucp_ep_h ep, FILE *stream)
fprintf(stream, "#\n");

if (worker->context->config.ext.proto_enable) {
ucs_string_buffer_init(&strb);
ucp_proto_select_dump(worker, ep->cfg_index, UCP_WORKER_CFG_INDEX_NULL,
&config->proto_select, stream);
fprintf(stream, "#\n");
&config->proto_select, &strb);
ucs_string_buffer_dump(&strb, "# ", stream);
ucs_string_buffer_cleanup(&strb);
}

UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);
Expand Down
55 changes: 34 additions & 21 deletions src/ucp/core/ucp_rkey.c
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_ep_rkey_unpack, (ep, rkey_buffer, rkey_p),
ucp_rkey_resolve_inner(rkey, ep);
}

ucs_trace("unpacked rkey %p with md_map 0x%lx type %s", rkey, rkey->md_map,
ucs_memory_type_names[rkey->mem_type]);
*rkey_p = rkey;
status = UCS_OK;

Expand All @@ -311,43 +313,33 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_ep_rkey_unpack, (ep, rkey_buffer, rkey_p),
goto out_unlock;
}

void ucp_rkey_dump_packed(const void *rkey_buffer, char *buffer, size_t max)
void ucp_rkey_dump_packed(const void *rkey_buffer, ucs_string_buffer_t *strb)
{
char *p = buffer;
char *endp = buffer + max;
const uint8_t *rkey_buf = rkey_buffer;
ucs_memory_type_t mem_type;
ucp_md_map_t md_map;
unsigned md_index;
uint8_t md_size;
int first;

snprintf(p, endp - p, "{");
p += strlen(p);

md_map = *(ucp_md_map_t*)(rkey_buf);
rkey_buf += sizeof(ucp_md_map_t) + sizeof(uint8_t);
rkey_buf += sizeof(ucp_md_map_t);

mem_type = *(uint8_t*)rkey_buf;
rkey_buf += sizeof(uint8_t);

ucs_string_buffer_appendf(strb, "{%s", ucs_memory_type_names[mem_type]);

first = 1;
ucs_for_each_bit(md_index, md_map) {
md_size = *rkey_buf;
rkey_buf += sizeof(uint8_t);

if (!first) {
snprintf(p, endp - p, ",");
p += strlen(p);
}
first = 0;

snprintf(p, endp - p, "%d:", md_index);
p += strlen(p);

ucs_str_dump_hex(rkey_buf, md_size, p, endp - p, SIZE_MAX);
p += strlen(p);
ucs_string_buffer_appendf(strb, ",%d:", md_index);
ucs_string_buffer_append_hex(strb, rkey_buf, md_size, SIZE_MAX);

rkey_buf += md_size;
}

snprintf(p, endp - p, "}");
ucs_string_buffer_appendf(strb, "}");
}

ucs_status_t ucp_rkey_ptr(ucp_rkey_h rkey, uint64_t raddr, void **addr_p)
Expand Down Expand Up @@ -375,6 +367,8 @@ void ucp_rkey_destroy(ucp_rkey_h rkey)
unsigned remote_md_index, rkey_index;
ucp_worker_h UCS_V_UNUSED worker;

ucs_trace("destroying rkey %p", rkey);

rkey_index = 0;
ucs_for_each_bit(remote_md_index, rkey->md_map) {
uct_rkey_release(rkey->tl_rkey[rkey_index].cmpt,
Expand Down Expand Up @@ -516,3 +510,22 @@ void ucp_rkey_resolve_inner(ucp_rkey_h rkey, ucp_ep_h ep)
rkey->cache.rma_proto->name, rkey->cache.rma_lane, rkey->cache.rma_rkey,
rkey->cache.amo_proto->name, rkey->cache.amo_lane, rkey->cache.amo_rkey);
}

void ucp_rkey_config_dump_brief(const ucp_rkey_config_key_t *rkey_config_key,
ucs_string_buffer_t *strb)
{
ucs_string_buffer_appendf(strb, "%s memory, md_map 0x%" PRIx64,
ucs_memory_type_names[rkey_config_key->mem_type],
rkey_config_key->md_map);
}

void ucp_rkey_proto_select_dump(ucp_worker_h worker,
ucp_worker_cfg_index_t rkey_cfg_index,
ucs_string_buffer_t *strb)
{
const ucp_rkey_config_t *rkey_config = &worker->rkey_config[rkey_cfg_index];

ucp_proto_select_dump_short(&rkey_config->put_short, "put_short", strb);
ucp_proto_select_dump(worker, rkey_config->key.ep_cfg_index, rkey_cfg_index,
&rkey_config->proto_select, strb);
}
12 changes: 11 additions & 1 deletion src/ucp/core/ucp_rkey.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "ucp_types.h"

#include <ucp/core/ucp_context.h>
#include <ucp/proto/proto_select.h>


Expand Down Expand Up @@ -145,6 +146,15 @@ ssize_t ucp_rkey_pack_uct(ucp_context_h context, ucp_md_map_t md_map,
void *rkey_buffer);


void ucp_rkey_dump_packed(const void *rkey_buffer, char *buffer, size_t max);
void ucp_rkey_dump_packed(const void *rkey_buffer, ucs_string_buffer_t *strb);


void ucp_rkey_config_dump_brief(const ucp_rkey_config_key_t *rkey_config_key,
ucs_string_buffer_t *strb);


void ucp_rkey_proto_select_dump(ucp_worker_h worker,
ucp_worker_cfg_index_t rkey_cfg_index,
ucs_string_buffer_t *strb);

#endif
36 changes: 24 additions & 12 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -1653,11 +1653,11 @@ static char* ucp_worker_add_feature_rsc(ucp_context_h context,
return p;
}

static void ucp_worker_print_used_tls(const ucp_ep_config_key_t *key,
ucp_context_h context,
ucp_worker_cfg_index_t config_idx)
char *ucp_worker_print_used_tls(const ucp_ep_config_key_t *key,
ucp_context_h context,
ucp_worker_cfg_index_t config_idx, char *info,
size_t max)
{
char info[256] = {0};
ucp_lane_map_t tag_lanes_map = 0;
ucp_lane_map_t rma_lanes_map = 0;
ucp_lane_map_t amo_lanes_map = 0;
Expand All @@ -1666,12 +1666,8 @@ static void ucp_worker_print_used_tls(const ucp_ep_config_key_t *key,
ucp_lane_index_t lane;
char *p, *endp;

if (!ucs_log_is_enabled(UCS_LOG_LEVEL_INFO)) {
return;
}

p = info;
endp = p + sizeof(info);
endp = p + max;

snprintf(p, endp - p, "ep_cfg[%d]: ", config_idx);
p += strlen(p);
Expand Down Expand Up @@ -1719,7 +1715,8 @@ static void ucp_worker_print_used_tls(const ucp_ep_config_key_t *key,
p, endp - p);
ucp_worker_add_feature_rsc(context, key, stream_lanes_map, "stream",
p, endp - p);
ucs_info("%s", info);

return info;
}

static ucs_status_t ucp_worker_init_mpools(ucp_worker_h worker)
Expand Down Expand Up @@ -1825,6 +1822,7 @@ ucp_worker_get_ep_config(ucp_worker_h worker, const ucp_ep_config_key_t *key,
ucp_ep_config_t *ep_config;
ucp_memtype_thresh_t *max_eager_short;
ucs_status_t status;
char tl_info[256];

/* Search for the given key in the ep_config array */
for (ep_cfg_index = 0; ep_cfg_index < worker->ep_config_count;
Expand Down Expand Up @@ -1875,7 +1873,8 @@ ucp_worker_get_ep_config(ucp_worker_h worker, const ucp_ep_config_key_t *key,
}

if (print_cfg) {
ucp_worker_print_used_tls(key, context, ep_cfg_index);
ucs_info("%s", ucp_worker_print_used_tls(key, context, ep_cfg_index,
tl_info, sizeof(tl_info)));
}

++worker->ep_config_count;
Expand Down Expand Up @@ -2630,10 +2629,12 @@ void ucp_worker_release_address(ucp_worker_h worker, ucp_address_t *address)
void ucp_worker_print_info(ucp_worker_h worker, FILE *stream)
{
ucp_context_h context = worker->context;
ucp_worker_cfg_index_t rkey_cfg_index;
ucp_rsc_index_t rsc_index;
ucs_string_buffer_t strb;
ucp_address_t *address;
size_t address_length;
ucs_status_t status;
ucp_rsc_index_t rsc_index;
int first;

UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker);
Expand Down Expand Up @@ -2668,6 +2669,17 @@ void ucp_worker_print_info(ucp_worker_h worker, FILE *stream)

fprintf(stream, "#\n");

if (context->config.ext.proto_enable) {
ucs_string_buffer_init(&strb);
for (rkey_cfg_index = 0; rkey_cfg_index < worker->rkey_config_count;
++rkey_cfg_index) {
ucp_rkey_proto_select_dump(worker, rkey_cfg_index, &strb);
ucs_string_buffer_appendf(&strb, "\n");
}
ucs_string_buffer_dump(&strb, "# ", stream);
ucs_string_buffer_cleanup(&strb);
}

UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);
}

Expand Down
5 changes: 5 additions & 0 deletions src/ucp/core/ucp_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@ void ucp_worker_discard_uct_ep(ucp_ep_h ucp_ep, uct_ep_h uct_ep,
uct_pending_purge_callback_t purge_cb,
void *purge_arg);

char *ucp_worker_print_used_tls(const ucp_ep_config_key_t *key,
ucp_context_h context,
ucp_worker_cfg_index_t config_idx, char *info,
size_t max);

/* must be called with async lock held */
static UCS_F_ALWAYS_INLINE void
ucp_worker_flush_ops_count_inc(ucp_worker_h worker)
Expand Down
28 changes: 20 additions & 8 deletions src/ucp/proto/proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
#define UCP_PROTO_ID_INVALID ((ucp_proto_id_t)-1)


/** Maximal length of ucp_proto_config_str_func_t output */
#define UCP_PROTO_CONFIG_STR_MAX 128


/* Protocol identifier */
typedef unsigned ucp_proto_id_t;

Expand Down Expand Up @@ -142,14 +146,18 @@ typedef ucs_status_t
/**
* Dump protocol-specific configuration.
*
* @param [in] priv Protocol private data, which was previously filled by
* @ref ucp_proto_init_func_t.
* @param [out] strb Filled with a string of protocol configuration text.
* The user is responsible to release the string by
* calling @ref ucs_string_buffer_cleanup.
* @param [in] min_length Return information starting from this message length.
* @param [in] max_length Return information up to this message length (inclusive).
* @param [in] priv Protocol private data, which was previously filled by
* @ref ucp_proto_init_func_t.
* @param [out] strb Protocol configuration text should be written to this
* string buffer. This function should only **append**
* data to the buffer, and should not initialize, release
* or erase any data already in the buffer.
*/
typedef void
(*ucp_proto_config_str_func_t)(const void *priv, ucs_string_buffer_t *strb);
typedef void (*ucp_proto_config_str_func_t)(size_t min_length,
size_t max_length, const void *priv,
ucs_string_buffer_t *strb);


/**
Expand All @@ -160,7 +168,11 @@ struct ucp_proto {
unsigned flags; /* Protocol flags for special handling */
ucp_proto_init_func_t init; /* Initialization function */
ucp_proto_config_str_func_t config_str; /* Configuration dump function */
uct_pending_callback_t progress; /* UCT progress function */

/* Initial UCT progress function, can be changed during the protocol
* request lifetime to implement different stages
*/
uct_pending_callback_t progress;
};


Expand Down
Loading

0 comments on commit 6c3588b

Please sign in to comment.