Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add nightly tests #63

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions .github/workflows/nightly_tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
name: Nightly Tests

on:
schedule:
# Runs at 2 AM UTC every day
- cron: '0 2 * * *'

jobs:
nightly-tests:
runs-on: ubuntu-latest

steps:
- name: Checkout Code
uses: actions/checkout@v2

- name: Setup Python Environment
uses: actions/setup-python@v2
with:
python-version: '3.10.14'

- name: Install Chakra
run: |
pip install .

- name: Install PARAM
run: |
git clone https://github.com/facebookresearch/param.git
cd param/train/compute/python/
git checkout c83ce8429110a86549c40fec5a01acbd9fbd54a4
pip install .

- name: Install Dependencies
run: |
pip install -r requirements-dev.txt

- name: Run Unit Tests
run: |
python -m pytest -vv tests
2 changes: 1 addition & 1 deletion .github/workflows/python_lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
- name: Setup Python Environment
uses: actions/setup-python@v2
with:
python-version: '3.8'
python-version: '3.10.14'

- name: Install Dependencies
run: |
Expand Down
35 changes: 35 additions & 0 deletions .github/workflows/python_tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
name: Python Unit Tests

on: pull_request

jobs:
python-tests:
runs-on: ubuntu-latest

steps:
- name: Checkout Code
uses: actions/checkout@v2

- name: Setup Python Environment
uses: actions/setup-python@v2
with:
python-version: '3.10.14'

- name: Install Chakra
run: |
pip install .

- name: Install PARAM
run: |
git clone https://github.com/facebookresearch/param.git
cd param/train/compute/python/
git checkout c83ce8429110a86549c40fec5a01acbd9fbd54a4
pip install .

- name: Install Dependencies
run: |
pip install -r requirements-dev.txt

- name: Run Unit Tests
run: |
python -m pytest -vv tests
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pyre-check==0.9.19
pyright==1.1.359
pytest==8.1.1
ruff==0.3.5
6 changes: 5 additions & 1 deletion src/converter/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ def main() -> None:
converter = PyTorchConverter(args.input_filename, args.output_filename, logger)
converter.convert()
else:
logger.error(f"{args.input_type} unsupported")
supported_types = ["Text", "PyTorch"]
logger.error(
f"The input type '{args.input_type}' is not supported. "
f"Supported types are: {', '.join(supported_types)}."
)
sys.exit(1)
except Exception:
traceback.print_exc()
Expand Down
56 changes: 37 additions & 19 deletions src/converter/pytorch_node.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/usr/bin/env python3

import traceback
from enum import Enum
from typing import Any, Dict, List, Optional

Expand All @@ -22,22 +21,34 @@ class PyTorchNode:
data_deps (List[PyTorchNode]): List of data-dependent parent nodes.
children (List[PyTorchNode]): List of child nodes.
gpu_children (List[PyTorchNode]): List of GPU-specific child nodes.
record_param_comms_node (Optional['PyTorchNode']): Corresponding record_param_comms node.
nccl_node (Optional['PyTorchNode']): Corresponding NCCL node.
id (int): Unique identifier of the node.
record_param_comms_node (Optional[PyTorchNode]): Corresponding
record_param_comms node.
nccl_node (Optional[PyTorchNode]): Corresponding NCCL node.
id (str): Identifier of the node.
name (str): Name of the node.
parent (int): Control dependencies identifier.
inputs (Dict[str, Any]): Input data including values, shapes, and types.
outputs (Dict[str, Any]): Output data including values, shapes, and types.
parent (Any): Parent of the node.
inputs (Any): Inputs of the node.
outputs (Any): Outputs of the node.
inclusive_dur (Optional[float]): Inclusive duration of the node.
exclusive_dur (float): Exclusive duration of the node.
ts (Optional[float]): Timestamp of the node.
inter_thread_dep (Any): Inter-thread dependency of the node.
cat (Any): Category of the node.
stream (Any): Stream associated with the node.
"""

SUPPORTED_VERSIONS = ["1.0.2-chakra.0.0.4", "1.0.3-chakra.0.0.4"]

def __init__(self, schema: str, node_data: Dict[str, Any]) -> None:
"""
Initializes a PyTorchNode object using the node data and schema version provided.
Initializes a PyTorchNode object using the node data and schema version
provided.

Args:
schema (str): The schema version based on which the node will be initialized.
node_data (Dict[str, Any]): Dictionary containing the data of the PyTorch node.
schema (str): The schema version based on which the node will be
initialized.
node_data (Dict[str, Any]): Dictionary containing the data of the
PyTorch node.
"""
self.schema = schema
self.data_deps: List["PyTorchNode"] = []
Expand Down Expand Up @@ -71,15 +82,14 @@ def parse_data(self, node_data: Dict[str, Any]) -> None:
Args:
node_data (Dict[str, Any]): The node data to be parsed.
"""
supported_versions = ["1.0.2-chakra.0.0.4", "1.0.3-chakra.0.0.4"]
if self.schema in supported_versions:
if self.schema in self.SUPPORTED_VERSIONS:
if self.schema == "1.0.2-chakra.0.0.4" or self.schema == "1.0.3-chakra.0.0.4":
self._parse_data_1_0_3_chakra_0_0_4(node_data)
else:
raise ValueError(
f"Unsupported schema version '{self.schema}'. Please check "
f"if the schema version is in the list of supported versions: "
f"{supported_versions}"
f"{self.SUPPORTED_VERSIONS}"
)

def _parse_data_1_0_3_chakra_0_0_4(self, node_data: Dict[str, Any]) -> None:
Expand All @@ -88,8 +98,6 @@ def _parse_data_1_0_3_chakra_0_0_4(self, node_data: Dict[str, Any]) -> None:
self.parent = node_data["ctrl_deps"]
self.inputs = node_data["inputs"]
self.outputs = node_data["outputs"]

# TODO: should be added as attributes
self.inclusive_dur = node_data.get("inclusive_dur")
self.exclusive_dur = node_data.get("exclusive_dur", 0)
self.ts = node_data.get("ts")
Expand Down Expand Up @@ -155,7 +163,8 @@ def add_gpu_child(self, gpu_child_node: "PyTorchNode") -> None:
Adds a child GPU node for this node.

Args:
gpu_child_node (Optional[PyTorchNode]): The child GPU node to be added.
gpu_child_node (Optional[PyTorchNode]): The child GPU node to be
added.
"""
self.gpu_children.append(gpu_child_node)

Expand All @@ -164,7 +173,8 @@ def is_record_param_comms_op(self) -> bool:
Checks if the node is a record_param_comms operator.

Returns:
bool: True if the node is a record_param_comms operator, False otherwise.
bool: True if the node is a record_param_comms operator, False
otherwise.
"""
return "record_param_comms" in self.name

Expand Down Expand Up @@ -234,4 +244,12 @@ def get_data_type_size(data_type: str) -> int:
try:
return data_type_size_map[data_type]
except KeyError as e:
raise ValueError(f"Unsupported data type: {data_type}") from e
traceback_str = traceback.format_exc()
raise ValueError(
f"Unsupported data type: {data_type}. The data_type_size_map "
f"dictionary is used for mapping the number of bytes for a "
f"given tensor data type. This dictionary may be incomplete. "
f"Please update the data_type_size_map or report this issue "
f"to the maintainer by creating an issue. Traceback:\n"
f"{traceback_str}"
) from e
4 changes: 1 addition & 3 deletions src/converter/pytorch_tensor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#!/usr/bin/env python3

from typing import List


Expand Down Expand Up @@ -29,7 +27,7 @@ def is_valid(self) -> bool:
Checks if the tensor data is valid.

Returns:
bool: True if tensor_data is a list of exactly five integers,
bool: True if tensor_data is a list of exactly six integers,
False otherwise.
"""
return (
Expand Down
104 changes: 104 additions & 0 deletions src/trace_link/kineto_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from typing import Any, Dict, Optional

from param_bench.train.compute.python.tools.execution_trace import Node as PyTorchOperator


class KinetoOperator:
"""
Represents a single operator in a Kineto trace by default, with fields primarily sourced
from the Kineto traces. In addition to the default fields from Kineto traces, additional
fields have been introduced for postprocessing purposes. These additional fields facilitate
the correlation of PyTorch operators and the enforcement of dependencies among them,
enhancing trace analysis and utility.

Attributes:
op_dict (Dict[str, Any]): Dictionary containing the operator data.
category (str): Category of the operator.
name (str): Name of the operator.
phase (Optional[str]): Phase of the operator.
inclusive_dur (int): Inclusive duration of the operator in microseconds.
exclusive_dur (int): Exclusive duration of the operator in microseconds.
timestamp (int): Timestamp of the operator in microseconds.
external_id (str): External ID associated with the operator.
ev_idx (str): Event index associated with the operator.
tid (int): Thread ID associated with the operator.
pytorch_op (Optional[PyTorchOperator]): Associated PyTorch operator.
parent_pytorch_op_id (Optional[int]): ID of the parent PyTorch operator.
inter_thread_dep (Optional[int]): ID of the latest CPU node from other
threads before the gap.
stream (Optional[int]): Stream ID associated with the operator.
rf_id (Optional[int]): Record function ID.
correlation (int): Correlation ID used to link CUDA runtime operations
with their GPU counterparts.
"""

def __init__(self, kineto_op: Dict[str, Any]) -> None:
"""
Initializes a new instance of the KinetoOperator class.

Args:
kineto_op (Dict[str, Any]): The dictionary representing the
operator data.
"""
self.op_dict: Dict[str, Any] = kineto_op
self.category: str = kineto_op.get("cat", "")
self.name: str = kineto_op.get("name", "")
self.phase: Optional[str] = kineto_op.get("ph")
self.inclusive_dur: int = kineto_op.get("dur", 0)
self.exclusive_dur: int = kineto_op.get("dur", 0)
self.timestamp: int = kineto_op.get("ts", 0)
self.external_id: str = kineto_op.get("args", {}).get("External id", "")
self.ev_idx: str = kineto_op.get("args", {}).get("Ev Idx", "")
self.tid: int = kineto_op.get("tid", 0)
self.pytorch_op: Optional[PyTorchOperator] = None
self.parent_pytorch_op_id: Optional[int] = None
self.inter_thread_dep: Optional[int] = None
self.stream: Optional[int] = kineto_op.get("args", {}).get("stream")
self.rf_id: Optional[int] = kineto_op.get("args", {}).get("Record function id")
self.correlation: int = kineto_op.get("args", {}).get("correlation", -1)

def __repr__(self) -> str:
"""
Represent the KinetoOperator as a string.

Returns:
str: A string representation of the KinetoOperator.
"""
return (
f"KinetoOperator(category={self.category}, name={self.name}, phase={self.phase}, "
f"inclusive_dur={self.inclusive_dur}, exclusive_dur={self.exclusive_dur}, "
f"timestamp={self.timestamp}, external_id={self.external_id}, ev_idx={self.ev_idx}, "
f"tid={self.tid}, parent_pytorch_op_id={self.parent_pytorch_op_id}, "
f"inter_thread_dep={self.inter_thread_dep}, stream={self.stream}, rf_id={self.rf_id}, "
f"correlation={self.correlation})"
)

def is_valid(
self,
category: str,
name_exception: str = "ProfilerStep",
phase: Optional[str] = None,
) -> bool:
"""
Checks if the operator matches specified filtering criteria.

Comment (TODO):
This is legacy code from a previous implementation. Ideally, we should merge this logic
into trace_linker.py. The purpose of is_valid is ambiguous, and it is unclear whether
the function is essential. However, we keep it as it is to avoid breaking downstream
tools. After properly setting up CI/CD pipelines and testing, we can consider removing it.

Args:
category (str): The category to check against.
name_exception (str): A name to exclude in the check.
phase (Optional[str]): The phase to check against, if any.

Returns:
bool: True if the operator matches the criteria, False otherwise.
"""
return (
self.category is not None
and name_exception not in self.name
and self.category == category
and (phase is None or self.phase == phase)
)
Loading
Loading