From e26646e637c2e5f9056385aaa96777bee6ab92fc Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Sun, 1 Sep 2024 12:42:27 +0200 Subject: [PATCH 1/8] Adding qwenvl2 and its data recorder --- examples/camera/.gitignore | 1 + examples/camera/README.md | 9 + examples/camera/dataflow.yml | 20 ++ examples/camera/run.rs | 98 +++++++++ examples/vlm/.gitignore | 1 + examples/vlm/README.md | 1 + examples/vlm/dataflow.yml | 31 +++ examples/vlm/run.rs | 98 +++++++++ node-hub/dora-qwenvl/README.md | 3 + node-hub/dora-qwenvl/dora_qwenvl/__init__.py | 11 + node-hub/dora-qwenvl/dora_qwenvl/main.py | 145 +++++++++++++ node-hub/dora-qwenvl/pyproject.toml | 30 +++ .../dora-qwenvl/tests/test_dora_qwenvl.py | 9 + node-hub/dora-rerun/src/main.rs | 2 +- node-hub/llama-factory-recorder/README.md | 3 + .../llama_factory_recorder/__init__.py | 11 + .../llama_factory_recorder/main.py | 193 ++++++++++++++++++ .../llama-factory-recorder/pyproject.toml | 23 +++ .../tests/test_llama_factory_recorder.py | 9 + 19 files changed, 697 insertions(+), 1 deletion(-) create mode 100644 examples/camera/.gitignore create mode 100644 examples/camera/README.md create mode 100644 examples/camera/dataflow.yml create mode 100644 examples/camera/run.rs create mode 100644 examples/vlm/.gitignore create mode 100644 examples/vlm/README.md create mode 100644 examples/vlm/dataflow.yml create mode 100644 examples/vlm/run.rs create mode 100644 node-hub/dora-qwenvl/README.md create mode 100644 node-hub/dora-qwenvl/dora_qwenvl/__init__.py create mode 100644 node-hub/dora-qwenvl/dora_qwenvl/main.py create mode 100644 node-hub/dora-qwenvl/pyproject.toml create mode 100644 node-hub/dora-qwenvl/tests/test_dora_qwenvl.py create mode 100644 node-hub/llama-factory-recorder/README.md create mode 100644 node-hub/llama-factory-recorder/llama_factory_recorder/__init__.py create mode 100644 node-hub/llama-factory-recorder/llama_factory_recorder/main.py create mode 100644 node-hub/llama-factory-recorder/pyproject.toml create mode 100644 node-hub/llama-factory-recorder/tests/test_llama_factory_recorder.py diff --git a/examples/camera/.gitignore b/examples/camera/.gitignore new file mode 100644 index 000000000..eede66d83 --- /dev/null +++ b/examples/camera/.gitignore @@ -0,0 +1 @@ +*.pt \ No newline at end of file diff --git a/examples/camera/README.md b/examples/camera/README.md new file mode 100644 index 000000000..1899f8bb7 --- /dev/null +++ b/examples/camera/README.md @@ -0,0 +1,9 @@ +# Quick example on how to use a camera + +Make sure to have, dora and pip installed. + +```bash +dora up +dora build dataflow.yml +dora start dataflow.yml +``` diff --git a/examples/camera/dataflow.yml b/examples/camera/dataflow.yml new file mode 100644 index 000000000..35a589daf --- /dev/null +++ b/examples/camera/dataflow.yml @@ -0,0 +1,20 @@ +nodes: + - id: camera + build: pip install ../../node-hub/opencv-video-capture + path: opencv-video-capture + inputs: + tick: dora/timer/millis/20 + outputs: + - image + env: + CAPTURE_PATH: 0 + IMAGE_WIDTH: 640 + IMAGE_HEIGHT: 480 + + - id: plot + build: pip install ../../node-hub/opencv-plot + path: opencv-plot + inputs: + image: + source: camera/image + queue_size: 1 diff --git a/examples/camera/run.rs b/examples/camera/run.rs new file mode 100644 index 000000000..b575234d2 --- /dev/null +++ b/examples/camera/run.rs @@ -0,0 +1,98 @@ +use dora_core::{get_pip_path, get_python_path, run}; +use dora_tracing::set_up_tracing; +use eyre::{bail, ContextCompat, WrapErr}; +use std::path::Path; + +#[tokio::main] +async fn main() -> eyre::Result<()> { + set_up_tracing("python-dataflow-runner")?; + + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + std::env::set_current_dir(root.join(file!()).parent().unwrap()) + .wrap_err("failed to set working dir")?; + + run( + get_python_path().context("Could not get python binary")?, + &["-m", "venv", "../.env"], + None, + ) + .await + .context("failed to create venv")?; + let venv = &root.join("examples").join(".env"); + std::env::set_var( + "VIRTUAL_ENV", + venv.to_str().context("venv path not valid unicode")?, + ); + let orig_path = std::env::var("PATH")?; + // bin folder is named Scripts on windows. + // 🤦‍♂️ See: https://github.com/pypa/virtualenv/commit/993ba1316a83b760370f5a3872b3f5ef4dd904c1 + let venv_bin = if cfg!(windows) { + venv.join("Scripts") + } else { + venv.join("bin") + }; + + if cfg!(windows) { + std::env::set_var( + "PATH", + format!( + "{};{orig_path}", + venv_bin.to_str().context("venv path not valid unicode")? + ), + ); + } else { + std::env::set_var( + "PATH", + format!( + "{}:{orig_path}", + venv_bin.to_str().context("venv path not valid unicode")? + ), + ); + } + + run( + get_pip_path().context("Could not get pip binary")?, + &["install", "maturin"], + Some(venv), + ) + .await + .context("pip install maturin failed")?; + + run( + "maturin", + &["develop"], + Some(&root.join("apis").join("python").join("node")), + ) + .await + .context("maturin develop failed")?; + + let dataflow = Path::new("dataflow.yml"); + run_dataflow(dataflow).await?; + + Ok(()) +} + +async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + + // First build the dataflow (install requirements) + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--").arg("build").arg(dataflow); + if !cmd.status().await?.success() { + bail!("failed to run dataflow"); + }; + + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--") + .arg("daemon") + .arg("--run-dataflow") + .arg(dataflow); + if !cmd.status().await?.success() { + bail!("failed to run dataflow"); + }; + Ok(()) +} diff --git a/examples/vlm/.gitignore b/examples/vlm/.gitignore new file mode 100644 index 000000000..eede66d83 --- /dev/null +++ b/examples/vlm/.gitignore @@ -0,0 +1 @@ +*.pt \ No newline at end of file diff --git a/examples/vlm/README.md b/examples/vlm/README.md new file mode 100644 index 000000000..ab6eec8c4 --- /dev/null +++ b/examples/vlm/README.md @@ -0,0 +1 @@ +# Quick example on using a VLM with dora-rs diff --git a/examples/vlm/dataflow.yml b/examples/vlm/dataflow.yml new file mode 100644 index 000000000..ac12cce7a --- /dev/null +++ b/examples/vlm/dataflow.yml @@ -0,0 +1,31 @@ +nodes: + - id: camera + build: pip install ../../node-hub/opencv-video-capture + path: opencv-video-capture + inputs: + tick: dora/timer/millis/20 + outputs: + - image + env: + CAPTURE_PATH: 0 + IMAGE_WIDTH: 640 + IMAGE_HEIGHT: 480 + + - id: dora-qwenvl + build: pip install -e ../../node-hub/dora-qwenvl + path: dora-qwenvl + inputs: + image: + source: camera/image + queue_size: 1 + outputs: + - text + + - id: plot + build: pip install ../../node-hub/opencv-plot + path: opencv-plot + inputs: + image: + source: camera/image + queue_size: 1 + text: dora-qwenvl/text diff --git a/examples/vlm/run.rs b/examples/vlm/run.rs new file mode 100644 index 000000000..b575234d2 --- /dev/null +++ b/examples/vlm/run.rs @@ -0,0 +1,98 @@ +use dora_core::{get_pip_path, get_python_path, run}; +use dora_tracing::set_up_tracing; +use eyre::{bail, ContextCompat, WrapErr}; +use std::path::Path; + +#[tokio::main] +async fn main() -> eyre::Result<()> { + set_up_tracing("python-dataflow-runner")?; + + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + std::env::set_current_dir(root.join(file!()).parent().unwrap()) + .wrap_err("failed to set working dir")?; + + run( + get_python_path().context("Could not get python binary")?, + &["-m", "venv", "../.env"], + None, + ) + .await + .context("failed to create venv")?; + let venv = &root.join("examples").join(".env"); + std::env::set_var( + "VIRTUAL_ENV", + venv.to_str().context("venv path not valid unicode")?, + ); + let orig_path = std::env::var("PATH")?; + // bin folder is named Scripts on windows. + // 🤦‍♂️ See: https://github.com/pypa/virtualenv/commit/993ba1316a83b760370f5a3872b3f5ef4dd904c1 + let venv_bin = if cfg!(windows) { + venv.join("Scripts") + } else { + venv.join("bin") + }; + + if cfg!(windows) { + std::env::set_var( + "PATH", + format!( + "{};{orig_path}", + venv_bin.to_str().context("venv path not valid unicode")? + ), + ); + } else { + std::env::set_var( + "PATH", + format!( + "{}:{orig_path}", + venv_bin.to_str().context("venv path not valid unicode")? + ), + ); + } + + run( + get_pip_path().context("Could not get pip binary")?, + &["install", "maturin"], + Some(venv), + ) + .await + .context("pip install maturin failed")?; + + run( + "maturin", + &["develop"], + Some(&root.join("apis").join("python").join("node")), + ) + .await + .context("maturin develop failed")?; + + let dataflow = Path::new("dataflow.yml"); + run_dataflow(dataflow).await?; + + Ok(()) +} + +async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + + // First build the dataflow (install requirements) + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--").arg("build").arg(dataflow); + if !cmd.status().await?.success() { + bail!("failed to run dataflow"); + }; + + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--") + .arg("daemon") + .arg("--run-dataflow") + .arg(dataflow); + if !cmd.status().await?.success() { + bail!("failed to run dataflow"); + }; + Ok(()) +} diff --git a/node-hub/dora-qwenvl/README.md b/node-hub/dora-qwenvl/README.md new file mode 100644 index 000000000..88f4e564e --- /dev/null +++ b/node-hub/dora-qwenvl/README.md @@ -0,0 +1,3 @@ +# Dora QwenVL2 node + +Experimental node for using a VLM within dora. diff --git a/node-hub/dora-qwenvl/dora_qwenvl/__init__.py b/node-hub/dora-qwenvl/dora_qwenvl/__init__.py new file mode 100644 index 000000000..ac3cbef9f --- /dev/null +++ b/node-hub/dora-qwenvl/dora_qwenvl/__init__.py @@ -0,0 +1,11 @@ +import os + +# Define the path to the README file relative to the package directory +readme_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "README.md") + +# Read the content of the README file +try: + with open(readme_path, "r", encoding="utf-8") as f: + __doc__ = f.read() +except FileNotFoundError: + __doc__ = "README file not found." diff --git a/node-hub/dora-qwenvl/dora_qwenvl/main.py b/node-hub/dora-qwenvl/dora_qwenvl/main.py new file mode 100644 index 000000000..904d29266 --- /dev/null +++ b/node-hub/dora-qwenvl/dora_qwenvl/main.py @@ -0,0 +1,145 @@ +import os +from dora import Node +from transformers import Qwen2VLForConditionalGeneration, AutoProcessor +from qwen_vl_utils import process_vision_info +import numpy as np +import pyarrow as pa +from PIL import Image + +DEFAULT_PATH = "Qwen/Qwen2-VL-2B-Instruct" +CUSTOM_MODEL_PATH = os.getenv("CUSTOM_MODEL_PATH", DEFAULT_PATH) +DEFAULT_QUESTION = os.getenv( + "DEFAULT_QUESTION", + "Describe this image", +) + +# default: Load the model on the available device(s) +model = Qwen2VLForConditionalGeneration.from_pretrained( + PATH, + torch_dtype="auto", + device_map="auto", + attn_implementation="flash_attention_2", +) + +# default processer +processor = AutoProcessor.from_pretrained(DEFAULT_PATH) + + +def generate(image: np.array, question): + """ + Generate the response to the question given the image using Qwen2 model. + """ + image = Image.fromarray(image) + + messages = [ + { + "role": "user", + "content": [ + { + "type": "image", + "image": image, + }, + {"type": "text", "text": question}, + ], + } + ] + + # Preparation for inference + text = processor.apply_chat_template( + messages, tokenize=False, add_generation_prompt=True + ) + image_inputs, video_inputs = process_vision_info(messages) + inputs = processor( + text=[text], + images=image_inputs, + videos=video_inputs, + padding=True, + return_tensors="pt", + ) + inputs = inputs.to("cuda") + + # Inference: Generation of the output + generated_ids = model.generate(**inputs, max_new_tokens=128) + generated_ids_trimmed = [ + out_ids[len(in_ids) :] + for in_ids, out_ids in zip(inputs.input_ids, generated_ids) + ] + output_text = processor.batch_decode( + generated_ids_trimmed, + skip_special_tokens=True, + clean_up_tokenization_spaces=False, + ) + return output_text[0] + + +def main(): + node = Node() + + question = DEFAULT_QUESTION + frame = None + pa.array([]) # initialize pyarrow array + + for event in node: + event_type = event["type"] + + if event_type == "INPUT": + event_id = event["id"] + + if event_id == "image": + storage = event["value"] + metadata = event["metadata"] + encoding = metadata["encoding"] + width = metadata["width"] + height = metadata["height"] + + if encoding == "bgr8": + channels = 3 + storage_type = np.uint8 + elif encoding == "rgb8": + channels = 3 + storage_type = np.uint8 + else: + raise RuntimeError(f"Unsupported image encoding: {encoding}") + + frame = ( + storage.to_numpy() + .astype(storage_type) + .reshape((height, width, channels)) + ) + if encoding == "bgr8": + frame = frame[:, :, ::-1] # OpenCV image (BGR to RGB) + elif encoding == "rgb8": + pass + else: + raise RuntimeError(f"Unsupported image encoding: {encoding}") + + elif event_id == "tick": + if frame is None: + continue + response = generate(frame, question) + node.send_output( + "text", + pa.array([response]), + metadata, + ) + + elif event_id == "text": + text = event["value"][0].as_py() + if text != "": + question = text + if frame is None: + continue + # set the max number of tiles in `max_num` + response = generate(frame, question) + node.send_output( + "text", + pa.array([response]), + metadata, + ) + + elif event_type == "ERROR": + raise RuntimeError(event["error"]) + + +if __name__ == "__main__": + main() diff --git a/node-hub/dora-qwenvl/pyproject.toml b/node-hub/dora-qwenvl/pyproject.toml new file mode 100644 index 000000000..cbd5e1319 --- /dev/null +++ b/node-hub/dora-qwenvl/pyproject.toml @@ -0,0 +1,30 @@ +[tool.poetry] +name = "dora-qwenvl" +version = "0.3.6-rc0" +authors = [ + "Haixuan Xavier Tao ", + "Enzo Le Van ", +] +description = "Dora Node for VLM" +readme = "README.md" + +packages = [{ include = "dora_qwenvl" }] + +[tool.poetry.dependencies] +python = "^3.7" +dora-rs = "^0.3.6" +numpy = "< 2.0.0" +torch = "^2.4.0" +torchvision = "^0.19" +transformers = { git = "https://github.com/huggingface/transformers" } +qwen-vl-utils = "^0.0.2" +accelerate = "^0.33" +flash-attention = "^2.6.1" + + +[tool.poetry.scripts] +dora-qwenvl = "dora_qwenvl.main:main" + +[build-system] +requires = ["poetry-core>=1.8.0"] +build-backend = "poetry.core.masonry.api" diff --git a/node-hub/dora-qwenvl/tests/test_dora_qwenvl.py b/node-hub/dora-qwenvl/tests/test_dora_qwenvl.py new file mode 100644 index 000000000..4105676f4 --- /dev/null +++ b/node-hub/dora-qwenvl/tests/test_dora_qwenvl.py @@ -0,0 +1,9 @@ +import pytest + + +def test_import_main(): + from dora_qwenvl.main import main + + # Check that everything is working, and catch dora Runtime Exception as we're not running in a dora dataflow. + with pytest.raises(RuntimeError): + main() diff --git a/node-hub/dora-rerun/src/main.rs b/node-hub/dora-rerun/src/main.rs index a3b9b895d..b16272d99 100644 --- a/node-hub/dora-rerun/src/main.rs +++ b/node-hub/dora-rerun/src/main.rs @@ -104,7 +104,7 @@ fn main() -> Result<()> { rec.log(id.as_str(), &image) .context("could not log image")?; - } else if id.as_str().contains("textlog") { + } else if id.as_str().contains("text") { let buffer: StringArray = data.to_data().into(); buffer.iter().try_for_each(|string| -> Result<()> { if let Some(str) = string { diff --git a/node-hub/llama-factory-recorder/README.md b/node-hub/llama-factory-recorder/README.md new file mode 100644 index 000000000..9279a6d6b --- /dev/null +++ b/node-hub/llama-factory-recorder/README.md @@ -0,0 +1,3 @@ +# Dora Llama factory recorder + +Experimental node for recording for training llama based model. diff --git a/node-hub/llama-factory-recorder/llama_factory_recorder/__init__.py b/node-hub/llama-factory-recorder/llama_factory_recorder/__init__.py new file mode 100644 index 000000000..ac3cbef9f --- /dev/null +++ b/node-hub/llama-factory-recorder/llama_factory_recorder/__init__.py @@ -0,0 +1,11 @@ +import os + +# Define the path to the README file relative to the package directory +readme_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "README.md") + +# Read the content of the README file +try: + with open(readme_path, "r", encoding="utf-8") as f: + __doc__ = f.read() +except FileNotFoundError: + __doc__ = "README file not found." diff --git a/node-hub/llama-factory-recorder/llama_factory_recorder/main.py b/node-hub/llama-factory-recorder/llama_factory_recorder/main.py new file mode 100644 index 000000000..aa8deb103 --- /dev/null +++ b/node-hub/llama-factory-recorder/llama_factory_recorder/main.py @@ -0,0 +1,193 @@ +import os +import json +from dora import Node +import numpy as np +import pyarrow as pa +from PIL import Image +from pathlib import Path + +DEFAULT_QUESTION = os.getenv( + "DEFAULT_QUESTION", + "Describe this image", +) +ENTRY_NAME = "dora_demo" +LLAMA_FACTORY_ROOT_PATH = Path(os.getenv("LLAMA_FACTORY_ROOT_PATH")) / "data" + + +# If JSON already exists, append incremental suffix to avoid overwriting +if (LLAMA_FACTORY_ROOT_PATH / ENTRY_NAME).exists(): + i = 1 + while (LLAMA_FACTORY_ROOT_PATH / f"{ENTRY_NAME}_{i}.json").exists(): + i += 1 + ENTRY_NAME = f"{ENTRY_NAME}_{i}" + + +DEFAULT_RECORD_IMAGE_ROOT_PATH = LLAMA_FACTORY_ROOT_PATH / ENTRY_NAME +DEFAULT_RECORD_JSON_PATH = LLAMA_FACTORY_ROOT_PATH / (ENTRY_NAME + ".json") + + +def write_dict_to_json(file_path, key: str, new_data): + """ + Writes a dictionary to a JSON file. If the file already contains a list of entries, + the new data will be appended to that list. Otherwise, it will create a new list. + + Parameters: + - file_path: str, the path to the JSON file. + - new_data: dict, the dictionary to add to the JSON file. + """ + try: + # Open the JSON file and load its content + with open(file_path, "r+", encoding="utf-8") as file: + try: + data = json.load(file) + except json.JSONDecodeError: + data = {} + + data[key] = new_data + # Write the updated data back to the file + file.seek(0) + json.dump(data, file, indent=4, ensure_ascii=False) + file.truncate() + + except FileNotFoundError: + # If the file doesn't exist, create it and write the new data as a list + with open(file_path, "w", encoding="utf-8") as file: + json.dump({key: new_data}, file, indent=4, ensure_ascii=False) + + +write_dict_to_json( + LLAMA_FACTORY_ROOT_PATH / "dataset_info.json", + ENTRY_NAME, + { + "file_name": ENTRY_NAME + ".json", + "formatting": "sharegpt", + "columns": {"messages": "messages", "images": "images"}, + "tags": { + "role_tag": "role", + "content_tag": "content", + "user_tag": "user", + "assistant_tag": "assistant", + }, + }, +) + + +def save_image_and_add_to_json( + image_array, root_path, llama_root_path, jsonl_file, messages +): + """ + Saves an image from a NumPy array and adds a new JSON object as a line to a JSONL file. + The function generates a sequential numeric image filename starting from 0 and + follows the provided template structure. + + Parameters: + - image_array: numpy.ndarray, the image data as a NumPy array. + - root_path: str, the root directory where the image will be saved. + - jsonl_file: str, the path to the JSONL file. + - messages: list of dicts, each containing 'content' and 'role'. + + The image is saved as a PNG file, and the JSONL entry includes the 'messages' and 'images' keys. + """ + + # Create the root directory if it doesn't exist + os.makedirs(llama_root_path / root_path, exist_ok=True) + + # Get the current image ID by counting existing files + image_id = len( + [ + name + for name in os.listdir(llama_root_path / root_path) + if os.path.isfile(os.path.join(llama_root_path / root_path, name)) + ] + ) + + # Define the image filename + image_filename = f"{image_id}.png" + image_path = os.path.join(root_path, image_filename) + + # Save the image + image = Image.fromarray(image_array) + image.save(llama_root_path / image_path) + + # Create the JSON entry with 'messages' and 'images' + new_entry = {"messages": messages, "images": [image_path]} + + # Add the entry to the JSONL file with UTF-8 encoding + with open(jsonl_file, "a", encoding="utf-8") as f: + json_line = json.dumps(new_entry) + f.write(json_line + "\n") + + +def main(): + pa.array([]) # initialize pyarrow array + node = Node() + + question = DEFAULT_QUESTION + frame = None + + for event in node: + event_type = event["type"] + + if event_type == "INPUT": + event_id = event["id"] + + if event_id == "image": + storage = event["value"] + metadata = event["metadata"] + encoding = metadata["encoding"] + width = metadata["width"] + height = metadata["height"] + + if encoding == "bgr8": + channels = 3 + storage_type = np.uint8 + elif encoding == "rgb8": + channels = 3 + storage_type = np.uint8 + else: + raise RuntimeError(f"Unsupported image encoding: {encoding}") + + frame = ( + storage.to_numpy() + .astype(storage_type) + .reshape((height, width, channels)) + ) + if encoding == "bgr8": + frame = frame[:, :, ::-1] # OpenCV image (BGR to RGB) + elif encoding == "rgb8": + pass + else: + raise RuntimeError(f"Unsupported image encoding: {encoding}") + + elif event_id == "text": + text = event["value"][0].as_py() + if text != "": + question = text + elif event_id == "ground_truth": + if frame is None: + continue + ground_truth = event["value"][0].as_py() + + messages = [ + {"content": "" + question, "role": "user"}, + { + "content": ground_truth, + "role": "assistant", + }, + ] + + save_image_and_add_to_json( + image_array=frame, + root_path=ENTRY_NAME, + llama_root_path=LLAMA_FACTORY_ROOT_PATH, + jsonl_file=DEFAULT_RECORD_JSON_PATH, + messages=messages, + ) + node.send_output( + "text", + pa.array([ground_truth]), + metadata, + ) + + elif event_type == "ERROR": + raise RuntimeError(event["error"]) diff --git a/node-hub/llama-factory-recorder/pyproject.toml b/node-hub/llama-factory-recorder/pyproject.toml new file mode 100644 index 000000000..34a55c0ab --- /dev/null +++ b/node-hub/llama-factory-recorder/pyproject.toml @@ -0,0 +1,23 @@ +[tool.poetry] +name = "llama-factory-recorder" +version = "0.3.6-rc0" +authors = [ + "Haixuan Xavier Tao ", + "Enzo Le Van ", +] +description = "Dora Node for VLM" +readme = "README.md" + +packages = [{ include = "llama_factory_recorder" }] + +[tool.poetry.dependencies] +python = "^3.7" +dora-rs = "^0.3.6" +pillow = "^10.4.0" + +[tool.poetry.scripts] +llama-factory-recorder = "llama_factory_recorder.main:main" + +[build-system] +requires = ["poetry-core>=1.8.0"] +build-backend = "poetry.core.masonry.api" diff --git a/node-hub/llama-factory-recorder/tests/test_llama_factory_recorder.py b/node-hub/llama-factory-recorder/tests/test_llama_factory_recorder.py new file mode 100644 index 000000000..34dee34c3 --- /dev/null +++ b/node-hub/llama-factory-recorder/tests/test_llama_factory_recorder.py @@ -0,0 +1,9 @@ +import pytest + + +def test_import_main(): + from llama_factory_recorder.main import main + + # Check that everything is working, and catch dora Runtime Exception as we're not running in a dora dataflow. + with pytest.raises(RuntimeError): + main() From 69e31712f515e30e5869a6318e29a4b6d0f6078b Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 2 Sep 2024 18:09:47 +0200 Subject: [PATCH 2/8] Minor fix --- node-hub/dora-qwenvl/dora_qwenvl/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node-hub/dora-qwenvl/dora_qwenvl/main.py b/node-hub/dora-qwenvl/dora_qwenvl/main.py index 904d29266..65112584c 100644 --- a/node-hub/dora-qwenvl/dora_qwenvl/main.py +++ b/node-hub/dora-qwenvl/dora_qwenvl/main.py @@ -15,7 +15,7 @@ # default: Load the model on the available device(s) model = Qwen2VLForConditionalGeneration.from_pretrained( - PATH, + CUSTOM_MODEL_PATH, torch_dtype="auto", device_map="auto", attn_implementation="flash_attention_2", @@ -118,7 +118,7 @@ def main(): continue response = generate(frame, question) node.send_output( - "text", + "tick", pa.array([response]), metadata, ) From 02c1fef2de14dfdd7af22a9cf53278f89bc95ed1 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 3 Sep 2024 10:09:38 +0200 Subject: [PATCH 3/8] Update keyboard version --- node-hub/dora-keyboard/pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node-hub/dora-keyboard/pyproject.toml b/node-hub/dora-keyboard/pyproject.toml index 29b9308aa..83b7c615f 100644 --- a/node-hub/dora-keyboard/pyproject.toml +++ b/node-hub/dora-keyboard/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dora-keyboard" -version = "0.3.5" +version = "0.3.6" authors = [ "Haixuan Xavier Tao ", "Enzo Le Van ", @@ -13,7 +13,7 @@ readme = "README.md" packages = [{ include = "dora_keyboard" }] [tool.poetry.dependencies] -dora-rs = "0.3.5" +dora-rs = "^0.3.6" numpy = "< 2.0.0" pyarrow = ">= 5.0.0" pynput = "^1.7.6" From 4456aec6d869706465a7c64f9e05533720243c2c Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Wed, 4 Sep 2024 08:29:44 +0200 Subject: [PATCH 4/8] Fix flash_attn naming --- node-hub/dora-qwenvl/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node-hub/dora-qwenvl/pyproject.toml b/node-hub/dora-qwenvl/pyproject.toml index cbd5e1319..d5ca3b7b3 100644 --- a/node-hub/dora-qwenvl/pyproject.toml +++ b/node-hub/dora-qwenvl/pyproject.toml @@ -19,7 +19,7 @@ torchvision = "^0.19" transformers = { git = "https://github.com/huggingface/transformers" } qwen-vl-utils = "^0.0.2" accelerate = "^0.33" -flash-attention = "^2.6.1" +flash_attn = "^2.6.1" [tool.poetry.scripts] From 3a85f2f5e07a1c6d38fc281abc03f13d9f3546ab Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 10 Sep 2024 11:07:05 +0200 Subject: [PATCH 5/8] quick example to show that the queue size seems to be one of the latest data instead of being the latest data. There might be a queue somewhere that is creating this descrepenc.y --- tests/queue_size_latest_data/dataflow.yaml | 12 ++++++++++++ tests/queue_size_latest_data/receive_data.py | 17 +++++++++++++++++ tests/queue_size_latest_data/send_data.py | 10 ++++++++++ 3 files changed, 39 insertions(+) create mode 100644 tests/queue_size_latest_data/dataflow.yaml create mode 100644 tests/queue_size_latest_data/receive_data.py create mode 100644 tests/queue_size_latest_data/send_data.py diff --git a/tests/queue_size_latest_data/dataflow.yaml b/tests/queue_size_latest_data/dataflow.yaml new file mode 100644 index 000000000..7393b7d83 --- /dev/null +++ b/tests/queue_size_latest_data/dataflow.yaml @@ -0,0 +1,12 @@ +nodes: + - id: send_data + path: ./send_data.py + outputs: + - data + + - id: receive_data_with_sleep + path: ./receive_data.py + inputs: + tick: + source: send_data/data + queue_size: 1 diff --git a/tests/queue_size_latest_data/receive_data.py b/tests/queue_size_latest_data/receive_data.py new file mode 100644 index 000000000..c53f031a2 --- /dev/null +++ b/tests/queue_size_latest_data/receive_data.py @@ -0,0 +1,17 @@ +from dora import Node +import time + + +node = Node() +time.sleep(5) +for event in node: + event_type = event["type"] + + if event_type == "INPUT": + event_id = event["id"] + send_time = event["value"].to_pylist()[0] + print("Duration: ", send_time - time.time(), flush=True) + assert ( + send_time - time.time() < 1.2 + ), f"Duration: {send_time - time.time()} should be less than 1 as we should always pull latest data." + time.sleep(1) diff --git a/tests/queue_size_latest_data/send_data.py b/tests/queue_size_latest_data/send_data.py new file mode 100644 index 000000000..475ae3e79 --- /dev/null +++ b/tests/queue_size_latest_data/send_data.py @@ -0,0 +1,10 @@ +from dora import Node +import time +import pyarrow as pa + +node = Node() + +for i in range(10): + now = time.time() + node.send_output("data", pa.array([now])) + time.sleep(1) From 804763082f663a321fdbcc1b98d9ee4c8378b449 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 10 Sep 2024 11:55:57 +0200 Subject: [PATCH 6/8] Adding example of dora not getting the latest data when a queue_size is set --- Cargo.lock | 10 +++++ Cargo.toml | 1 + .../dataflow.yaml | 0 .../receive_data.py | 0 .../send_data.py | 3 +- .../queue_size_latest_data_rust/dataflow.yaml | 12 +++++ .../receive_data/Cargo.toml | 15 +++++++ .../receive_data/README.md | 3 ++ .../receive_data/src/main.rs | 45 +++++++++++++++++++ 9 files changed, 88 insertions(+), 1 deletion(-) rename tests/{queue_size_latest_data => queue_size_latest_data_python}/dataflow.yaml (100%) rename tests/{queue_size_latest_data => queue_size_latest_data_python}/receive_data.py (100%) rename tests/{queue_size_latest_data => queue_size_latest_data_python}/send_data.py (63%) create mode 100644 tests/queue_size_latest_data_rust/dataflow.yaml create mode 100644 tests/queue_size_latest_data_rust/receive_data/Cargo.toml create mode 100644 tests/queue_size_latest_data_rust/receive_data/README.md create mode 100644 tests/queue_size_latest_data_rust/receive_data/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index e8644be9f..34a29375b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7902,6 +7902,16 @@ dependencies = [ "tungstenite 0.20.1", ] +[[package]] +name = "receive_data" +version = "0.3.6" +dependencies = [ + "chrono", + "dora-node-api", + "eyre", + "uhlc", +] + [[package]] name = "redox_syscall" version = "0.3.5" diff --git a/Cargo.toml b/Cargo.toml index ab2948a4d..2df1916be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ members = [ "examples/multiple-daemons/node", "examples/multiple-daemons/operator", "examples/multiple-daemons/sink", + "tests/queue_size_latest_data_rust/receive_data", "libraries/arrow-convert", "libraries/communication-layer/*", "libraries/core", diff --git a/tests/queue_size_latest_data/dataflow.yaml b/tests/queue_size_latest_data_python/dataflow.yaml similarity index 100% rename from tests/queue_size_latest_data/dataflow.yaml rename to tests/queue_size_latest_data_python/dataflow.yaml diff --git a/tests/queue_size_latest_data/receive_data.py b/tests/queue_size_latest_data_python/receive_data.py similarity index 100% rename from tests/queue_size_latest_data/receive_data.py rename to tests/queue_size_latest_data_python/receive_data.py diff --git a/tests/queue_size_latest_data/send_data.py b/tests/queue_size_latest_data_python/send_data.py similarity index 63% rename from tests/queue_size_latest_data/send_data.py rename to tests/queue_size_latest_data_python/send_data.py index 475ae3e79..1123a5aa7 100644 --- a/tests/queue_size_latest_data/send_data.py +++ b/tests/queue_size_latest_data_python/send_data.py @@ -1,10 +1,11 @@ from dora import Node import time import pyarrow as pa +import numpy as np node = Node() for i in range(10): now = time.time() - node.send_output("data", pa.array([now])) + node.send_output("data", pa.array([np.uint64(now)])) time.sleep(1) diff --git a/tests/queue_size_latest_data_rust/dataflow.yaml b/tests/queue_size_latest_data_rust/dataflow.yaml new file mode 100644 index 000000000..631e07c3f --- /dev/null +++ b/tests/queue_size_latest_data_rust/dataflow.yaml @@ -0,0 +1,12 @@ +nodes: + - id: send_data + path: ../queue_size_latest_data_python/send_data.py + outputs: + - data + + - id: receive_data_with_sleep + path: ../../target/release/receive_data + inputs: + tick: + source: send_data/data + queue_size: 1 diff --git a/tests/queue_size_latest_data_rust/receive_data/Cargo.toml b/tests/queue_size_latest_data_rust/receive_data/Cargo.toml new file mode 100644 index 000000000..37fb78ef0 --- /dev/null +++ b/tests/queue_size_latest_data_rust/receive_data/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "receive_data" +edition = "2021" +version.workspace = true +description.workspace = true +documentation.workspace = true +license.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dora-node-api = { workspace = true, features = ["tracing"] } +eyre = "0.6.8" +uhlc = "0.5.1" +chrono = "0.4" diff --git a/tests/queue_size_latest_data_rust/receive_data/README.md b/tests/queue_size_latest_data_rust/receive_data/README.md new file mode 100644 index 000000000..9aa46d909 --- /dev/null +++ b/tests/queue_size_latest_data_rust/receive_data/README.md @@ -0,0 +1,3 @@ +# Print received inputs in the terminal + +Check example at [examples/speech-to-text](examples/speech-to-text) diff --git a/tests/queue_size_latest_data_rust/receive_data/src/main.rs b/tests/queue_size_latest_data_rust/receive_data/src/main.rs new file mode 100644 index 000000000..444c0d3de --- /dev/null +++ b/tests/queue_size_latest_data_rust/receive_data/src/main.rs @@ -0,0 +1,45 @@ +use std::{ + thread::sleep, + time::{Duration, Instant}, +}; + +use chrono::Utc; +use dora_node_api::{ + self, + arrow::{ + array::{AsArray, PrimitiveArray}, + datatypes::{Int64Type, UInt64Type}, + temporal_conversions::EPOCH_DAYS_FROM_CE, + }, + dora_core::config::DataId, + DoraNode, Metadata, +}; +use uhlc::system_time_clock; + +fn main() -> eyre::Result<()> { + let mut printed_error = String::new(); + let (node, mut events) = DoraNode::init_from_env()?; + + sleep(Duration::from_secs(5)); + while let Some(event) = events.recv() { + match event { + dora_node_api::Event::Input { + id: _, + metadata: _, + data, + } => { + let data: &PrimitiveArray = data.as_primitive(); + let time: &[u64] = data.values(); + let now = Utc::now(); + let timestamp: u64 = now.timestamp() as u64; + println!("Time Difference: {:?}", timestamp - time[0]); + assert!( + timestamp - time[0] < 2, + "Time difference should be less than 2 seconds as data is sent every seconds" + ); + } + _ => {} + } + } + Ok(()) +} From c961e594024f36740df59f7a264e2394803c9e3a Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 10 Sep 2024 14:22:01 +0200 Subject: [PATCH 7/8] Adding additional information and adding tests into CI --- .github/workflows/ci.yml | 7 +++++++ Cargo.lock | 1 - tests/queue_size_latest_data_python/receive_data.py | 3 +++ .../receive_data/Cargo.toml | 1 - .../receive_data/src/main.rs | 12 +++++------- 5 files changed, 15 insertions(+), 9 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d3cba5b90..f4d64d0d8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -316,6 +316,10 @@ jobs: dora stop --name ci-rust-dynamic --grace-duration 5s dora destroy + cargo build -p receive_data --release + # Run tests + dora start ../tests/queue_size_latest_data_rust/dataflow.yaml + - name: "Test CLI (Python)" timeout-minutes: 30 # fail-fast by using bash shell explictly @@ -345,6 +349,9 @@ jobs: dora stop --name ci-python-dynamic --grace-duration 5s dora destroy + # Run tests + dora start ../tests/queue_size_latest_data_python/dataflow.yaml + - name: "Test CLI (C)" timeout-minutes: 30 # fail-fast by using bash shell explictly diff --git a/Cargo.lock b/Cargo.lock index 34a29375b..02e3fd1fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7909,7 +7909,6 @@ dependencies = [ "chrono", "dora-node-api", "eyre", - "uhlc", ] [[package]] diff --git a/tests/queue_size_latest_data_python/receive_data.py b/tests/queue_size_latest_data_python/receive_data.py index c53f031a2..32e559e89 100644 --- a/tests/queue_size_latest_data_python/receive_data.py +++ b/tests/queue_size_latest_data_python/receive_data.py @@ -3,7 +3,10 @@ node = Node() + +# Voluntarily sleep for 5 seconds to ensure that the node is dropping the oldest input time.sleep(5) + for event in node: event_type = event["type"] diff --git a/tests/queue_size_latest_data_rust/receive_data/Cargo.toml b/tests/queue_size_latest_data_rust/receive_data/Cargo.toml index 37fb78ef0..a19105143 100644 --- a/tests/queue_size_latest_data_rust/receive_data/Cargo.toml +++ b/tests/queue_size_latest_data_rust/receive_data/Cargo.toml @@ -11,5 +11,4 @@ license.workspace = true [dependencies] dora-node-api = { workspace = true, features = ["tracing"] } eyre = "0.6.8" -uhlc = "0.5.1" chrono = "0.4" diff --git a/tests/queue_size_latest_data_rust/receive_data/src/main.rs b/tests/queue_size_latest_data_rust/receive_data/src/main.rs index 444c0d3de..baac35392 100644 --- a/tests/queue_size_latest_data_rust/receive_data/src/main.rs +++ b/tests/queue_size_latest_data_rust/receive_data/src/main.rs @@ -8,19 +8,17 @@ use dora_node_api::{ self, arrow::{ array::{AsArray, PrimitiveArray}, - datatypes::{Int64Type, UInt64Type}, - temporal_conversions::EPOCH_DAYS_FROM_CE, + datatypes::UInt64Type, }, - dora_core::config::DataId, - DoraNode, Metadata, + DoraNode, }; -use uhlc::system_time_clock; fn main() -> eyre::Result<()> { - let mut printed_error = String::new(); - let (node, mut events) = DoraNode::init_from_env()?; + let (_node, mut events) = DoraNode::init_from_env()?; + // Voluntarily sleep for 5 seconds to ensure that the node is dropping the oldest input sleep(Duration::from_secs(5)); + while let Some(event) = events.recv() { match event { dora_node_api::Event::Input { From f687c6491d5e3478d4281c525a038b3a6561af34 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 10 Sep 2024 14:32:27 +0200 Subject: [PATCH 8/8] Fix CI path --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f4d64d0d8..5a95c6a31 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -318,7 +318,7 @@ jobs: cargo build -p receive_data --release # Run tests - dora start ../tests/queue_size_latest_data_rust/dataflow.yaml + dora start tests/queue_size_latest_data_rust/dataflow.yaml - name: "Test CLI (Python)" timeout-minutes: 30