diff --git a/core/environment.c b/core/environment.c index 9dc074772..a9e10e9f4 100644 --- a/core/environment.c +++ b/core/environment.c @@ -106,6 +106,11 @@ static void environment_init_modes(environment_t* env, int num_modes, int num_st * @brief Initialize the federation-specific parts of the environment struct. */ static void environment_init_federated(environment_t* env, int num_is_present_fields) { +#ifdef FEDERATED_NDT_ENABLED + // FIXME: Create a queue saving tags instead of events. For now, ndt_q stores + // dummy events. + env->ndt_q = pqueue_tag_init(10); +#endif // FEDERATED_NDT_ENABLED #ifdef FEDERATED_DECENTRALIZED env->_lf_intended_tag_fields = (tag_t**) calloc(num_is_present_fields, sizeof(tag_t*)); LF_ASSERT(env->_lf_intended_tag_fields, "Out of memory"); @@ -113,7 +118,7 @@ static void environment_init_federated(environment_t* env, int num_is_present_fi #endif } -void environment_init_tags( environment_t *env, instant_t start_time, interval_t duration) { +void environment_init_tags(environment_t *env, instant_t start_time, interval_t duration) { env->current_tag = (tag_t){.time = start_time, .microstep = 0u}; tag_t stop_tag = FOREVER_TAG_INITIALIZER; @@ -165,6 +170,9 @@ void environment_free(environment_t* env) { pqueue_free(env->event_q); pqueue_free(env->recycle_q); pqueue_free(env->next_q); + #ifdef FEDERATED_NDT_ENABLED + pqueue_tag_free(env->ndt_q); + #endif // FEDERATED_NDT_ENABLED environment_free_threaded(env); environment_free_single_threaded(env); diff --git a/core/federated/RTI/main.c b/core/federated/RTI/main.c index fdc234ced..6833ec9e9 100644 --- a/core/federated/RTI/main.c +++ b/core/federated/RTI/main.c @@ -99,6 +99,8 @@ void usage(int argc, const char* argv[]) { lf_print(" clock sync attempt (default is 10). Applies to 'init' and 'on'.\n"); lf_print(" -a, --auth Turn on HMAC authentication options.\n"); lf_print(" -t, --tracing Turn on tracing.\n"); + lf_print(" -v, --version The minimum required version of Lingua Franca."); + lf_print(" --ndt Turn on ndt optimization.\n"); lf_print("Command given:"); for (int i = 0; i < argc; i++) { @@ -171,7 +173,10 @@ int process_clock_sync_args(int argc, const char* argv[]) { int process_args(int argc, const char* argv[]) { for (int i = 1; i < argc; i++) { - if (strcmp(argv[i], "-i") == 0 || strcmp(argv[i], "--id") == 0) { + if (strcmp(argv[i], "-v") == 0 || strcmp(argv[i], "--version") == 0) { + lf_print("%s", version_info); + return 0; + } else if (strcmp(argv[i], "-i") == 0 || strcmp(argv[i], "--id") == 0) { if (argc < i + 2) { lf_print_error("--id needs a string argument."); usage(argc, argv); @@ -232,6 +237,8 @@ int process_args(int argc, const char* argv[]) { rti.authentication_enabled = true; } else if (strcmp(argv[i], "-t") == 0 || strcmp(argv[i], "--tracing") == 0) { rti.base.tracing_enabled = true; + } else if (strcmp(argv[i], "--ndt") == 0) { + rti.ndt_enabled = true; } else if (strcmp(argv[i], " ") == 0) { // Tolerate spaces continue; @@ -239,7 +246,7 @@ int process_args(int argc, const char* argv[]) { lf_print_error("Unrecognized command-line argument: %s", argv[i]); usage(argc, argv); return 0; - } + } } if (rti.base.number_of_scheduling_nodes == 0) { lf_print_error("--number_of_federates needs a valid positive integer argument."); diff --git a/core/federated/RTI/rti_common.c b/core/federated/RTI/rti_common.c index a6554195e..b70a86b6e 100644 --- a/core/federated/RTI/rti_common.c +++ b/core/federated/RTI/rti_common.c @@ -56,6 +56,8 @@ void initialize_scheduling_node(scheduling_node_t* e, uint16_t id) { e->downstream = NULL; e->num_downstream = 0; e->mode = REALTIME; + e->has_physical_action = false; + e->enable_ndt = false; invalidate_min_delays_upstream(e); } @@ -298,6 +300,25 @@ static void _update_min_delays_upstream(scheduling_node_t* end, scheduling_node_ } } +bool check_physical_action_of_transitive_downstreams(scheduling_node_t* e, bool visited[]) { + if (visited[e->id] || e->state == NOT_CONNECTED) { + return false; + } + + visited[e->id] = true; + + for (int i = 0; i < e->num_downstream; i++) { + if (check_physical_action_of_transitive_downstreams(rti_common->scheduling_nodes[e->downstream[i]], visited)) { + return true; + } + } + if (e->has_physical_action) { + return true; + } else { + return false; + } +} + void update_min_delays_upstream(scheduling_node_t* node) { // Check whether cached result is valid. if (node->min_delays == NULL) { diff --git a/core/federated/RTI/rti_common.h b/core/federated/RTI/rti_common.h index d71751a98..2f9840eb9 100644 --- a/core/federated/RTI/rti_common.h +++ b/core/federated/RTI/rti_common.h @@ -65,6 +65,9 @@ typedef struct scheduling_node_t { int* downstream; // Array of downstream scheduling node ids. int num_downstream; // Size of the array of downstream scheduling nodes. execution_mode_t mode; // FAST or REALTIME. + bool is_in_cycle; + bool has_physical_action; + bool enable_ndt; minimum_delay_t* min_delays; // Array of minimum delays from upstream nodes, not including this node. size_t num_min_delays; // Size of min_delays array. int flags; // Or of IS_IN_ZERO_DELAY_CYCLE, IS_IN_CYCLE @@ -273,5 +276,9 @@ void invalidate_min_delays_upstream(scheduling_node_t* node); */ void free_scheduling_nodes(scheduling_node_t** scheduling_nodes, uint16_t number_of_scheduling_nodes); +bool check_cycle(scheduling_node_t* e, int target_id, bool visited[]); + +bool check_physical_action_of_transitive_downstreams(scheduling_node_t* e, bool visited[]); + #endif // RTI_COMMON_H #endif // STANDALONE_RTI || LF_ENCLAVES diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 2fce8b1bf..5194e4254 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -286,6 +286,90 @@ void update_federate_next_event_tag_locked(uint16_t federate_id, tag_t next_even update_scheduling_node_next_event_tag_locked(&(fed->enclave), next_event_tag); } +// /** +// * Update the cycle information of every federate. +// */ +// void update_cycle_information() { +// bool *visited = (bool *)calloc(rti_remote->base.number_of_scheduling_nodes, sizeof(bool)); +// for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) { +// scheduling_node_t* target_node = rti_remote->base.scheduling_nodes[i]; +// if (check_cycle(target_node, i, visited)) { +// target_node->is_in_cycle = true; +// LF_PRINT_DEBUG("There is a cycle including federate %d.", i); +// } else { +// target_node->is_in_cycle = false; +// LF_PRINT_DEBUG("There is no cycle including federate %d.", i); +// } +// for (int j = 0; j < rti_remote->base.number_of_scheduling_nodes; j++) { +// visited[j] = false; +// } +// } +// free(visited); +// } + +/** + * Determine whether or not to use the NDT messages. +*/ +void determine_the_ndt_condition() { + bool *visited = (bool *)calloc(rti_remote->base.number_of_scheduling_nodes, sizeof(bool)); + for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) { + scheduling_node_t* target_node = rti_remote->base.scheduling_nodes[i]; + if (is_in_cycle(target_node) || check_physical_action_of_transitive_downstreams(target_node, visited)) { + target_node->enable_ndt = false; + LF_PRINT_DEBUG("There is a cycle including federate %d or a transitive downstream federate of" + " the federate %d has a physical action.", i, i); + } else { + target_node->enable_ndt = true; + LF_PRINT_DEBUG("Enable NDT messages for federate %d.", i); + } + for (int j = 0; j < rti_remote->base.number_of_scheduling_nodes; j++) { + visited[j] = false; + } + } + free(visited); +} + +void send_upstream_next_downstream_tag(federate_info_t* fed, tag_t next_event_tag) { + if (!rti_remote->ndt_enabled) { + return; + } + // The RTI receives next_event_tag from the federated fed. + // It has to send NDT messages to the upstream federates of fed + // if the LTC message from an upstream federate is ealrier than the next_event_tag. + size_t message_length = 1 + sizeof(int64_t) + sizeof(uint32_t); + unsigned char buffer[message_length]; + buffer[0] = MSG_TYPE_NEXT_DOWNSTREAM_TAG; + encode_int64(next_event_tag.time, &(buffer[1])); + encode_int32((int32_t)next_event_tag.microstep, &(buffer[1 + sizeof(int64_t)])); + + // FIXME: Send NDT to transitive upstreams either. + + // Also, the RTI has to check the sparsity of a federate and determine whether + // it sends NDT to it or not. + for (int i = 0; i < fed->enclave.num_upstream; i++) { + int upstream_id = fed->enclave.upstream[i]; + // scheduling_node_t* upstream_node = rti_remote->base.scheduling_nodes[upstream_id]; + federate_info_t* upstream_federate = GET_FED_INFO(upstream_id); + + if (!upstream_federate->enclave.enable_ndt) { + LF_PRINT_DEBUG("Do not send the NDT to federate %d", i); + continue; + } + + if (lf_tag_compare(upstream_federate->enclave.completed, next_event_tag) < 0 && + lf_tag_compare(upstream_federate->enclave.next_event, next_event_tag) <= 0) { + // Send next downstream tag to upstream federates that do not complete at next_event_tag + LF_PRINT_LOG("RTI sending the next downstream event message (NDT) " PRINTF_TAG "to federate %u.", + next_event_tag.time - start_time, next_event_tag.microstep, upstream_id); + if (rti_remote->base.tracing_enabled) { + tracepoint_rti_to_federate(rti_remote->base.trace, send_NDT, upstream_id, &next_event_tag); + } + write_to_socket_errexit(upstream_federate->socket, message_length, buffer, + "RTI failed to send MSG_TYPE_NEXT_DOWNSTREAM_TAG message to federate %d.", upstream_id); + } + } +} + void handle_port_absent_message(federate_info_t* sending_federate, unsigned char* buffer) { size_t message_size = sizeof(uint16_t) + sizeof(uint16_t) + sizeof(int64_t) + sizeof(uint32_t); @@ -539,6 +623,10 @@ void handle_next_event_tag(federate_info_t* fed) { fed->enclave.id, intended_tag ); + // If fed cannot get the grant of the intended tag, send NDTs to its upstream federates. + if (lf_tag_compare(fed->enclave.last_granted, intended_tag) < 0) { + send_upstream_next_downstream_tag(fed, intended_tag); + } lf_mutex_unlock(&rti_mutex); } @@ -1232,147 +1320,209 @@ int32_t receive_and_check_fed_id_message(int socket_id, struct sockaddr_in* clie return (int32_t)fed_id; } -int receive_connection_information(int socket_id, uint16_t fed_id) { - LF_PRINT_DEBUG("RTI waiting for MSG_TYPE_NEIGHBOR_STRUCTURE from federate %d.", fed_id); - unsigned char connection_info_header[MSG_TYPE_NEIGHBOR_STRUCTURE_HEADER_SIZE]; +int receive_set_up_information(int socket_id, uint16_t fed_id) { + LF_PRINT_DEBUG("RTI waiting for MSG_TYPE_NEIGHBOR_STRUCTURE, MSG_TYPE_PHYSICAL_ACTION, and " + "MSG_TYPE_UDP_PORT from federate %d.", fed_id); + + bool waiting_connection_information = true; + bool waiting_physical_action_information = true; + bool waiting_udp_message_and_set_up_clock_sync = true; + + if (!rti_remote->ndt_enabled) { + waiting_physical_action_information = false; + } + + // Buffer for incoming messages. + // This does not constrain the message size because messages + // are forwarded piece by piece. + unsigned char buffer[FED_COM_BUFFER_SIZE]; + + // Listen for messages from the federate. + while (waiting_connection_information + || waiting_physical_action_information + || waiting_udp_message_and_set_up_clock_sync) { + // Read no more than one byte to get the message type. + ssize_t bytes_read = read_from_socket(socket_id, 1, buffer); + if (bytes_read < 1) { + // Socket is closed + lf_print_warning("RTI: Socket to federate %d is closed.", fed_id); + return 0; + } + LF_PRINT_DEBUG("RTI: Received message type %u from federate %d.", buffer[0], fed_id); + switch(buffer[0]) { + case MSG_TYPE_NEIGHBOR_STRUCTURE: + if (handle_connection_information(socket_id, (uint16_t)fed_id)) { + waiting_connection_information = false; + } else { + LF_PRINT_DEBUG("RTI failed to handle MSG_TYPE_NEIGHBOR_STRUCTURE message header from federate %d.", fed_id); + return 0; + } + break; + case MSG_TYPE_PHYSICAL_ACTION: + if (handle_physical_action_information(socket_id, (uint16_t)fed_id)) { + waiting_physical_action_information = false; + } else { + LF_PRINT_DEBUG("RTI failed to handle MSG_TYPE_NEIGHBOR_STRUCTURE message header from federate %d.", fed_id); + return 0; + } + break; + case MSG_TYPE_UDP_PORT: + if (handle_udp_message_and_set_up_clock_sync(socket_id, (uint16_t)fed_id)) { + waiting_udp_message_and_set_up_clock_sync = false; + } else { + LF_PRINT_DEBUG("RTI failed to handle MSG_TYPE_NEIGHBOR_STRUCTURE message header from federate %d.", fed_id); + return 0; + } + break; + default: + lf_print_error("RTI was expecting a MSG_TYPE_NEIGHBOR_STRUCTURE, MSG_TYPE_PHYSICAL_ACTION, or MSG_TYPE_UDP_PORT " + "message from federate %d. Got %u instead. Rejecting federate.", fed_id, buffer[0]); + send_reject(socket_id, UNEXPECTED_MESSAGE); + return 0; + } + } + return 1; +} + +int handle_connection_information(int socket_id, uint16_t fed_id) { + LF_PRINT_DEBUG("RTI received MSG_TYPE_NEIGHBOR_STRUCTURE from federate %d.", fed_id); + unsigned char connection_info_header[MSG_TYPE_NEIGHBOR_STRUCTURE_HEADER_SIZE - 1]; read_from_socket_errexit( socket_id, - MSG_TYPE_NEIGHBOR_STRUCTURE_HEADER_SIZE, + MSG_TYPE_NEIGHBOR_STRUCTURE_HEADER_SIZE - 1, connection_info_header, "RTI failed to read MSG_TYPE_NEIGHBOR_STRUCTURE message header from federate %d.", fed_id ); - if (connection_info_header[0] != MSG_TYPE_NEIGHBOR_STRUCTURE) { - lf_print_error("RTI was expecting a MSG_TYPE_UDP_PORT message from federate %d. Got %u instead. " - "Rejecting federate.", fed_id, connection_info_header[0]); - send_reject(socket_id, UNEXPECTED_MESSAGE); - return 0; - } else { - federate_info_t* fed = GET_FED_INFO(fed_id); - // Read the number of upstream and downstream connections - fed->enclave.num_upstream = extract_int32(&(connection_info_header[1])); - fed->enclave.num_downstream = extract_int32(&(connection_info_header[1 + sizeof(int32_t)])); - LF_PRINT_DEBUG( - "RTI got %d upstreams and %d downstreams from federate %d.", - fed->enclave.num_upstream, - fed->enclave.num_downstream, - fed_id); - - // Allocate memory for the upstream and downstream pointers - if (fed->enclave.num_upstream > 0) { - fed->enclave.upstream = (int*)malloc(sizeof(uint16_t) * fed->enclave.num_upstream); - // Allocate memory for the upstream delay pointers - fed->enclave.upstream_delay = - (interval_t*)malloc( - sizeof(interval_t) * fed->enclave.num_upstream - ); - } else { - fed->enclave.upstream = (int*)NULL; - fed->enclave.upstream_delay = (interval_t*)NULL; - } - if (fed->enclave.num_downstream > 0) { - fed->enclave.downstream = (int*)malloc(sizeof(uint16_t) * fed->enclave.num_downstream); - } else { - fed->enclave.downstream = (int*)NULL; - } - - size_t connections_info_body_size = ((sizeof(uint16_t) + sizeof(int64_t)) * - fed->enclave.num_upstream) + (sizeof(uint16_t) * fed->enclave.num_downstream); - unsigned char* connections_info_body = (unsigned char*)malloc(connections_info_body_size); - read_from_socket_errexit( - socket_id, - connections_info_body_size, - connections_info_body, - "RTI failed to read MSG_TYPE_NEIGHBOR_STRUCTURE message body from federate %d.", - fed_id + federate_info_t* fed = GET_FED_INFO(fed_id); + // Read the number of upstream and downstream connections + fed->enclave.num_upstream = extract_int32(&(connection_info_header[0])); + fed->enclave.num_downstream = extract_int32(&(connection_info_header[sizeof(int32_t)])); + LF_PRINT_DEBUG( + "RTI got %d upstreams and %d downstreams from federate %d.", + fed->enclave.num_upstream, + fed->enclave.num_downstream, + fed_id); + + // Allocate memory for the upstream and downstream pointers + fed->enclave.upstream = (int*)malloc(sizeof(uint16_t) * fed->enclave.num_upstream); + fed->enclave.downstream = (int*)malloc(sizeof(uint16_t) * fed->enclave.num_downstream); + + // Allocate memory for the upstream delay pointers + fed->enclave.upstream_delay = + (interval_t*)malloc( + sizeof(interval_t) * fed->enclave.num_upstream ); - // Keep track of where we are in the buffer - size_t message_head = 0; - // First, read the info about upstream federates - for (int i=0; ienclave.num_upstream; i++) { - fed->enclave.upstream[i] = extract_uint16(&(connections_info_body[message_head])); - message_head += sizeof(uint16_t); - fed->enclave.upstream_delay[i] = extract_int64(&(connections_info_body[message_head])); - message_head += sizeof(int64_t); - } + size_t connections_info_body_size = ((sizeof(uint16_t) + sizeof(int64_t)) * + fed->enclave.num_upstream) + (sizeof(uint16_t) * fed->enclave.num_downstream); + unsigned char* connections_info_body = (unsigned char*)malloc(connections_info_body_size); + read_from_socket_errexit( + socket_id, + connections_info_body_size, + connections_info_body, + "RTI failed to read MSG_TYPE_NEIGHBOR_STRUCTURE message body from federate %d.", + fed_id + ); - // Next, read the info about downstream federates - for (int i=0; ienclave.num_downstream; i++) { - fed->enclave.downstream[i] = extract_uint16(&(connections_info_body[message_head])); - message_head += sizeof(uint16_t); - } + // Keep track of where we are in the buffer + size_t message_head = 0; + // First, read the info about upstream federates + for (int i=0; ienclave.num_upstream; i++) { + fed->enclave.upstream[i] = extract_uint16(&(connections_info_body[message_head])); + LF_PRINT_DEBUG("fed->enclave.upstream[i] = %d", fed->enclave.upstream[i]); // Remove + message_head += sizeof(uint16_t); + fed->enclave.upstream_delay[i] = extract_int64(&(connections_info_body[message_head])); + LF_PRINT_DEBUG("fed->enclave.upstream_delay[i] = %ld", fed->enclave.upstream_delay[i]); // Remove + message_head += sizeof(int64_t); + } - free(connections_info_body); - return 1; + // Next, read the info about downstream federates + for (int i=0; ienclave.num_downstream; i++) { + fed->enclave.downstream[i] = extract_uint16(&(connections_info_body[message_head])); + LF_PRINT_DEBUG("fed->enclave.downstream[i] = %d", fed->enclave.downstream[i]); // Remove + message_head += sizeof(uint16_t); } + + free(connections_info_body); + return 1; +} + +int handle_physical_action_information(int socket_id, uint16_t fed_id) { + LF_PRINT_DEBUG("RTI received MSG_TYPE_PHYSICAL_ACTION from federate %d.", fed_id); + unsigned char response[sizeof(uint16_t)]; + read_from_socket_errexit(socket_id, sizeof(uint16_t) , response, + "RTI failed to read MSG_TYPE_PHYSICAL_ACTION message from federate %d.", fed_id); + + federate_info_t *fed = GET_FED_INFO(fed_id); + uint16_t federate_has_physical_action = extract_uint16(&(response[0])); + + LF_PRINT_DEBUG("RTI got MSG_TYPE_PHYSICAL_ACTION %u from federate %d.", federate_has_physical_action, fed_id); + + fed->enclave.has_physical_action = (bool) federate_has_physical_action; + + return 1; } -int receive_udp_message_and_set_up_clock_sync(int socket_id, uint16_t fed_id) { +int handle_udp_message_and_set_up_clock_sync(int socket_id, uint16_t fed_id) { // Read the MSG_TYPE_UDP_PORT message from the federate regardless of the status of // clock synchronization. This message will tell the RTI whether the federate // is doing clock synchronization, and if it is, what port to use for UDP. - LF_PRINT_DEBUG("RTI waiting for MSG_TYPE_UDP_PORT from federate %d.", fed_id); - unsigned char response[1 + sizeof(uint16_t)]; - read_from_socket_errexit(socket_id, 1 + sizeof(uint16_t) , response, + LF_PRINT_DEBUG("RTI received MSG_TYPE_UDP_PORT from federate %d.", fed_id); + unsigned char response[sizeof(uint16_t)]; + read_from_socket_errexit(socket_id, sizeof(uint16_t) , response, "RTI failed to read MSG_TYPE_UDP_PORT message from federate %d.", fed_id); - if (response[0] != MSG_TYPE_UDP_PORT) { - lf_print_error("RTI was expecting a MSG_TYPE_UDP_PORT message from federate %d. Got %u instead. " - "Rejecting federate.", fed_id, response[0]); - send_reject(socket_id, UNEXPECTED_MESSAGE); - return 0; - } else { - federate_info_t *fed = GET_FED_INFO(fed_id); - if (rti_remote->clock_sync_global_status >= clock_sync_init) {// If no initial clock sync, no need perform initial clock sync. - uint16_t federate_UDP_port_number = extract_uint16(&(response[1])); - - LF_PRINT_DEBUG("RTI got MSG_TYPE_UDP_PORT %u from federate %d.", federate_UDP_port_number, fed_id); - - // A port number of UINT16_MAX means initial clock sync should not be performed. - if (federate_UDP_port_number != UINT16_MAX) { - // Perform the initialization clock synchronization with the federate. - // Send the required number of messages for clock synchronization - for (int i=0; i < rti_remote->clock_sync_exchanges_per_interval; i++) { - // Send the RTI's current physical time T1 to the federate. - send_physical_clock(MSG_TYPE_CLOCK_SYNC_T1, fed, TCP); - - // Listen for reply message, which should be T3. - size_t message_size = 1 + sizeof(int32_t); - unsigned char buffer[message_size]; - read_from_socket_errexit(socket_id, message_size, buffer, - "Socket to federate %d unexpectedly closed.", fed_id); - if (buffer[0] == MSG_TYPE_CLOCK_SYNC_T3) { - int32_t fed_id = extract_int32(&(buffer[1])); - assert(fed_id > -1); - assert(fed_id < 65536); - LF_PRINT_DEBUG("RTI received T3 clock sync message from federate %d.", fed_id); - handle_physical_clock_sync_message(fed, TCP); - } else { - lf_print_error("Unexpected message %u from federate %d.", buffer[0], fed_id); - send_reject(socket_id, UNEXPECTED_MESSAGE); - return 0; - } + + federate_info_t *fed = GET_FED_INFO(fed_id); + if (rti_remote->clock_sync_global_status >= clock_sync_init) {// If no initial clock sync, no need perform initial clock sync. + uint16_t federate_UDP_port_number = extract_uint16(&(response[0])); + + LF_PRINT_DEBUG("RTI got MSG_TYPE_UDP_PORT %u from federate %d.", federate_UDP_port_number, fed_id); + + // A port number of UINT16_MAX means initial clock sync should not be performed. + if (federate_UDP_port_number != UINT16_MAX) { + // Perform the initialization clock synchronization with the federate. + // Send the required number of messages for clock synchronization + for (int i=0; i < rti_remote->clock_sync_exchanges_per_interval; i++) { + // Send the RTI's current physical time T1 to the federate. + send_physical_clock(MSG_TYPE_CLOCK_SYNC_T1, fed, TCP); + + // Listen for reply message, which should be T3. + size_t message_size = 1 + sizeof(int32_t); + unsigned char buffer[message_size]; + read_from_socket_errexit(socket_id, message_size, buffer, + "Socket to federate %d unexpectedly closed.", fed_id); + if (buffer[0] == MSG_TYPE_CLOCK_SYNC_T3) { + int32_t fed_id = extract_int32(&(buffer[1])); + assert(fed_id > -1); + assert(fed_id < 65536); + LF_PRINT_DEBUG("RTI received T3 clock sync message from federate %d.", fed_id); + handle_physical_clock_sync_message(fed, TCP); + } else { + lf_print_error("Unexpected message %u from federate %d.", buffer[0], fed_id); + send_reject(socket_id, UNEXPECTED_MESSAGE); + return 0; } - LF_PRINT_DEBUG("RTI finished initial clock synchronization with federate %d.", fed_id); } - if (rti_remote->clock_sync_global_status >= clock_sync_on) { // If no runtime clock sync, no need to set up the UDP port. - if (federate_UDP_port_number > 0) { - // Initialize the UDP_addr field of the federate struct - fed->UDP_addr.sin_family = AF_INET; - fed->UDP_addr.sin_port = htons(federate_UDP_port_number); - fed->UDP_addr.sin_addr = fed->server_ip_addr; - } - } else { - // Disable clock sync after initial round. - fed->clock_synchronization_enabled = false; - } - } else { // No clock synchronization at all. - // Clock synchronization is universally disabled via the clock-sync command-line parameter - // (-c off was passed to the RTI). - // Note that the federates are still going to send a MSG_TYPE_UDP_PORT message but with a payload (port) of -1. - fed->clock_synchronization_enabled = false; + LF_PRINT_DEBUG("RTI finished initial clock synchronization with federate %d.", fed_id); + } + if (rti_remote->clock_sync_global_status >= clock_sync_on) { // If no runtime clock sync, no need to set up the UDP port. + if (federate_UDP_port_number > 0) { + // Initialize the UDP_addr field of the federate struct + fed->UDP_addr.sin_family = AF_INET; + fed->UDP_addr.sin_port = htons(federate_UDP_port_number); + fed->UDP_addr.sin_addr = fed->server_ip_addr; + } + } else { + // Disable clock sync after initial round. + fed->clock_synchronization_enabled = false; } + } else { // No clock synchronization at all. + // Clock synchronization is universally disabled via the clock-sync command-line parameter + // (-c off was passed to the RTI). + // Note that the federates are still going to send a MSG_TYPE_UDP_PORT message but with a payload (port) of -1. + fed->clock_synchronization_enabled = false; } return 1; } @@ -1480,9 +1630,7 @@ void connect_to_federates(int socket_descriptor) { // The first message from the federate should contain its ID and the federation ID. int32_t fed_id = receive_and_check_fed_id_message(socket_id, (struct sockaddr_in*)&client_fd); - if (fed_id >= 0 - && receive_connection_information(socket_id, (uint16_t)fed_id) - && receive_udp_message_and_set_up_clock_sync(socket_id, (uint16_t)fed_id)) { + if (fed_id >= 0 && receive_set_up_information(socket_id, (uint16_t)fed_id)) { // Create a thread to communicate with the federate. // This has to be done after clock synchronization is finished @@ -1496,6 +1644,7 @@ void connect_to_federates(int socket_descriptor) { i--; } } + determine_the_ndt_condition(); // All federates have connected. LF_PRINT_DEBUG("All federates have connected to RTI."); @@ -1646,6 +1795,7 @@ void initialize_RTI(rti_remote_t *rti){ rti_remote->clock_sync_period_ns = MSEC(10); rti_remote->clock_sync_exchanges_per_interval = 10; rti_remote->authentication_enabled = false; + rti_remote->ndt_enabled = false; rti_remote->base.tracing_enabled = false; rti_remote->stop_in_progress = false; } diff --git a/core/federated/RTI/rti_remote.h b/core/federated/RTI/rti_remote.h index b3249ec30..bfe4370bf 100644 --- a/core/federated/RTI/rti_remote.h +++ b/core/federated/RTI/rti_remote.h @@ -35,6 +35,8 @@ ///////////////////////////////////////////// //// Data structures +static char version_info[] = "0.5.0"; // Simply use Lingua Franca version name for now. + typedef enum socket_type_t { TCP, UDP @@ -160,6 +162,12 @@ typedef struct rti_remote_t { * Boolean indicating that authentication is enabled. */ bool authentication_enabled; + + /** + * Boolean indicating that NDT message is enabled. + */ + bool ndt_enabled; + /** * Boolean indicating that a stop request is already in progress. */ @@ -213,6 +221,24 @@ int create_server(int32_t specified_port, uint16_t port, socket_type_t socket_ty */ void update_federate_next_event_tag_locked(uint16_t federate_id, tag_t next_event_tag); +/** + * Update the cycle information of every federate. +*/ +void update_cycle_information(); + +/** + * Determine whether or not to use the NDT messages. +*/ +void determine_the_ndt_condition(); + +/** + * @brief Send the next downstream tag. + * + * @param fed The downstream federate. + * @param next_event_tag The next event tag from the downstream federate. +*/ +void send_upstream_next_downstream_tag(federate_info_t* fed, tag_t next_event_tag); + /** * Handle a port absent message being received rom a federate via the RIT. * @@ -414,14 +440,36 @@ void send_reject(int socket_id, unsigned char error_code); int32_t receive_and_check_fed_id_message(int socket_id, struct sockaddr_in* client_fd); /** - * Listen for a MSG_TYPE_NEIGHBOR_STRUCTURE message, and upon receiving it, fill - * out the relevant information in the federate's struct. + * Listen for MSG_TYPE_NEIGHBOR_STRUCTURE, MSG_TYPE_PHYSICAL_ACTION, and + * MSG_TYPE_UDP_PORT messages, and upon receiving them, handle a payload + * of each message. + * @param socket_id The socket on which to listen. + * @param fed_id The federate ID. + * @return 1 for success, 0 for failure. +*/ +int receive_set_up_information(int socket_id, uint16_t fed_id); + +/** + * Handle a MSG_TYPE_NEIGHBOR_STRUCTURE message. Fill out + * the relevant information in the federate's struct. + * @param socket_id The socket on which to listen. + * @param fed_id The federate ID. + * @return 1 for success, 0 for failure. */ -int receive_connection_information(int socket_id, uint16_t fed_id); +int handle_connection_information(int socket_id, uint16_t fed_id); + +/** + * Handle a MSG_TYPE_PHYSICAL_ACTION message. Update the information of + * presence of a physical action. + * @param socket_id The socket on which to listen. + * @param fed_id The federate ID. + * @return 1 for success, 0 for failure. +*/ +int handle_physical_action_information(int socket_id, uint16_t fed_id); /** - * Listen for a MSG_TYPE_UDP_PORT message, and upon receiving it, set up - * clock synchronization and perform the initial clock synchronization. + * Handle a MSG_TYPE_UDP_PORT message, set up clock synchronization and + * perform the initial clock synchronization. * Initial clock synchronization is performed only if the MSG_TYPE_UDP_PORT message * payload is not UINT16_MAX. If it is also not 0, then this function sets * up to perform runtime clock synchronization using the UDP port number @@ -431,7 +479,7 @@ int receive_connection_information(int socket_id, uint16_t fed_id); * @param fed_id The federate ID. * @return 1 for success, 0 for failure. */ -int receive_udp_message_and_set_up_clock_sync(int socket_id, uint16_t fed_id); +int handle_udp_message_and_set_up_clock_sync(int socket_id, uint16_t fed_id); #ifdef __RTI_AUTH__ /** diff --git a/core/federated/federate.c b/core/federated/federate.c index c57553464..1c2c9c8d7 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -67,6 +67,9 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include // For secure random number generation. #include // For HMAC-based authentication of federates. #endif +#ifdef FEDERATED_NDT_ENABLED +#include "pqueue_tag.h" +#endif // FEDERATED_NDT_ENABLED // Global variables defined in tag.c: extern instant_t _lf_last_reported_unadjusted_physical_time_ns; @@ -420,6 +423,17 @@ int send_timed_message(environment_t* env, lf_mutex_unlock(&outbound_socket_mutex); return 0; } + + // Insert the intended tag into the ndt_q to send LTC to the RTI quickly. +#ifdef FEDERATED_NDT_ENABLED + if (pqueue_tag_size(env->ndt_q) != 0) { + // FIXME: If the RTI changes the use of NDTs dynamically, merely checking the size + // is not enough to know whether this federate using the NDT optimization or not. + LF_PRINT_DEBUG("Insert NDT at the intended to send LTC and NET quickly."); + pqueue_tag_insert_if_no_match(env->ndt_q, current_message_intended_tag); + } +#endif // FEDERATED_NDT_ENABLED + // Trace the event when tracing is enabled if (message_type == MSG_TYPE_TAGGED_MESSAGE) { tracepoint_federate_to_rti(_fed.trace, send_TAGGED_MSG, _lf_my_fed_id, ¤t_message_intended_tag); @@ -1143,6 +1157,16 @@ void connect_to_rti(const char* hostname, int port) { // @see MSG_TYPE_NEIGHBOR_STRUCTURE in net_common.h send_neighbor_structure_to_RTI(_fed.socket_TCP_RTI); +#ifdef FEDERATED_NDT_ENABLED + // Send whether this federate has a physical action to the RTI. + uint16_t has_physical_action = _fed.min_delay_from_physical_action_to_federate_output != NEVER; + unsigned char physical_action_message_buffer[1 + sizeof(uint16_t)]; + physical_action_message_buffer[0] = MSG_TYPE_PHYSICAL_ACTION; + encode_uint16(has_physical_action, &(physical_action_message_buffer[1])); + write_to_socket_errexit(_fed.socket_TCP_RTI, 1 + sizeof(uint16_t), physical_action_message_buffer, + "Failed to send physical action info to RTI."); +#endif // FEDERATED_NDT_ENABLED + uint16_t udp_port = setup_clock_synchronization_with_rti(); // Write the returned port number to the RTI @@ -1392,10 +1416,6 @@ void send_port_absent_to_federate(environment_t* env, interval_t additional_dela unsigned short fed_ID) { assert(env != GLOBAL_ENVIRONMENT); - // Construct the message - size_t message_length = 1 + sizeof(port_ID) + sizeof(fed_ID) + sizeof(instant_t) + sizeof(microstep_t); - unsigned char buffer[message_length]; - // Apply the additional delay to the current tag and use that as the intended // tag of the outgoing message. Note that if there is delay on the connection, // then we cannot promise no message with tag = current_tag + delay because a @@ -1403,6 +1423,28 @@ void send_port_absent_to_federate(environment_t* env, interval_t additional_dela // message with a tag strictly less than current_tag + delay. tag_t current_message_intended_tag = lf_delay_strict(env->current_tag, additional_delay); + + // FIXME: If port absent messages are not used when there is no zero-delay cycle, + // This part is not needed as we don't apply the NDT optimization for cycles. +#ifdef FEDERATED_NDT_ENABLED + if (pqueue_tag_size(env->ndt_q) != 0 ) { + // FIXME: If the RTI changes the use of NDTs dynamically, merely checking the size + // is not enough to know whether this federate using the NDT optimization or not. + tag_t earliest_ndt = pqueue_tag_peek(env->ndt_q)->tag; + if (lf_tag_compare(current_message_intended_tag, earliest_ndt) < 0) { + // No events exist in any downstream federates + LF_PRINT_DEBUG("The intended tag " PRINTF_TAG " is less than the earliest NDT " PRINTF_TAG "." + "Skip sending the port absent message.", + current_message_intended_tag.time - start_time, current_message_intended_tag.microstep, + earliest_ndt.time - start_time, earliest_ndt.microstep); + return; + } + } +#endif // FEDERATED_NDT_ENABLED + + // Construct the message + size_t message_length = 1 + sizeof(port_ID) + sizeof(fed_ID) + sizeof(instant_t) + sizeof(microstep_t); + unsigned char buffer[message_length]; LF_PRINT_LOG("Sending port " "absent for tag " PRINTF_TAG " for port %d to federate %d.", @@ -1910,11 +1952,32 @@ void _lf_logical_tag_complete(tag_t tag_to_send) { if (compare_with_last_tag >= 0) { return; } - LF_PRINT_LOG("Sending Logical Time Complete (LTC) " PRINTF_TAG " to the RTI.", + + environment_t *env; + _lf_get_environments(&env); + bool need_to_send_LTC = true; +#ifdef FEDERATED_NDT_ENABLED + if (pqueue_tag_size(env->ndt_q) != 0 ) { + // FIXME: If the RTI changes the use of NDTs dynamically, merely checking the size + // is not enough to know whether this federate using the NDT optimization or not. + tag_t earliest_ndt = pqueue_tag_peek(env->ndt_q)->tag; + if (lf_tag_compare(tag_to_send, earliest_ndt) < 0) { + // No events exist in any downstream federates + LF_PRINT_DEBUG("The intended tag " PRINTF_TAG " is less than the earliest NDT " PRINTF_TAG "." + "Skip sending the logical tag complete.", + tag_to_send.time - start_time, tag_to_send.microstep, + earliest_ndt.time - start_time, earliest_ndt.microstep); + need_to_send_LTC = false; + } + } +#endif // FEDERATED_NDT_ENABLED + if (need_to_send_LTC) { + LF_PRINT_LOG("Sending Logical Tag Complete (LTC) " PRINTF_TAG " to the RTI.", tag_to_send.time - start_time, tag_to_send.microstep); - _lf_send_tag(MSG_TYPE_LOGICAL_TAG_COMPLETE, tag_to_send, true); - _fed.last_sent_LTC = tag_to_send; + _lf_send_tag(MSG_TYPE_LOGICAL_TAG_COMPLETE, tag_to_send, true); + _fed.last_sent_LTC = tag_to_send; + } } bool update_max_level(tag_t tag, bool is_provisional) { @@ -2143,6 +2206,7 @@ void handle_provisional_tag_advance_grant() { // TAG. In either case, we know that at the PTAG tag, all outputs // have either been sent or are absent, so we can send an LTC. // Send an LTC to indicate absent outputs. + LF_PRINT_DEBUG("Inside handle_provisional_tag_advance_grant"); _lf_logical_tag_complete(PTAG); // Nothing more to do. lf_mutex_unlock(&env->mutex); @@ -2337,6 +2401,43 @@ void handle_stop_request_message() { lf_mutex_unlock(&outbound_socket_mutex); } +#ifdef FEDERATED_NDT_ENABLED +/** + * Handle a MSG_TYPE_NEXT_DOWNSTREAM_MESSAGE from the RTI + * + * +*/ +void handle_next_downstream_tag() { + // FIXME: maybe the mutex lock is needed. Is it possible to remove an element + // simultaneously with this function? + size_t bytes_to_read = sizeof(instant_t) + sizeof(microstep_t); + unsigned char buffer[bytes_to_read]; + read_from_socket_errexit(_fed.socket_TCP_RTI, bytes_to_read, buffer, + "Failed to read next downstream tag from RTI."); + tag_t NDT = extract_tag(buffer); + + LF_PRINT_LOG("Received from RTI a MSG_TYPE_NEXT_DOWNSTREAM_TAG message with elapsed tag " PRINTF_TAG ".", + NDT.time - start_time, NDT.microstep); + // Trace the event when tracing is enabled + tracepoint_federate_from_rti(_fed.trace, receive_NDT, _lf_my_fed_id, &NDT); + + environment_t* env; + _lf_get_environments(&env); + + if (lf_tag_compare(env->current_tag, NDT) <= 0) { + // The current tag is less than or equal to the NDT and the tag doesn't exists + // in the ndt_q. Insert NDT to ndt_q. + pqueue_tag_insert_if_no_match(env->ndt_q, NDT); + } + if (lf_tag_compare(env->current_tag, NDT) > 0) { + // The current tag is greater than the NDT. Send the LTC with the NDT and + // push the current tag to ndt_q so that this federate notify the appropriate NET message. + _lf_send_tag(MSG_TYPE_LOGICAL_TAG_COMPLETE, NDT, true); + pqueue_tag_insert_if_no_match(env->ndt_q, env->current_tag); + } +} +#endif // FEDERATED_NDT_ENABLED + /** * Close sockets used to communicate with other federates, if they are open, * and send a MSG_TYPE_RESIGN message to the RTI. This implements the function @@ -2554,6 +2655,11 @@ void* listen_to_rti_TCP(void* args) { case MSG_TYPE_PORT_ABSENT: handle_port_absent_message(_fed.socket_TCP_RTI, -1); break; +#ifdef FEDERATED_NDT_ENABLED + case MSG_TYPE_NEXT_DOWNSTREAM_TAG: + handle_next_downstream_tag(); + break; +#endif // FEDERATED_NDT_ENABLED case MSG_TYPE_CLOCK_SYNC_T1: case MSG_TYPE_CLOCK_SYNC_T4: lf_print_error("Federate %d received unexpected clock sync message from RTI on TCP socket.", @@ -2742,10 +2848,31 @@ tag_t _lf_send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply // This if statement does not fall through but rather returns. // NET is not bounded by physical time or has no downstream federates. // Normal case. - _lf_send_tag(MSG_TYPE_NEXT_EVENT_TAG, tag, wait_for_reply); - _fed.last_sent_NET = tag; - LF_PRINT_LOG("Sent next event tag (NET) " PRINTF_TAG " to RTI.", - tag.time - start_time, tag.microstep); + + // If there is no downstream events that require the NET of the current tag, + // do not send the NET. + bool need_to_send_NET = true; +#ifdef FEDERATED_NDT_ENABLED + if (pqueue_tag_size(env->ndt_q) != 0 ) { + // FIXME: If the RTI changes the use of NDTs dynamically, merely checking the size + // is not enough to know whether this federate using the NDT optimization or not. + tag_t earliest_ndt = pqueue_tag_peek(env->ndt_q)->tag; + if (lf_tag_compare(tag, earliest_ndt) < 0) { + // No events exist in any downstream federates + LF_PRINT_DEBUG("The intended tag " PRINTF_TAG " is less than the earliest NDT " PRINTF_TAG "." + "Skip sending the next event tag.", + tag.time - start_time, tag.microstep, + earliest_ndt.time - start_time, earliest_ndt.microstep); + need_to_send_NET = false; + } + } +#endif // FEDERATED_NDT_ENABLED + if (need_to_send_NET) { + _lf_send_tag(MSG_TYPE_NEXT_EVENT_TAG, tag, wait_for_reply); + _fed.last_sent_NET = tag; + LF_PRINT_LOG("Sent next event tag (NET) " PRINTF_TAG " to RTI.", + tag.time - start_time, tag.microstep); + } if (!wait_for_reply) { LF_PRINT_LOG("Not waiting for reply to NET."); diff --git a/core/reactor_common.c b/core/reactor_common.c index eb31c761e..19febbac5 100644 --- a/core/reactor_common.c +++ b/core/reactor_common.c @@ -51,6 +51,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #endif #include "port.h" #include "pqueue.h" +#include "pqueue_tag.h" #include "reactor.h" #include "reactor_common.h" #include "tag.h" @@ -292,6 +293,16 @@ void _lf_start_time_step(environment_t *env) { } } +#ifdef FEDERATED_NDT_ENABLED + while (pqueue_tag_size(env->ndt_q) != 0 + && lf_tag_compare(pqueue_tag_peek(env->ndt_q)->tag, env->current_tag) < 0) { + // Remove elements of ndt_q with tag less than the current tag. + tag_t tag_to_remove = pqueue_tag_pop_tag(env->ndt_q); + LF_PRINT_DEBUG("Remove the tag " PRINTF_TAG " from the ndt_q is less than the current tag " PRINTF_TAG ". Remove it.", + tag_to_remove.time - start_time, tag_to_remove.microstep, + env->current_tag.time - start_time, env->current_tag.microstep); + } +#endif #ifdef FEDERATED_DECENTRALIZED for (int i = 0; i < env->is_present_fields_size; i++) { // FIXME: For now, an intended tag of (NEVER, 0) diff --git a/include/core/environment.h b/include/core/environment.h index d4852ddca..2ac58ac8d 100644 --- a/include/core/environment.h +++ b/include/core/environment.h @@ -38,6 +38,7 @@ #include "lf_types.h" #include "platform.h" +#include "pqueue_tag.h" #include "trace.h" // Forward declarations so that a pointers can appear in the environment struct. @@ -106,6 +107,9 @@ typedef struct environment_t { tag_t** _lf_intended_tag_fields; int _lf_intended_tag_fields_size; #endif // FEDERATED +#ifdef FEDERATED_NDT_ENABLED + pqueue_tag_t* ndt_q; +#endif // FEDERATED_NDT_ENABLED #ifdef LF_ENCLAVES // TODO: Consider dropping #ifdef enclave_info_t *enclave_info; #endif diff --git a/include/core/federated/network/net_common.h b/include/core/federated/network/net_common.h index 38001cc0b..cd9ae218d 100644 --- a/include/core/federated/network/net_common.h +++ b/include/core/federated/network/net_common.h @@ -685,6 +685,28 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #define MSG_TYPE_NEIGHBOR_STRUCTURE 24 #define MSG_TYPE_NEIGHBOR_STRUCTURE_HEADER_SIZE 9 +/** + * Byte identifying a next downstream event tag (NDET) message sent from a downstream + * federate via the RTI in centralized coordination. + * The next eight bytes will be the timestamp. + * The next four bytes will be the microstep. + * This message from the RTI tells the federate the tag of the earliest event on the + * source federate's event queue. In other words, this federate only has to send LTC + * and NULL messages greater than or equal to this tag. If this federate has no its + * upstream federates, this federate also can skip sending NET messages that are + * greater than or equal to this tag. + */ +#define MSG_TYPE_NEXT_DOWNSTREAM_TAG 25 + +/** + * A message that informs the RTI about its presence of a physical action. The + * information is used for deciding whether the RTI sends NDT messages to the source + * federate's upstreams. + * The next four byte will be 1 if the source federate has a physical action + * and 0 otherwise. +*/ +#define MSG_TYPE_PHYSICAL_ACTION 26 + ///////////////////////////////////////////// //// Rejection codes diff --git a/include/core/trace.h b/include/core/trace.h index 598c669bf..fcee5efc8 100644 --- a/include/core/trace.h +++ b/include/core/trace.h @@ -98,6 +98,7 @@ typedef enum send_P2P_MSG, send_ADR_AD, send_ADR_QR, + send_NDT, // Receiving messages receive_ACK, receive_TIMESTAMP, @@ -119,6 +120,7 @@ typedef enum receive_P2P_MSG, receive_ADR_AD, receive_ADR_QR, + receive_NDT, receive_UNIDENTIFIED, NUM_EVENT_TYPES } trace_event_t; @@ -161,6 +163,7 @@ static const char *trace_event_names[] = { "Sending P2P_MSG", "Sending ADR_AD", "Sending ADR_QR", + "Sending NDT", // Receiving messages "Receiving ACK", "Receiving TIMESTAMP", @@ -182,6 +185,7 @@ static const char *trace_event_names[] = { "Receiving P2P_MSG", "Receiving ADR_AD", "Receiving ADR_QR", + "Receiving NDT", "Receiving UNIDENTIFIED", }; diff --git a/include/core/utils/pqueue.h b/include/core/utils/pqueue.h index edfd4968c..2df94fa5c 100644 --- a/include/core/utils/pqueue.h +++ b/include/core/utils/pqueue.h @@ -96,4 +96,14 @@ void print_reaction(void *reaction); */ void print_event(void *event); +// ********** NDT Priority Queue Support Start + +int tag_in_reverse_order(pqueue_pri_t thiz, pqueue_pri_t that); +int tag_matches(void* next, void* curr); +pqueue_pri_t get_ndtq_priority(void *a); +size_t get_ndtq_position(void *a); +void set_ndtq_position(void *a, size_t pos); +int ndt_node_matches(void* next, void* curr); +void print_tag(void *reaction); + #endif /* PQUEUE_H */ diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt index 1f7391f92..4b247b944 100644 --- a/lingua-franca-ref.txt +++ b/lingua-franca-ref.txt @@ -1 +1 @@ -master +rti-ndt diff --git a/util/tracing/visualization/fedsd.py b/util/tracing/visualization/fedsd.py index d9d44253b..6d65231b2 100644 --- a/util/tracing/visualization/fedsd.py +++ b/util/tracing/visualization/fedsd.py @@ -26,6 +26,7 @@ .NET { stroke: #118ab2; fill: #118ab2} \ .PTAG { stroke: #06d6a0; fill: #06d6a0} \ .TAG { stroke: #08a578; fill: #08a578} \ + .NDT { stroke: purple; fill: purple} \ .TIMESTAMP { stroke: grey; fill: grey } \ .FED_ID {stroke: #80DD99; fill: #80DD99 } \ .ADV {stroke-linecap="round" ; stroke: "red" ; fill: "red"} \ @@ -60,6 +61,7 @@ "Sending P2P_MSG": "P2P_MSG", "Sending ADR_AD": "ADR_AD", "Sending ADR_QR": "ADR_QR", + "Sending NDT": "NDT", "Receiving ACK": "ACK", "Receiving TIMESTAMP": "TIMESTAMP", "Receiving NET": "NET", @@ -80,6 +82,7 @@ "Receiving P2P_MSG": "P2P_MSG", "Receiving ADR_AD": "ADR_AD", "Receiving ADR_QR": "ADR_QR", + "Receiving NDT": "NDT", "Receiving UNIDENTIFIED": "UNIDENTIFIED", "Scheduler advancing time ends": "AdvLT" }