Skip to content

Commit 59a443a

Browse files
Pthreads manager version working
1 parent eff5c14 commit 59a443a

File tree

4 files changed

+27
-25
lines changed

4 files changed

+27
-25
lines changed

examples/basic/basic.cpp

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,8 @@
44
#include <random>
55
#include <hicr/backends/hwloc/memoryManager.hpp>
66
#include <hicr/backends/hwloc/topologyManager.hpp>
7-
#include <hicr/backends/mpi/instanceManager.hpp>
8-
#include <hicr/backends/mpi/communicationManager.hpp>
9-
#include <hicr/backends/mpi/memoryManager.hpp>
7+
#include <hicr/backends/pthreads/instanceManager.hpp>
8+
#include <hicr/backends/pthreads/communicationManager.hpp>
109
#include <hicr/backends/pthreads/computeManager.hpp>
1110
#include <hicr/backends/pthreads/communicationManager.hpp>
1211
#include <hicr/backends/boost/computeManager.hpp>
@@ -22,7 +21,7 @@ int main(int argc, char *argv[])
2221
// Creating HWloc topology object
2322
hwloc_topology_t hwlocTopologyObject;
2423

25-
// Reserving memory for hwloc
24+
// Reserving memory for hwloc
2625
hwloc_topology_init(&hwlocTopologyObject);
2726

2827
// Initializing host (CPU) topology manager
@@ -46,13 +45,15 @@ int main(int argc, char *argv[])
4645
// Grabbing first compute resource for computing incoming RPCs
4746
auto computeResource = *computeResources.begin();
4847

49-
// Getting MPI managers
50-
auto instanceManager = HiCR::backend::mpi::InstanceManager::createDefault(&argc, &argv);
51-
auto mpiCommunicationManager = std::make_shared<HiCR::backend::mpi::CommunicationManager>();
52-
auto mpiMemoryManager = std::make_shared<HiCR::backend::mpi::MemoryManager>();
53-
auto pthreadsComputeManager = std::make_shared<HiCR::backend::pthreads::ComputeManager>();
54-
auto boostComputeManager = std::make_shared<HiCR::backend::boost::ComputeManager>();
55-
auto hwlocMemoryManager = std::make_shared<HiCR::backend::hwloc::MemoryManager>(&hwlocTopologyObject);
48+
// Creating pthreads manager core
49+
HiCR::backend::pthreads::Core core(1);
50+
51+
// Getting managers
52+
auto instanceManager = std::make_shared<HiCR::backend::pthreads::InstanceManager>(core);
53+
auto communicationManager = std::make_shared<HiCR::backend::pthreads::CommunicationManager>(core);
54+
auto workerComputeManager = std::make_shared<HiCR::backend::pthreads::ComputeManager>();
55+
auto taskComputeManager = std::make_shared<HiCR::backend::boost::ComputeManager>();
56+
auto memoryManager = std::make_shared<HiCR::backend::hwloc::MemoryManager>(&hwlocTopologyObject);
5657

5758
// Creating taskr object
5859
nlohmann::json taskrConfig;
@@ -61,10 +62,10 @@ int main(int argc, char *argv[])
6162
taskrConfig["Minimum Active Task Workers"] = 1; // Have at least one worker active at all times
6263
taskrConfig["Service Worker Count"] = 1; // Have one dedicated service workers at all times to listen for incoming messages
6364
taskrConfig["Make Task Workers Run Services"] = false; // Workers will check for meta messages in between executions
64-
auto taskr = std::make_unique<taskr::Runtime>(boostComputeManager.get(), pthreadsComputeManager.get(), computeResources, taskrConfig);
65+
auto taskr = std::make_unique<taskr::Runtime>(taskComputeManager.get(), workerComputeManager.get(), computeResources, taskrConfig);
6566

6667
// Instantiate RPC Engine
67-
auto rpcEngine = std::make_shared<HiCR::frontend::RPCEngine>(*mpiCommunicationManager, *instanceManager, *mpiMemoryManager, *pthreadsComputeManager, bufferMemorySpace, computeResource);
68+
auto rpcEngine = std::make_shared<HiCR::frontend::RPCEngine>(*communicationManager, *instanceManager, *memoryManager, *workerComputeManager, bufferMemorySpace, computeResource);
6869

6970
// Initialize RPC Engine
7071
rpcEngine->initialize();
@@ -118,7 +119,7 @@ int main(int argc, char *argv[])
118119
// Checking I have the correct number of instances (one per replica)
119120
if (instanceManager->getInstances().size() != replicasRequired)
120121
{
121-
fprintf(stderr, "Error: %lu MPI instances provided, but %lu partition replicas were requested\n", instanceManager->getInstances().size(), replicasRequired);
122+
fprintf(stderr, "Error: %lu instances provided, but %lu partition replicas were requested\n", instanceManager->getInstances().size(), replicasRequired);
122123
instanceManager->abort(-1);
123124
}
124125

@@ -143,18 +144,18 @@ int main(int argc, char *argv[])
143144
// This allows for flexibility to choose in which devices to place the payload and coordination buffers
144145
for (const auto& edge : hllm.getDeployment().getEdges())
145146
{
146-
edge->setPayloadCommunicationManager(mpiCommunicationManager.get());
147-
edge->setPayloadMemoryManager(mpiMemoryManager.get());
147+
edge->setPayloadCommunicationManager(communicationManager.get());
148+
edge->setPayloadMemoryManager(memoryManager.get());
148149
edge->setPayloadMemorySpace(bufferMemorySpace);
149150

150-
edge->setCoordinationCommunicationManager(mpiCommunicationManager.get());
151-
edge->setCoordinationMemoryManager(mpiMemoryManager.get());
151+
edge->setCoordinationCommunicationManager(communicationManager.get());
152+
edge->setCoordinationMemoryManager(memoryManager.get());
152153
edge->setCoordinationMemorySpace(bufferMemorySpace);
153154
}
154155

155156
// Setting managers for partition-wise control messaging
156-
hllm.getDeployment().getControlBuffer().communicationManager = mpiCommunicationManager.get();
157-
hllm.getDeployment().getControlBuffer().memoryManager = mpiMemoryManager.get();
157+
hllm.getDeployment().getControlBuffer().communicationManager = communicationManager.get();
158+
hllm.getDeployment().getControlBuffer().memoryManager = memoryManager.get();
158159
hllm.getDeployment().getControlBuffer().memorySpace = bufferMemorySpace;
159160

160161
// Declaring the hLLM tasks for the application
@@ -167,7 +168,7 @@ int main(int argc, char *argv[])
167168

168169
// Create output
169170
responseOutput = request + std::string(" [Processed]");
170-
const auto responseMemSlot = mpiMemoryManager->registerLocalMemorySlot(bufferMemorySpace, responseOutput.data(), responseOutput.size() + 1);
171+
const auto responseMemSlot = memoryManager->registerLocalMemorySlot(bufferMemorySpace, responseOutput.data(), responseOutput.size() + 1);
171172

172173
// printf("[Basic Example] Returning response: '%s'\n", responseOutput.c_str());
173174
task->setOutput("Response", responseMemSlot);

include/hllm/edge/input.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ class Input final : public Base
2727
_dataChannelSizesBuffer = _edgeConfig.getCoordinationMemoryManager()->allocateLocalMemorySlot(_edgeConfig.getCoordinationMemorySpace(), sizesBufferSize);
2828

2929
// Allocating payload buffer as a local memory slot
30-
_dataChannelPayloadBuffer = _edgeConfig.getPayloadMemoryManager()->allocateLocalMemorySlot(_edgeConfig.getPayloadMemorySpace(), _edgeConfig.getBufferSize());
30+
auto payloadBufferSize = HiCR::channel::variableSize::SPSC::Consumer::getPayloadBufferSize(_edgeConfig.getBufferSize());
31+
_dataChannelPayloadBuffer = _edgeConfig.getPayloadMemoryManager()->allocateLocalMemorySlot(_edgeConfig.getPayloadMemorySpace(), payloadBufferSize);
3132

3233
///// Allocating additional local buffers required for the consumer medata channel
3334

include/hllm/edge/output.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ class Output final : public Base
7676
auto messageMetadataMemorySlot = _edgeConfig.getCoordinationMemoryManager()->registerLocalMemorySlot(_edgeConfig.getCoordinationMemorySpace(), (void*)&message.getMetadata(), sizeof(Message::metadata_t));
7777
_metadataChannel->push(messageMetadataMemorySlot);
7878

79-
_edgeConfig.getPayloadMemoryManager()->freeLocalMemorySlot(messagePayloadMemorySlot);
80-
_edgeConfig.getCoordinationMemoryManager()->freeLocalMemorySlot(messageMetadataMemorySlot);
79+
_edgeConfig.getPayloadMemoryManager()->deregisterLocalMemorySlot(messagePayloadMemorySlot);
80+
_edgeConfig.getCoordinationMemoryManager()->deregisterLocalMemorySlot(messageMetadataMemorySlot);
8181
}
8282

8383
private:

0 commit comments

Comments
 (0)