Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/rmq naming caliper #98

Merged
merged 3 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -472,9 +472,9 @@ jobs:
\"service-port\": ${RABBITMQ_PORT},
\"service-host\": \"${RABBITMQ_HOST}\",
\"rabbitmq-vhost\": \"/\",
\"rabbitmq-outbound-queue\": \"test-ci\",
\"rabbitmq-exchange\": \"ams-fanout\",
\"rabbitmq-routing-key\": \"training\"
\"rabbitmq-queue-physics\": \"test-ci\",
\"rabbitmq-exchange-training\": \"ams-fanout\",
\"rabbitmq-key-training\": \"training\"
},
\"update_surrogate\": false
},
Expand Down
7 changes: 4 additions & 3 deletions examples/ideal_gas/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,12 @@ If you want to use RabbitMQ as database back-end you will have to perform additi
"service-port": 1234,
"service-host": "",
"rabbitmq-cert": "creds.pem",
"rabbitmq-inbound-queue": "test4",
"rabbitmq-outbound-queue": "test3"
"rabbitmq-queue-physics": "ams-data",
"rabbitmq-exchange-training": "ams-training",
"rabbitmq-key-training": "training"
}
```
`rabbitmq-cert` is where the TLS certificate is (absolute path ideally), `rabbitmq-queue-data` is the name of the queue used by AMS.
`rabbitmq-cert` is where the TLS certificate is (absolute path ideally), `rabbitmq-queue-physics` is the name of the queue used by AMS.
You can use for testing the credentials I have pre-generated to access a RabbitMQ server located in PDS here : `/usr/workspace/AMS/pds/rabbitmq/`.

Known issues (that I am working on):
Expand Down
36 changes: 30 additions & 6 deletions src/AMSWorkflow/ams/rmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,30 @@ def broadcast(self, message):

@dataclass
class AMSRMQConfiguration:
"""
This class parse the credentials to connect
to the RabbitMQ server.
The expected format is the same as AMSlib.
{
"db" : {
"dbType" : "rmq",
"rmq_config" : {
"service-port": 0,
"service-host": "",
"rabbitmq-erlang-cookie": "",
"rabbitmq-name": "",
"rabbitmq-password": "",
"rabbitmq-user": "",
"rabbitmq-vhost": "",
"rabbitmq-cert": "",
"rabbitmq-queue-physics": "",
"rabbitmq-exchange-training": "",
"rabbitmq-key-training": ""
},
"update_surrogate": true|false
}
}
"""
service_port: int
service_host: str
rabbitmq_erlang_cookie: str
Expand All @@ -893,11 +917,11 @@ class AMSRMQConfiguration:
rabbitmq_user: str
rabbitmq_vhost: str
rabbitmq_cert: str
rabbitmq_outbound_queue: str
rabbitmq_queue_physics: str
rabbitmq_exchange_training: str = ""
rabbitmq_key_training: str = ""
rabbitmq_ml_submit_queue: str = ""
rabbitmq_ml_status_queue: str = ""
rabbitmq_exchange: str = "not-used"
rabbitmq_routing_key: str = ""

def __post_init__(self):
if not Path(self.rabbitmq_cert).exists():
Expand Down Expand Up @@ -926,9 +950,9 @@ def to_dict(self, AMSlib=False):
"rabbitmq-user": self.rabbitmq_user,
"rabbitmq-vhost": self.rabbitmq_vhost,
"rabbitmq-cert": self.rabbitmq_cert,
"rabbitmq-outbound-queue": self.rabbitmq_outbound_queue,
"rabbitmq-exchange": self.rabbitmq_exchange,
"rabbitmq-routing-key": self.rabbitmq_routing_key,
"rabbitmq-queue-physics": self.rabbitmq_queue_physics,
"rabbitmq-exchange-training": self.rabbitmq_exchange_training,
"rabbitmq-key-training": self.rabbitmq_key_training,
"rabbitmq-ml-submit-queue": self.rabbitmq_ml_submit_queue,
"rabbitmq-ml-status-queue": self.rabbitmq_ml_status_queue,
}
Expand Down
4 changes: 2 additions & 2 deletions src/AMSWorkflow/ams/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1011,8 +1011,8 @@ def from_cli(cls, args):
config.rabbitmq_user,
config.rabbitmq_password,
config.rabbitmq_cert,
config.rabbitmq_outbound_queue,
config.rabbitmq_ml_status_queue if args.update_rmq_models else None,
config.rabbitmq_queue_physics,
config.rabbitmq_exchange_training if args.update_rmq_models else None
)

def requires_model_update(self):
Expand Down
6 changes: 3 additions & 3 deletions src/AMSlib/AMS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,11 +379,11 @@ class AMSWrap
std::string rmq_user = getEntry<std::string>(rmq_entry, "rabbitmq-user");
std::string rmq_vhost = getEntry<std::string>(rmq_entry, "rabbitmq-vhost");
std::string rmq_out_queue =
getEntry<std::string>(rmq_entry, "rabbitmq-outbound-queue");
getEntry<std::string>(rmq_entry, "rabbitmq-queue-physics");
std::string exchange =
getEntry<std::string>(rmq_entry, "rabbitmq-exchange");
getEntry<std::string>(rmq_entry, "rabbitmq-exchange-training");
std::string routing_key =
getEntry<std::string>(rmq_entry, "rabbitmq-routing-key");
getEntry<std::string>(rmq_entry, "rabbitmq-key-training");
bool update_surrogate = getEntry<bool>(entry, "update_surrogate");

// We allow connection to RabbitMQ without TLS certificate
Expand Down
27 changes: 16 additions & 11 deletions src/AMSlib/wf/basedb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@
#ifndef __AMS_BASE_DB__
#define __AMS_BASE_DB__


#include <H5Ipublic.h>

#include <cstdint>
#include <experimental/filesystem>
#include <fstream>
Expand Down Expand Up @@ -38,6 +35,7 @@ namespace fs = std::experimental::filesystem;
#endif

#ifdef __ENABLE_HDF5__
#include <H5Ipublic.h>
#include <hdf5.h>
#define HDF5_ERROR(Eid) \
if (Eid < 0) { \
Expand All @@ -47,6 +45,9 @@ namespace fs = std::experimental::filesystem;
}
#endif

#ifdef __AMS_ENABLE_CALIPER__
#include <caliper/cali_macros.h>
#endif

#ifdef __ENABLE_RMQ__
#include <amqpcpp.h>
Expand Down Expand Up @@ -229,6 +230,7 @@ class csvDB final : public FileDB
inputs.size(),
outputs.size())

CALIPER(CALI_MARK_BEGIN("STORE_CSV");)
const size_t num_in = inputs.size();
const size_t num_out = outputs.size();

Expand All @@ -251,6 +253,7 @@ class csvDB final : public FileDB
}
fd << outputs[num_out - 1][i] << "\n";
}
CALIPER(CALI_MARK_END("STORE_CSV");)
}


Expand Down Expand Up @@ -1529,9 +1532,9 @@ class RMQPublisher
* "service-port": 31495,
* "service-host": "url.czapps.llnl.gov",
* "rabbitmq-cert": "tls-cert.crt",
* "rabbitmq-outbound-queue": "test3",
* "rabbitmq-exchange": "ams-fanout",
* "rabbitmq-routing-key": "training"
* "rabbitmq-queue-physics": "test3",
* "rabbitmq-exchange-training": "ams-fanout",
* "rabbitmq-key-training": "training"
* }
*
* The TLS certificate must be generated by the user and the absolute paths are preferred.
Expand All @@ -1546,15 +1549,15 @@ class RMQPublisher
* Therefore, we have two threads per MPI rank.
*
* Here, RMQInterface::publish() has access to internal RabbitMQ channels and can publish the message
* on the outbound queue (rabbitmq-outbound-queue in the JSON configuration).
* on the outbound queue (rabbitmq-queue-physics in the JSON configuration).
* Note that storing data like that is much faster than with writing files as a call to RabbitMQDB::store()
* is virtually free, the actual data sending part is taking place in a thread and does not slow down
* the main simulation (MPI).
*
* 2. Consuming data: The inbound queue (rabbitmq-inbound-queue in the JSON configuration) is the queue for incoming data. The
* RMQConsumer is listening on that queue for messages. In the AMSLib approach, that queue is used to communicate
* updates to rank regarding the ML surrrogate model. RMQConsumer will automatically populate a std::vector with all
* messages received since the execution of AMS started.
* 2. Consuming data: The exchange and the training key (rabbitmq-exchange-training and rabbitmq-key-training
* in the JSON configuration) are for incoming data. The RMQConsumer is listening on that exchange/key for messages.
* In the AMSLib approach, that queue is used to communicate updates to rank regarding the ML surrrogate model.
* RMQConsumer will automatically populate a std::vector with all messages received since the execution of AMS started.
*
* Global note: Most calls dealing with RabbitMQ (to establish a RMQ connection, opening a channel, publish data etc)
* are asynchronous callbacks (similar to asyncio in Python or future in C++).
Expand Down Expand Up @@ -1660,6 +1663,7 @@ class RMQInterface
inputs.size(),
outputs.size())

CALIPER(CALI_MARK_BEGIN("STORE_RMQ");)
AMSMessage msg(_msg_tag, _rId, domain_name, num_elements, inputs, outputs);

if (!_publisher->connectionValid()) {
Expand All @@ -1676,6 +1680,7 @@ class RMQInterface
}
_publisher->publish(std::move(msg));
_msg_tag++;
CALIPER(CALI_MARK_END("STORE_RMQ");)
}

/**
Expand Down
2 changes: 2 additions & 0 deletions src/AMSlib/wf/hdf5db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ void hdf5DB::_store(size_t num_elements,
std::vector<TypeValue*>& outputs,
bool* predicate)
{
CALIPER(CALI_MARK_BEGIN("STORE_HDF5");)
if (isDouble<TypeValue>::default_value())
HDType = H5T_NATIVE_DOUBLE;
else
Expand Down Expand Up @@ -180,6 +181,7 @@ void hdf5DB::_store(size_t num_elements,
}

totalElements += num_elements;
CALIPER(CALI_MARK_END("STORE_HDF5");)
}


Expand Down
4 changes: 4 additions & 0 deletions src/AMSlib/wf/rmqdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,7 @@ unsigned RMQPublisherHandler::unacknowledged() const

void RMQPublisherHandler::publish(AMSMessage&& msg)
{
CALIPER(CALI_MARK_BEGIN("RMQ_PUBLISH");)
{
const std::lock_guard<std::mutex> lock(_mutex);
_messages.push_back(msg);
Expand Down Expand Up @@ -724,6 +725,7 @@ void RMQPublisherHandler::publish(AMSMessage&& msg)
msg.id())
}
_nb_msg++;
CALIPER(CALI_MARK_END("RMQ_PUBLISH");)
}

void RMQPublisherHandler::onReady(AMQP::TcpConnection* connection)
Expand Down Expand Up @@ -982,6 +984,7 @@ bool RMQInterface::connect(std::string rmq_name,

void RMQInterface::restartPublisher()
{
CALIPER(CALI_MARK_BEGIN("RMQ_RESTART_PUBLISHER");)
std::vector<AMSMessage> messages = _publisher->getMsgBuffer();

AMSMessage& msg_min =
Expand All @@ -1008,6 +1011,7 @@ void RMQInterface::restartPublisher()
_rId, *_address, _cacert, _queue_sender, std::move(messages));
_publisher_thread = std::thread([&]() { _publisher->start(); });
connected = true;
CALIPER(CALI_MARK_END("RMQ_RESTART_PUBLISHER");)
}

void RMQInterface::close()
Expand Down
6 changes: 3 additions & 3 deletions tests/AMSlib/json_configs/rmq.json.in
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
"rabbitmq-user": "",
"rabbitmq-vhost": "",
"rabbitmq-cert": "",
"rabbitmq-outbound-queue": "",
"rabbitmq-exchange": "",
"rabbitmq-routing-key": ""
"rabbitmq-queue-physics": "",
"rabbitmq-exchange-training": "",
"rabbitmq-key-training": ""
},
"update_surrogate": false
},
Expand Down
2 changes: 1 addition & 1 deletion tests/AMSlib/verify_rmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def verify(
port = rmq_json["service-port"]
user = rmq_json["rabbitmq-user"]
password = rmq_json["rabbitmq-password"]
queue = rmq_json["rabbitmq-outbound-queue"]
queue = rmq_json["rabbitmq-queue-physics"]
cert = None
if "rabbitmq-cert" in rmq_json:
cert = rmq_json["rabbitmq-cert"]
Expand Down