Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] Update all stages to use new IngestControlMessage class instead of Morpheus ControlMessage #469

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e8dfd47
Add IngestControlMessage and tests
drobison00 Feb 18, 2025
48d965a
Checkpoint test updates, compartmentalization fixes
drobison00 Feb 18, 2025
2ac6d74
Checkpoint -- Merge
drobison00 Feb 18, 2025
7363007
Merge remote-tracking branch 'upstream/main' into devin_fea_ingest_cm
drobison00 Feb 18, 2025
10708a4
Add unit tests
drobison00 Feb 18, 2025
84658f6
Update CM
drobison00 Feb 18, 2025
249fd93
Remove transient files
drobison00 Feb 18, 2025
ccac726
Update BrokerSource to use ICM
drobison00 Feb 18, 2025
9243647
Checkpoint -- conversion working through multiprocess_stage
drobison00 Feb 19, 2025
85459cf
Small fix
drobison00 Feb 19, 2025
3ba1ab4
Checkpoint -- Full pipeline working, sans vdb_upload
drobison00 Feb 19, 2025
b9fe084
All stages working with ICM
drobison00 Feb 19, 2025
0b3cf87
Merge upstream/main
drobison00 Feb 19, 2025
95af5f1
Merge branch 'main' into devin_fea_icm_conversion
jdye64 Feb 20, 2025
9b47a09
Merge remote-tracking branch 'upstream/main' into devin_fea_icm_conve…
drobison00 Feb 20, 2025
c4bb297
Merge remote-tracking branch 'refs/remotes/origin/devin_fea_icm_conve…
drobison00 Feb 20, 2025
8f84fae
Unit test fixes
drobison00 Feb 20, 2025
36b9a0a
More unit test fixes
drobison00 Feb 20, 2025
dde00f0
More unit test fixes
drobison00 Feb 20, 2025
f0b7ed0
Remove duplicate table/chart task injection from click.py
drobison00 Feb 21, 2025
0ca20eb
Remove unused modules, unit test fixes, various other things
drobison00 Feb 21, 2025
1dc0c9f
Remove ExecutionMode for the moment
drobison00 Feb 21, 2025
e35d1b2
Updates to remove GPU based dependencies
drobison00 Feb 21, 2025
8ae25a8
Default CUDA_VISIBLE_DEVICES to -1 in ingest runtime container
drobison00 Feb 21, 2025
30284fc
Update IngestCM, make remove_task raise and error if task is not exis…
drobison00 Feb 21, 2025
8ffdd5b
Small changes to avoid dictionary change while iterating
drobison00 Feb 21, 2025
45bc34c
Update docker-compose.yaml
randerzander Feb 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions api/src/nv_ingest_api/primitives/control_message_task.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from uuid import UUID

from pydantic import BaseModel, Field, ConfigDict
from typing import Any, Dict
from typing import Any, Dict, Union


class ControlMessageTask(BaseModel):
model_config = ConfigDict(extra="forbid")

name: str
id: str
type: str
id: Union[str, UUID]
properties: Dict[str, Any] = Field(default_factory=dict)
53 changes: 50 additions & 3 deletions api/src/nv_ingest_api/primitives/ingest_control_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,46 @@
logger = logging.getLogger(__name__)


def remove_task_by_type(ctrl_msg, task: str):
"""
Remove a task from the control message by matching its type.

This function iterates over the tasks in the control message, and if it finds a task
whose type matches the provided task string, it removes that task (using its unique id)
and returns the task's properties.

Parameters
----------
ctrl_msg : IngestControlMessage
The control message from which to remove the task.
task : str
The task type to remove.

Returns
-------
dict
The properties of the removed task.

Raises
------
ValueError
If no task with the given type is found.
"""
task_obj = None
for t in ctrl_msg.get_tasks():
if t.type == task:
task_obj = t
break

if task_obj is None:
err_msg = f"process_control_message: Task '{task}' not found in control message."
logger.error(err_msg)
raise ValueError(err_msg)

removed_task = ctrl_msg.remove_task(task_obj.id)
return removed_task.properties


class IngestControlMessage:
"""
A control message class for ingesting tasks and managing associated metadata,
Expand Down Expand Up @@ -53,14 +93,18 @@ def has_task(self, task_id: str) -> bool:
"""
return task_id in self._tasks

def remove_task(self, task_id: str) -> None:
def remove_task(self, task_id: str) -> ControlMessageTask:
"""
Remove a task from the control message. Logs a warning if the task does not exist.
"""
if task_id in self._tasks:
_task = self._tasks[task_id]

del self._tasks[task_id]

return _task
else:
logger.warning(f"Attempted to remove non-existent task with id: {task_id}")
raise RuntimeError(f"Attempted to remove non-existent task with id: {task_id}")

def config(self, config: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Expand Down Expand Up @@ -154,7 +198,8 @@ def filter_timestamp(self, regex_filter: str) -> Dict[str, datetime]:
Retrieve timestamps whose keys match the regex filter.
"""
pattern = re.compile(regex_filter)
return {key: ts for key, ts in self._timestamps.items() if pattern.search(key)}
timestamps_snapshot = self._timestamps.copy()
return {key: ts for key, ts in timestamps_snapshot.items() if pattern.search(key)}

def get_timestamp(self, key: str, fail_if_nonexist: bool = False) -> datetime:
"""
Expand Down Expand Up @@ -188,12 +233,14 @@ def set_timestamp(self, key: str, timestamp: Any) -> None:
"""
if isinstance(timestamp, datetime):
self._timestamps[key] = timestamp

elif isinstance(timestamp, str):
try:
dt = datetime.fromisoformat(timestamp)
self._timestamps[key] = dt
except ValueError as e:
raise ValueError(f"Invalid timestamp format: {timestamp}") from e

else:
raise ValueError("timestamp must be a datetime object or ISO format string")

Expand Down
13 changes: 0 additions & 13 deletions client/src/nv_ingest_client/cli/util/click.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,13 @@
from nv_ingest_client.primitives.tasks import StoreTask
from nv_ingest_client.primitives.tasks import VdbUploadTask
from nv_ingest_client.primitives.tasks.caption import CaptionTaskSchema
from nv_ingest_client.primitives.tasks.chart_extraction import ChartExtractionSchema
from nv_ingest_client.primitives.tasks.chart_extraction import ChartExtractionTask
from nv_ingest_client.primitives.tasks.dedup import DedupTaskSchema
from nv_ingest_client.primitives.tasks.embed import EmbedTaskSchema
from nv_ingest_client.primitives.tasks.extract import ExtractTaskSchema
from nv_ingest_client.primitives.tasks.filter import FilterTaskSchema
from nv_ingest_client.primitives.tasks.split import SplitTaskSchema
from nv_ingest_client.primitives.tasks.store import StoreEmbedTaskSchema
from nv_ingest_client.primitives.tasks.store import StoreTaskSchema
from nv_ingest_client.primitives.tasks.table_extraction import TableExtractionSchema
from nv_ingest_client.primitives.tasks.table_extraction import TableExtractionTask
from nv_ingest_client.primitives.tasks.vdb_upload import VdbUploadTaskSchema
from nv_ingest_client.util.util import generate_matching_files

Expand Down Expand Up @@ -115,15 +111,6 @@ def click_validate_task(ctx, param, value):
task_options = check_schema(ExtractTaskSchema, options, task_id, json_options)
new_task_id = f"{task_id}_{task_options.document_type}"
new_task = [(new_task_id, ExtractTask(**task_options.model_dump()))]

if task_options.extract_tables is True:
subtask_options = check_schema(TableExtractionSchema, {}, "table_data_extract", "{}")
new_task.append(("table_data_extract", TableExtractionTask(**subtask_options.model_dump())))

if task_options.extract_charts is True:
subtask_options = check_schema(ChartExtractionSchema, {}, "chart_data_extract", "{}")
new_task.append(("chart_data_extract", ChartExtractionTask(**subtask_options.model_dump())))

elif task_id == "store":
task_options = check_schema(StoreTaskSchema, options, task_id, json_options)
new_task_id = f"{task_id}"
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ services:
reservations:
devices:
- driver: nvidia
device_ids: ["1"]
device_ids: ["0"]
capabilities: [gpu]
runtime: nvidia
profiles:
Expand Down Expand Up @@ -165,7 +165,7 @@ services:
cap_add:
- sys_nice
environment:
- CUDA_VISIBLE_DEVICES=0
- CUDA_VISIBLE_DEVICES=-1
- EMBEDDING_NIM_MODEL_NAME=${EMBEDDING_NIM_MODEL_NAME:-nvidia/llama-3.2-nv-embedqa-1b-v2}
- INGEST_LOG_LEVEL=DEFAULT
# Message client for development
Expand Down
5 changes: 3 additions & 2 deletions src/microservice_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import json

from morpheus.config import Config
from morpheus.config import Config, ExecutionMode
from morpheus.config import CppConfig
from morpheus.config import PipelineModes
from morpheus.utils.logger import configure_logging
Expand Down Expand Up @@ -88,9 +88,10 @@ def cli(
logging.basicConfig(level=log_level, format="%(asctime)s - %(levelname)s - %(message)s")
configure_logging(log_level=log_level)

CppConfig.set_should_use_cpp(use_cpp)
CppConfig.set_should_use_cpp(False)

morpheus_pipeline_config = Config()
morpheus_pipeline_config.execution_mode = ExecutionMode.CPU
morpheus_pipeline_config.debug = True if log_level == "DEBUG" else False
morpheus_pipeline_config.log_level = log_level
morpheus_pipeline_config.pipeline_batch_size = pipeline_batch_size
Expand Down
2 changes: 0 additions & 2 deletions src/nv_ingest/modules/extractors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

from .docx_extractor import DocxExtractorLoaderFactory
from .pdf_extractor import PDFExtractorLoaderFactory

__all__ = [
"PDFExtractorLoaderFactory",
"DocxExtractorLoaderFactory",
]
Loading
Loading