Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partially fix issue #37 #51

Merged
merged 2 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,37 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"

[project]
name = "ams-wf"
version = "1.0"
requires-python = ">=3.9"
classifiers = [
"Development Status :: 3 - Alpha",
"Operating System :: POSIX :: Linux",
"Programming Language :: Python",
"Programming Language :: Python :: 3 :: Only",
]
dependencies = [
"argparse",
"kosh>=3.0.1",
"pika>=1.3.0",
"numpy>=1.2.0"
]

[project.scripts]
AMSBroker = "ams_wf.AMSBroker:main"
AMSDBStage = "ams_wf.AMSDBStage:main"
AMSOrchestrator = "ams_wf.AMSOrchestrator:main"
AMSStore = "ams_wf.AMSStore:main"
AMSTrain = "ams_wf.AMSTrain:main"

[project.urls]
"Homepage" = "https://github.com/LLNL/AMS/"

[tool.setuptools]
packages = ["ams_wf", "ams"]

# Black formatting
[tool.black]
line-length = 120
Expand Down
5 changes: 3 additions & 2 deletions src/AMSWorkflow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ add_subdirectory(ams_wf)
add_subdirectory(ams)

configure_file("setup.py" "${CMAKE_CURRENT_BINARY_DIR}/setup.py" COPYONLY)
configure_file("${CMAKE_HOME_DIRECTORY}/pyproject.toml" "${CMAKE_CURRENT_BINARY_DIR}/pyproject.toml" COPYONLY)

file(GLOB_RECURSE pyfiles *.py ams_wf/*.py ams/*.py)

Expand All @@ -15,8 +16,8 @@ else()
set(_pip_args "--user")
endif()

message(WARNING "AMS Python Source files are ${pyfiles}")
message(WARNING "AMS Python built cmd is : ${Python_EXECUTABLE} -m pip install ${_pip_args} ${AMS_PY_APP}")
message(STATUS "AMS Python Source files are ${pyfiles}")
message(STATUS "AMS Python built cmd is : ${Python_EXECUTABLE} -m pip install ${_pip_args} ${AMS_PY_APP}")

add_custom_target(PyAMS ALL
COMMAND ${Python_EXECUTABLE} -m pip install ${_pip_args} ${AMS_PY_APP}
Expand Down
44 changes: 34 additions & 10 deletions src/AMSWorkflow/ams/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing import Callable, List, Tuple
import struct
import signal
import os

import numpy as np

Expand Down Expand Up @@ -92,10 +93,15 @@ class Task(ABC):
the staging mechanism.
"""

def __init__(self):
self.statistics = {"datasize" : 0, "duration" : 0}

@abstractmethod
def __call__(self):
pass

def stats(self):
return self.statistics

class ForwardTask(Task):
"""
Expand All @@ -112,7 +118,7 @@ def __init__(self, i_queue, o_queue, callback):
"""
initializes a ForwardTask class with the queues and the callback.
"""

super().__init__()
if not isinstance(callback, Callable):
raise TypeError(f"{callback} argument is not Callable")

Expand Down Expand Up @@ -142,6 +148,7 @@ def __call__(self):
the output to the output queue. In the case of receiving a 'termination' messages informs
the tasks waiting on the output queues about the terminations and returns from the function.
"""
start = time.time()

while True:
# This is a blocking call
Expand All @@ -152,9 +159,14 @@ def __call__(self):
elif item.is_process():
inputs, outputs = self._action(item.data())
self.o_queue.put(QueueMessage(MessageType.Process, DataBlob(inputs, outputs)))
self.statistics["datasize"] += (inputs.nbytes + outputs.nbytes)
elif item.is_new_model():
# This is not handled yet
continue

end = time.time()
self.statistics["duration"] = end - start
print(f"Spend {end - start} at {self.__class__.__name__} ({self.statistics})")
return


Expand All @@ -171,6 +183,7 @@ class FSLoaderTask(Task):
"""

def __init__(self, o_queue, loader, pattern):
super().__init__()
self.o_queue = o_queue
self.pattern = pattern
self.loader = loader
Expand All @@ -193,10 +206,13 @@ def __call__(self):
output_batches = np.array_split(output_data, num_batches)
for j, (i, o) in enumerate(zip(input_batches, output_batches)):
self.o_queue.put(QueueMessage(MessageType.Process, DataBlob(i, o)))
self.statistics["datasize"] += (input_data.nbytes + output_data.nbytes)

self.o_queue.put(QueueMessage(MessageType.Terminate, None))

end = time.time()
print(f"Spend {end - start} at {self.__class__.__name__}")
self.statistics["duration"] += (end - start)
print(f"Spend {end - start} at {self.__class__.__name__} ({self.statistics})")


class RMQMessage(object):
Expand Down Expand Up @@ -343,7 +359,8 @@ class RMQLoaderTask(Task):
prefetch_count: Number of messages prefected by RMQ (impact performance)
"""

def __init__(self, o_queue, credentials, cacert, rmq_queue, prefetch_count=1):
def __init__(self, o_queue, credentials, cacert, rmq_queue, prefetch_count = 1):
super().__init__()
self.o_queue = o_queue
self.credentials = credentials
self.cacert = cacert
Expand Down Expand Up @@ -387,6 +404,8 @@ def callback_message(self, ch, basic_deliver, properties, body):
input_batches = np.array_split(input_data, num_batches)
output_batches = np.array_split(output_data, num_batches)

self.statistics["datasize"] += (input_data.nbytes + output_data.nbytes)

for j, (i, o) in enumerate(zip(input_batches, output_batches)):
self.o_queue.put(QueueMessage(MessageType.Process, DataBlob(i, o)))

Expand All @@ -397,15 +416,14 @@ def handler(signum, frame):
print(f"Received SIGNUM={signum} for {name}[pid={pid}]: stopping process")
self.rmq_consumer.stop()
self.o_queue.put(QueueMessage(MessageType.Terminate, None))
print(f"Spend {self.total_time} at {self.__class__.__name__}")
self.statistics["duration"] += (end - start)
print(f"Spend {self.total_time} at {self.__class__.__name__} ({self.statistics})")

return handler

def __call__(self):
"""
Busy loop of reading all files matching the pattern and creating
'100' batches which will be pushed on the queue. Upon reading all files
the Task pushes a 'Terminate' message to the queue and returns.
Busy loop of consuming messages from RMQ queue
"""
self.rmq_consumer.run()

Expand All @@ -426,6 +444,7 @@ def __init__(self, i_queue, o_queue, writer_cls, out_dir):
initializes the writer task to read data from the i_queue write them using
the writer_cls and store the data in the out_dir.
"""
super().__init__()
self.data_writer_cls = writer_cls
self.out_dir = out_dir
self.i_queue = i_queue
Expand Down Expand Up @@ -470,7 +489,9 @@ def __call__(self):
break

end = time.time()
print(f"Spend {end - start} {total_bytes_written} at {self.__class__.__name__}")
self.statistics["datasize"] = total_bytes_written
self.statistics["duration"] += (end - start)
print(f"Spend {end - start} {total_bytes_written} at {self.__class__.__name__} ({self.statistics})")


class PushToStore(Task):
Expand All @@ -491,7 +512,7 @@ def __init__(self, i_queue, ams_config, db_path, store):
is not under db_path, it copies the file to this location and if store defined
it makes the kosh-store aware about the existence of the file.
"""

super().__init__()
self.ams_config = ams_config
self.i_queue = i_queue
self.dir = Path(db_path).absolute()
Expand Down Expand Up @@ -521,9 +542,12 @@ def __call__(self):

if self._store:
db_store.add_candidates([str(dest_file)])

self.statistics["datasize"] += os.stat(src_fn).st_size

end = time.time()
print(f"Spend {end - start} at {self.__class__.__name__}")
self.statistics["duration"] += (end - start)
print(f"Spend {end - start} at {self.__class__.__name__} ({self.statistics})")


class Pipeline(ABC):
Expand Down
22 changes: 1 addition & 21 deletions src/AMSWorkflow/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,4 @@

import setuptools

setuptools.setup(
name="ams-wf",
version="1.0",
packages=["ams_wf", "ams"],
install_requires=["argparse", "kosh>=3.0.1", "pika>=1.3.0", "numpy>=1.2.0"],
entry_points={
"console_scripts": [
"AMSBroker=ams_wf.AMSBroker:main",
"AMSDBStage=ams_wf.AMSDBStage:main",
"AMSOrchestrator=ams_wf.AMSOrchestrator:main",
"AMSStore=ams_wf.AMSStore:main",
"AMSTrain=ams_wf.AMSTrain:main",
]
},
classifiers=[
"Development Status :: 3 - Alpha",
"Operating System :: POSIX :: Linux",
"Programming Language :: Python",
"Programming Language :: Python :: 3 :: Only",
],
)
setuptools.setup()
2 changes: 1 addition & 1 deletion src/AMSlib/ml/surrogate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ class SurrogateModel

bool is_DeltaUQ() { return _is_DeltaUQ; }

void update(std::string new_path)
void update(const std::string& new_path)
{
/* This function updates the underlying torch model,
* with a new one pointed at location modelPath. The previous
Expand Down
1 change: 0 additions & 1 deletion tests/AMSlib/ams_update_model.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ int main(int argc, char *argv[])
char *data_type = argv[2];
char *zero_model = argv[3];
char *one_model = argv[4];
char *swap;

AMSResourceType resource = AMSResourceType::HOST;
if (use_device == 1) {
Expand Down
Loading