diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d3cba5b9..5a95c6a3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -316,6 +316,10 @@ jobs: dora stop --name ci-rust-dynamic --grace-duration 5s dora destroy + cargo build -p receive_data --release + # Run tests + dora start tests/queue_size_latest_data_rust/dataflow.yaml + - name: "Test CLI (Python)" timeout-minutes: 30 # fail-fast by using bash shell explictly @@ -345,6 +349,9 @@ jobs: dora stop --name ci-python-dynamic --grace-duration 5s dora destroy + # Run tests + dora start ../tests/queue_size_latest_data_python/dataflow.yaml + - name: "Test CLI (C)" timeout-minutes: 30 # fail-fast by using bash shell explictly diff --git a/Cargo.lock b/Cargo.lock index e8644be9..02e3fd1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7902,6 +7902,15 @@ dependencies = [ "tungstenite 0.20.1", ] +[[package]] +name = "receive_data" +version = "0.3.6" +dependencies = [ + "chrono", + "dora-node-api", + "eyre", +] + [[package]] name = "redox_syscall" version = "0.3.5" diff --git a/Cargo.toml b/Cargo.toml index ab2948a4..2df1916b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ members = [ "examples/multiple-daemons/node", "examples/multiple-daemons/operator", "examples/multiple-daemons/sink", + "tests/queue_size_latest_data_rust/receive_data", "libraries/arrow-convert", "libraries/communication-layer/*", "libraries/core", diff --git a/examples/camera/.gitignore b/examples/camera/.gitignore new file mode 100644 index 00000000..eede66d8 --- /dev/null +++ b/examples/camera/.gitignore @@ -0,0 +1 @@ +*.pt \ No newline at end of file diff --git a/examples/camera/README.md b/examples/camera/README.md new file mode 100644 index 00000000..1899f8bb --- /dev/null +++ b/examples/camera/README.md @@ -0,0 +1,9 @@ +# Quick example on how to use a camera + +Make sure to have, dora and pip installed. + +```bash +dora up +dora build dataflow.yml +dora start dataflow.yml +``` diff --git a/examples/camera/dataflow.yml b/examples/camera/dataflow.yml new file mode 100644 index 00000000..35a589da --- /dev/null +++ b/examples/camera/dataflow.yml @@ -0,0 +1,20 @@ +nodes: + - id: camera + build: pip install ../../node-hub/opencv-video-capture + path: opencv-video-capture + inputs: + tick: dora/timer/millis/20 + outputs: + - image + env: + CAPTURE_PATH: 0 + IMAGE_WIDTH: 640 + IMAGE_HEIGHT: 480 + + - id: plot + build: pip install ../../node-hub/opencv-plot + path: opencv-plot + inputs: + image: + source: camera/image + queue_size: 1 diff --git a/examples/camera/run.rs b/examples/camera/run.rs new file mode 100644 index 00000000..b575234d --- /dev/null +++ b/examples/camera/run.rs @@ -0,0 +1,98 @@ +use dora_core::{get_pip_path, get_python_path, run}; +use dora_tracing::set_up_tracing; +use eyre::{bail, ContextCompat, WrapErr}; +use std::path::Path; + +#[tokio::main] +async fn main() -> eyre::Result<()> { + set_up_tracing("python-dataflow-runner")?; + + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + std::env::set_current_dir(root.join(file!()).parent().unwrap()) + .wrap_err("failed to set working dir")?; + + run( + get_python_path().context("Could not get python binary")?, + &["-m", "venv", "../.env"], + None, + ) + .await + .context("failed to create venv")?; + let venv = &root.join("examples").join(".env"); + std::env::set_var( + "VIRTUAL_ENV", + venv.to_str().context("venv path not valid unicode")?, + ); + let orig_path = std::env::var("PATH")?; + // bin folder is named Scripts on windows. + // 🤦‍♂️ See: https://github.com/pypa/virtualenv/commit/993ba1316a83b760370f5a3872b3f5ef4dd904c1 + let venv_bin = if cfg!(windows) { + venv.join("Scripts") + } else { + venv.join("bin") + }; + + if cfg!(windows) { + std::env::set_var( + "PATH", + format!( + "{};{orig_path}", + venv_bin.to_str().context("venv path not valid unicode")? + ), + ); + } else { + std::env::set_var( + "PATH", + format!( + "{}:{orig_path}", + venv_bin.to_str().context("venv path not valid unicode")? + ), + ); + } + + run( + get_pip_path().context("Could not get pip binary")?, + &["install", "maturin"], + Some(venv), + ) + .await + .context("pip install maturin failed")?; + + run( + "maturin", + &["develop"], + Some(&root.join("apis").join("python").join("node")), + ) + .await + .context("maturin develop failed")?; + + let dataflow = Path::new("dataflow.yml"); + run_dataflow(dataflow).await?; + + Ok(()) +} + +async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + + // First build the dataflow (install requirements) + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--").arg("build").arg(dataflow); + if !cmd.status().await?.success() { + bail!("failed to run dataflow"); + }; + + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--") + .arg("daemon") + .arg("--run-dataflow") + .arg(dataflow); + if !cmd.status().await?.success() { + bail!("failed to run dataflow"); + }; + Ok(()) +} diff --git a/examples/vlm/.gitignore b/examples/vlm/.gitignore new file mode 100644 index 00000000..eede66d8 --- /dev/null +++ b/examples/vlm/.gitignore @@ -0,0 +1 @@ +*.pt \ No newline at end of file diff --git a/examples/vlm/README.md b/examples/vlm/README.md new file mode 100644 index 00000000..ab6eec8c --- /dev/null +++ b/examples/vlm/README.md @@ -0,0 +1 @@ +# Quick example on using a VLM with dora-rs diff --git a/examples/vlm/dataflow.yml b/examples/vlm/dataflow.yml new file mode 100644 index 00000000..ac12cce7 --- /dev/null +++ b/examples/vlm/dataflow.yml @@ -0,0 +1,31 @@ +nodes: + - id: camera + build: pip install ../../node-hub/opencv-video-capture + path: opencv-video-capture + inputs: + tick: dora/timer/millis/20 + outputs: + - image + env: + CAPTURE_PATH: 0 + IMAGE_WIDTH: 640 + IMAGE_HEIGHT: 480 + + - id: dora-qwenvl + build: pip install -e ../../node-hub/dora-qwenvl + path: dora-qwenvl + inputs: + image: + source: camera/image + queue_size: 1 + outputs: + - text + + - id: plot + build: pip install ../../node-hub/opencv-plot + path: opencv-plot + inputs: + image: + source: camera/image + queue_size: 1 + text: dora-qwenvl/text diff --git a/examples/vlm/run.rs b/examples/vlm/run.rs new file mode 100644 index 00000000..b575234d --- /dev/null +++ b/examples/vlm/run.rs @@ -0,0 +1,98 @@ +use dora_core::{get_pip_path, get_python_path, run}; +use dora_tracing::set_up_tracing; +use eyre::{bail, ContextCompat, WrapErr}; +use std::path::Path; + +#[tokio::main] +async fn main() -> eyre::Result<()> { + set_up_tracing("python-dataflow-runner")?; + + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + std::env::set_current_dir(root.join(file!()).parent().unwrap()) + .wrap_err("failed to set working dir")?; + + run( + get_python_path().context("Could not get python binary")?, + &["-m", "venv", "../.env"], + None, + ) + .await + .context("failed to create venv")?; + let venv = &root.join("examples").join(".env"); + std::env::set_var( + "VIRTUAL_ENV", + venv.to_str().context("venv path not valid unicode")?, + ); + let orig_path = std::env::var("PATH")?; + // bin folder is named Scripts on windows. + // 🤦‍♂️ See: https://github.com/pypa/virtualenv/commit/993ba1316a83b760370f5a3872b3f5ef4dd904c1 + let venv_bin = if cfg!(windows) { + venv.join("Scripts") + } else { + venv.join("bin") + }; + + if cfg!(windows) { + std::env::set_var( + "PATH", + format!( + "{};{orig_path}", + venv_bin.to_str().context("venv path not valid unicode")? + ), + ); + } else { + std::env::set_var( + "PATH", + format!( + "{}:{orig_path}", + venv_bin.to_str().context("venv path not valid unicode")? + ), + ); + } + + run( + get_pip_path().context("Could not get pip binary")?, + &["install", "maturin"], + Some(venv), + ) + .await + .context("pip install maturin failed")?; + + run( + "maturin", + &["develop"], + Some(&root.join("apis").join("python").join("node")), + ) + .await + .context("maturin develop failed")?; + + let dataflow = Path::new("dataflow.yml"); + run_dataflow(dataflow).await?; + + Ok(()) +} + +async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + + // First build the dataflow (install requirements) + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--").arg("build").arg(dataflow); + if !cmd.status().await?.success() { + bail!("failed to run dataflow"); + }; + + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--") + .arg("daemon") + .arg("--run-dataflow") + .arg(dataflow); + if !cmd.status().await?.success() { + bail!("failed to run dataflow"); + }; + Ok(()) +} diff --git a/node-hub/dora-keyboard/pyproject.toml b/node-hub/dora-keyboard/pyproject.toml index 29b9308a..83b7c615 100644 --- a/node-hub/dora-keyboard/pyproject.toml +++ b/node-hub/dora-keyboard/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dora-keyboard" -version = "0.3.5" +version = "0.3.6" authors = [ "Haixuan Xavier Tao ", "Enzo Le Van ", @@ -13,7 +13,7 @@ readme = "README.md" packages = [{ include = "dora_keyboard" }] [tool.poetry.dependencies] -dora-rs = "0.3.5" +dora-rs = "^0.3.6" numpy = "< 2.0.0" pyarrow = ">= 5.0.0" pynput = "^1.7.6" diff --git a/node-hub/dora-qwenvl/README.md b/node-hub/dora-qwenvl/README.md new file mode 100644 index 00000000..88f4e564 --- /dev/null +++ b/node-hub/dora-qwenvl/README.md @@ -0,0 +1,3 @@ +# Dora QwenVL2 node + +Experimental node for using a VLM within dora. diff --git a/node-hub/dora-qwenvl/dora_qwenvl/__init__.py b/node-hub/dora-qwenvl/dora_qwenvl/__init__.py new file mode 100644 index 00000000..ac3cbef9 --- /dev/null +++ b/node-hub/dora-qwenvl/dora_qwenvl/__init__.py @@ -0,0 +1,11 @@ +import os + +# Define the path to the README file relative to the package directory +readme_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "README.md") + +# Read the content of the README file +try: + with open(readme_path, "r", encoding="utf-8") as f: + __doc__ = f.read() +except FileNotFoundError: + __doc__ = "README file not found." diff --git a/node-hub/dora-qwenvl/dora_qwenvl/main.py b/node-hub/dora-qwenvl/dora_qwenvl/main.py new file mode 100644 index 00000000..65112584 --- /dev/null +++ b/node-hub/dora-qwenvl/dora_qwenvl/main.py @@ -0,0 +1,145 @@ +import os +from dora import Node +from transformers import Qwen2VLForConditionalGeneration, AutoProcessor +from qwen_vl_utils import process_vision_info +import numpy as np +import pyarrow as pa +from PIL import Image + +DEFAULT_PATH = "Qwen/Qwen2-VL-2B-Instruct" +CUSTOM_MODEL_PATH = os.getenv("CUSTOM_MODEL_PATH", DEFAULT_PATH) +DEFAULT_QUESTION = os.getenv( + "DEFAULT_QUESTION", + "Describe this image", +) + +# default: Load the model on the available device(s) +model = Qwen2VLForConditionalGeneration.from_pretrained( + CUSTOM_MODEL_PATH, + torch_dtype="auto", + device_map="auto", + attn_implementation="flash_attention_2", +) + +# default processer +processor = AutoProcessor.from_pretrained(DEFAULT_PATH) + + +def generate(image: np.array, question): + """ + Generate the response to the question given the image using Qwen2 model. + """ + image = Image.fromarray(image) + + messages = [ + { + "role": "user", + "content": [ + { + "type": "image", + "image": image, + }, + {"type": "text", "text": question}, + ], + } + ] + + # Preparation for inference + text = processor.apply_chat_template( + messages, tokenize=False, add_generation_prompt=True + ) + image_inputs, video_inputs = process_vision_info(messages) + inputs = processor( + text=[text], + images=image_inputs, + videos=video_inputs, + padding=True, + return_tensors="pt", + ) + inputs = inputs.to("cuda") + + # Inference: Generation of the output + generated_ids = model.generate(**inputs, max_new_tokens=128) + generated_ids_trimmed = [ + out_ids[len(in_ids) :] + for in_ids, out_ids in zip(inputs.input_ids, generated_ids) + ] + output_text = processor.batch_decode( + generated_ids_trimmed, + skip_special_tokens=True, + clean_up_tokenization_spaces=False, + ) + return output_text[0] + + +def main(): + node = Node() + + question = DEFAULT_QUESTION + frame = None + pa.array([]) # initialize pyarrow array + + for event in node: + event_type = event["type"] + + if event_type == "INPUT": + event_id = event["id"] + + if event_id == "image": + storage = event["value"] + metadata = event["metadata"] + encoding = metadata["encoding"] + width = metadata["width"] + height = metadata["height"] + + if encoding == "bgr8": + channels = 3 + storage_type = np.uint8 + elif encoding == "rgb8": + channels = 3 + storage_type = np.uint8 + else: + raise RuntimeError(f"Unsupported image encoding: {encoding}") + + frame = ( + storage.to_numpy() + .astype(storage_type) + .reshape((height, width, channels)) + ) + if encoding == "bgr8": + frame = frame[:, :, ::-1] # OpenCV image (BGR to RGB) + elif encoding == "rgb8": + pass + else: + raise RuntimeError(f"Unsupported image encoding: {encoding}") + + elif event_id == "tick": + if frame is None: + continue + response = generate(frame, question) + node.send_output( + "tick", + pa.array([response]), + metadata, + ) + + elif event_id == "text": + text = event["value"][0].as_py() + if text != "": + question = text + if frame is None: + continue + # set the max number of tiles in `max_num` + response = generate(frame, question) + node.send_output( + "text", + pa.array([response]), + metadata, + ) + + elif event_type == "ERROR": + raise RuntimeError(event["error"]) + + +if __name__ == "__main__": + main() diff --git a/node-hub/dora-qwenvl/pyproject.toml b/node-hub/dora-qwenvl/pyproject.toml new file mode 100644 index 00000000..d5ca3b7b --- /dev/null +++ b/node-hub/dora-qwenvl/pyproject.toml @@ -0,0 +1,30 @@ +[tool.poetry] +name = "dora-qwenvl" +version = "0.3.6-rc0" +authors = [ + "Haixuan Xavier Tao ", + "Enzo Le Van ", +] +description = "Dora Node for VLM" +readme = "README.md" + +packages = [{ include = "dora_qwenvl" }] + +[tool.poetry.dependencies] +python = "^3.7" +dora-rs = "^0.3.6" +numpy = "< 2.0.0" +torch = "^2.4.0" +torchvision = "^0.19" +transformers = { git = "https://github.com/huggingface/transformers" } +qwen-vl-utils = "^0.0.2" +accelerate = "^0.33" +flash_attn = "^2.6.1" + + +[tool.poetry.scripts] +dora-qwenvl = "dora_qwenvl.main:main" + +[build-system] +requires = ["poetry-core>=1.8.0"] +build-backend = "poetry.core.masonry.api" diff --git a/node-hub/dora-qwenvl/tests/test_dora_qwenvl.py b/node-hub/dora-qwenvl/tests/test_dora_qwenvl.py new file mode 100644 index 00000000..4105676f --- /dev/null +++ b/node-hub/dora-qwenvl/tests/test_dora_qwenvl.py @@ -0,0 +1,9 @@ +import pytest + + +def test_import_main(): + from dora_qwenvl.main import main + + # Check that everything is working, and catch dora Runtime Exception as we're not running in a dora dataflow. + with pytest.raises(RuntimeError): + main() diff --git a/node-hub/dora-rerun/src/main.rs b/node-hub/dora-rerun/src/main.rs index a3b9b895..b16272d9 100644 --- a/node-hub/dora-rerun/src/main.rs +++ b/node-hub/dora-rerun/src/main.rs @@ -104,7 +104,7 @@ fn main() -> Result<()> { rec.log(id.as_str(), &image) .context("could not log image")?; - } else if id.as_str().contains("textlog") { + } else if id.as_str().contains("text") { let buffer: StringArray = data.to_data().into(); buffer.iter().try_for_each(|string| -> Result<()> { if let Some(str) = string { diff --git a/node-hub/llama-factory-recorder/README.md b/node-hub/llama-factory-recorder/README.md new file mode 100644 index 00000000..9279a6d6 --- /dev/null +++ b/node-hub/llama-factory-recorder/README.md @@ -0,0 +1,3 @@ +# Dora Llama factory recorder + +Experimental node for recording for training llama based model. diff --git a/node-hub/llama-factory-recorder/llama_factory_recorder/__init__.py b/node-hub/llama-factory-recorder/llama_factory_recorder/__init__.py new file mode 100644 index 00000000..ac3cbef9 --- /dev/null +++ b/node-hub/llama-factory-recorder/llama_factory_recorder/__init__.py @@ -0,0 +1,11 @@ +import os + +# Define the path to the README file relative to the package directory +readme_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "README.md") + +# Read the content of the README file +try: + with open(readme_path, "r", encoding="utf-8") as f: + __doc__ = f.read() +except FileNotFoundError: + __doc__ = "README file not found." diff --git a/node-hub/llama-factory-recorder/llama_factory_recorder/main.py b/node-hub/llama-factory-recorder/llama_factory_recorder/main.py new file mode 100644 index 00000000..aa8deb10 --- /dev/null +++ b/node-hub/llama-factory-recorder/llama_factory_recorder/main.py @@ -0,0 +1,193 @@ +import os +import json +from dora import Node +import numpy as np +import pyarrow as pa +from PIL import Image +from pathlib import Path + +DEFAULT_QUESTION = os.getenv( + "DEFAULT_QUESTION", + "Describe this image", +) +ENTRY_NAME = "dora_demo" +LLAMA_FACTORY_ROOT_PATH = Path(os.getenv("LLAMA_FACTORY_ROOT_PATH")) / "data" + + +# If JSON already exists, append incremental suffix to avoid overwriting +if (LLAMA_FACTORY_ROOT_PATH / ENTRY_NAME).exists(): + i = 1 + while (LLAMA_FACTORY_ROOT_PATH / f"{ENTRY_NAME}_{i}.json").exists(): + i += 1 + ENTRY_NAME = f"{ENTRY_NAME}_{i}" + + +DEFAULT_RECORD_IMAGE_ROOT_PATH = LLAMA_FACTORY_ROOT_PATH / ENTRY_NAME +DEFAULT_RECORD_JSON_PATH = LLAMA_FACTORY_ROOT_PATH / (ENTRY_NAME + ".json") + + +def write_dict_to_json(file_path, key: str, new_data): + """ + Writes a dictionary to a JSON file. If the file already contains a list of entries, + the new data will be appended to that list. Otherwise, it will create a new list. + + Parameters: + - file_path: str, the path to the JSON file. + - new_data: dict, the dictionary to add to the JSON file. + """ + try: + # Open the JSON file and load its content + with open(file_path, "r+", encoding="utf-8") as file: + try: + data = json.load(file) + except json.JSONDecodeError: + data = {} + + data[key] = new_data + # Write the updated data back to the file + file.seek(0) + json.dump(data, file, indent=4, ensure_ascii=False) + file.truncate() + + except FileNotFoundError: + # If the file doesn't exist, create it and write the new data as a list + with open(file_path, "w", encoding="utf-8") as file: + json.dump({key: new_data}, file, indent=4, ensure_ascii=False) + + +write_dict_to_json( + LLAMA_FACTORY_ROOT_PATH / "dataset_info.json", + ENTRY_NAME, + { + "file_name": ENTRY_NAME + ".json", + "formatting": "sharegpt", + "columns": {"messages": "messages", "images": "images"}, + "tags": { + "role_tag": "role", + "content_tag": "content", + "user_tag": "user", + "assistant_tag": "assistant", + }, + }, +) + + +def save_image_and_add_to_json( + image_array, root_path, llama_root_path, jsonl_file, messages +): + """ + Saves an image from a NumPy array and adds a new JSON object as a line to a JSONL file. + The function generates a sequential numeric image filename starting from 0 and + follows the provided template structure. + + Parameters: + - image_array: numpy.ndarray, the image data as a NumPy array. + - root_path: str, the root directory where the image will be saved. + - jsonl_file: str, the path to the JSONL file. + - messages: list of dicts, each containing 'content' and 'role'. + + The image is saved as a PNG file, and the JSONL entry includes the 'messages' and 'images' keys. + """ + + # Create the root directory if it doesn't exist + os.makedirs(llama_root_path / root_path, exist_ok=True) + + # Get the current image ID by counting existing files + image_id = len( + [ + name + for name in os.listdir(llama_root_path / root_path) + if os.path.isfile(os.path.join(llama_root_path / root_path, name)) + ] + ) + + # Define the image filename + image_filename = f"{image_id}.png" + image_path = os.path.join(root_path, image_filename) + + # Save the image + image = Image.fromarray(image_array) + image.save(llama_root_path / image_path) + + # Create the JSON entry with 'messages' and 'images' + new_entry = {"messages": messages, "images": [image_path]} + + # Add the entry to the JSONL file with UTF-8 encoding + with open(jsonl_file, "a", encoding="utf-8") as f: + json_line = json.dumps(new_entry) + f.write(json_line + "\n") + + +def main(): + pa.array([]) # initialize pyarrow array + node = Node() + + question = DEFAULT_QUESTION + frame = None + + for event in node: + event_type = event["type"] + + if event_type == "INPUT": + event_id = event["id"] + + if event_id == "image": + storage = event["value"] + metadata = event["metadata"] + encoding = metadata["encoding"] + width = metadata["width"] + height = metadata["height"] + + if encoding == "bgr8": + channels = 3 + storage_type = np.uint8 + elif encoding == "rgb8": + channels = 3 + storage_type = np.uint8 + else: + raise RuntimeError(f"Unsupported image encoding: {encoding}") + + frame = ( + storage.to_numpy() + .astype(storage_type) + .reshape((height, width, channels)) + ) + if encoding == "bgr8": + frame = frame[:, :, ::-1] # OpenCV image (BGR to RGB) + elif encoding == "rgb8": + pass + else: + raise RuntimeError(f"Unsupported image encoding: {encoding}") + + elif event_id == "text": + text = event["value"][0].as_py() + if text != "": + question = text + elif event_id == "ground_truth": + if frame is None: + continue + ground_truth = event["value"][0].as_py() + + messages = [ + {"content": "" + question, "role": "user"}, + { + "content": ground_truth, + "role": "assistant", + }, + ] + + save_image_and_add_to_json( + image_array=frame, + root_path=ENTRY_NAME, + llama_root_path=LLAMA_FACTORY_ROOT_PATH, + jsonl_file=DEFAULT_RECORD_JSON_PATH, + messages=messages, + ) + node.send_output( + "text", + pa.array([ground_truth]), + metadata, + ) + + elif event_type == "ERROR": + raise RuntimeError(event["error"]) diff --git a/node-hub/llama-factory-recorder/pyproject.toml b/node-hub/llama-factory-recorder/pyproject.toml new file mode 100644 index 00000000..34a55c0a --- /dev/null +++ b/node-hub/llama-factory-recorder/pyproject.toml @@ -0,0 +1,23 @@ +[tool.poetry] +name = "llama-factory-recorder" +version = "0.3.6-rc0" +authors = [ + "Haixuan Xavier Tao ", + "Enzo Le Van ", +] +description = "Dora Node for VLM" +readme = "README.md" + +packages = [{ include = "llama_factory_recorder" }] + +[tool.poetry.dependencies] +python = "^3.7" +dora-rs = "^0.3.6" +pillow = "^10.4.0" + +[tool.poetry.scripts] +llama-factory-recorder = "llama_factory_recorder.main:main" + +[build-system] +requires = ["poetry-core>=1.8.0"] +build-backend = "poetry.core.masonry.api" diff --git a/node-hub/llama-factory-recorder/tests/test_llama_factory_recorder.py b/node-hub/llama-factory-recorder/tests/test_llama_factory_recorder.py new file mode 100644 index 00000000..34dee34c --- /dev/null +++ b/node-hub/llama-factory-recorder/tests/test_llama_factory_recorder.py @@ -0,0 +1,9 @@ +import pytest + + +def test_import_main(): + from llama_factory_recorder.main import main + + # Check that everything is working, and catch dora Runtime Exception as we're not running in a dora dataflow. + with pytest.raises(RuntimeError): + main() diff --git a/tests/queue_size_latest_data_python/dataflow.yaml b/tests/queue_size_latest_data_python/dataflow.yaml new file mode 100644 index 00000000..7393b7d8 --- /dev/null +++ b/tests/queue_size_latest_data_python/dataflow.yaml @@ -0,0 +1,12 @@ +nodes: + - id: send_data + path: ./send_data.py + outputs: + - data + + - id: receive_data_with_sleep + path: ./receive_data.py + inputs: + tick: + source: send_data/data + queue_size: 1 diff --git a/tests/queue_size_latest_data_python/receive_data.py b/tests/queue_size_latest_data_python/receive_data.py new file mode 100644 index 00000000..32e559e8 --- /dev/null +++ b/tests/queue_size_latest_data_python/receive_data.py @@ -0,0 +1,20 @@ +from dora import Node +import time + + +node = Node() + +# Voluntarily sleep for 5 seconds to ensure that the node is dropping the oldest input +time.sleep(5) + +for event in node: + event_type = event["type"] + + if event_type == "INPUT": + event_id = event["id"] + send_time = event["value"].to_pylist()[0] + print("Duration: ", send_time - time.time(), flush=True) + assert ( + send_time - time.time() < 1.2 + ), f"Duration: {send_time - time.time()} should be less than 1 as we should always pull latest data." + time.sleep(1) diff --git a/tests/queue_size_latest_data_python/send_data.py b/tests/queue_size_latest_data_python/send_data.py new file mode 100644 index 00000000..1123a5aa --- /dev/null +++ b/tests/queue_size_latest_data_python/send_data.py @@ -0,0 +1,11 @@ +from dora import Node +import time +import pyarrow as pa +import numpy as np + +node = Node() + +for i in range(10): + now = time.time() + node.send_output("data", pa.array([np.uint64(now)])) + time.sleep(1) diff --git a/tests/queue_size_latest_data_rust/dataflow.yaml b/tests/queue_size_latest_data_rust/dataflow.yaml new file mode 100644 index 00000000..631e07c3 --- /dev/null +++ b/tests/queue_size_latest_data_rust/dataflow.yaml @@ -0,0 +1,12 @@ +nodes: + - id: send_data + path: ../queue_size_latest_data_python/send_data.py + outputs: + - data + + - id: receive_data_with_sleep + path: ../../target/release/receive_data + inputs: + tick: + source: send_data/data + queue_size: 1 diff --git a/tests/queue_size_latest_data_rust/receive_data/Cargo.toml b/tests/queue_size_latest_data_rust/receive_data/Cargo.toml new file mode 100644 index 00000000..a1910514 --- /dev/null +++ b/tests/queue_size_latest_data_rust/receive_data/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "receive_data" +edition = "2021" +version.workspace = true +description.workspace = true +documentation.workspace = true +license.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dora-node-api = { workspace = true, features = ["tracing"] } +eyre = "0.6.8" +chrono = "0.4" diff --git a/tests/queue_size_latest_data_rust/receive_data/README.md b/tests/queue_size_latest_data_rust/receive_data/README.md new file mode 100644 index 00000000..9aa46d90 --- /dev/null +++ b/tests/queue_size_latest_data_rust/receive_data/README.md @@ -0,0 +1,3 @@ +# Print received inputs in the terminal + +Check example at [examples/speech-to-text](examples/speech-to-text) diff --git a/tests/queue_size_latest_data_rust/receive_data/src/main.rs b/tests/queue_size_latest_data_rust/receive_data/src/main.rs new file mode 100644 index 00000000..baac3539 --- /dev/null +++ b/tests/queue_size_latest_data_rust/receive_data/src/main.rs @@ -0,0 +1,43 @@ +use std::{ + thread::sleep, + time::{Duration, Instant}, +}; + +use chrono::Utc; +use dora_node_api::{ + self, + arrow::{ + array::{AsArray, PrimitiveArray}, + datatypes::UInt64Type, + }, + DoraNode, +}; + +fn main() -> eyre::Result<()> { + let (_node, mut events) = DoraNode::init_from_env()?; + + // Voluntarily sleep for 5 seconds to ensure that the node is dropping the oldest input + sleep(Duration::from_secs(5)); + + while let Some(event) = events.recv() { + match event { + dora_node_api::Event::Input { + id: _, + metadata: _, + data, + } => { + let data: &PrimitiveArray = data.as_primitive(); + let time: &[u64] = data.values(); + let now = Utc::now(); + let timestamp: u64 = now.timestamp() as u64; + println!("Time Difference: {:?}", timestamp - time[0]); + assert!( + timestamp - time[0] < 2, + "Time difference should be less than 2 seconds as data is sent every seconds" + ); + } + _ => {} + } + } + Ok(()) +}