Skip to content

Commit

Permalink
Fix log parsing undefined variable and duplicate sequence id errors (#…
Browse files Browse the repository at this point in the history
…1809)

* Fixes two related bugs which occur when running the `log_parsing` example with unexpected data.
* Fix error in `LogParsingPostProcessingStage::__get_label_dicts` where there is a possibility that the `new_label` and `new_confidence` variables are undefined.

* Fix error where the `tokenize_text_series` method can return a sequence_ids list with duplicate ids when a text series is passed in and one of the values has a length longer than `seq_len` and `truncation=False`, this prevents a down-stream error in the inference stage
```
RuntimeError: Inconsistent ID column. Last element in 'seq_ids' tensor, [1], must not extend beyond last message, [0]
```
Not sure if this is the best place to catch this error.



Closes #1765

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1809
  • Loading branch information
dagardner-nv committed Jul 25, 2024
1 parent 79ed546 commit a527ee7
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 20 deletions.
19 changes: 15 additions & 4 deletions examples/log_parsing/postprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import json
import logging
import pathlib
import typing
from collections import defaultdict
Expand All @@ -31,6 +32,8 @@
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stage_schema import StageSchema

logger = logging.getLogger(f"morpheus.{__name__}")


@register_stage("log-postprocess", modes=[PipelineModes.NLP])
class LogParsingPostProcessingStage(SinglePortStage):
Expand Down Expand Up @@ -117,17 +120,25 @@ def _postprocess(self, x: MultiResponseMessage):
def __get_label_dicts(self, row):
token_dict = defaultdict(str)
confidence_dict = defaultdict(list)
new_label = None
new_confidence = None
for label, confidence, token_id in zip(row["labels"], row["confidences"], row["token_ids"]):
text_token = self._vocab_lookup[token_id]
if text_token[:2] != "##" and text_token[0] != '.':
# if not a subword use the current label, else use previous
new_label = label
new_confidence = confidence
if self._label_map[new_label] in token_dict:
token_dict[self._label_map[new_label]] = (token_dict[self._label_map[new_label]] + " " + text_token)

if new_label is not None and new_confidence is not None:
if self._label_map[new_label] in token_dict:
token_dict[self._label_map[new_label]] = (token_dict[self._label_map[new_label]] + " " + text_token)
else:
token_dict[self._label_map[new_label]] = text_token

confidence_dict[self._label_map[label]].append(new_confidence)
else:
token_dict[self._label_map[new_label]] = text_token
confidence_dict[self._label_map[label]].append(new_confidence)
logger.warning("Ignoring unexecpected subword token: %s", text_token)

return token_dict, confidence_dict

def __decode_cleanup(self, df):
Expand Down
7 changes: 7 additions & 0 deletions morpheus/utils/cudf_subword_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ def tokenize_text_series(vocab_hash_file: str,
max_rows_tensor = len(text_ser) * 2
max_length = seq_len

# Preflight check to ensure that the input strings are not too long
if not truncation:
max_value_length = text_ser.str.len().max()
if max_value_length > max_length:
raise ValueError(
f"Input strings are too long ({max_value_length}) to be tokenized without truncation seq_len={seq_len}")

# Call the tokenizer
tokenizer_output = tokenizer(text_ser,
max_length=max_length,
Expand Down
12 changes: 12 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,18 @@ def string_collection_config_fixture():
yield load_json_file(filename="service/milvus_string_collection_conf.json")


@pytest.fixture(scope="session", name="bert_cased_hash")
def bert_cased_hash_fixture():
from _utils import TEST_DIRS
yield os.path.join(TEST_DIRS.data_dir, 'bert-base-cased-hash.txt')


@pytest.fixture(scope="session", name="bert_cased_vocab")
def bert_cased_vocab_fixture():
from _utils import TEST_DIRS
yield os.path.join(TEST_DIRS.data_dir, 'bert-base-cased-vocab.txt')


@pytest.fixture(name="nemollm", scope='session')
def nemollm_fixture(fail_missing: bool):
"""
Expand Down
36 changes: 27 additions & 9 deletions tests/examples/log_parsing/test_log_parsing_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@
MODEL_MAX_BATCH_SIZE = 32


def _run_pipeline(config: Config, dataset_cudf: DatasetManager, import_mod: typing.List[types.ModuleType]):
def _run_pipeline(config: Config,
dataset_cudf: DatasetManager,
import_mod: typing.List[types.ModuleType],
bert_cased_hash: str,
bert_cased_vocab: str):
"""
Runs just the Log Parsing Pipeline
"""
Expand All @@ -47,8 +51,6 @@ def _run_pipeline(config: Config, dataset_cudf: DatasetManager, import_mod: typi
config.model_max_batch_size = MODEL_MAX_BATCH_SIZE
config.feature_length = FEATURE_LENGTH

model_vocab_file = os.path.join(TEST_DIRS.data_dir, 'bert-base-cased-vocab.txt')
vocab_hash_file_name = os.path.join(TEST_DIRS.data_dir, 'bert-base-cased-hash.txt')
log_test_data_dir = os.path.join(TEST_DIRS.tests_data_dir, 'examples/log_parsing')

# Not actually the real model config, just the subset that LogParsingPostProcessingStage uses
Expand All @@ -67,7 +69,7 @@ def _run_pipeline(config: Config, dataset_cudf: DatasetManager, import_mod: typi
pipe.add_stage(DeserializeStage(config))
pipe.add_stage(
PreprocessNLPStage(config,
vocab_hash_file=vocab_hash_file_name,
vocab_hash_file=bert_cased_hash,
truncation=False,
do_lower_case=False,
stride=64,
Expand All @@ -80,7 +82,7 @@ def _run_pipeline(config: Config, dataset_cudf: DatasetManager, import_mod: typi
force_convert_inputs=True))
pipe.add_stage(
postprocessing_mod.LogParsingPostProcessingStage(config,
vocab_path=model_vocab_file,
vocab_path=bert_cased_vocab,
model_config_path=model_config_file))

comp_stage = pipe.add_stage(CompareDataFrameStage(config, expected_df))
Expand All @@ -90,7 +92,11 @@ def _run_pipeline(config: Config, dataset_cudf: DatasetManager, import_mod: typi
assert_results(comp_stage.get_results())


def _run_mocked_pipeline(config: Config, dataset_cudf: DatasetManager, import_mod: typing.List[types.ModuleType]):
def _run_mocked_pipeline(config: Config,
dataset_cudf: DatasetManager,
import_mod: typing.List[types.ModuleType],
bert_cased_hash: str,
bert_cased_vocab: str):
"""
Runs the minibert pipeline and mocks the Triton Python interface
"""
Expand Down Expand Up @@ -122,13 +128,25 @@ def _run_mocked_pipeline(config: Config, dataset_cudf: DatasetManager, import_mo
async_infer = mk_async_infer(inf_results)
mock_triton_client.async_infer.side_effect = async_infer

return _run_pipeline(config, dataset_cudf, import_mod)
return _run_pipeline(config,
dataset_cudf,
import_mod,
bert_cased_hash=bert_cased_hash,
bert_cased_vocab=bert_cased_vocab)


@pytest.mark.slow
@pytest.mark.import_mod([
os.path.join(TEST_DIRS.examples_dir, 'log_parsing', 'inference.py'),
os.path.join(TEST_DIRS.examples_dir, 'log_parsing', 'postprocessing.py')
])
def test_pipe(config: Config, dataset_cudf: DatasetManager, import_mod: typing.List[types.ModuleType]):
_run_mocked_pipeline(config, dataset_cudf, import_mod)
def test_pipe(config: Config,
dataset_cudf: DatasetManager,
import_mod: typing.List[types.ModuleType],
bert_cased_hash: str,
bert_cased_vocab: str):
_run_mocked_pipeline(config,
dataset_cudf,
import_mod,
bert_cased_hash=bert_cased_hash,
bert_cased_vocab=bert_cased_vocab)
52 changes: 48 additions & 4 deletions tests/examples/log_parsing/test_postprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import os
import re
import types
import typing

Expand All @@ -29,6 +31,11 @@
from morpheus.messages import TensorMemory


@pytest.fixture(scope='module', name="model_config_file")
def fixture_model_config_file():
return os.path.join(TEST_DIRS.tests_data_dir, 'examples/log_parsing/log-parsing-config.json')


def build_post_proc_message(dataset_cudf: DatasetManager, log_test_data_dir: str):
input_file = os.path.join(TEST_DIRS.validation_data_dir, 'log-parsing-validation-data-input.csv')
input_df = dataset_cudf[input_file]
Expand All @@ -55,15 +62,15 @@ def build_post_proc_message(dataset_cudf: DatasetManager, log_test_data_dir: str
@pytest.mark.import_mod(os.path.join(TEST_DIRS.examples_dir, 'log_parsing', 'postprocessing.py'))
def test_log_parsing_post_processing_stage(config: Config,
dataset_cudf: DatasetManager,
import_mod: typing.List[types.ModuleType]):
import_mod: typing.List[types.ModuleType],
bert_cased_vocab: str,
model_config_file: str):
postprocessing_mod = import_mod

model_vocab_file = os.path.join(TEST_DIRS.data_dir, 'bert-base-cased-vocab.txt')
log_test_data_dir = os.path.join(TEST_DIRS.tests_data_dir, 'examples/log_parsing')
model_config_file = os.path.join(log_test_data_dir, 'log-parsing-config.json')

stage = postprocessing_mod.LogParsingPostProcessingStage(config,
vocab_path=model_vocab_file,
vocab_path=bert_cased_vocab,
model_config_path=model_config_file)

post_proc_message = build_post_proc_message(dataset_cudf, log_test_data_dir)
Expand All @@ -73,3 +80,40 @@ def test_log_parsing_post_processing_stage(config: Config,

assert isinstance(out_meta, MessageMeta)
DatasetManager.assert_compare_df(out_meta.df, expected_df)


@pytest.mark.import_mod(os.path.join(TEST_DIRS.examples_dir, 'log_parsing', 'postprocessing.py'))
def test_undefined_variable_error(caplog: pytest.LogCaptureFixture,
config: Config,
dataset_cudf: DatasetManager,
import_mod: typing.List[types.ModuleType],
bert_cased_vocab: str,
model_config_file: str):
"""
Test for undefined variable error, which occurrs when the first token_id is unexpected resulting in the `new_label`
and `new_confidence` variables being undefined.
"""
postprocessing_mod = import_mod

log_test_data_dir = os.path.join(TEST_DIRS.tests_data_dir, 'examples/log_parsing')

stage = postprocessing_mod.LogParsingPostProcessingStage(config,
vocab_path=bert_cased_vocab,
model_config_path=model_config_file)

post_proc_message = build_post_proc_message(dataset_cudf, log_test_data_dir)
post_proc_message.get_tensor('input_ids')[0] = 27716.0

expected_log_re = re.compile(r"^Ignoring unexecpected subword token:.*")

caplog.clear()
with caplog.at_level(logging.WARNING):
stage._postprocess(post_proc_message)

logged_warning = False
for rec in caplog.records:
if rec.levelno == logging.WARNING and expected_log_re.match(rec.message) is not None:
logged_warning = True
break

assert logged_warning, "Expected warning message not found in logs"
5 changes: 2 additions & 3 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -914,12 +914,11 @@ def test_pipeline_alias(self, config, callback_values): # pylint: disable=unuse
assert config.mode == PipelineModes.NLP

@pytest.mark.replace_callback('pipeline_nlp')
def test_pipeline_nlp_relative_paths(self, config, callback_values):
def test_pipeline_nlp_relative_paths(self, config, callback_values, bert_cased_hash: str):
"""
Ensure that the default paths in the nlp pipeline are valid when run from outside the morpheus repo
"""

vocab_file_name = os.path.join(TEST_DIRS.data_dir, 'bert-base-cased-hash.txt')
args = (GENERAL_ARGS + ['pipeline-nlp'] + FILE_SRC_ARGS + [
'deserialize',
'preprocess',
Expand All @@ -941,7 +940,7 @@ def test_pipeline_nlp_relative_paths(self, config, callback_values):
# pylint: disable=unused-variable
[file_source, deserialize, process_nlp, triton_inf, monitor, add_class, validation, serialize, to_file] = stages

assert process_nlp._vocab_hash_file == os.path.realpath(vocab_file_name)
assert process_nlp._vocab_hash_file == os.path.realpath(bert_cased_hash)

@pytest.mark.replace_callback('pipeline_nlp')
def test_pipeline_nlp_relative_path_precedence(self, config, callback_values, tmp_path):
Expand Down
53 changes: 53 additions & 0 deletions tests/utils/test_cudf_subword_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import string

import pytest

import cudf

from morpheus.utils.cudf_subword_helper import tokenize_text_series


@pytest.mark.parametrize("seq_length", [10, 256, 1024])
@pytest.mark.parametrize("do_lower_case", [False, True])
@pytest.mark.parametrize("add_special_tokens", [False, True])
def test_needs_trunc_error(bert_cased_hash: str, seq_length: int, do_lower_case: bool, add_special_tokens: bool):
"""
Feeding the subword tokenizer with a string that is too long should raise an error rather than
a duplicate in the id list
"""

short_string = string.ascii_lowercase[0:seq_length - 1]

long_string = list(string.ascii_lowercase)
while len(long_string) <= seq_length:
long_string.extend(string.ascii_lowercase)

long_string = "".join(long_string)

series = cudf.Series([short_string, long_string])

# Infer the value of stride the same way that the PreprocessNLPStage does
stride = (seq_length // 2) + (seq_length // 4)

with pytest.raises(ValueError):
tokenize_text_series(vocab_hash_file=bert_cased_hash,
do_lower_case=do_lower_case,
text_ser=series,
seq_len=seq_length,
truncation=False,
stride=stride,
add_special_tokens=add_special_tokens)

0 comments on commit a527ee7

Please sign in to comment.