Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitoring #1

Merged
merged 2 commits into from
Oct 7, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ompi/mca/pml/monitoring/pml_monitoring.c
Original file line number Diff line number Diff line change
@@ -154,7 +154,7 @@ int mca_pml_monitoring_get_messages_count (const struct mca_base_pvar_t *pvar, v
uint64_t *values = (uint64_t*) value;
int i;

if(comm != &ompi_mpi_comm_world.comm)
if(comm != &ompi_mpi_comm_world.comm || NULL == messages_count)
return OMPI_ERROR;

for (i = 0 ; i < comm_size ; ++i) {
@@ -171,7 +171,7 @@ int mca_pml_monitoring_get_messages_size (const struct mca_base_pvar_t *pvar, vo
uint64_t *values = (uint64_t*) value;
int i;

if(comm != &ompi_mpi_comm_world.comm)
if(comm != &ompi_mpi_comm_world.comm || NULL == sent_data)
return OMPI_ERROR;

for (i = 0 ; i < comm_size ; ++i) {
60 changes: 35 additions & 25 deletions ompi/mca/pml/monitoring/pml_monitoring_component.c
Original file line number Diff line number Diff line change
@@ -79,6 +79,29 @@ mca_pml_monitoring_notify_flush(struct mca_base_pvar_t *pvar, mca_base_pvar_even
return OMPI_ERROR;
}

static int
mca_pml_monitoring_messages_notify(mca_base_pvar_t *pvar,
mca_base_pvar_event_t event,
void *obj_handle,
int *count)
{
switch (event) {
case MCA_BASE_PVAR_HANDLE_BIND:
/* Return the size of the communicator as the number of values */
*count = ompi_comm_size ((ompi_communicator_t *) obj_handle);
case MCA_BASE_PVAR_HANDLE_UNBIND:
return OMPI_SUCCESS;
case MCA_BASE_PVAR_HANDLE_START:
mca_pml_monitoring_current_state = mca_pml_monitoring_enabled;
return OMPI_SUCCESS;
case MCA_BASE_PVAR_HANDLE_STOP:
mca_pml_monitoring_current_state = 0;
return OMPI_SUCCESS;
}

return OMPI_ERROR;
}

int mca_pml_monitoring_enable(bool enable)
{
/* If we reach this point we were succesful at hijacking the interface of
@@ -92,6 +115,18 @@ int mca_pml_monitoring_enable(bool enable)
mca_pml_monitoring_get_flush, mca_pml_monitoring_set_flush,
mca_pml_monitoring_notify_flush, &mca_pml_monitoring_component);

(void)mca_base_pvar_register("ompi", "pml", "monitoring", "messages_count", "Number of messages "
"sent to each peer in a communicator", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE,
MCA_BASE_VAR_TYPE_UNSIGNED_LONG, NULL, MPI_T_BIND_MPI_COMM,
MCA_BASE_PVAR_FLAG_READONLY,
mca_pml_monitoring_get_messages_count, NULL, mca_pml_monitoring_messages_notify, NULL);

(void)mca_base_pvar_register("ompi", "pml", "monitoring", "messages_size", "Size of messages "
"sent to each peer in a communicator", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE,
MCA_BASE_VAR_TYPE_UNSIGNED_LONG, NULL, MPI_T_BIND_MPI_COMM,
MCA_BASE_PVAR_FLAG_READONLY,
mca_pml_monitoring_get_messages_size, NULL, mca_pml_monitoring_messages_notify, NULL);

return pml_selected_module.pml_enable(enable);
}

@@ -104,20 +139,6 @@ static int mca_pml_monitoring_component_open(void)
return OMPI_SUCCESS;
}

static int
mca_pml_monitoring_comm_size_notify(mca_base_pvar_t *pvar,
mca_base_pvar_event_t event,
void *obj_handle,
int *count)
{
if (MCA_BASE_PVAR_HANDLE_BIND == event) {
/* Return the size of the communicator as the number of values */
*count = ompi_comm_size ((ompi_communicator_t *) obj_handle);
}

return OMPI_SUCCESS;
}

static int mca_pml_monitoring_component_close(void)
{
if( NULL != mca_pml_monitoring_current_filename ) {
@@ -198,17 +219,6 @@ static int mca_pml_monitoring_component_register(void)
OPAL_INFO_LVL_4,
MCA_BASE_VAR_SCOPE_READONLY, &mca_pml_monitoring_enabled);

(void)mca_base_pvar_register("ompi", "pml", "monitoring", "messages_count", "Number of messages "
"sent to each peer in a communicator", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE,
MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, MPI_T_BIND_MPI_COMM,
MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_CONTINUOUS,
mca_pml_monitoring_get_messages_count, NULL, mca_pml_monitoring_comm_size_notify, NULL);

(void)mca_base_pvar_register("ompi", "pml", "monitoring", "messages_size", "Size of messages "
"sent to each peer in a communicator", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE,
MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, MPI_T_BIND_MPI_COMM,
MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_CONTINUOUS,
mca_pml_monitoring_get_messages_size, NULL, mca_pml_monitoring_comm_size_notify, NULL);
return OMPI_SUCCESS;
}

5 changes: 5 additions & 0 deletions test/monitoring/Makefile.am
Original file line number Diff line number Diff line change
@@ -14,8 +14,13 @@
# of 'make check'
if PROJECT_OMPI
noinst_PROGRAMS = monitoring_test
lib_LTLIBRARIES = monitoring_prof.la

monitoring_test_SOURCES = monitoring_test.c
monitoring_test_LDFLAGS = $(WRAPPER_EXTRA_LDFLAGS)
monitoring_test_LDADD = $(top_builddir)/ompi/libmpi.la $(top_builddir)/opal/libopen-pal.la

monitoring_prof_la_SOURCES = monitoring_prof.c
monitoring_prof_la_LDFLAGS=-module -avoid-version -shared $(WRAPPER_EXTRA_LDFLAGS)
monitoring_prof_la_LIBADD = $(top_builddir)/ompi/libmpi.la $(top_builddir)/opal/libopen-pal.la
endif
253 changes: 253 additions & 0 deletions test/monitoring/monitoring_prof.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
/*
* Copyright (c) 2013-2015 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2013-2015 Inria. All rights reserved.
* Copyright (c) 2013-2015 Bull SAS. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/

/*
pml monitoring PMPI profiler
Designed by George Bosilca <bosilca@icl.utk.edu>, Emmanuel Jeannot <emmanuel.jeannot@inria.fr> and Guillaume Papauré <guillaume.papaure@bull.net>
Contact the authors for questions.
To be run as:
mpirun -np 4 -x LD_PRELOAD=ompi_install_dir/lib/monitoring_prof.so --mca pml_monitoring_enable 1 ./my_app
...
...
...
writing 4x4 matrix to monitoring_msg.mat
writing 4x4 matrix to monitoring_size.mat
writing 4x4 matrix to monitoring_avg.mat
*/

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <string.h>
#include <stdint.h>

static MPI_T_pvar_session session;
static int comm_world_size;
static int comm_world_rank;

struct monitoring_result
{
char * pvar_name;
int pvar_idx;
MPI_T_pvar_handle pvar_handle;
uint64_t * vector;
};
typedef struct monitoring_result monitoring_result;

static monitoring_result counts;
static monitoring_result sizes;

static int write_mat(char *, uint64_t *, unsigned int);
static void init_monitoring_result(const char *, monitoring_result *);
static void start_monitoring_result(monitoring_result *);
static void stop_monitoring_result(monitoring_result *);
static void get_monitoring_result(monitoring_result *);
static void destroy_monitoring_result(monitoring_result *);

int MPI_Init(int* argc, char*** argv)
{
int result, MPIT_result;
int provided;

result = PMPI_Init(argc, argv);

PMPI_Comm_size(MPI_COMM_WORLD, &comm_world_size);
PMPI_Comm_rank(MPI_COMM_WORLD, &comm_world_rank);

MPIT_result = MPI_T_init_thread(MPI_THREAD_SINGLE, &provided);
if (MPIT_result != MPI_SUCCESS) {
fprintf(stderr, "ERROR : failed to intialize MPI_T interface, preventing to get monitoring results: check your OpenMPI installation\n");
PMPI_Abort(MPI_COMM_WORLD, MPIT_result);
}

MPIT_result = MPI_T_pvar_session_create(&session);
if (MPIT_result != MPI_SUCCESS) {
fprintf(stderr, "ERROR : failed to create MPI_T session, preventing to get monitoring results: check your OpenMPI installation\n");
PMPI_Abort(MPI_COMM_WORLD, MPIT_result);
}

init_monitoring_result("pml_monitoring_messages_count", &counts);
init_monitoring_result("pml_monitoring_messages_size", &sizes);

start_monitoring_result(&counts);
start_monitoring_result(&sizes);

return result;
}

int MPI_Finalize(void)
{
int result, MPIT_result;
uint64_t * exchange_count_matrix = NULL;
uint64_t * exchange_size_matrix = NULL;
uint64_t * exchange_avg_size_matrix = NULL;

if (0 == comm_world_rank) {
exchange_count_matrix = (uint64_t *) malloc(comm_world_size * comm_world_size * sizeof(uint64_t));
exchange_size_matrix = (uint64_t *) malloc(comm_world_size * comm_world_size * sizeof(uint64_t));
exchange_avg_size_matrix = (uint64_t *) malloc(comm_world_size * comm_world_size * sizeof(uint64_t));
}

stop_monitoring_result(&counts);
stop_monitoring_result(&sizes);

get_monitoring_result(&counts);
get_monitoring_result(&sizes);

PMPI_Gather(counts.vector, comm_world_size, MPI_UNSIGNED_LONG, exchange_count_matrix, comm_world_size, MPI_UNSIGNED_LONG, 0, MPI_COMM_WORLD);
PMPI_Gather(sizes.vector, comm_world_size, MPI_UNSIGNED_LONG, exchange_size_matrix, comm_world_size, MPI_UNSIGNED_LONG, 0, MPI_COMM_WORLD);

if (0 == comm_world_rank) {
int i, j;

//Get the same matrix than profile2mat.pl
for (i = 0; i < comm_world_size; ++i) {
for (j = i + 1; j < comm_world_size; ++j) {
exchange_count_matrix[i * comm_world_size + j] = exchange_count_matrix[j * comm_world_size + i] = (exchange_count_matrix[i * comm_world_size + j] + exchange_count_matrix[j * comm_world_size + i]) / 2;
exchange_size_matrix[i * comm_world_size + j] = exchange_size_matrix[j * comm_world_size + i] = (exchange_size_matrix[i * comm_world_size + j] + exchange_size_matrix[j * comm_world_size + i]) / 2;
if (exchange_count_matrix[i * comm_world_size + j] != 0)
exchange_avg_size_matrix[i * comm_world_size + j] = exchange_avg_size_matrix[j * comm_world_size + i] = exchange_size_matrix[i * comm_world_size + j] / exchange_count_matrix[i * comm_world_size + j];
}
}

write_mat("monitoring_msg.mat", exchange_count_matrix, comm_world_size);
write_mat("monitoring_size.mat", exchange_size_matrix, comm_world_size);
write_mat("monitoring_avg.mat", exchange_avg_size_matrix, comm_world_size);
}

free(exchange_count_matrix);
free(exchange_size_matrix);
free(exchange_avg_size_matrix);
destroy_monitoring_result(&counts);
destroy_monitoring_result(&sizes);

MPIT_result = MPI_T_pvar_session_free(&session);
if (MPIT_result != MPI_SUCCESS) {
fprintf(stderr, "WARNING : failed to free MPI_T session, monitoring results may be impacted : check your OpenMPI installation\n");
}

MPIT_result = MPI_T_finalize();
if (MPIT_result != MPI_SUCCESS) {
fprintf(stderr, "WARNING : failed to finalize MPI_T interface, monitoring results may be impacted : check your OpenMPI installation\n");
}

result = PMPI_Finalize();

return result;
}

void init_monitoring_result(const char * pvar_name, monitoring_result * res)
{
int count;
int MPIT_result;
MPI_Comm comm_world = MPI_COMM_WORLD;

res->pvar_name = strdup(pvar_name);

MPIT_result = MPI_T_pvar_get_index(res->pvar_name, MPI_T_PVAR_CLASS_SIZE, &(res->pvar_idx));
if (MPIT_result != MPI_SUCCESS) {
fprintf(stderr, "ERROR : cannot find monitoring MPI_T \"%s\" pvar, check that you have monitoring pml\n", pvar_name);
PMPI_Abort(MPI_COMM_WORLD, MPIT_result);
}

MPIT_result = MPI_T_pvar_handle_alloc(session, res->pvar_idx, comm_world, &(res->pvar_handle), &count);
if (MPIT_result != MPI_SUCCESS) {
fprintf(stderr, "ERROR : failed to allocate handle on \"%s\" pvar, check that you have monitoring pml\n", pvar_name);
PMPI_Abort(MPI_COMM_WORLD, MPIT_result);
}

if (count != comm_world_size) {
fprintf(stderr, "ERROR : COMM_WORLD has %d ranks \"%s\" pvar contains %d values, check that you have monitoring pml\n", comm_world_size, pvar_name, count);
PMPI_Abort(MPI_COMM_WORLD, count);
}

res->vector = (uint64_t *) malloc(comm_world_size * sizeof(uint64_t));
}

void start_monitoring_result(monitoring_result * res)
{
int MPIT_result;

MPIT_result = MPI_T_pvar_start(session, res->pvar_handle);
if (MPIT_result != MPI_SUCCESS) {
fprintf(stderr, "ERROR : failed to start handle on \"%s\" pvar, check that you have enabled the monitoring pml\n", res->pvar_name);
PMPI_Abort(MPI_COMM_WORLD, MPIT_result);
}
}

void stop_monitoring_result(monitoring_result * res)
{
int MPIT_result;

MPIT_result = MPI_T_pvar_stop(session, res->pvar_handle);
if (MPIT_result != MPI_SUCCESS) {
fprintf(stderr, "ERROR : failed to stop handle on \"%s\" pvar, check that you have enabled the monitoring pml\n", res->pvar_name);
MPI_Abort(MPI_COMM_WORLD, MPIT_result);
}
}

void get_monitoring_result(monitoring_result * res)
{
int MPIT_result;

MPIT_result = MPI_T_pvar_read(session, res->pvar_handle, res->vector);
if (MPIT_result != MPI_SUCCESS) {
fprintf(stderr, "ERROR : failed to read \"%s\" pvar, check that you have enabled the monitoring pml\n", res->pvar_name);
PMPI_Abort(MPI_COMM_WORLD, MPIT_result);
}
}

void destroy_monitoring_result(monitoring_result * res)
{
int MPIT_result;

MPIT_result = MPI_T_pvar_handle_free(session, &(res->pvar_handle));
if (MPIT_result != MPI_SUCCESS) {
printf("ERROR : failed to free handle on \"%s\" pvar, check that you have enabled the monitoring pml\n", res->pvar_name);
MPI_Abort(MPI_COMM_WORLD, MPIT_result);
}

free(res->pvar_name);
free(res->vector);
}

int write_mat(char * filename, uint64_t * mat, unsigned int dim)
{
FILE *matrix_file;
int i, j;

matrix_file = fopen(filename, "w");
if (!matrix_file) {
fprintf(stderr, "ERROR : failed to open \"%s\" file in write mode, check your permissions\n", filename);
return -1;
}

printf("writing %ux%u matrix to %s\n", dim, dim, filename);

for (i = 0; i < comm_world_size; ++i) {
for (j = 0; j < comm_world_size - 1; ++j) {
fprintf(matrix_file, "%u ", mat[i * comm_world_size + j]);
}
fprintf(matrix_file, "%u\n", mat[i * comm_world_size + j]);
}
fflush(matrix_file);
fclose(matrix_file);

return 0;
}