diff --git a/examples/log_parsing/postprocessing.py b/examples/log_parsing/postprocessing.py index 72b1e34fc6..830ff9f7ef 100644 --- a/examples/log_parsing/postprocessing.py +++ b/examples/log_parsing/postprocessing.py @@ -13,6 +13,7 @@ # limitations under the License. import json +import logging import pathlib import typing from collections import defaultdict @@ -31,6 +32,8 @@ from morpheus.pipeline.single_port_stage import SinglePortStage from morpheus.pipeline.stage_schema import StageSchema +logger = logging.getLogger(f"morpheus.{__name__}") + @register_stage("log-postprocess", modes=[PipelineModes.NLP]) class LogParsingPostProcessingStage(SinglePortStage): @@ -117,17 +120,25 @@ def _postprocess(self, x: MultiResponseMessage): def __get_label_dicts(self, row): token_dict = defaultdict(str) confidence_dict = defaultdict(list) + new_label = None + new_confidence = None for label, confidence, token_id in zip(row["labels"], row["confidences"], row["token_ids"]): text_token = self._vocab_lookup[token_id] if text_token[:2] != "##" and text_token[0] != '.': # if not a subword use the current label, else use previous new_label = label new_confidence = confidence - if self._label_map[new_label] in token_dict: - token_dict[self._label_map[new_label]] = (token_dict[self._label_map[new_label]] + " " + text_token) + + if new_label is not None and new_confidence is not None: + if self._label_map[new_label] in token_dict: + token_dict[self._label_map[new_label]] = (token_dict[self._label_map[new_label]] + " " + text_token) + else: + token_dict[self._label_map[new_label]] = text_token + + confidence_dict[self._label_map[label]].append(new_confidence) else: - token_dict[self._label_map[new_label]] = text_token - confidence_dict[self._label_map[label]].append(new_confidence) + logger.warning("Ignoring unexecpected subword token: %s", text_token) + return token_dict, confidence_dict def __decode_cleanup(self, df): diff --git a/morpheus/utils/cudf_subword_helper.py b/morpheus/utils/cudf_subword_helper.py index 550100a054..c3af4f010d 100644 --- a/morpheus/utils/cudf_subword_helper.py +++ b/morpheus/utils/cudf_subword_helper.py @@ -150,6 +150,13 @@ def tokenize_text_series(vocab_hash_file: str, max_rows_tensor = len(text_ser) * 2 max_length = seq_len + # Preflight check to ensure that the input strings are not too long + if not truncation: + max_value_length = text_ser.str.len().max() + if max_value_length > max_length: + raise ValueError( + f"Input strings are too long ({max_value_length}) to be tokenized without truncation seq_len={seq_len}") + # Call the tokenizer tokenizer_output = tokenizer(text_ser, max_length=max_length, diff --git a/tests/conftest.py b/tests/conftest.py index 732dee996c..19b92df1b1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1044,6 +1044,18 @@ def string_collection_config_fixture(): yield load_json_file(filename="service/milvus_string_collection_conf.json") +@pytest.fixture(scope="session", name="bert_cased_hash") +def bert_cased_hash_fixture(): + from _utils import TEST_DIRS + yield os.path.join(TEST_DIRS.data_dir, 'bert-base-cased-hash.txt') + + +@pytest.fixture(scope="session", name="bert_cased_vocab") +def bert_cased_vocab_fixture(): + from _utils import TEST_DIRS + yield os.path.join(TEST_DIRS.data_dir, 'bert-base-cased-vocab.txt') + + @pytest.fixture(name="nemollm", scope='session') def nemollm_fixture(fail_missing: bool): """ diff --git a/tests/examples/log_parsing/test_log_parsing_pipe.py b/tests/examples/log_parsing/test_log_parsing_pipe.py index 4aac2d439b..0658dc879d 100755 --- a/tests/examples/log_parsing/test_log_parsing_pipe.py +++ b/tests/examples/log_parsing/test_log_parsing_pipe.py @@ -36,7 +36,11 @@ MODEL_MAX_BATCH_SIZE = 32 -def _run_pipeline(config: Config, dataset_cudf: DatasetManager, import_mod: typing.List[types.ModuleType]): +def _run_pipeline(config: Config, + dataset_cudf: DatasetManager, + import_mod: typing.List[types.ModuleType], + bert_cased_hash: str, + bert_cased_vocab: str): """ Runs just the Log Parsing Pipeline """ @@ -47,8 +51,6 @@ def _run_pipeline(config: Config, dataset_cudf: DatasetManager, import_mod: typi config.model_max_batch_size = MODEL_MAX_BATCH_SIZE config.feature_length = FEATURE_LENGTH - model_vocab_file = os.path.join(TEST_DIRS.data_dir, 'bert-base-cased-vocab.txt') - vocab_hash_file_name = os.path.join(TEST_DIRS.data_dir, 'bert-base-cased-hash.txt') log_test_data_dir = os.path.join(TEST_DIRS.tests_data_dir, 'examples/log_parsing') # Not actually the real model config, just the subset that LogParsingPostProcessingStage uses @@ -67,7 +69,7 @@ def _run_pipeline(config: Config, dataset_cudf: DatasetManager, import_mod: typi pipe.add_stage(DeserializeStage(config)) pipe.add_stage( PreprocessNLPStage(config, - vocab_hash_file=vocab_hash_file_name, + vocab_hash_file=bert_cased_hash, truncation=False, do_lower_case=False, stride=64, @@ -80,7 +82,7 @@ def _run_pipeline(config: Config, dataset_cudf: DatasetManager, import_mod: typi force_convert_inputs=True)) pipe.add_stage( postprocessing_mod.LogParsingPostProcessingStage(config, - vocab_path=model_vocab_file, + vocab_path=bert_cased_vocab, model_config_path=model_config_file)) comp_stage = pipe.add_stage(CompareDataFrameStage(config, expected_df)) @@ -90,7 +92,11 @@ def _run_pipeline(config: Config, dataset_cudf: DatasetManager, import_mod: typi assert_results(comp_stage.get_results()) -def _run_mocked_pipeline(config: Config, dataset_cudf: DatasetManager, import_mod: typing.List[types.ModuleType]): +def _run_mocked_pipeline(config: Config, + dataset_cudf: DatasetManager, + import_mod: typing.List[types.ModuleType], + bert_cased_hash: str, + bert_cased_vocab: str): """ Runs the minibert pipeline and mocks the Triton Python interface """ @@ -122,7 +128,11 @@ def _run_mocked_pipeline(config: Config, dataset_cudf: DatasetManager, import_mo async_infer = mk_async_infer(inf_results) mock_triton_client.async_infer.side_effect = async_infer - return _run_pipeline(config, dataset_cudf, import_mod) + return _run_pipeline(config, + dataset_cudf, + import_mod, + bert_cased_hash=bert_cased_hash, + bert_cased_vocab=bert_cased_vocab) @pytest.mark.slow @@ -130,5 +140,13 @@ def _run_mocked_pipeline(config: Config, dataset_cudf: DatasetManager, import_mo os.path.join(TEST_DIRS.examples_dir, 'log_parsing', 'inference.py'), os.path.join(TEST_DIRS.examples_dir, 'log_parsing', 'postprocessing.py') ]) -def test_pipe(config: Config, dataset_cudf: DatasetManager, import_mod: typing.List[types.ModuleType]): - _run_mocked_pipeline(config, dataset_cudf, import_mod) +def test_pipe(config: Config, + dataset_cudf: DatasetManager, + import_mod: typing.List[types.ModuleType], + bert_cased_hash: str, + bert_cased_vocab: str): + _run_mocked_pipeline(config, + dataset_cudf, + import_mod, + bert_cased_hash=bert_cased_hash, + bert_cased_vocab=bert_cased_vocab) diff --git a/tests/examples/log_parsing/test_postprocessing.py b/tests/examples/log_parsing/test_postprocessing.py index c32ec5623d..c76c7dcab4 100644 --- a/tests/examples/log_parsing/test_postprocessing.py +++ b/tests/examples/log_parsing/test_postprocessing.py @@ -13,7 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import os +import re import types import typing @@ -29,6 +31,11 @@ from morpheus.messages import TensorMemory +@pytest.fixture(scope='module', name="model_config_file") +def fixture_model_config_file(): + return os.path.join(TEST_DIRS.tests_data_dir, 'examples/log_parsing/log-parsing-config.json') + + def build_post_proc_message(dataset_cudf: DatasetManager, log_test_data_dir: str): input_file = os.path.join(TEST_DIRS.validation_data_dir, 'log-parsing-validation-data-input.csv') input_df = dataset_cudf[input_file] @@ -55,15 +62,15 @@ def build_post_proc_message(dataset_cudf: DatasetManager, log_test_data_dir: str @pytest.mark.import_mod(os.path.join(TEST_DIRS.examples_dir, 'log_parsing', 'postprocessing.py')) def test_log_parsing_post_processing_stage(config: Config, dataset_cudf: DatasetManager, - import_mod: typing.List[types.ModuleType]): + import_mod: typing.List[types.ModuleType], + bert_cased_vocab: str, + model_config_file: str): postprocessing_mod = import_mod - model_vocab_file = os.path.join(TEST_DIRS.data_dir, 'bert-base-cased-vocab.txt') log_test_data_dir = os.path.join(TEST_DIRS.tests_data_dir, 'examples/log_parsing') - model_config_file = os.path.join(log_test_data_dir, 'log-parsing-config.json') stage = postprocessing_mod.LogParsingPostProcessingStage(config, - vocab_path=model_vocab_file, + vocab_path=bert_cased_vocab, model_config_path=model_config_file) post_proc_message = build_post_proc_message(dataset_cudf, log_test_data_dir) @@ -73,3 +80,40 @@ def test_log_parsing_post_processing_stage(config: Config, assert isinstance(out_meta, MessageMeta) DatasetManager.assert_compare_df(out_meta.df, expected_df) + + +@pytest.mark.import_mod(os.path.join(TEST_DIRS.examples_dir, 'log_parsing', 'postprocessing.py')) +def test_undefined_variable_error(caplog: pytest.LogCaptureFixture, + config: Config, + dataset_cudf: DatasetManager, + import_mod: typing.List[types.ModuleType], + bert_cased_vocab: str, + model_config_file: str): + """ + Test for undefined variable error, which occurrs when the first token_id is unexpected resulting in the `new_label` + and `new_confidence` variables being undefined. + """ + postprocessing_mod = import_mod + + log_test_data_dir = os.path.join(TEST_DIRS.tests_data_dir, 'examples/log_parsing') + + stage = postprocessing_mod.LogParsingPostProcessingStage(config, + vocab_path=bert_cased_vocab, + model_config_path=model_config_file) + + post_proc_message = build_post_proc_message(dataset_cudf, log_test_data_dir) + post_proc_message.get_tensor('input_ids')[0] = 27716.0 + + expected_log_re = re.compile(r"^Ignoring unexecpected subword token:.*") + + caplog.clear() + with caplog.at_level(logging.WARNING): + stage._postprocess(post_proc_message) + + logged_warning = False + for rec in caplog.records: + if rec.levelno == logging.WARNING and expected_log_re.match(rec.message) is not None: + logged_warning = True + break + + assert logged_warning, "Expected warning message not found in logs" diff --git a/tests/test_cli.py b/tests/test_cli.py index 93d11f1949..1f578e5990 100755 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -914,12 +914,11 @@ def test_pipeline_alias(self, config, callback_values): # pylint: disable=unuse assert config.mode == PipelineModes.NLP @pytest.mark.replace_callback('pipeline_nlp') - def test_pipeline_nlp_relative_paths(self, config, callback_values): + def test_pipeline_nlp_relative_paths(self, config, callback_values, bert_cased_hash: str): """ Ensure that the default paths in the nlp pipeline are valid when run from outside the morpheus repo """ - vocab_file_name = os.path.join(TEST_DIRS.data_dir, 'bert-base-cased-hash.txt') args = (GENERAL_ARGS + ['pipeline-nlp'] + FILE_SRC_ARGS + [ 'deserialize', 'preprocess', @@ -941,7 +940,7 @@ def test_pipeline_nlp_relative_paths(self, config, callback_values): # pylint: disable=unused-variable [file_source, deserialize, process_nlp, triton_inf, monitor, add_class, validation, serialize, to_file] = stages - assert process_nlp._vocab_hash_file == os.path.realpath(vocab_file_name) + assert process_nlp._vocab_hash_file == os.path.realpath(bert_cased_hash) @pytest.mark.replace_callback('pipeline_nlp') def test_pipeline_nlp_relative_path_precedence(self, config, callback_values, tmp_path): diff --git a/tests/utils/test_cudf_subword_helper.py b/tests/utils/test_cudf_subword_helper.py new file mode 100644 index 0000000000..3dc376a9a9 --- /dev/null +++ b/tests/utils/test_cudf_subword_helper.py @@ -0,0 +1,53 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import string + +import pytest + +import cudf + +from morpheus.utils.cudf_subword_helper import tokenize_text_series + + +@pytest.mark.parametrize("seq_length", [10, 256, 1024]) +@pytest.mark.parametrize("do_lower_case", [False, True]) +@pytest.mark.parametrize("add_special_tokens", [False, True]) +def test_needs_trunc_error(bert_cased_hash: str, seq_length: int, do_lower_case: bool, add_special_tokens: bool): + """ + Feeding the subword tokenizer with a string that is too long should raise an error rather than + a duplicate in the id list + """ + + short_string = string.ascii_lowercase[0:seq_length - 1] + + long_string = list(string.ascii_lowercase) + while len(long_string) <= seq_length: + long_string.extend(string.ascii_lowercase) + + long_string = "".join(long_string) + + series = cudf.Series([short_string, long_string]) + + # Infer the value of stride the same way that the PreprocessNLPStage does + stride = (seq_length // 2) + (seq_length // 4) + + with pytest.raises(ValueError): + tokenize_text_series(vocab_hash_file=bert_cased_hash, + do_lower_case=do_lower_case, + text_ser=series, + seq_len=seq_length, + truncation=False, + stride=stride, + add_special_tokens=add_special_tokens)