Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev #188

Merged
merged 25 commits into from
Dec 18, 2024
Merged

Dev #188

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
91c988e
Merge pull request #186 from ORNL/main
renan-souza Dec 13, 2024
9d7e003
adding llm example
renan-souza Dec 13, 2024
c82d3ba
Major changes to flowcept_torch
renan-souza Dec 16, 2024
b04fe08
Improving llm search exampel
renan-souza Dec 17, 2024
64f13b6
Changes to improve llm tracking
renan-souza Dec 17, 2024
1f7db22
Code reformat
renan-souza Dec 17, 2024
ed2794e
Changes to ml decorator tests
renan-souza Dec 17, 2024
a57d2a8
Installing mldev deps
renan-souza Dec 17, 2024
23ab18b
reducing tensorboard test sizes
renan-souza Dec 17, 2024
524bde1
Fixing multiple doc inserter init bug
renan-souza Dec 17, 2024
40f4952
Changes in DBDAO and modularizing llm wf
renan-souza Dec 17, 2024
8128207
Typo
renan-souza Dec 17, 2024
402f274
renaming example file
renan-souza Dec 17, 2024
d470ffc
Removing unused test
renan-souza Dec 18, 2024
e0481a8
Removing unused test
renan-souza Dec 18, 2024
1ff89dc
mongo singleton tests
renan-souza Dec 18, 2024
53f01af
mongo singleton tests
renan-souza Dec 18, 2024
849c4ad
Adding ml_dev to Dockerfile
renan-souza Dec 18, 2024
3083ca8
Fix dockerfile
renan-souza Dec 18, 2024
3fe02e1
Removing ml deps from Dockerfile. GitHub actions won't work due to im…
renan-souza Dec 18, 2024
2cd97a1
Adding simple perceptron example
renan-souza Dec 18, 2024
6a176fb
fix run examples
renan-souza Dec 18, 2024
3e74494
Removing prints from example
renan-souza Dec 18, 2024
badb390
Code reformat
renan-souza Dec 18, 2024
abd9d4f
Merge pull request #187 from ORNL/ml_loops
renan-souza Dec 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/create-release-n-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ jobs:
run: make services

- name: Test with pytest
run: pytest --ignore=tests/decorator_tests/ml_tests/llm_tests
run: pytest

- name: Test notebooks
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/run-tests-kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:
make tests

- name: Test notebooks
run: pytest --ignore=notebooks/zambeze.ipynb --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb
run: pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb

- name: Stop services
run: docker compose -f deployment/compose-kafka.yml down
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/run-tests-py11.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
export MQ_TYPE=kafka
export MQ_PORT=9092
# Ignoring heavy tests. They are executed with Kafka in another GH Action.
pytest --ignore=tests/decorator_tests/ml_tests --ignore=tests/adapters/test_tensorboard.py
pytest

- name: Stop services
run: docker compose -f deployment/compose-kafka.yml down
Expand Down
7 changes: 3 additions & 4 deletions .github/workflows/run-tests-simple.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: (Without Mongo) Unit, integration, and notebook tests
name: (Without Mongo) Simple Tests
on:
push:
schedule:
Expand Down Expand Up @@ -35,7 +35,7 @@ jobs:
run: python -m pip install --upgrade pip

- name: Test examples
run: bash .github/workflows/run_examples.sh examples false # with mongo
run: bash .github/workflows/run_examples.sh examples false # without mongo

- name: Install all dependencies
run: |
Expand All @@ -46,8 +46,7 @@ jobs:
run: pip list

- name: Test with pytest and redis
run: |
make tests
run: make tests

- name: Test notebooks with pytest and redis
run: pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore="notebooks/dask_from_CLI.ipynb" --ignore="notebooks/analytics.ipynb"
Expand Down
39 changes: 34 additions & 5 deletions .github/workflows/run_examples.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,31 @@
set -e
set -o pipefail

# Display usage/help message
usage() {
echo -e "\nUsage: $0 <examples_dir> <with_mongo>\n"
echo "Arguments:"
echo " examples_dir Path to the examples directory (Mandatory)"
echo " with_mongo Boolean flag (true/false) indicating whether to include MongoDB support (Mandatory)"
echo -e "\nExample:"
echo " $0 examples true"
echo " $0 examples false"
exit 1
}

# Check if the required arguments are provided
if [[ -z "$1" || -z "$2" ]]; then
echo "Error: Missing mandatory arguments!"
usage
fi

# Function to run tests with common steps
run_test() {
test_path="${EXAMPLES_DIR}/${1}_example.py"
test_type="$1"
with_mongo="$2"
echo "Test type=${test_type}"
echo "Running $test_path"
echo "Starting $test_path"

pip uninstall flowcept -y > /dev/null 2>&1 || true # Ignore errors during uninstall

Expand All @@ -29,17 +47,28 @@ run_test() {
elif [[ "$test_type" =~ "tensorboard" ]]; then
echo "Installing tensorboard"
pip install .[tensorboard] > /dev/null 2>&1
elif [[ "$test_type" =~ "single_layer_perceptron" ]]; then
echo "Installing ml_dev dependencies"
pip install .[ml_dev] > /dev/null 2>&1
elif [[ "$test_type" =~ "llm_complex" ]]; then
echo "Installing ml_dev dependencies"
pip install .[ml_dev]
echo "Defining python path for llm_complex..."
export PYTHONPATH=$PYTHONPATH:${EXAMPLES_DIR}/llm_complex
echo $PYTHONPATH
fi

# Run the test and capture output
echo "Running $test_path ..."
python "$test_path" | tee output.log

echo "Ok, ran $test_path."
# Check for errors in the output
if grep -iq "error" output.log; then
echo "Test failed! See output.log for details."
echo "Test $test_path failed! See output.log for details."
exit 1
fi

echo "Great, no errors to run $test_path."

# Clean up the log file
rm output.log
}
Expand All @@ -51,7 +80,7 @@ echo "Using examples directory: $EXAMPLES_DIR"
echo "With Mongo? ${WITH_MONGO}"

# Define the test cases
tests=("instrumented_simple" "instrumented_loop" "dask" "mlflow" "tensorboard")
tests=("instrumented_simple" "instrumented_loop" "dask" "mlflow" "tensorboard" "single_layer_perceptron" "llm_complex/llm_main")

# Iterate over the tests and run them
for test_ in "${tests[@]}"; do
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ test.py
time.txt
tmp/
deployment/data
**/*output_data*
examples/llm_complex/input_data
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ help:
@printf "\033[32mtests\033[0m run unit tests with pytest\n"
@printf "\033[32mtests-in-container\033[0m run unit tests with pytest inside Flowcept's container\n"
@printf "\033[32mtests-in-container-mongo\033[0m run unit tests inside container with MongoDB\n"
@printf "\033[32mtests-in-container-kafka\033[0m run unit tests inside container with Kafka and MongoDB\n"
@printf "\033[32mtests-all\033[0m run all unit tests with pytest, including long-running ones\n"
@printf "\033[32mtests-notebooks\033[0m test the notebooks using pytest\n"
@printf "\033[32mclean\033[0m remove cache directories and Sphinx build output\n"
Expand Down Expand Up @@ -43,6 +44,7 @@ clean:
find . -type d -name "mlruns" -exec rm -rf {} \; 2>/dev/null || true
find . -type d -name "__pycache__" -exec rm -rf {} \; 2>/dev/null || true
find . -type d -name "*tfevents*" -exec rm -rf {} \; 2>/dev/null || true
find . -type d -name "*output_data*" -exec rm -rf {} \; 2>/dev/null || true
# sphinx-build -M clean docs docs/_build This needs to be fixed.

# Build the HTML documentation using Sphinx
Expand Down Expand Up @@ -88,7 +90,7 @@ liveness:
# Run unit tests using pytest
.PHONY: tests
tests:
pytest --ignore=tests/decorator_tests/ml_tests/llm_tests
pytest

.PHONY: tests-notebooks
tests-notebooks:
Expand Down
2 changes: 1 addition & 1 deletion deployment/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ RUN export FLOWCEPT_SETTINGS_PATH=$(realpath resources/sample_settings.yaml) \
RUN conda create -n flowcept python=3.11.10 -y \
&& echo "conda activate flowcept" >> ~/.bashrc

# The following command is an overkill and will install many things you might not need. Please see pyproject.toml and modify deployment/Dockerfile in case you do not need to install "all" dependencies.
# The following command is an overkill and will install many things you might not need. Please modify this Dockerfile in case you do not need to install "all" dependencies.
RUN conda run -n flowcept pip install -e .[all]

CMD ["bash"]
2 changes: 1 addition & 1 deletion examples/dask_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def sum_list(values):

# Closing Dask and Flowcept
client.close() # This is to avoid generating errors
cluster.close() # This calls are needed closeouts to inform of workflow conclusion.
cluster.close() # This call is needed closeout to inform of workflow conclusion.

# Optionally: flowcept.stop()

Expand Down
56 changes: 56 additions & 0 deletions examples/instrumented_loop_unmanaged_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import multiprocessing
import random
from time import sleep

from flowcept import Flowcept, FlowceptLoop

if __name__ == '__main__': #

interceptor_id = Flowcept.start_instrumentation_interceptor()

event = multiprocessing.Event()
process1 = multiprocessing.Process(target=Flowcept.start_persistence, args=(interceptor_id, event))
process1.start()
sleep(1)
# Run loop
loop = FlowceptLoop(range(max_iterations := 3), workflow_id=interceptor_id)
for item in loop:
loss = random.random()
sleep(0.05)
print(item, loss)
# The following is optional, in case you want to capture values generated inside the loop.
loop.end_iter({"item": item, "loss": loss})

Flowcept.stop_instrumentation_interceptor()

event.set()
process1.join()

docs = Flowcept.db.query(filter={"workflow_id": interceptor_id})
for d in docs:
print(d)
# assert len(docs) == max_iterations+1 # The whole loop itself is a task

#
#
# @staticmethod
# def start_instrumentation_interceptor():
# instance = InstrumentationInterceptor.get_instance()
# instance_id = id(instance)
# instance.start(bundle_exec_id=instance_id)
# return instance_id
#
# @staticmethod
# def stop_instrumentation_interceptor():
# instance = InstrumentationInterceptor.get_instance()
# instance.stop()
#
# @staticmethod
# def start_persistence(interceptor_id, event):
# from flowcept.flowceptor.consumers.document_inserter import DocumentInserter
# inserter = DocumentInserter(
# check_safe_stops=True,
# bundle_exec_id=interceptor_id,
# ).start()
# event.wait()
# inserter.stop()
1 change: 1 addition & 0 deletions examples/instrumented_simple_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ def mult_two(n):
print(o2)
docs = Flowcept.db.query(filter={"workflow_id": Flowcept.current_workflow_id})
print(len(docs))
assert len(docs) == 2

149 changes: 149 additions & 0 deletions examples/llm_complex/llm_dataprep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
from time import time

import torch
import os

from torch.utils.data import Subset
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from datasets import load_dataset


# Define a function to yield tokens from the dataset
def yield_tokens(tokenizer, data_iter):
for item in data_iter:
if len(item["text"]):
yield tokenizer(item["text"])


# Define a function to process the raw text and convert it to tensors
def data_process(tokenizer, vocab, raw_text_iter):
data = [
torch.tensor(
[vocab[token] for token in tokenizer(item["text"])],
dtype=torch.long,
)
for item in raw_text_iter
]
return torch.cat(tuple(filter(lambda t: t.numel() > 0, data)))


def get_dataset_ref(campaign_id, train_data, val_data, test_data):
dataset_ref = f"{campaign_id}_train_shape_{train_data.shape}_val_shape_{val_data.shape}_test_shape_{test_data.shape}"
return dataset_ref

def get_wiki_text_dataset(data_dir):
# Load the WikiText2 dataset
t0 = time()
train_data = torch.load(os.path.join(data_dir, "train_data.tensor"))
val_data = torch.load(os.path.join(data_dir, "val_data.tensor"))
test_data = torch.load(os.path.join(data_dir, "test_data.tensor"))
t1 = time()
t_disk_load = t1 - t0

try:
if torch.cuda.is_available():
device = torch.device("gpu")
elif torch.backends.mps.is_available():
device = torch.device("mps")
else:
device = torch.device("cpu")

t2 = time()
t_device_available = t2 - t1
train_data = train_data.to(device)
val_data = val_data.to(device)
test_data = test_data.to(device)
t_gpu_load = time() - t2
except:
raise Exception("Couldn't send data to device")

print("Train data", train_data.shape)
print("Validation data", val_data.shape)
print("Test data", test_data.shape)
return (
train_data,
val_data,
test_data,
t_disk_load,
t_device_available,
t_gpu_load,
)

def save_workflow(ntokens, train_data, val_data, test_data, dataset_ref, subset_size=None, tokenizer_type=None, campaign_id=None):
from flowcept import WorkflowObject, Flowcept
config = {
"subset_size": subset_size,
"tokenizer_type": tokenizer_type
}
dataset_prep_wf = WorkflowObject()
dataset_prep_wf.used = config
dataset_prep_wf.campaign_id = campaign_id
dataset_prep_wf.name = "generate_wikitext_dataset"

dataset_prep_wf.generated = {
"ntokens": ntokens,
"dataset_ref": dataset_ref,
"train_data_shape": list(train_data.shape),
"val_data_shape": list(val_data.shape),
"test_data_shape": list(test_data.shape),
}
Flowcept.db.insert_or_update_workflow(dataset_prep_wf)
print(dataset_prep_wf)
return dataset_prep_wf.workflow_id, dataset_ref


def dataprep_workflow(data_dir="input_data",
tokenizer_type="basic_english", # spacy, moses, toktok, revtok, subword
subset_size=None,
campaign_id=None):

os.makedirs(data_dir, exist_ok=True)

print("Downloading dataset")
dataset = load_dataset("wikitext", "wikitext-2-v1")
print("Ok, now saving it into the current directory")
dataset.save_to_disk(os.path.join(data_dir, "wikitext-2-v1.data"))

test_dataset = dataset["test"]
train_dataset = dataset["train"]
validation_dataset = dataset["validation"]

if subset_size is not None and subset_size > 0:
test_dataset = Subset(test_dataset, range(subset_size))
train_dataset = Subset(train_dataset, range(subset_size))
validation_dataset = Subset(validation_dataset, range(subset_size))

# Build the vocabulary from the training dataset
tokenizer = get_tokenizer(tokenizer_type)
vocab = build_vocab_from_iterator(yield_tokens(tokenizer, train_dataset))
vocab.set_default_index(vocab["<unk>"])
ntokens = len(vocab)

# Process the train, validation, and test datasets
train_data = data_process(tokenizer, vocab, train_dataset)
val_data = data_process(tokenizer, vocab, validation_dataset)
test_data = data_process(tokenizer, vocab, test_dataset)

train_data_path = os.path.join(data_dir, "train_data.tensor")
val_data_path = os.path.join(data_dir, "val_data.tensor")
test_data_path = os.path.join(data_dir, "test_data.tensor")

torch.save(train_data, train_data_path)
torch.save(val_data, val_data_path)
torch.save(test_data, test_data_path)

print(f"Saved files in {data_dir}. Now running some asserts.")

train_data_loaded = torch.load(train_data_path)
val_data_loaded = torch.load(val_data_path)
test_data_loaded = torch.load(test_data_path)

assert all(train_data == train_data_loaded)
assert all(val_data == val_data_loaded)
assert all(test_data == test_data_loaded)

dataset_ref = get_dataset_ref(campaign_id, train_data, val_data, test_data)
wf_id = save_workflow(ntokens, train_data, val_data, test_data, dataset_ref, subset_size=subset_size, tokenizer_type=tokenizer_type, campaign_id=campaign_id)
return wf_id, dataset_ref, ntokens

Loading
Loading