@@ -68,6 +68,7 @@ static const char pmix_version_string[] = PMIX_VERSION;
68
68
#endif /* PMIX_ENABLE_DSTORE */
69
69
70
70
#include "pmix_client_ops.h"
71
+ #include "src/include/pmix_jobdata.h"
71
72
72
73
#define PMIX_MAX_RETRIES 10
73
74
@@ -186,7 +187,9 @@ static void job_data(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr,
186
187
return ;
187
188
}
188
189
/* decode it */
189
- pmix_client_process_nspace_blob (pmix_globals .myid .nspace , buf );
190
+ #if !(defined(PMIX_ENABLE_DSTORE ) && (PMIX_ENABLE_DSTORE == 1 ))
191
+ pmix_job_data_htable_store (nspace , buf );
192
+ #endif
190
193
cb -> status = PMIX_SUCCESS ;
191
194
cb -> active = false;
192
195
}
@@ -751,12 +754,27 @@ static void _peersfn(int sd, short args, void *cbdata)
751
754
pmix_cb_t * cb = (pmix_cb_t * )cbdata ;
752
755
pmix_status_t rc ;
753
756
char * * nsprocs = NULL , * * nsps = NULL , * * tmp ;
757
+ #if !(defined(PMIX_ENABLE_DSTORE ) && (PMIX_ENABLE_DSTORE == 1 ))
754
758
pmix_nspace_t * nsptr ;
755
759
pmix_nrec_t * nptr ;
760
+ #endif
756
761
size_t i ;
757
762
758
763
/* cycle across our known nspaces */
759
764
tmp = NULL ;
765
+ #if defined(PMIX_ENABLE_DSTORE ) && (PMIX_ENABLE_DSTORE == 1 )
766
+ if (PMIX_SUCCESS == (rc = pmix_dstore_fetch (cb -> nspace , PMIX_RANK_WILDCARD ,
767
+ cb -> key , & cb -> value ))) {
768
+
769
+ tmp = pmix_argv_split (cb -> value -> data .string , ',' );
770
+ for (i = 0 ; NULL != tmp [i ]; i ++ ) {
771
+ pmix_argv_append_nosize (& nsps , cb -> nspace );
772
+ pmix_argv_append_nosize (& nsprocs , tmp [i ]);
773
+ }
774
+ pmix_argv_free (tmp );
775
+ tmp = NULL ;
776
+ }
777
+ #else
760
778
PMIX_LIST_FOREACH (nsptr , & pmix_globals .nspaces , pmix_nspace_t ) {
761
779
if (0 == strncmp (nsptr -> nspace , cb -> nspace , PMIX_MAX_NSLEN )) {
762
780
/* cycle across the nodes in this nspace */
@@ -774,6 +792,7 @@ static void _peersfn(int sd, short args, void *cbdata)
774
792
}
775
793
}
776
794
}
795
+ #endif
777
796
if (0 == (i = pmix_argv_count (nsps ))) {
778
797
/* we don't know this nspace */
779
798
rc = PMIX_ERR_NOT_FOUND ;
@@ -1030,163 +1049,6 @@ static pmix_status_t recv_connect_ack(int sd)
1030
1049
return PMIX_SUCCESS ;
1031
1050
}
1032
1051
1033
- void pmix_client_process_nspace_blob (const char * nspace , pmix_buffer_t * bptr )
1034
- {
1035
- pmix_status_t rc ;
1036
- int32_t cnt ;
1037
- int rank ;
1038
- pmix_kval_t * kptr , * kp2 , kv ;
1039
- pmix_buffer_t buf2 ;
1040
- pmix_byte_object_t * bo ;
1041
- size_t nnodes , i , j ;
1042
- pmix_nspace_t * nsptr , * nsptr2 ;
1043
- pmix_nrec_t * nrec , * nr2 ;
1044
- char * * procs ;
1045
-
1046
- pmix_output_verbose (2 , pmix_globals .debug_output ,
1047
- "pmix: PROCESSING BLOB FOR NSPACE %s" , nspace );
1048
-
1049
- /* cycle across our known nspaces */
1050
- nsptr = NULL ;
1051
- PMIX_LIST_FOREACH (nsptr2 , & pmix_globals .nspaces , pmix_nspace_t ) {
1052
- if (0 == strcmp (nsptr2 -> nspace , nspace )) {
1053
- nsptr = nsptr2 ;
1054
- break ;
1055
- }
1056
- }
1057
- if (NULL == nsptr ) {
1058
- /* we don't know this nspace - add it */
1059
- nsptr = PMIX_NEW (pmix_nspace_t );
1060
- (void )strncpy (nsptr -> nspace , nspace , PMIX_MAX_NSLEN );
1061
- pmix_list_append (& pmix_globals .nspaces , & nsptr -> super );
1062
- }
1063
-
1064
- /* unpack any info structs provided */
1065
- cnt = 1 ;
1066
- kptr = PMIX_NEW (pmix_kval_t );
1067
- while (PMIX_SUCCESS == (rc = pmix_bfrop .unpack (bptr , kptr , & cnt , PMIX_KVAL ))) {
1068
- if (0 == strcmp (kptr -> key , PMIX_PROC_BLOB )) {
1069
- /* transfer the byte object for unpacking */
1070
- bo = & (kptr -> value -> data .bo );
1071
- PMIX_CONSTRUCT (& buf2 , pmix_buffer_t );
1072
- PMIX_LOAD_BUFFER (& buf2 , bo -> bytes , bo -> size );
1073
- PMIX_RELEASE (kptr );
1074
- /* start by unpacking the rank */
1075
- cnt = 1 ;
1076
- if (PMIX_SUCCESS != (rc = pmix_bfrop .unpack (& buf2 , & rank , & cnt , PMIX_INT ))) {
1077
- PMIX_ERROR_LOG (rc );
1078
- PMIX_DESTRUCT (& buf2 );
1079
- return ;
1080
- }
1081
- kp2 = PMIX_NEW (pmix_kval_t );
1082
- kp2 -> key = strdup (PMIX_RANK );
1083
- PMIX_VALUE_CREATE (kp2 -> value , 1 );
1084
- kp2 -> value -> type = PMIX_INT ;
1085
- kp2 -> value -> data .integer = rank ;
1086
- if (PMIX_SUCCESS != (rc = pmix_hash_store (& nsptr -> internal , rank , kp2 ))) {
1087
- PMIX_ERROR_LOG (rc );
1088
- }
1089
- PMIX_RELEASE (kp2 ); // maintain accounting
1090
- cnt = 1 ;
1091
- kp2 = PMIX_NEW (pmix_kval_t );
1092
- while (PMIX_SUCCESS == (rc = pmix_bfrop .unpack (& buf2 , kp2 , & cnt , PMIX_KVAL ))) {
1093
- /* this is data provided by a job-level exchange, so store it
1094
- * in the job-level data hash_table */
1095
- if (PMIX_SUCCESS != (rc = pmix_hash_store (& nsptr -> internal , rank , kp2 ))) {
1096
- PMIX_ERROR_LOG (rc );
1097
- }
1098
- PMIX_RELEASE (kp2 ); // maintain accounting
1099
- kp2 = PMIX_NEW (pmix_kval_t );
1100
- }
1101
- /* cleanup */
1102
- PMIX_DESTRUCT (& buf2 ); // releases the original kptr data
1103
- PMIX_RELEASE (kp2 );
1104
- } else if (0 == strcmp (kptr -> key , PMIX_MAP_BLOB )) {
1105
- /* transfer the byte object for unpacking */
1106
- bo = & (kptr -> value -> data .bo );
1107
- PMIX_CONSTRUCT (& buf2 , pmix_buffer_t );
1108
- PMIX_LOAD_BUFFER (& buf2 , bo -> bytes , bo -> size );
1109
- PMIX_RELEASE (kptr );
1110
- /* start by unpacking the number of nodes */
1111
- cnt = 1 ;
1112
- if (PMIX_SUCCESS != (rc = pmix_bfrop .unpack (& buf2 , & nnodes , & cnt , PMIX_SIZE ))) {
1113
- PMIX_ERROR_LOG (rc );
1114
- PMIX_DESTRUCT (& buf2 );
1115
- return ;
1116
- }
1117
- /* unpack the list of procs on each node */
1118
- for (i = 0 ; i < nnodes ; i ++ ) {
1119
- cnt = 1 ;
1120
- PMIX_CONSTRUCT (& kv , pmix_kval_t );
1121
- if (PMIX_SUCCESS != (rc = pmix_bfrop .unpack (& buf2 , & kv , & cnt , PMIX_KVAL ))) {
1122
- PMIX_ERROR_LOG (rc );
1123
- PMIX_DESTRUCT (& buf2 );
1124
- PMIX_DESTRUCT (& kv );
1125
- return ;
1126
- }
1127
- /* the name of the node is in the key, and the value is
1128
- * a comma-delimited list of procs on that node. See if we already
1129
- * have this node */
1130
- nrec = NULL ;
1131
- PMIX_LIST_FOREACH (nr2 , & nsptr -> nodes , pmix_nrec_t ) {
1132
- if (0 == strcmp (nr2 -> name , kv .key )) {
1133
- nrec = nr2 ;
1134
- break ;
1135
- }
1136
- }
1137
- if (NULL == nrec ) {
1138
- /* Create a node record and store that list */
1139
- nrec = PMIX_NEW (pmix_nrec_t );
1140
- nrec -> name = strdup (kv .key );
1141
- pmix_list_append (& nsptr -> nodes , & nrec -> super );
1142
- } else {
1143
- /* refresh the list */
1144
- if (NULL != nrec -> procs ) {
1145
- free (nrec -> procs );
1146
- }
1147
- }
1148
- nrec -> procs = strdup (kv .value -> data .string );
1149
- /* split the list of procs so we can store their
1150
- * individual location data */
1151
- procs = pmix_argv_split (nrec -> procs , ',' );
1152
- for (j = 0 ; NULL != procs [j ]; j ++ ) {
1153
- /* store the hostname for each proc - again, this is
1154
- * data obtained via a job-level exchange, so store it
1155
- * in the job-level data hash_table */
1156
- kp2 = PMIX_NEW (pmix_kval_t );
1157
- kp2 -> key = strdup (PMIX_HOSTNAME );
1158
- kp2 -> value = (pmix_value_t * )malloc (sizeof (pmix_value_t ));
1159
- kp2 -> value -> type = PMIX_STRING ;
1160
- kp2 -> value -> data .string = strdup (nrec -> name );
1161
- rank = strtol (procs [j ], NULL , 10 );
1162
- if (PMIX_SUCCESS != (rc = pmix_hash_store (& nsptr -> internal , rank , kp2 ))) {
1163
- PMIX_ERROR_LOG (rc );
1164
- }
1165
- PMIX_RELEASE (kp2 ); // maintain accounting
1166
- }
1167
- pmix_argv_free (procs );
1168
- PMIX_DESTRUCT (& kv );
1169
- }
1170
- /* cleanup */
1171
- PMIX_DESTRUCT (& buf2 ); // releases the original kptr data
1172
- } else {
1173
- /* this is job-level data, so just add it to that hash_table
1174
- * with the wildcard rank */
1175
- if (PMIX_SUCCESS != (rc = pmix_hash_store (& nsptr -> internal , PMIX_RANK_WILDCARD , kptr ))) {
1176
- PMIX_ERROR_LOG (rc );
1177
- }
1178
- /* maintain accounting - but note that the kptr remains
1179
- * alive and stored in the hash table! So we cannot reuse
1180
- * it for some other purpose */
1181
- PMIX_RELEASE (kptr );
1182
- }
1183
- kptr = PMIX_NEW (pmix_kval_t );
1184
- cnt = 1 ;
1185
- }
1186
- /* need to release the leftover kptr */
1187
- PMIX_RELEASE (kptr );
1188
- }
1189
-
1190
1052
static pmix_status_t usock_connect (struct sockaddr * addr , int * fd )
1191
1053
{
1192
1054
int sd = -1 ;
0 commit comments