Skip to content

Commit

Permalink
Fix vdb_upload runtime error (#1643)
Browse files Browse the repository at this point in the history
* Add `ControlMessage` to the `accepted_types` for `InferenceStage` when in Python mode
* fix import of `CppTensorMemory`
* Set default value of `['rss']` for `--source_type` avoids issue where command line flag values are ignored.
* Fix bug in overrides of `config` fixture which prevented parameterization on the `use_cpp` fixture.
* fix type-o in config value `stop_after_rec` not `stop_after_sec`
* Ensure a default int value for `stop_after_rec` to avoid schema validation error
* Revert the default value for `--vector_db_resource_name` back to 'RSS', allowing the output of running this example to be used as the input for the RAG pipeline

Closes #1642
Closes #1645

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - David Gardner (https://github.com/dagardner-nv)
  - Eli Fajardo (https://github.com/efajardo-nv)

Approvers:
  - Yuchen Zhang (https://github.com/yuchenz427)
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1643
  • Loading branch information
dagardner-nv authored Apr 22, 2024
1 parent 883b804 commit 31d963a
Show file tree
Hide file tree
Showing 18 changed files with 37 additions and 33 deletions.
9 changes: 4 additions & 5 deletions examples/llm/rag/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,14 @@ pipeline option of `rag`:
```bash
export NGC_API_KEY=[YOUR_KEY_HERE]
NGC_API_KEY=${NGC_API_KEY} python examples/llm/main.py rag pipeline
python examples/llm/main.py rag pipeline
```

**Using OpenAI LLM models**

```bash
export OPENAI_API_KEY=[YOUR_KEY_HERE]
OPENAI_API_KEY=${OPENAI_API_KEY} python examples/llm/main.py rag pipeline
python examples/llm/main.py rag pipeline --llm_service=OpenAI --model_name=gpt-3.5-turbo
```

### Run example (Persistent Pipeline):
Expand All @@ -232,14 +232,14 @@ OPENAI_API_KEY=${OPENAI_API_KEY} python examples/llm/main.py rag pipeline

```bash
export NGC_API_KEY=[YOUR_KEY_HERE]
python examples/llm/main.py rag persistent
python examples/llm/main.py rag persistent
```

**Using OpenAI LLM models**

```bash
export OPENAI_API_KEY=[YOUR_KEY_HERE]
python examples/llm/main.py rag persistent
python examples/llm/main.py rag persistent
```

### Options:
Expand Down Expand Up @@ -273,4 +273,3 @@ The `rag` command has its own set of options and commands:

- `persistant`
- `pipeline`

4 changes: 2 additions & 2 deletions examples/llm/vdb_upload/module/rss_source_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class RSSSourcePipeSchema(BaseModel):
output_batch_size: int = 2048
request_timeout_sec: float = 2.0
run_indefinitely: bool = True
stop_after_sec: int = 0
stop_after_rec: int = 0
vdb_resource_name: str
web_scraper_config: Optional[Dict[Any, Any]] = None

Expand Down Expand Up @@ -130,7 +130,7 @@ def _rss_source_pipe(builder: mrc.Builder):
"cooldown_interval_sec": validated_config.cooldown_interval_sec,
"request_timeout_sec": validated_config.request_timeout_sec,
"interval_sec": validated_config.interval_sec,
"stop_after_sec": validated_config.stop_after_sec,
"stop_after_rec": validated_config.stop_after_rec,
}
rss_source_loader = RSSSourceLoaderFactory.get_instance("rss_source", {"rss_source": rss_source_config})

Expand Down
4 changes: 2 additions & 2 deletions examples/llm/vdb_upload/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def run():
@click.option("--source_type",
multiple=True,
type=click.Choice(['rss', 'filesystem'], case_sensitive=False),
default=[],
default=['rss'],
show_default=True,
help="The type of source to use. Can specify multiple times for different source types.")
@click.option(
Expand All @@ -128,7 +128,7 @@ def run():
@click.option(
"--vector_db_resource_name",
type=str,
default="VDBUploadExample",
default="RSS",
help="The identifier of the resource on which operations are to be performed in the vector database.",
)
@click.option(
Expand Down
4 changes: 2 additions & 2 deletions examples/llm/vdb_upload/vdb_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ vdb_pipeline:
output_batch_size: 2048 # Number of chunked documents per output batch
request_timeout_sec: 2.0
run_indefinitely: true
stop_after_sec: 0
stop_after_rec: 0
web_scraper_config:
chunk_overlap: 51
chunk_size: 512
Expand Down Expand Up @@ -300,4 +300,4 @@ vdb_pipeline:
dtype: FLOAT_VECTOR
description: Embedding vectors representing the data entry
dim: 384 # Size of the embeddings to store in the vector database
description: Collection schema for diverse data sources
description: Collection schema for diverse data sources
4 changes: 2 additions & 2 deletions examples/llm/vdb_upload/vdb_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def _build_default_rss_source(enable_cache,
"output_batch_size": 2048,
"cache_dir": "./.cache/http",
"cooldown_interval_sec": interval_secs,
"stop_after_sec": stop_after,
"stop_after_rec": stop_after or 0,
"enable_cache": enable_cache,
"enable_monitor": enable_monitors,
"feed_input": feed_inputs if feed_inputs else build_rss_urls(),
Expand Down Expand Up @@ -448,7 +448,7 @@ def build_final_config(vdb_conf_path,
interval_secs=60,
run_indefinitely=True,
stop_after=None,
vector_db_resource_name="VDBUploadExample",
vector_db_resource_name="RSS",
content_chunking_size=128,
rss_request_timeout_sec=30,
feed_inputs=build_rss_urls()))
Expand Down
2 changes: 1 addition & 1 deletion morpheus/modules/input/rss_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def fetch_feeds() -> MessageMeta:

records_emitted += df_size

if (0 < validated_config.stop_after_sec <= records_emitted):
if (0 < validated_config.stop_after_rec <= records_emitted):
stop_requested = True
logger.info("Stop limit reached... preparing to halt the source.")
break
Expand Down
2 changes: 1 addition & 1 deletion morpheus/modules/schemas/rss_source_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class RSSSourceSchema(BaseModel):
cooldown_interval_sec: int = 600
request_timeout_sec: float = 2.0
interval_sec: int = 600
stop_after_sec: int = 0
stop_after_rec: int = 0

class Config:
extra = "forbid"
5 changes: 4 additions & 1 deletion morpheus/stages/inference/inference_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,10 @@ def accepted_types(self) -> typing.Tuple:
typing.Tuple
Tuple of input types.
"""
return (MultiInferenceMessage, )
if (self._build_cpp_node()):
return (MultiInferenceMessage, )

return (MultiInferenceMessage, ControlMessage)

def compute_schema(self, schema: StageSchema):
schema.output_schema.set_type(MultiResponseMessage)
Expand Down
2 changes: 1 addition & 1 deletion morpheus/stages/input/rss_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def __init__(self,
"rss_source": {
"feed_input": feed_input,
"interval_sec": interval_secs,
"stop_after_sec": stop_after,
"stop_after_rec": stop_after,
"run_indefinitely": run_indefinitely,
"batch_size": batch_size,
"enable_cache": enable_cache,
Expand Down
5 changes: 3 additions & 2 deletions morpheus/stages/preprocess/preprocess_fil_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import cudf

import morpheus._lib.messages as _messages
import morpheus._lib.stages as _stages
from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
Expand All @@ -32,7 +33,6 @@
from morpheus.messages import MultiInferenceFILMessage
from morpheus.messages import MultiInferenceMessage
from morpheus.messages import MultiMessage
from morpheus.messages import TensorMemory as CppTensorMemory
from morpheus.stages.preprocess.preprocess_base_stage import PreprocessBaseStage

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -123,7 +123,8 @@ def process_control_message(x: ControlMessage, fea_len: int, fea_cols: typing.Li
seg_ids[:, 0] = cp.arange(0, count, dtype=cp.uint32)
seg_ids[:, 2] = fea_len - 1

x.tensors(CppTensorMemory(count=count, tensors={"input__0": data, "seq_ids": seg_ids}))
# We need the C++ impl of TensorMemory until #1646 is resolved
x.tensors(_messages.TensorMemory(count=count, tensors={"input__0": data, "seq_ids": seg_ids}))
return x

@staticmethod
Expand Down
15 changes: 8 additions & 7 deletions morpheus/stages/preprocess/preprocess_nlp_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import cudf

import morpheus._lib.messages as _messages
import morpheus._lib.stages as _stages
from morpheus.cli.register_stage import register_stage
from morpheus.cli.utils import MorpheusRelativePath
Expand All @@ -35,7 +36,6 @@
from morpheus.messages import MultiInferenceMessage
from morpheus.messages import MultiInferenceNLPMessage
from morpheus.messages import MultiMessage
from morpheus.messages import TensorMemory as CppTensorMemory
from morpheus.stages.preprocess.preprocess_base_stage import PreprocessBaseStage
from morpheus.utils.cudf_subword_helper import tokenize_text_series

Expand Down Expand Up @@ -204,13 +204,14 @@ def process_control_message(message: ControlMessage,

del text_series

# We need the C++ impl of TensorMemory until #1646 is resolved
message.tensors(
CppTensorMemory(count=tokenized.input_ids.shape[0],
tensors={
"input_ids": tokenized.input_ids,
"input_mask": tokenized.input_mask,
"seq_ids": tokenized.segment_ids
}))
_messages.TensorMemory(count=tokenized.input_ids.shape[0],
tensors={
"input_ids": tokenized.input_ids,
"input_mask": tokenized.input_mask,
"seq_ids": tokenized.segment_ids
}))

message.set_metadata("inference_memory_params", {"inference_type": "nlp"})
return message
Expand Down
2 changes: 1 addition & 1 deletion tests/examples/gnn_fraud_detection_pipeline/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def cuml_fixture(fail_missing: bool):


@pytest.fixture(name="config")
def config_fixture(config):
def config_fixture(config, use_cpp: bool): # pylint: disable=unused-argument
"""
The GNN fraud detection pipeline utilizes the "other" pipeline mode.
"""
Expand Down
2 changes: 1 addition & 1 deletion tests/examples/log_parsing/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@


@pytest.fixture(name="config")
def config_fixture(config):
def config_fixture(config, use_cpp: bool): # pylint: disable=unused-argument
"""
The log_parsing pipelie requires NLP mode. Set this here so all the tests don't need to set it themselves.
"""
Expand Down
2 changes: 1 addition & 1 deletion tests/examples/ransomware_detection/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def dask_distributed(fail_missing: bool):


@pytest.fixture(name="config")
def config_fixture(config):
def config_fixture(config, use_cpp: bool): # pylint: disable=unused-argument
"""
The ransomware detection pipeline utilizes the FIL pipeline mode.
"""
Expand Down
2 changes: 1 addition & 1 deletion tests/stages/test_preprocess_fil_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@


@pytest.fixture(name='config')
def fixture_config(config: Config):
def fixture_config(config: Config, use_cpp: bool): # pylint: disable=unused-argument
config.feature_length = 1
config.fil = ConfigFIL()
config.fil.feature_columns = ["data"]
Expand Down
2 changes: 1 addition & 1 deletion tests/stages/test_preprocess_nlp_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@


@pytest.fixture(name='config')
def fixture_config(config: Config):
def fixture_config(config: Config, use_cpp: bool): # pylint: disable=unused-argument
config.class_labels = [
"address",
"bank_acct",
Expand Down
2 changes: 1 addition & 1 deletion tests/test_add_classifications_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@


@pytest.fixture(name="config")
def config_fixture(config: Config):
def config_fixture(config: Config, use_cpp: bool): # pylint: disable=unused-argument
config.class_labels = ['frogs', 'lizards', 'toads']
yield config

Expand Down
2 changes: 1 addition & 1 deletion tests/test_add_scores_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@


@pytest.fixture(name='config')
def fixture_config(config: Config):
def fixture_config(config: Config, use_cpp: bool): # pylint: disable=unused-argument
config.class_labels = ['frogs', 'lizards', 'toads']
config.feature_length = 12
yield config
Expand Down

0 comments on commit 31d963a

Please sign in to comment.