From fed4a555ca760cbc502f7282a504b508b8abb347 Mon Sep 17 00:00:00 2001 From: Chanhee Lee Date: Sun, 11 Feb 2024 18:22:19 -0700 Subject: [PATCH] Recover parts of original federated tests --- .../launcher/FedLauncherGenerator.java | 15 +-- test/RustRti/src/federated/Absent.lf | 46 +++++++ .../src/federated/BroadcastFeedback.lf | 33 +++++ .../BroadcastFeedbackWithHierarchy.lf | 40 ++++++ test/RustRti/src/federated/ChainWithDelay.lf | 20 +++ test/RustRti/src/federated/DistributedBank.lf | 24 ++++ .../federated/DistributedBankToMultiport.lf | 33 +++++ .../RustRti/src/federated/DistributedCount.lf | 41 ++++++ .../src/federated/DistributedDoublePort.lf | 52 ++++++++ .../src/federated/DistributedInterleaved.lf | 44 +++++++ .../src/federated/DistributedLoopedAction.lf | 62 +++++++++ .../DistributedLoopedPhysicalAction.lf | 84 +++++++++++++ .../src/federated/DistributedMultiport.lf | 48 +++++++ .../federated/DistributedMultiportToBank.lf | 41 ++++++ .../federated/DistributedMultiportToken.lf | 46 +++++++ .../src/federated/DistributedNetworkOrder.lf | 75 +++++++++++ .../DistributedPhysicalActionUpstream.lf | 60 +++++++++ .../DistributedPhysicalActionUpstreamLong.lf | 88 +++++++++++++ test/RustRti/src/federated/DistributedStop.lf | 118 ++++++++++++++++++ test/RustRti/src/lib/Count.lf | 11 ++ test/RustRti/src/lib/FileLevelPreamble.lf | 12 ++ test/RustRti/src/lib/FileReader.txt | 1 + test/RustRti/src/lib/GenDelay.lf | 21 ++++ test/RustRti/src/lib/Imported.lf | 14 +++ test/RustRti/src/lib/ImportedAgain.lf | 15 +++ test/RustRti/src/lib/ImportedComposition.lf | 22 ++++ test/RustRti/src/lib/InternalDelay.lf | 15 +++ test/RustRti/src/lib/LoopedActionSender.lf | 36 ++++++ test/RustRti/src/lib/PassThrough.lf | 11 ++ test/RustRti/src/lib/Test.lf | 15 +++ test/RustRti/src/lib/TestCount.lf | 34 +++++ test/RustRti/src/lib/TestCountMultiport.lf | 41 ++++++ 32 files changed, 1208 insertions(+), 10 deletions(-) create mode 100644 test/RustRti/src/federated/Absent.lf create mode 100644 test/RustRti/src/federated/BroadcastFeedback.lf create mode 100644 test/RustRti/src/federated/BroadcastFeedbackWithHierarchy.lf create mode 100644 test/RustRti/src/federated/ChainWithDelay.lf create mode 100644 test/RustRti/src/federated/DistributedBank.lf create mode 100644 test/RustRti/src/federated/DistributedBankToMultiport.lf create mode 100644 test/RustRti/src/federated/DistributedCount.lf create mode 100644 test/RustRti/src/federated/DistributedDoublePort.lf create mode 100644 test/RustRti/src/federated/DistributedInterleaved.lf create mode 100644 test/RustRti/src/federated/DistributedLoopedAction.lf create mode 100644 test/RustRti/src/federated/DistributedLoopedPhysicalAction.lf create mode 100644 test/RustRti/src/federated/DistributedMultiport.lf create mode 100644 test/RustRti/src/federated/DistributedMultiportToBank.lf create mode 100644 test/RustRti/src/federated/DistributedMultiportToken.lf create mode 100644 test/RustRti/src/federated/DistributedNetworkOrder.lf create mode 100644 test/RustRti/src/federated/DistributedPhysicalActionUpstream.lf create mode 100644 test/RustRti/src/federated/DistributedPhysicalActionUpstreamLong.lf create mode 100644 test/RustRti/src/federated/DistributedStop.lf create mode 100644 test/RustRti/src/lib/Count.lf create mode 100644 test/RustRti/src/lib/FileLevelPreamble.lf create mode 100644 test/RustRti/src/lib/FileReader.txt create mode 100644 test/RustRti/src/lib/GenDelay.lf create mode 100644 test/RustRti/src/lib/Imported.lf create mode 100644 test/RustRti/src/lib/ImportedAgain.lf create mode 100644 test/RustRti/src/lib/ImportedComposition.lf create mode 100644 test/RustRti/src/lib/InternalDelay.lf create mode 100644 test/RustRti/src/lib/LoopedActionSender.lf create mode 100644 test/RustRti/src/lib/PassThrough.lf create mode 100644 test/RustRti/src/lib/Test.lf create mode 100644 test/RustRti/src/lib/TestCount.lf create mode 100644 test/RustRti/src/lib/TestCountMultiport.lf diff --git a/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java b/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java index 757cb4487a..175f610826 100644 --- a/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java +++ b/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java @@ -319,7 +319,7 @@ public void doGenerateForRustRTI(List federates, RtiConfig rti // Launch the RTI in the foreground. if (host.equals("localhost") || host.equals("0.0.0.0")) { // FIXME: the paths below will not work on Windows - shCode.append(getLaunchCodeForRustRti()).append("\n"); + shCode.append(getLaunchCodeForRustRti(Integer.toString(federates.size()))).append("\n"); } else { // Start the RTI on the remote machine - Not supported yet for Rust RTI. } @@ -329,10 +329,7 @@ public void doGenerateForRustRTI(List federates, RtiConfig rti for (FederateInstance federate : federates) { var buildConfig = getBuildConfig(federate, fileConfig, messageReporter); if (federate.isRemote) { - Path fedRelSrcGenPath = - fileConfig.getOutPath().relativize(fileConfig.getSrcGenPath()).resolve(federate.name); - if (distCode.isEmpty()) distCode.append(distHeader).append("\n"); - String logFileName = String.format("log/%s_%s.log", fileConfig.name, federate.name); + if (distCode.isEmpty()) distCode.append(distHeader).append("\n"); distCode.append(getDistCode(rtiConfig.getDirectory(), federate)).append("\n"); shCode .append(getFedRemoteLaunchCode(rtiConfig.getDirectory(), federate, federateIndex++)) @@ -543,7 +540,8 @@ private String getLaunchCode(String rtiLaunchCode) { "sleep 1"); } - private String getLaunchCodeForRustRti() { + private String getLaunchCodeForRustRti(String numberOfFederates) { + String launchCodeWithoutLogging = new String("cargo run -- -i ${FEDERATION_ID} -n "+ numberOfFederates + " -c init &"); return String.join( "\n", "echo \"#### Launching the Rust runtime infrastructure (RTI).\"", @@ -561,10 +559,7 @@ private String getLaunchCodeForRustRti() { " FIRST_RUST_RTI_PATH=${FIRST_RUST_RTI_REMOTE_PATH[0]%/*}", " cd ${FIRST_RUST_RTI_PATH}; cd ../", "fi", - "cargo run -- -i ${FEDERATION_ID} \\", - "-n 2 \\", - "-c init \\", - "&", + launchCodeWithoutLogging, "# Store the PID of the RTI", "RTI=$!", "# Wait for the RTI to boot up before", diff --git a/test/RustRti/src/federated/Absent.lf b/test/RustRti/src/federated/Absent.lf new file mode 100644 index 0000000000..7130210cf3 --- /dev/null +++ b/test/RustRti/src/federated/Absent.lf @@ -0,0 +1,46 @@ +target C { + tracing: true, + timeout: 100 ms +} + +reactor Sender { + output out1: int + output out2: int + timer t(0, 20 ms) + state c: int = 1 + + reaction(t) -> out1, out2 {= + if (self->c % 2 != 0) { + lf_set(out1, self->c); + } else { + lf_set(out2, self->c); + } + self->c++; + =} +} + +reactor Receiver { + input in1: int + input in2: int + + reaction(in1) {= + lf_print("Received %d on in1", in1->value); + if (in1->value % 2 == 0) { + lf_print_error_and_exit("********* Expected an odd integer!"); + } + =} + + reaction(in2) {= + lf_print("Received %d on in2", in2->value); + if (in2->value % 2 != 0) { + lf_print_error_and_exit("********* Expected an even integer!"); + } + =} +} + +federated reactor(d: time = 1 ms) { + s = new Sender() + r = new Receiver() + s.out1 -> r.in1 + s.out2 -> r.in2 +} diff --git a/test/RustRti/src/federated/BroadcastFeedback.lf b/test/RustRti/src/federated/BroadcastFeedback.lf new file mode 100644 index 0000000000..66a93c275b --- /dev/null +++ b/test/RustRti/src/federated/BroadcastFeedback.lf @@ -0,0 +1,33 @@ +/** This tests an output that is broadcast back to a multiport input of a bank. */ +target C { + timeout: 1 sec, + build-type: RelWithDebInfo +} + +reactor SenderAndReceiver { + output out: int + input[2] in: int + state received: bool = false + + reaction(startup) -> out {= + lf_set(out, 42); + =} + + reaction(in) {= + if (in[0]->is_present && in[1]->is_present && in[0]->value == 42 && in[1]->value == 42) { + lf_print("SUCCESS"); + self->received = true; + } + =} + + reaction(shutdown) {= + if (!self->received == true) { + lf_print_error_and_exit("Failed to receive broadcast"); + } + =} +} + +federated reactor { + s = new[2] SenderAndReceiver() + (s.out)+ -> s.in +} diff --git a/test/RustRti/src/federated/BroadcastFeedbackWithHierarchy.lf b/test/RustRti/src/federated/BroadcastFeedbackWithHierarchy.lf new file mode 100644 index 0000000000..114e42cfd7 --- /dev/null +++ b/test/RustRti/src/federated/BroadcastFeedbackWithHierarchy.lf @@ -0,0 +1,40 @@ +/** This tests an output that is broadcast back to a multiport input of a bank. */ +target C { + timeout: 1 sec +} + +reactor SenderAndReceiver { + output out: int + input[2] in: int + state received: bool = false + + r = new Receiver() + in -> r.in + + reaction(startup) -> out {= + lf_set(out, 42); + =} +} + +reactor Receiver { + input[2] in: int + state received: bool = false + + reaction(in) {= + if (in[0]->is_present && in[1]->is_present && in[0]->value == 42 && in[1]->value == 42) { + lf_print("SUCCESS"); + self->received = true; + } + =} + + reaction(shutdown) {= + if (!self->received == true) { + lf_print_error_and_exit("Failed to receive broadcast"); + } + =} +} + +federated reactor { + s = new[2] SenderAndReceiver() + (s.out)+ -> s.in +} diff --git a/test/RustRti/src/federated/ChainWithDelay.lf b/test/RustRti/src/federated/ChainWithDelay.lf new file mode 100644 index 0000000000..dea606bf51 --- /dev/null +++ b/test/RustRti/src/federated/ChainWithDelay.lf @@ -0,0 +1,20 @@ +/** + * Demonstration that monotonic NET hypothesis is invalid. + * + * @author Edward A. Lee + */ +target C { + timeout: 3 msec +} + +import Count from "../lib/Count.lf" +import InternalDelay from "../lib/InternalDelay.lf" +import TestCount from "../lib/TestCount.lf" + +federated reactor { + c = new Count(period = 1 msec) + i = new InternalDelay(delay = 500 usec) + t = new TestCount(num_inputs=3) + c.out -> i.in + i.out -> t.in +} diff --git a/test/RustRti/src/federated/DistributedBank.lf b/test/RustRti/src/federated/DistributedBank.lf new file mode 100644 index 0000000000..65a6f871c2 --- /dev/null +++ b/test/RustRti/src/federated/DistributedBank.lf @@ -0,0 +1,24 @@ +// Check bank of federates. +target C { + timeout: 1 sec, + coordination: centralized +} + +reactor Node(bank_index: int = 0) { + timer t(0, 100 msec) + state count: int = 0 + + reaction(t) {= + lf_print("Hello world %d.", self->count++); + =} + + reaction(shutdown) {= + if (self->count == 0) { + lf_print_error_and_exit("Timer reactions did not execute."); + } + =} +} + +federated reactor DistributedBank { + n = new[2] Node() +} diff --git a/test/RustRti/src/federated/DistributedBankToMultiport.lf b/test/RustRti/src/federated/DistributedBankToMultiport.lf new file mode 100644 index 0000000000..d73b0959fd --- /dev/null +++ b/test/RustRti/src/federated/DistributedBankToMultiport.lf @@ -0,0 +1,33 @@ +// Check multiport to bank connections between federates. +target C { + timeout: 3 sec +} + +import Count from "../lib/Count.lf" + +reactor Destination { + input[2] in: int + state count: int = 1 + + reaction(in) {= + for (int i = 0; i < in_width; i++) { + lf_print("Received %d.", in[i]->value); + if (self->count != in[i]->value) { + lf_print_error_and_exit("Expected %d.", self->count); + } + } + self->count++; + =} + + reaction(shutdown) {= + if (self->count == 0) { + lf_print_error_and_exit("No data received."); + } + =} +} + +federated reactor { + s = new[2] Count() + d = new Destination() + s.out -> d.in +} diff --git a/test/RustRti/src/federated/DistributedCount.lf b/test/RustRti/src/federated/DistributedCount.lf new file mode 100644 index 0000000000..fb1c86904e --- /dev/null +++ b/test/RustRti/src/federated/DistributedCount.lf @@ -0,0 +1,41 @@ +/** + * Test a particularly simple form of a distributed deterministic system where a federation that + * receives timestamped messages has only those messages as triggers. Therefore, no additional + * coordination of the advancement of time (HLA or Ptides) is needed. + * @author Edward A. Lee + */ +target C { + timeout: 5 sec, + coordination: centralized +} + +import Count from "../lib/Count.lf" + +reactor Print { + input in: int + state c: int = 1 + + reaction(in) {= + interval_t elapsed_time = lf_time_logical_elapsed(); + lf_print("At time " PRINTF_TIME ", received %d", elapsed_time, in->value); + if (in->value != self->c) { + lf_print_error_and_exit("Expected to receive %d.", self->c); + } + if (elapsed_time != MSEC(200) + SEC(1) * (self->c - 1) ) { + lf_print_error_and_exit("Expected received time to be " PRINTF_TIME ".", MSEC(200) * self->c); + } + self->c++; + =} + + reaction(shutdown) {= + if (self->c != 6) { + lf_print_error_and_exit("Expected to receive 5 items."); + } + =} +} + +federated reactor DistributedCount(offset: time = 200 msec) { + c = new Count() + p = new Print() + c.out -> p.in after offset +} diff --git a/test/RustRti/src/federated/DistributedDoublePort.lf b/test/RustRti/src/federated/DistributedDoublePort.lf new file mode 100644 index 0000000000..ec0a6d0b1d --- /dev/null +++ b/test/RustRti/src/federated/DistributedDoublePort.lf @@ -0,0 +1,52 @@ +/** + * Test the case for when two upstream federates send messages to a downstream federate on two + * different ports. One message should carry a microstep delay relative to the other message. + * + * @author Soroush Bateni + */ +target C { + timeout: 900 msec, + coordination: centralized +} + +import Count from "../lib/Count.lf" + +reactor CountMicrostep { + state count: int = 1 + output out: int + logical action act: int + timer t(0, 1 sec) + + reaction(t) -> act {= + lf_schedule_int(act, 0, self->count++); + =} + + reaction(act) -> out {= + lf_set(out, act->value); + =} +} + +reactor Print { + input in: int + input in2: int + + reaction(in, in2) {= + interval_t elapsed_time = lf_time_logical_elapsed(); + lf_print("At tag " PRINTF_TAG ", received in = %d and in2 = %d.", elapsed_time, lf_tag().microstep, in->value, in2->value); + if (in->is_present && in2->is_present) { + lf_print_error_and_exit("ERROR: invalid logical simultaneity."); + } + =} + + reaction(shutdown) {= + lf_print("SUCCESS: messages were at least one microstep apart."); + =} +} + +federated reactor DistributedDoublePort { + c = new Count() + cm = new CountMicrostep() + p = new Print() + c.out -> p.in // Indicating a 'logical' connection. + cm.out -> p.in2 +} diff --git a/test/RustRti/src/federated/DistributedInterleaved.lf b/test/RustRti/src/federated/DistributedInterleaved.lf new file mode 100644 index 0000000000..dc212daf17 --- /dev/null +++ b/test/RustRti/src/federated/DistributedInterleaved.lf @@ -0,0 +1,44 @@ +// Check multiport to bank connections between federates. +target C { + timeout: 3 sec +} + +reactor Count(offset: time = 0, period: time = 1 sec) { + state count: int = 1 + output[4] out: int + timer t(offset, period) + + reaction(t) -> out {= + for (int i = 0; i < out_width; i++) { + lf_set(out[i], self->count++); + } + =} +} + +reactor Destination { + input[2] in: int + state count: int = 0 + + reaction(in) {= + lf_print("Received %d.", in[0]->value); + lf_print("Received %d.", in[1]->value); + // Because the connection is interleaved, the difference between the + // two inputs should be 2, not 1. + if (in[1]->value - in[0]->value != 2) { + lf_print_error_and_exit("Expected a difference of two."); + } + self->count++; + =} + + reaction(shutdown) {= + if (self->count == 0) { + lf_print_error_and_exit("No data received."); + } + =} +} + +federated reactor { + s = new Count() + d = new[2] Destination() + s.out -> interleaved(d.in) +} diff --git a/test/RustRti/src/federated/DistributedLoopedAction.lf b/test/RustRti/src/federated/DistributedLoopedAction.lf new file mode 100644 index 0000000000..88418f84d1 --- /dev/null +++ b/test/RustRti/src/federated/DistributedLoopedAction.lf @@ -0,0 +1,62 @@ +/** + * Test a sender-receiver network system that relies on microsteps being taken into account. + * + * @author Soroush Bateni + */ +target C { + logging: LOG, + timeout: 1 sec +} + +import Sender from "../lib/LoopedActionSender.lf" + +reactor Receiver(take_a_break_after: int = 10, break_interval: time = 400 msec) { + input in: int + state received_messages: int = 0 + state total_received_messages: int = 0 + state breaks: int = 0 + timer t(0, 10 msec) // This will impact the performance + + // but forces the logical time to advance Comment this line for a more sensible log output. + reaction(in) {= + lf_print("At tag " PRINTF_TAG " received value %d.", + lf_time_logical_elapsed(), + lf_tag().microstep, + in->value); + self->total_received_messages++; + if (in->value != self->received_messages++) { + lf_print_error("Expected %d", self->received_messages - 1); + // exit(1); + } + if (lf_time_logical_elapsed() != self->breaks * self->break_interval) { + lf_print_error("Received messages at an incorrect time: " PRINTF_TIME, lf_time_logical_elapsed()); + // exit(2); + } + + if (self->received_messages == self->take_a_break_after) { + // Sender is taking a break; + self->breaks++; + self->received_messages = 0; + } + =} + + reaction(t) {= + // Do nothing + =} + + reaction(shutdown) {= + if (self->breaks != 3 || + (self->total_received_messages != ((SEC(1)/self->break_interval)+1) * self->take_a_break_after) + ) { + lf_print_error_and_exit("Did not receive enough messages."); + } + printf("SUCCESS: Successfully received all messages from the sender.\n"); + =} +} + +federated reactor DistributedLoopedAction { + sender = new Sender() + receiver = new Receiver() + + sender.out -> receiver.in +} diff --git a/test/RustRti/src/federated/DistributedLoopedPhysicalAction.lf b/test/RustRti/src/federated/DistributedLoopedPhysicalAction.lf new file mode 100644 index 0000000000..b30ce210e3 --- /dev/null +++ b/test/RustRti/src/federated/DistributedLoopedPhysicalAction.lf @@ -0,0 +1,84 @@ +/** + * Test a sender-receiver network system that is similar to DistributedLoopedAction, but it uses a + * physical action rather than a logical action. This also demonstrates the advance-message-interval + * coordination option. This specifies the time period between Time Advance Notice (TAN) messages + * sent to the RTI (a form of null message that must be sent because of the physical action). The + * presence of this option also silences a warning about having a physical action that triggers an + * output. + * + * @author Soroush Bateni + */ +target C { + timeout: 1 sec, + tracing: true, + // Silences warning. + coordination-options: { + advance-message-interval: 10 msec + } +} + +reactor Sender(take_a_break_after: int = 10, break_interval: time = 550 msec) { + output out: int + physical action act + state sent_messages: int = 0 + + reaction(startup, act) -> act, out {= + // Send a message on out + lf_set(out, self->sent_messages); + self->sent_messages++; + if (self->sent_messages < self->take_a_break_after) { + lf_schedule(act, 0); + } else { + // Take a break + self->sent_messages = 0; + lf_schedule(act, self->break_interval); + } + =} +} + +reactor Receiver(take_a_break_after: int = 10, break_interval: time = 550 msec) { + input in: int + state received_messages: int = 0 + state total_received_messages: int = 0 + state breaks: int = 0 + timer t(0, 10 msec) // This will impact the performance + + // but forces the logical time to advance Comment this line for a more sensible log output. + reaction(in) {= + tag_t current_tag = lf_tag(); + lf_print("At tag " PRINTF_TAG " received %d.", + current_tag.time - lf_time_start(), + current_tag.microstep, + in->value); + self->total_received_messages++; + if (in->value != self->received_messages++) { + lf_print_error_and_exit("Expected %d.", self->received_messages - 1); + } + + if (self->received_messages == self->take_a_break_after) { + // Sender is taking a break; + self->breaks++; + self->received_messages = 0; + } + =} + + reaction(t) {= + // Do nothing + =} + + reaction(shutdown) {= + if (self->breaks < 2 || + (self->total_received_messages < ((SEC(1)/self->break_interval)+1) * self->take_a_break_after) + ) { + lf_print_error_and_exit("Test failed. Breaks: %d, Messages: %d.", self->breaks, self->total_received_messages); + } + lf_print("SUCCESS: Successfully received all messages from the sender."); + =} +} + +federated reactor DistributedLoopedPhysicalAction { + sender = new Sender() + receiver = new Receiver() + + sender.out -> receiver.in +} diff --git a/test/RustRti/src/federated/DistributedMultiport.lf b/test/RustRti/src/federated/DistributedMultiport.lf new file mode 100644 index 0000000000..44a04c4654 --- /dev/null +++ b/test/RustRti/src/federated/DistributedMultiport.lf @@ -0,0 +1,48 @@ +// Check multiport connections between federates. +target C { + timeout: 1 sec, + coordination: centralized +} + +reactor Source(width: int = 2) { + output[width] out: int + timer t(0, 100 msec) + state count: int = 0 + + reaction(t) -> out {= + for (int i = 0; i < out_width; i++) { + lf_set(out[i], self->count++); + } + =} +} + +reactor Destination(width: int = 3) { + input[width] in: int + state count: int = 0 + + reaction(in) {= + for (int i = 0; i < in_width; i++) { + if (in[i]->is_present) { + tag_t now = lf_tag(); + lf_print("Received %d at channel %d at tag " PRINTF_TAG, in[i]->value, i, + now.time - lf_time_start(), now.microstep + ); + if (in[i]->value != self->count++) { + lf_print_error_and_exit("Expected %d.", self->count - 1); + } + } + } + =} + + reaction(shutdown) {= + if (self->count == 0) { + lf_print_error_and_exit("No data received."); + } + =} +} + +federated reactor DistributedMultiport { + s = new Source(width=4) + d = new Destination(width=4) + s.out -> d.in +} diff --git a/test/RustRti/src/federated/DistributedMultiportToBank.lf b/test/RustRti/src/federated/DistributedMultiportToBank.lf new file mode 100644 index 0000000000..d8171de51e --- /dev/null +++ b/test/RustRti/src/federated/DistributedMultiportToBank.lf @@ -0,0 +1,41 @@ +// Check multiport to bank connections between federates. +target C { + timeout: 1 sec +} + +reactor Source { + output[2] out: int + timer t(0, 100 msec) + state count: int = 0 + + reaction(t) -> out {= + for (int i = 0; i < out_width; i++) { + lf_set(out[i], self->count); + } + self->count++; + =} +} + +reactor Destination { + input in: int + state count: int = 0 + + reaction(in) {= + lf_print("Received %d.", in->value); + if (self->count++ != in->value) { + lf_print_error_and_exit("Expected %d.", self->count - 1); + } + =} + + reaction(shutdown) {= + if (self->count == 0) { + lf_print_error_and_exit("No data received."); + } + =} +} + +federated reactor DistributedMultiportToBank { + s = new Source() + d = new[2] Destination() + s.out -> d.in +} diff --git a/test/RustRti/src/federated/DistributedMultiportToken.lf b/test/RustRti/src/federated/DistributedMultiportToken.lf new file mode 100644 index 0000000000..547fe651d9 --- /dev/null +++ b/test/RustRti/src/federated/DistributedMultiportToken.lf @@ -0,0 +1,46 @@ +// Check multiport connections between federates where the message is carried by a Token (in this +// case, with an array of char). +target C { + timeout: 1 sec, + coordination: centralized +} + +reactor Source { + output[4] out: char* + timer t(0, 200 msec) + state count: int = 0 + + reaction(t) -> out {= + for (int i = 0; i < out_width; i++) { + // With NULL, 0 arguments, snprintf tells us how many bytes are needed. + // Add one for the null terminator. + int length = snprintf(NULL, 0, "Hello %d", self->count) + 1; + // Dynamically allocate memory for the output. + SET_NEW_ARRAY(out[i], length); + // Populate the output string and increment the count. + snprintf(out[i]->value, length, "Hello %d", self->count++); + lf_print("MessageGenerator: At time " PRINTF_TIME ", send message: %s.", + lf_time_logical_elapsed(), + out[i]->value + ); + } + =} +} + +reactor Destination { + input[4] in: char* + + reaction(in) {= + for (int i = 0; i < in_width; i++) { + if (in[i]->is_present) { + lf_print("Received %s.", in[i]->value); + } + } + =} +} + +federated reactor DistributedMultiportToken { + s = new Source() + d = new Destination() + s.out -> d.in +} diff --git a/test/RustRti/src/federated/DistributedNetworkOrder.lf b/test/RustRti/src/federated/DistributedNetworkOrder.lf new file mode 100644 index 0000000000..b1413c11b1 --- /dev/null +++ b/test/RustRti/src/federated/DistributedNetworkOrder.lf @@ -0,0 +1,75 @@ +/** + * This is a test for send_timed_message, which is an internal API. + * + * This test sends a second message at time 5 msec that has the same intended tag as a message that + * it had previously sent at time 0 msec. This results in a warning, but the message microstep is + * incremented and correctly received one microstep later. + * + * @author Soroush Bateni + */ +target C { + timeout: 1 sec, + build-type: RelWithDebInfo // Release with debug info +} + +preamble {= + #ifdef __cplusplus + extern "C" { + #endif + #include "federate.h" + #ifdef __cplusplus + } + #endif +=} + +reactor Sender { + output out: int + timer t(0, 1 msec) + + reaction(t) -> out {= + int payload = 1; + if (lf_time_logical_elapsed() == 0LL) { + lf_send_tagged_message(self->base.environment, MSEC(10), MSG_TYPE_TAGGED_MESSAGE, 0, 1, "federate 1", sizeof(int), + (unsigned char*)&payload); + } else if (lf_time_logical_elapsed() == MSEC(5)) { + payload = 2; + lf_send_tagged_message(self->base.environment, MSEC(5), MSG_TYPE_TAGGED_MESSAGE, 0, 1, "federate 1", sizeof(int), + (unsigned char*)&payload); + } + =} +} + +reactor Receiver { + input in: int + state success: int = 0 + + reaction(in) {= + tag_t current_tag = lf_tag(); + if (current_tag.time == (lf_time_start() + MSEC(10))) { + if (current_tag.microstep == 0 && in->value == 1) { + self->success++; + } else if (current_tag.microstep == 1 && in->value == 2) { + self->success++; + } + } + printf("Received %d at tag " PRINTF_TAG ".\n", + in->value, + lf_time_logical_elapsed(), + lf_tag().microstep); + =} + + reaction(shutdown) {= + if (self->success != 2) { + fprintf(stderr, "ERROR: Failed to receive messages.\n"); + exit(1); + } + printf("SUCCESS.\n"); + =} +} + +federated reactor DistributedNetworkOrder { + sender = new Sender() + receiver = new Receiver() + + sender.out -> receiver.in +} diff --git a/test/RustRti/src/federated/DistributedPhysicalActionUpstream.lf b/test/RustRti/src/federated/DistributedPhysicalActionUpstream.lf new file mode 100644 index 0000000000..3a85c9b3d1 --- /dev/null +++ b/test/RustRti/src/federated/DistributedPhysicalActionUpstream.lf @@ -0,0 +1,60 @@ +/** + * Test that a rapidly produced physical action in an upstream federate can be properly handled in + * federated execution. + */ +target C { + timeout: 10 secs, + coordination-options: { + advance-message-interval: 30 msec + } +} + +import PassThrough from "../lib/PassThrough.lf" +import TestCount from "../lib/TestCount.lf" + +preamble {= + extern int _counter; + void callback(void *a); + void* take_time(void* a); +=} + +reactor WithPhysicalAction { + preamble {= + int _counter = 1; + void callback(void *a) { + lf_schedule_int(a, 0, _counter++); + } + // Simulate time passing before a callback occurs. + void* take_time(void* a) { + while (_counter < 15) { + instant_t sleep_time = MSEC(10); + lf_sleep(sleep_time); + callback(a); + } + return NULL; + } + =} + + output out: int + state thread_id: lf_thread_t = 0 + physical action act(0): int + + reaction(startup) -> act {= + // start new thread, provide callback + lf_thread_create(&self->thread_id, &take_time, act); + =} + + reaction(act) -> out {= + lf_set(out, act->value); + =} +} + +federated reactor { + a = new WithPhysicalAction() + m1 = new PassThrough() + m2 = new PassThrough() + test = new TestCount(num_inputs=14) + a.out -> m1.in + m1.out -> m2.in + m2.out -> test.in +} diff --git a/test/RustRti/src/federated/DistributedPhysicalActionUpstreamLong.lf b/test/RustRti/src/federated/DistributedPhysicalActionUpstreamLong.lf new file mode 100644 index 0000000000..99154b1ad2 --- /dev/null +++ b/test/RustRti/src/federated/DistributedPhysicalActionUpstreamLong.lf @@ -0,0 +1,88 @@ +/** + * Test that a rapidly produced physical action in an upstream federate can be properly handled in a + * long chain of federates. + */ +target C { + timeout: 1 sec, + coordination-options: { + advance-message-interval: 500 usec + } +} + +import PassThrough from "../lib/PassThrough.lf" +import TestCount from "../lib/TestCount.lf" + +preamble {= + extern int _counter; + void callback(void *a); + void* take_time(void* a); +=} + +reactor WithPhysicalAction { + preamble {= + int _counter = 1; + void callback(void *a) { + lf_schedule_int(a, 0, _counter++); + } + // Simulate time passing before a callback occurs. + void* take_time(void* a) { + while (_counter < 20) { + instant_t sleep_time = USEC(50); + lf_sleep(sleep_time); + callback(a); + } + return NULL; + } + =} + output out: int + state thread_id: lf_thread_t = 0 + physical action act(0): int + + reaction(startup) -> act {= + // start new thread, provide callback + lf_thread_create(&self->thread_id, &take_time, act); + =} + + reaction(act) -> out {= + lf_set(out, act->value); + =} +} + +federated reactor { + a = new WithPhysicalAction() + test = new TestCount(num_inputs=19) + + passThroughs1 = new PassThrough() + passThroughs2 = new PassThrough() + passThroughs3 = new PassThrough() + passThroughs4 = new PassThrough() + passThroughs5 = new PassThrough() + passThroughs6 = new PassThrough() + passThroughs7 = new PassThrough() + passThroughs8 = new PassThrough() + passThroughs9 = new PassThrough() + passThroughs10 = new PassThrough() + + a.out, + passThroughs1.out, + passThroughs2.out, + passThroughs3.out, + passThroughs4.out, + passThroughs5.out, + passThroughs6.out, + passThroughs7.out, + passThroughs8.out, + passThroughs9.out, + passThroughs10.out -> + passThroughs1.in, + passThroughs2.in, + passThroughs3.in, + passThroughs4.in, + passThroughs5.in, + passThroughs6.in, + passThroughs7.in, + passThroughs8.in, + passThroughs9.in, + passThroughs10.in, + test.in +} diff --git a/test/RustRti/src/federated/DistributedStop.lf b/test/RustRti/src/federated/DistributedStop.lf new file mode 100644 index 0000000000..6e8796d90b --- /dev/null +++ b/test/RustRti/src/federated/DistributedStop.lf @@ -0,0 +1,118 @@ +/** + * Test for lf_request_stop() in federated execution with centralized coordination. + * + * @author Soroush Bateni + */ +target C + +reactor Sender { + output out: int + timer t(0, 1 usec) + logical action act + state reaction_invoked_correctly: bool = false + + reaction(t, act) -> out, act {= + lf_print("Sending 42 at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + lf_set(out, 42); + if (lf_tag().microstep == 0) { + // Instead of having a separate reaction + // for 'act' like Stop.lf, we trigger the + // same reaction to test lf_request_stop() being + // called multiple times + lf_schedule(act, 0); + } + if (lf_time_logical_elapsed() == USEC(1)) { + // Call lf_request_stop() both at (1 usec, 0) and + // (1 usec, 1) + lf_print("Requesting stop at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + lf_request_stop(); + } + + tag_t _1usec1 = (tag_t) { .time = USEC(1) + lf_time_start(), .microstep = 1u }; + if (lf_tag_compare(lf_tag(), _1usec1) == 0) { + // The reaction was invoked at (1 usec, 1) as expected + self->reaction_invoked_correctly = true; + } else if (lf_tag_compare(lf_tag(), _1usec1) > 0) { + // The reaction should not have been invoked at tags larger than (1 usec, 1) + lf_print_error_and_exit("ERROR: Invoked reaction(t, act) at tag bigger than shutdown."); + } + =} + + reaction(shutdown) {= + if (lf_time_logical_elapsed() != USEC(1) || + lf_tag().microstep != 1) { + lf_print_error_and_exit("ERROR: Sender failed to stop the federation in time. " + "Stopping at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + } else if (self->reaction_invoked_correctly == false) { + lf_print_error_and_exit("ERROR: Sender reaction(t, act) was not invoked at (1 usec, 1). " + "Stopping at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + } + lf_print("SUCCESS: Successfully stopped the federation at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + =} +} + +reactor Receiver( + // Used in the decentralized variant of the test + stp_offset: time = 10 msec) { + input in: int + state reaction_invoked_correctly: bool = false + + reaction(in) {= + lf_print("Received %d at " PRINTF_TAG ".", + in->value, + lf_time_logical_elapsed(), + lf_tag().microstep); + if (lf_time_logical_elapsed() == USEC(1)) { + lf_print("Requesting stop at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + lf_request_stop(); + // The receiver should receive a message at tag + // (1 usec, 1) and trigger this reaction + self->reaction_invoked_correctly = true; + } + + tag_t _1usec1 = (tag_t) { .time = USEC(1) + lf_time_start(), .microstep = 1u }; + if (lf_tag_compare(lf_tag(), _1usec1) > 0) { + self->reaction_invoked_correctly = false; + } + =} + + reaction(shutdown) {= + // Sender should have requested stop earlier than the receiver. + // Therefore, the shutdown events must occur at (1000, 0) on the + // receiver. + if (lf_time_logical_elapsed() != USEC(1) || + lf_tag().microstep != 1) { + lf_print_error_and_exit("Receiver failed to stop the federation at the right time. " + "Stopping at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + } else if (self->reaction_invoked_correctly == false) { + lf_print_error_and_exit("Receiver reaction(in) was not invoked the correct number of times. " + "Stopping at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + } + lf_print("SUCCESS: Successfully stopped the federation at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + =} +} + +federated reactor DistributedStop { + sender = new Sender() + receiver = new Receiver() + + sender.out -> receiver.in +} diff --git a/test/RustRti/src/lib/Count.lf b/test/RustRti/src/lib/Count.lf new file mode 100644 index 0000000000..ee3953b021 --- /dev/null +++ b/test/RustRti/src/lib/Count.lf @@ -0,0 +1,11 @@ +target C + +reactor Count(offset: time = 0, period: time = 1 sec) { + state count: int = 1 + output out: int + timer t(offset, period) + + reaction(t) -> out {= + lf_set(out, self->count++); + =} +} diff --git a/test/RustRti/src/lib/FileLevelPreamble.lf b/test/RustRti/src/lib/FileLevelPreamble.lf new file mode 100644 index 0000000000..11067d5e63 --- /dev/null +++ b/test/RustRti/src/lib/FileLevelPreamble.lf @@ -0,0 +1,12 @@ +/** Test for ensuring that file-level preambles are inherited when a file is imported. */ +target C + +preamble {= + #define FOO 2 +=} + +reactor FileLevelPreamble { + reaction(startup) {= + printf("FOO: %d\n", FOO); + =} +} diff --git a/test/RustRti/src/lib/FileReader.txt b/test/RustRti/src/lib/FileReader.txt new file mode 100644 index 0000000000..5e1c309dae --- /dev/null +++ b/test/RustRti/src/lib/FileReader.txt @@ -0,0 +1 @@ +Hello World \ No newline at end of file diff --git a/test/RustRti/src/lib/GenDelay.lf b/test/RustRti/src/lib/GenDelay.lf new file mode 100644 index 0000000000..8f21c3de1b --- /dev/null +++ b/test/RustRti/src/lib/GenDelay.lf @@ -0,0 +1,21 @@ +target C + +preamble {= + typedef int message_t; +=} + +reactor Source { + output out: message_t + + reaction(startup) -> out {= + lf_set(out, 42); + =} +} + +reactor Sink { + input in: message_t + + reaction(in) {= + lf_print("Received %d at time %lld", in->value, lf_time_logical_elapsed()); + =} +} diff --git a/test/RustRti/src/lib/Imported.lf b/test/RustRti/src/lib/Imported.lf new file mode 100644 index 0000000000..85d0a2b493 --- /dev/null +++ b/test/RustRti/src/lib/Imported.lf @@ -0,0 +1,14 @@ +// This is used by the test for the ability to import a reactor definition that itself imports a +// reactor definition. +target C + +import ImportedAgain from "./ImportedAgain.lf" + +reactor Imported { + input x: int + a = new ImportedAgain() + + reaction(x) -> a.x {= + lf_set(a.x, x->value); + =} +} diff --git a/test/RustRti/src/lib/ImportedAgain.lf b/test/RustRti/src/lib/ImportedAgain.lf new file mode 100644 index 0000000000..6870526b95 --- /dev/null +++ b/test/RustRti/src/lib/ImportedAgain.lf @@ -0,0 +1,15 @@ +// This is used by the test for the ability to import a reactor definition that itself imports a +// reactor definition. +target C + +reactor ImportedAgain { + input x: int + + reaction(x) {= + printf("Received: %d.\n", x->value); + if (x->value != 42) { + printf("ERROR: Expected input to be 42. Got: %d.\n", x->value); + exit(1); + } + =} +} diff --git a/test/RustRti/src/lib/ImportedComposition.lf b/test/RustRti/src/lib/ImportedComposition.lf new file mode 100644 index 0000000000..e5524f3d22 --- /dev/null +++ b/test/RustRti/src/lib/ImportedComposition.lf @@ -0,0 +1,22 @@ +// This is used by the test for the ability to import a reactor definition that itself imports a +// reactor definition. +target C + +reactor Gain { + input x: int + output y: int + + reaction(x) -> y {= + lf_set(y, x->value * 2); + =} +} + +reactor ImportedComposition { + input x: int + output y: int + g1 = new Gain() + g2 = new Gain() + x -> g1.x after 10 msec + g1.y -> g2.x after 30 msec + g2.y -> y after 15 msec +} diff --git a/test/RustRti/src/lib/InternalDelay.lf b/test/RustRti/src/lib/InternalDelay.lf new file mode 100644 index 0000000000..fb7124a4ec --- /dev/null +++ b/test/RustRti/src/lib/InternalDelay.lf @@ -0,0 +1,15 @@ +target C + +reactor InternalDelay(delay: time = 10 msec) { + input in: int + output out: int + logical action d: int + + reaction(in) -> d {= + lf_schedule_int(d, self->delay, in->value); + =} + + reaction(d) -> out {= + lf_set(out, d->value); + =} +} diff --git a/test/RustRti/src/lib/LoopedActionSender.lf b/test/RustRti/src/lib/LoopedActionSender.lf new file mode 100644 index 0000000000..e9ea36f40a --- /dev/null +++ b/test/RustRti/src/lib/LoopedActionSender.lf @@ -0,0 +1,36 @@ +/** + * A sender reactor that outputs integers in superdense time. + * + * @author Soroush Bateni + */ +target C + +/** + * @param take_a_break_after: Indicates how many messages are sent in consecutive superdense time + * @param break_interval: Determines how long the reactor should take a break after sending + * take_a_break_after messages. + */ +reactor Sender(take_a_break_after: int = 10, break_interval: time = 400 msec) { + output out: int + logical action act + state sent_messages: int = 0 + + reaction(startup, act) -> act, out {= + // Send a message on out + /* printf("At tag (%lld, %u) sending value %d.\n", + lf_time_logical_elapsed(), + lf_tag().microstep, + self->sent_messages + ); */ + lf_set(out, self->sent_messages); + lf_print("Sender sent %d.", self->sent_messages); + self->sent_messages++; + if (self->sent_messages < self->take_a_break_after) { + lf_schedule(act, 0); + } else { + // Take a break + self->sent_messages=0; + lf_schedule(act, self->break_interval); + } + =} +} diff --git a/test/RustRti/src/lib/PassThrough.lf b/test/RustRti/src/lib/PassThrough.lf new file mode 100644 index 0000000000..389905489a --- /dev/null +++ b/test/RustRti/src/lib/PassThrough.lf @@ -0,0 +1,11 @@ +/** Forward the integer input on `in` to the output port `out`. */ +target C + +reactor PassThrough { + input in: int + output out: int + + reaction(in) -> out {= + lf_set(out, in->value); + =} +} diff --git a/test/RustRti/src/lib/Test.lf b/test/RustRti/src/lib/Test.lf new file mode 100644 index 0000000000..69e4f79b2c --- /dev/null +++ b/test/RustRti/src/lib/Test.lf @@ -0,0 +1,15 @@ +target C + +reactor TestDouble(expected: double[] = {1.0, 1.0, 1.0, 1.0}) { + input in: double + state count: int = 0 + + reaction(in) {= + printf("Received: %f\n", in->value); + if (in->value != self->expected[self->count]) { + printf("ERROR: Expected %f.\n", self->expected[self->count]); + exit(1); + } + self->count++; + =} +} diff --git a/test/RustRti/src/lib/TestCount.lf b/test/RustRti/src/lib/TestCount.lf new file mode 100644 index 0000000000..e4fbb82b02 --- /dev/null +++ b/test/RustRti/src/lib/TestCount.lf @@ -0,0 +1,34 @@ +/** + * Test that a counting sequence of inputs starts with the specified start parameter value, + * increments by the specified stride, and receives the specified number of inputs. + * + * @param start The starting value for the expected inputs. Default is 1. + * @param stride The increment for the inputs. Default is 1. + * @param num_inputs The number of inputs expected. Default is 1. + */ +target C + +reactor TestCount(start: int = 1, stride: int = 1, num_inputs: int = 1) { + state count: int = start + state inputs_received: int = 0 + input in: int + + reaction(in) {= + lf_print("Received %d.", in->value); + if (in->value != self->count) { + lf_print_error_and_exit("Expected %d.", self->count); + } + self->count += self->stride; + self->inputs_received++; + =} + + reaction(shutdown) {= + lf_print("Shutdown invoked."); + if (self->inputs_received != self->num_inputs) { + lf_print_error_and_exit("Expected to receive %d inputs, but got %d.", + self->num_inputs, + self->inputs_received + ); + } + =} +} diff --git a/test/RustRti/src/lib/TestCountMultiport.lf b/test/RustRti/src/lib/TestCountMultiport.lf new file mode 100644 index 0000000000..a0b0db294d --- /dev/null +++ b/test/RustRti/src/lib/TestCountMultiport.lf @@ -0,0 +1,41 @@ +/** + * Test that a counting sequence of inputs starts with the specified start parameter value, + * increments by the specified stride, and receives the specified number of inputs. This version has + * a multiport input, and each input is expected to be present and incremented over the previous + * input. + * + * @param start The starting value for the expected inputs. Default is 1. + * @param stride The increment for the inputs. Default is 1. + * @param num_inputs The number of inputs expected on each channel. Default is 1. + */ +target C + +reactor TestCountMultiport(start: int = 1, stride: int = 1, num_inputs: int = 1, width: int = 2) { + state count: int = start + state inputs_received: int = 0 + input[width] in: int + + reaction(in) {= + for (int i = 0; i < in_width; i++) { + if (!in[i]->is_present) { + lf_print_error_and_exit("No input on channel %d.", i); + } + lf_print("Received %d on channel %d.", in[i]->value, i); + if (in[i]->value != self->count) { + lf_print_error_and_exit("Expected %d.", self->count); + } + self->count += self->stride; + } + self->inputs_received++; + =} + + reaction(shutdown) {= + lf_print("Shutdown invoked."); + if (self->inputs_received != self->num_inputs) { + lf_print_error_and_exit("Expected to receive %d inputs, but only got %d.", + self->num_inputs, + self->inputs_received + ); + } + =} +}