Skip to content

Commit

Permalink
caliper: message tracing and child trace avoidance
Browse files Browse the repository at this point in the history
This update adds tracing for various properties of messages, RPCs,
events, etc.  I'm not certain this will give us all we need to get
matches, but it's on the way.  I've also added some environment
modifications in broker to keep children from being traced, which cuts
down on the extra caliper output significantly (down to one per broker).
The unsetenv approach is also not optimal, but it helps a great deal
while we work on figuring out something cleaner.

This includes switching to using caliper's pkg-config to configure our
build, refactors the caliper usage in handle as suggested by @garlick,
and runs clang-format against everything to ensure line lengths and
spacing is respected.

The profiling_msg_snapshot routine can be called due to messaging
activity inside the connector_init, so the profiling object is not
ready, and caused uninitialized nodes to be emitted.  This is currently
addressed by a check and return.
  • Loading branch information
trws committed Jul 26, 2016
1 parent 0f8a54f commit f977ec8
Show file tree
Hide file tree
Showing 8 changed files with 278 additions and 40 deletions.
28 changes: 17 additions & 11 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ AM_MAINTAINER_MODE
AC_DEFINE([_GNU_SOURCE], 1,
[Define _GNU_SOURCE so that we get all necessary prototypes])

##
# Initialize pkg-config for PKG_CHECK_MODULES to avoid conditional issues
##
PKG_PROG_PKG_CONFIG

##
# Checks for programs
##
Expand Down Expand Up @@ -69,17 +74,6 @@ X_AC_CHECK_COND_LIB(dl, dlerror)
X_AC_MALLOC
AC_CHECK_LIB(m, floor)

LIBCALIPER=
AC_ARG_ENABLE(caliper,
[ --enable-caliper[=OPTS] Use caliper for profiling. [default=no] [OPTS=no/yes]], ,
[enable_caliper="no"])

if test "$enable_caliper" = "yes"; then
AC_SUBST([LIBCALIPER], ["-lcaliper"])
AC_DEFINE([HAVE_CALIPER], [1], [Define if you have libcaliper])
AC_DEFINE([PROFILING], [1], [Compile in profiling hooks])
fi

AC_MSG_CHECKING(--enable-python argument)
AC_ARG_ENABLE(python,
[ --enable-python[=OPTS] Include Python bindings. [default=yes] [OPTS=no/yes]], ,
Expand Down Expand Up @@ -123,6 +117,18 @@ LX_FIND_MPI
AM_CONDITIONAL([HAVE_MPI], [test "$have_C_mpi" = yes])
AX_CODE_COVERAGE

AC_ARG_ENABLE(caliper,
[ --enable-caliper[=OPTS] Use caliper for profiling. [default=no] [OPTS=no/yes]], ,
[enable_caliper="no"])

if test "$enable_caliper" = "yes"; then
PKG_CHECK_MODULES([CALIPER], [caliper], [], [])
CFLAGS="${CFLAGS} ${CALIPER_CFLAGS} "
LIBS="${LIBS} ${CALIPER_LIBS} -lrt "
AC_DEFINE([HAVE_CALIPER], [1], [Define if you have libcaliper])
fi


##
# Embedded libev
##
Expand Down
5 changes: 5 additions & 0 deletions src/broker/broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,11 @@ int main (int argc, char *argv[])
cali_begin_int_byname ("flux.tid", syscall(SYS_gettid));
cali_begin_string_byname ("binary", argv[0]);
cali_begin_int_byname ("flux.rank", ctx.rank);
// TODO: this is a stopgap until we have better control over
// instrumemtation in child processes. If we want to see what children
// that load libflux are up to, this should be disabled
unsetenv ("CALI_SERVICES_ENABLE");
unsetenv ("CALI_CONFIG_PROFILE");
#endif

/* Create directory for sockets, and a subdirectory specific
Expand Down
2 changes: 1 addition & 1 deletion src/common/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ libflux_internal_la_LIBADD = \
$(builddir)/libsubprocess/libsubprocess.la \
$(builddir)/libcompat/libcompat.la \
$(LIBMUNGE) $(JSON_LIBS) $(ZMQ_LIBS) $(LIBPTHREAD) $(LIBUTIL) \
$(LIBDL) $(LIBRT) $(LIBCALIPER)
$(LIBDL) $(LIBRT) $(CALIPER_LIBS)
libflux_internal_la_LDFLAGS = $(san_ld_zdef_flag)

lib_LTLIBRARIES = libflux-core.la libflux-optparse.la
Expand Down
48 changes: 32 additions & 16 deletions src/common/libflux/dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ struct dispatch {
#if HAVE_CALIPER
cali_id_t prof_msg_type;
cali_id_t prof_msg_topic;
cali_id_t prof_msg_dispatch;
#endif
};

Expand Down Expand Up @@ -154,12 +155,15 @@ static struct dispatch *dispatch_get (flux_t h)
fastpath_init (&d->norm);
fastpath_init (&d->group);
#if HAVE_CALIPER
d->prof_msg_type = cali_create_attribute("flux.message.type",
CALI_TYPE_STRING,
CALI_ATTR_SKIP_EVENTS);
d->prof_msg_topic = cali_create_attribute("flux.message.topic",
d->prof_msg_type = cali_create_attribute ("flux.message.type",
CALI_TYPE_STRING,
CALI_ATTR_DEFAULT);
CALI_ATTR_SKIP_EVENTS);
d->prof_msg_topic = cali_create_attribute ("flux.message.topic",
CALI_TYPE_STRING,
CALI_ATTR_SKIP_EVENTS);
d->prof_msg_dispatch = cali_create_attribute ("flux.message.dispatch",
CALI_TYPE_BOOL,
CALI_ATTR_DEFAULT);
#endif
flux_aux_set (h, "flux::dispatch", d, dispatch_destroy);
}
Expand Down Expand Up @@ -553,8 +557,10 @@ static int delete_items_zlist (zlist_t *l, item_test_f item_test,
return rc;
}

static void handle_cb (flux_reactor_t *r, flux_watcher_t *hw,
int revents, void *arg)
static void handle_cb (flux_reactor_t *r,
flux_watcher_t *hw,
int revents,
void *arg)
{
struct dispatch *d = arg;
flux_msg_t *msg = NULL;
Expand All @@ -573,17 +579,20 @@ static void handle_cb (flux_reactor_t *r, flux_watcher_t *hw,
goto done;
}

const char * topic;
flux_msg_get_topic(msg, &topic);
const char *topic;
flux_msg_get_topic (msg, &topic);
/* Add any new handlers here, making handler creation
* safe to call during handlers list traversal below.
*/
if (transfer_items_zlist (d->handlers_new, d->handlers) < 0)
goto done;

#if defined(HAVE_CALIPER)
cali_begin_string(d->prof_msg_type, flux_msg_typestr(type));
cali_begin_string(d->prof_msg_topic, topic);
cali_begin_string (d->prof_msg_type, flux_msg_typestr (type));
cali_begin_string (d->prof_msg_topic, topic);
cali_begin (d->prof_msg_dispatch);
cali_end (d->prof_msg_topic);
cali_end (d->prof_msg_type);
#endif

if ((flux_flags_get (d->h) & FLUX_O_COPROC))
Expand All @@ -592,19 +601,24 @@ static void handle_cb (flux_reactor_t *r, flux_watcher_t *hw,
match = dispatch_message (d, msg, type);

#if defined(HAVE_CALIPER)
cali_end(d->prof_msg_topic);
cali_end(d->prof_msg_type);
cali_begin_string (d->prof_msg_type, flux_msg_typestr (type));
cali_begin_string (d->prof_msg_topic, topic);
cali_end (d->prof_msg_dispatch);
cali_end (d->prof_msg_topic);
cali_end (d->prof_msg_type);
#endif

if (match < 0)
goto done;
/* Destroy handlers here, making handler destruction
* safe to call during handlers list traversal above.
*/
if (delete_items_zlist (d->handlers_new, item_test_destroyed,
if (delete_items_zlist (d->handlers_new,
item_test_destroyed,
(flux_free_f)free_msg_handler) < 0)
goto done;
if (delete_items_zlist (d->handlers, item_test_destroyed,
if (delete_items_zlist (d->handlers,
item_test_destroyed,
(flux_free_f)free_msg_handler) < 0)
goto done;
/* Message matched nothing.
Expand All @@ -618,7 +632,9 @@ static void handle_cb (flux_reactor_t *r, flux_watcher_t *hw,
} else if (flux_flags_get (d->h) & FLUX_O_TRACE) {
const char *topic = NULL;
(void)flux_msg_get_topic (msg, &topic);
fprintf (stderr, "nomatch: %s '%s'\n", flux_msg_typestr (type),
fprintf (stderr,
"nomatch: %s '%s'\n",
flux_msg_typestr (type),
topic ? topic : "");
}
}
Expand Down
152 changes: 149 additions & 3 deletions src/common/libflux/handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
#include <sys/epoll.h>
#include <poll.h>
#include <czmq.h>
#if HAVE_CALIPER
#include <caliper/cali.h>
#include <sys/syscall.h>
#endif

#include "handle.h"
#include "reactor.h"
Expand All @@ -44,6 +48,22 @@
#include "src/common/libutil/log.h"
#include "src/common/libutil/msglist.h"

#if HAVE_CALIPER
struct profiling_context {
int initialized;
cali_id_t msg_type;
cali_id_t msg_seq;
cali_id_t msg_topic;
cali_id_t msg_sender;
cali_id_t msg_rpc;
cali_id_t msg_rpc_nodeid;
cali_id_t msg_rpc_resp_expected;
cali_id_t msg_action;
cali_id_t msg_match_type;
cali_id_t msg_match_tag;
cali_id_t msg_match_glob;
};
#endif

struct flux_handle_struct {
const struct flux_handle_ops *ops;
Expand All @@ -60,8 +80,111 @@ struct flux_handle_struct {
void *fatal_arg;
bool fatality;
int usecount;
#if HAVE_CALIPER
struct profiling_context prof;
#endif
};

#if HAVE_CALIPER
void profiling_context_init (struct profiling_context* prof)
{
prof->msg_type = cali_create_attribute ("flux.message.type",
CALI_TYPE_STRING,
CALI_ATTR_DEFAULT | CALI_ATTR_ASVALUE);
prof->msg_seq = cali_create_attribute ("flux.message.seq",
CALI_TYPE_INT,
CALI_ATTR_SKIP_EVENTS);
prof->msg_topic = cali_create_attribute ("flux.message.topic",
CALI_TYPE_STRING,
CALI_ATTR_DEFAULT | CALI_ATTR_ASVALUE);
prof->msg_sender = cali_create_attribute ("flux.message.sender",
CALI_TYPE_STRING,
CALI_ATTR_SKIP_EVENTS);
// if flux.message.rpc is set, we're inside an RPC, it will be set to a
// type, single or multi
prof->msg_rpc = cali_create_attribute ("flux.message.rpc",
CALI_TYPE_STRING,
CALI_ATTR_SKIP_EVENTS);
prof->msg_rpc_nodeid = cali_create_attribute ("flux.message.rpc.nodeid",
CALI_TYPE_INT,
CALI_ATTR_SKIP_EVENTS);
prof->msg_rpc_resp_expected =
cali_create_attribute ("flux.message.response_expected",
CALI_TYPE_INT,
CALI_ATTR_SKIP_EVENTS);
prof->msg_action = cali_create_attribute ("flux.message.action",
CALI_TYPE_STRING,
CALI_ATTR_DEFAULT | CALI_ATTR_ASVALUE);
prof->msg_match_type = cali_create_attribute ("flux.message.match.type",
CALI_TYPE_INT,
CALI_ATTR_SKIP_EVENTS);
prof->msg_match_tag = cali_create_attribute ("flux.message.match.tag",
CALI_TYPE_INT,
CALI_ATTR_SKIP_EVENTS);
prof->msg_match_glob = cali_create_attribute ("flux.message.match.glob",
CALI_TYPE_STRING,
CALI_ATTR_SKIP_EVENTS);
prof->initialized=1;
}

static void profiling_msg_snapshot (flux_t h,
const flux_msg_t *msg,
int flags,
const char *msg_action)
{
cali_id_t attributes[3];
const void * data[3];
size_t size[3];

// This can get called before the handle is really ready
if(! h->prof.initialized) return;

int len = 0;

if (msg_action) {
attributes[len] = h->prof.msg_action;
data[len] = msg_action;
size[len] = strlen(msg_action);
++len;
}

int type;
flux_msg_get_type (msg, &type);
const char *msg_type = flux_msg_typestr (type);
if (msg_type) {
attributes[len] = h->prof.msg_type;
data[len] = msg_type;
size[len] = strlen(msg_type);
++len;
}

const char *msg_topic;
if (type != FLUX_MSGTYPE_KEEPALIVE)
flux_msg_get_topic (msg, &msg_topic);
else
msg_topic = "NONE";
/* attributes[len] = h->prof.msg_topic; */
/* data[len] = msg_topic; */
/* size[len] = strlen(msg_topic); */
/* ++len; */

if (type == FLUX_MSGTYPE_EVENT) {
uint32_t seq;
flux_msg_get_seq (msg, &seq);
cali_begin_int (h->prof.msg_seq, seq);
}
cali_push_snapshot (CALI_SCOPE_PROCESS | CALI_SCOPE_THREAD,
len /* n_entries */,
attributes /* event_attributes */,
data /* event_data */,
size /* event_size */);
if (type == FLUX_MSGTYPE_EVENT)
cali_end (h->prof.msg_seq);
}


#endif

static char *find_file_r (const char *name, const char *dirpath)
{
DIR *dir;
Expand Down Expand Up @@ -206,12 +329,14 @@ flux_t flux_open (const char *uri, int flags)
goto done;
}
h->dso = dso;
#if HAVE_CALIPER
profiling_context_init(&h->prof);
#endif
done:
if (scheme)
free (scheme);
return h;
}

void flux_close (flux_t h)
{
int saved_errno = errno;
Expand Down Expand Up @@ -444,6 +569,9 @@ int flux_send (flux_t h, const flux_msg_t *msg, int flags)
flux_msg_fprint (stderr, msg);
if (h->ops->send (h->impl, msg, flags) < 0)
goto fatal;
#if HAVE_CALIPER
profiling_msg_snapshot(h, msg, flags, "send");
#endif
return 0;
fatal:
FLUX_FATAL (h);
Expand Down Expand Up @@ -513,8 +641,8 @@ flux_msg_t *flux_recv (flux_t h, struct flux_match match, int flags)
int saved_errno;

flags |= h->flags;
if (!(flags & FLUX_O_NONBLOCK) && (flags & FLUX_O_COPROC)
&& flux_sleep_on (h, match) < 0) {
if (!(flags & FLUX_O_NONBLOCK) && (flags & FLUX_O_COPROC) &&
flux_sleep_on (h, match) < 0) {
if (errno != EINVAL)
goto fatal;
errno = 0;
Expand All @@ -541,6 +669,24 @@ flux_msg_t *flux_recv (flux_t h, struct flux_match match, int flags)
if (defer_requeue (&l, h) < 0)
goto fatal;
defer_destroy (&l);
#if HAVE_CALIPER
cali_begin_int (h->prof.msg_match_type, match.typemask);
cali_begin_int (h->prof.msg_match_tag, match.matchtag);
cali_begin_string (h->prof.msg_match_glob,
match.topic_glob ? match.topic_glob : "NONE");
char *sender = NULL;
flux_msg_get_route_first (msg, &sender);
if (sender)
cali_begin_string (h->prof.msg_sender, sender);
profiling_msg_snapshot (h, msg, flags, "recv");
if (sender)
cali_end (h->prof.msg_sender);
cali_end (h->prof.msg_match_type);
cali_end (h->prof.msg_match_tag);
cali_end (h->prof.msg_match_glob);

free (sender);
#endif
return msg;
fatal:
saved_errno = errno;
Expand Down
Loading

0 comments on commit f977ec8

Please sign in to comment.