diff --git a/ompi/mca/pml/monitoring/pml_monitoring.c b/ompi/mca/pml/monitoring/pml_monitoring.c index 14cac945b77..c058e21e8d3 100644 --- a/ompi/mca/pml/monitoring/pml_monitoring.c +++ b/ompi/mca/pml/monitoring/pml_monitoring.c @@ -154,7 +154,7 @@ int mca_pml_monitoring_get_messages_count (const struct mca_base_pvar_t *pvar, v uint64_t *values = (uint64_t*) value; int i; - if(comm != &ompi_mpi_comm_world.comm) + if(comm != &ompi_mpi_comm_world.comm || NULL == messages_count) return OMPI_ERROR; for (i = 0 ; i < comm_size ; ++i) { @@ -171,7 +171,7 @@ int mca_pml_monitoring_get_messages_size (const struct mca_base_pvar_t *pvar, vo uint64_t *values = (uint64_t*) value; int i; - if(comm != &ompi_mpi_comm_world.comm) + if(comm != &ompi_mpi_comm_world.comm || NULL == sent_data) return OMPI_ERROR; for (i = 0 ; i < comm_size ; ++i) { diff --git a/ompi/mca/pml/monitoring/pml_monitoring_component.c b/ompi/mca/pml/monitoring/pml_monitoring_component.c index 3d94ee5dfa9..fdcb325fe88 100644 --- a/ompi/mca/pml/monitoring/pml_monitoring_component.c +++ b/ompi/mca/pml/monitoring/pml_monitoring_component.c @@ -79,6 +79,29 @@ mca_pml_monitoring_notify_flush(struct mca_base_pvar_t *pvar, mca_base_pvar_even return OMPI_ERROR; } +static int +mca_pml_monitoring_messages_notify(mca_base_pvar_t *pvar, + mca_base_pvar_event_t event, + void *obj_handle, + int *count) +{ + switch (event) { + case MCA_BASE_PVAR_HANDLE_BIND: + /* Return the size of the communicator as the number of values */ + *count = ompi_comm_size ((ompi_communicator_t *) obj_handle); + case MCA_BASE_PVAR_HANDLE_UNBIND: + return OMPI_SUCCESS; + case MCA_BASE_PVAR_HANDLE_START: + mca_pml_monitoring_current_state = mca_pml_monitoring_enabled; + return OMPI_SUCCESS; + case MCA_BASE_PVAR_HANDLE_STOP: + mca_pml_monitoring_current_state = 0; + return OMPI_SUCCESS; + } + + return OMPI_ERROR; +} + int mca_pml_monitoring_enable(bool enable) { /* If we reach this point we were succesful at hijacking the interface of @@ -92,6 +115,18 @@ int mca_pml_monitoring_enable(bool enable) mca_pml_monitoring_get_flush, mca_pml_monitoring_set_flush, mca_pml_monitoring_notify_flush, &mca_pml_monitoring_component); + (void)mca_base_pvar_register("ompi", "pml", "monitoring", "messages_count", "Number of messages " + "sent to each peer in a communicator", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE, + MCA_BASE_VAR_TYPE_UNSIGNED_LONG, NULL, MPI_T_BIND_MPI_COMM, + MCA_BASE_PVAR_FLAG_READONLY, + mca_pml_monitoring_get_messages_count, NULL, mca_pml_monitoring_messages_notify, NULL); + + (void)mca_base_pvar_register("ompi", "pml", "monitoring", "messages_size", "Size of messages " + "sent to each peer in a communicator", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE, + MCA_BASE_VAR_TYPE_UNSIGNED_LONG, NULL, MPI_T_BIND_MPI_COMM, + MCA_BASE_PVAR_FLAG_READONLY, + mca_pml_monitoring_get_messages_size, NULL, mca_pml_monitoring_messages_notify, NULL); + return pml_selected_module.pml_enable(enable); } @@ -104,20 +139,6 @@ static int mca_pml_monitoring_component_open(void) return OMPI_SUCCESS; } -static int -mca_pml_monitoring_comm_size_notify(mca_base_pvar_t *pvar, - mca_base_pvar_event_t event, - void *obj_handle, - int *count) -{ - if (MCA_BASE_PVAR_HANDLE_BIND == event) { - /* Return the size of the communicator as the number of values */ - *count = ompi_comm_size ((ompi_communicator_t *) obj_handle); - } - - return OMPI_SUCCESS; -} - static int mca_pml_monitoring_component_close(void) { if( NULL != mca_pml_monitoring_current_filename ) { @@ -198,17 +219,6 @@ static int mca_pml_monitoring_component_register(void) OPAL_INFO_LVL_4, MCA_BASE_VAR_SCOPE_READONLY, &mca_pml_monitoring_enabled); - (void)mca_base_pvar_register("ompi", "pml", "monitoring", "messages_count", "Number of messages " - "sent to each peer in a communicator", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE, - MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, MPI_T_BIND_MPI_COMM, - MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_CONTINUOUS, - mca_pml_monitoring_get_messages_count, NULL, mca_pml_monitoring_comm_size_notify, NULL); - - (void)mca_base_pvar_register("ompi", "pml", "monitoring", "messages_size", "Size of messages " - "sent to each peer in a communicator", OPAL_INFO_LVL_4, MPI_T_PVAR_CLASS_SIZE, - MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, MPI_T_BIND_MPI_COMM, - MCA_BASE_PVAR_FLAG_READONLY | MCA_BASE_PVAR_FLAG_CONTINUOUS, - mca_pml_monitoring_get_messages_size, NULL, mca_pml_monitoring_comm_size_notify, NULL); return OMPI_SUCCESS; } diff --git a/test/monitoring/Makefile.am b/test/monitoring/Makefile.am index db85187bf90..94959bdf77c 100644 --- a/test/monitoring/Makefile.am +++ b/test/monitoring/Makefile.am @@ -14,8 +14,13 @@ # of 'make check' if PROJECT_OMPI noinst_PROGRAMS = monitoring_test + lib_LTLIBRARIES = monitoring_prof.la monitoring_test_SOURCES = monitoring_test.c monitoring_test_LDFLAGS = $(WRAPPER_EXTRA_LDFLAGS) monitoring_test_LDADD = $(top_builddir)/ompi/libmpi.la $(top_builddir)/opal/libopen-pal.la + + monitoring_prof_la_SOURCES = monitoring_prof.c + monitoring_prof_la_LDFLAGS=-module -avoid-version -shared $(WRAPPER_EXTRA_LDFLAGS) + monitoring_prof_la_LIBADD = $(top_builddir)/ompi/libmpi.la $(top_builddir)/opal/libopen-pal.la endif diff --git a/test/monitoring/monitoring_prof.c b/test/monitoring/monitoring_prof.c new file mode 100644 index 00000000000..ab3d0a47ba4 --- /dev/null +++ b/test/monitoring/monitoring_prof.c @@ -0,0 +1,253 @@ +/* + * Copyright (c) 2013-2015 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2013-2015 Inria. All rights reserved. + * Copyright (c) 2013-2015 Bull SAS. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +/* +pml monitoring PMPI profiler + +Designed by George Bosilca , Emmanuel Jeannot and Guillaume Papauré +Contact the authors for questions. + +To be run as: + +mpirun -np 4 -x LD_PRELOAD=ompi_install_dir/lib/monitoring_prof.so --mca pml_monitoring_enable 1 ./my_app + +... +... +... + +writing 4x4 matrix to monitoring_msg.mat +writing 4x4 matrix to monitoring_size.mat +writing 4x4 matrix to monitoring_avg.mat + +*/ + +#include +#include +#include +#include +#include + +static MPI_T_pvar_session session; +static int comm_world_size; +static int comm_world_rank; + +struct monitoring_result +{ + char * pvar_name; + int pvar_idx; + MPI_T_pvar_handle pvar_handle; + uint64_t * vector; +}; +typedef struct monitoring_result monitoring_result; + +static monitoring_result counts; +static monitoring_result sizes; + +static int write_mat(char *, uint64_t *, unsigned int); +static void init_monitoring_result(const char *, monitoring_result *); +static void start_monitoring_result(monitoring_result *); +static void stop_monitoring_result(monitoring_result *); +static void get_monitoring_result(monitoring_result *); +static void destroy_monitoring_result(monitoring_result *); + +int MPI_Init(int* argc, char*** argv) +{ + int result, MPIT_result; + int provided; + + result = PMPI_Init(argc, argv); + + PMPI_Comm_size(MPI_COMM_WORLD, &comm_world_size); + PMPI_Comm_rank(MPI_COMM_WORLD, &comm_world_rank); + + MPIT_result = MPI_T_init_thread(MPI_THREAD_SINGLE, &provided); + if (MPIT_result != MPI_SUCCESS) { + fprintf(stderr, "ERROR : failed to intialize MPI_T interface, preventing to get monitoring results: check your OpenMPI installation\n"); + PMPI_Abort(MPI_COMM_WORLD, MPIT_result); + } + + MPIT_result = MPI_T_pvar_session_create(&session); + if (MPIT_result != MPI_SUCCESS) { + fprintf(stderr, "ERROR : failed to create MPI_T session, preventing to get monitoring results: check your OpenMPI installation\n"); + PMPI_Abort(MPI_COMM_WORLD, MPIT_result); + } + + init_monitoring_result("pml_monitoring_messages_count", &counts); + init_monitoring_result("pml_monitoring_messages_size", &sizes); + + start_monitoring_result(&counts); + start_monitoring_result(&sizes); + + return result; +} + +int MPI_Finalize(void) +{ + int result, MPIT_result; + uint64_t * exchange_count_matrix = NULL; + uint64_t * exchange_size_matrix = NULL; + uint64_t * exchange_avg_size_matrix = NULL; + + if (0 == comm_world_rank) { + exchange_count_matrix = (uint64_t *) malloc(comm_world_size * comm_world_size * sizeof(uint64_t)); + exchange_size_matrix = (uint64_t *) malloc(comm_world_size * comm_world_size * sizeof(uint64_t)); + exchange_avg_size_matrix = (uint64_t *) malloc(comm_world_size * comm_world_size * sizeof(uint64_t)); + } + + stop_monitoring_result(&counts); + stop_monitoring_result(&sizes); + + get_monitoring_result(&counts); + get_monitoring_result(&sizes); + + PMPI_Gather(counts.vector, comm_world_size, MPI_UNSIGNED_LONG, exchange_count_matrix, comm_world_size, MPI_UNSIGNED_LONG, 0, MPI_COMM_WORLD); + PMPI_Gather(sizes.vector, comm_world_size, MPI_UNSIGNED_LONG, exchange_size_matrix, comm_world_size, MPI_UNSIGNED_LONG, 0, MPI_COMM_WORLD); + + if (0 == comm_world_rank) { + int i, j; + + //Get the same matrix than profile2mat.pl + for (i = 0; i < comm_world_size; ++i) { + for (j = i + 1; j < comm_world_size; ++j) { + exchange_count_matrix[i * comm_world_size + j] = exchange_count_matrix[j * comm_world_size + i] = (exchange_count_matrix[i * comm_world_size + j] + exchange_count_matrix[j * comm_world_size + i]) / 2; + exchange_size_matrix[i * comm_world_size + j] = exchange_size_matrix[j * comm_world_size + i] = (exchange_size_matrix[i * comm_world_size + j] + exchange_size_matrix[j * comm_world_size + i]) / 2; + if (exchange_count_matrix[i * comm_world_size + j] != 0) + exchange_avg_size_matrix[i * comm_world_size + j] = exchange_avg_size_matrix[j * comm_world_size + i] = exchange_size_matrix[i * comm_world_size + j] / exchange_count_matrix[i * comm_world_size + j]; + } + } + + write_mat("monitoring_msg.mat", exchange_count_matrix, comm_world_size); + write_mat("monitoring_size.mat", exchange_size_matrix, comm_world_size); + write_mat("monitoring_avg.mat", exchange_avg_size_matrix, comm_world_size); + } + + free(exchange_count_matrix); + free(exchange_size_matrix); + free(exchange_avg_size_matrix); + destroy_monitoring_result(&counts); + destroy_monitoring_result(&sizes); + + MPIT_result = MPI_T_pvar_session_free(&session); + if (MPIT_result != MPI_SUCCESS) { + fprintf(stderr, "WARNING : failed to free MPI_T session, monitoring results may be impacted : check your OpenMPI installation\n"); + } + + MPIT_result = MPI_T_finalize(); + if (MPIT_result != MPI_SUCCESS) { + fprintf(stderr, "WARNING : failed to finalize MPI_T interface, monitoring results may be impacted : check your OpenMPI installation\n"); + } + + result = PMPI_Finalize(); + + return result; +} + +void init_monitoring_result(const char * pvar_name, monitoring_result * res) +{ + int count; + int MPIT_result; + MPI_Comm comm_world = MPI_COMM_WORLD; + + res->pvar_name = strdup(pvar_name); + + MPIT_result = MPI_T_pvar_get_index(res->pvar_name, MPI_T_PVAR_CLASS_SIZE, &(res->pvar_idx)); + if (MPIT_result != MPI_SUCCESS) { + fprintf(stderr, "ERROR : cannot find monitoring MPI_T \"%s\" pvar, check that you have monitoring pml\n", pvar_name); + PMPI_Abort(MPI_COMM_WORLD, MPIT_result); + } + + MPIT_result = MPI_T_pvar_handle_alloc(session, res->pvar_idx, comm_world, &(res->pvar_handle), &count); + if (MPIT_result != MPI_SUCCESS) { + fprintf(stderr, "ERROR : failed to allocate handle on \"%s\" pvar, check that you have monitoring pml\n", pvar_name); + PMPI_Abort(MPI_COMM_WORLD, MPIT_result); + } + + if (count != comm_world_size) { + fprintf(stderr, "ERROR : COMM_WORLD has %d ranks \"%s\" pvar contains %d values, check that you have monitoring pml\n", comm_world_size, pvar_name, count); + PMPI_Abort(MPI_COMM_WORLD, count); + } + + res->vector = (uint64_t *) malloc(comm_world_size * sizeof(uint64_t)); +} + +void start_monitoring_result(monitoring_result * res) +{ + int MPIT_result; + + MPIT_result = MPI_T_pvar_start(session, res->pvar_handle); + if (MPIT_result != MPI_SUCCESS) { + fprintf(stderr, "ERROR : failed to start handle on \"%s\" pvar, check that you have enabled the monitoring pml\n", res->pvar_name); + PMPI_Abort(MPI_COMM_WORLD, MPIT_result); + } +} + +void stop_monitoring_result(monitoring_result * res) +{ + int MPIT_result; + + MPIT_result = MPI_T_pvar_stop(session, res->pvar_handle); + if (MPIT_result != MPI_SUCCESS) { + fprintf(stderr, "ERROR : failed to stop handle on \"%s\" pvar, check that you have enabled the monitoring pml\n", res->pvar_name); + MPI_Abort(MPI_COMM_WORLD, MPIT_result); + } +} + +void get_monitoring_result(monitoring_result * res) +{ + int MPIT_result; + + MPIT_result = MPI_T_pvar_read(session, res->pvar_handle, res->vector); + if (MPIT_result != MPI_SUCCESS) { + fprintf(stderr, "ERROR : failed to read \"%s\" pvar, check that you have enabled the monitoring pml\n", res->pvar_name); + PMPI_Abort(MPI_COMM_WORLD, MPIT_result); + } +} + +void destroy_monitoring_result(monitoring_result * res) +{ + int MPIT_result; + + MPIT_result = MPI_T_pvar_handle_free(session, &(res->pvar_handle)); + if (MPIT_result != MPI_SUCCESS) { + printf("ERROR : failed to free handle on \"%s\" pvar, check that you have enabled the monitoring pml\n", res->pvar_name); + MPI_Abort(MPI_COMM_WORLD, MPIT_result); + } + + free(res->pvar_name); + free(res->vector); +} + +int write_mat(char * filename, uint64_t * mat, unsigned int dim) +{ + FILE *matrix_file; + int i, j; + + matrix_file = fopen(filename, "w"); + if (!matrix_file) { + fprintf(stderr, "ERROR : failed to open \"%s\" file in write mode, check your permissions\n", filename); + return -1; + } + + printf("writing %ux%u matrix to %s\n", dim, dim, filename); + + for (i = 0; i < comm_world_size; ++i) { + for (j = 0; j < comm_world_size - 1; ++j) { + fprintf(matrix_file, "%u ", mat[i * comm_world_size + j]); + } + fprintf(matrix_file, "%u\n", mat[i * comm_world_size + j]); + } + fflush(matrix_file); + fclose(matrix_file); + + return 0; +}