Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removing support for MultiMessage from stages #1803

Merged
Merged
Show file tree
Hide file tree
Changes from 92 commits
Commits
Show all changes
96 commits
Select commit Hold shift + click to select a range
ba506fa
Remove MultiMessage from AddScoresStage
yczhang-nv Jul 8, 2024
bb18d7e
Remove MultiMessage from AddScoresStage python impl & reformatting
yczhang-nv Jul 8, 2024
6500db8
fix test errors
yczhang-nv Jul 9, 2024
6402965
remove from preprocess_nlp cpp impl
yczhang-nv Jul 9, 2024
7a53076
remove from preprocess_nlp python impl
yczhang-nv Jul 9, 2024
f4f97bd
remove from preprocess_fil cpp impl
yczhang-nv Jul 9, 2024
9e8b2c5
remove from preprocess_fil python impl
yczhang-nv Jul 9, 2024
0bbadbc
remove from preprocess_ae impl
yczhang-nv Jul 9, 2024
a960468
remove from serialize stage impl
yczhang-nv Jul 9, 2024
5c45b62
remove from filter_detections impl
yczhang-nv Jul 9, 2024
6ea61b8
remove from filter_detections_controller
yczhang-nv Jul 10, 2024
81ccd6e
remove from generate_viz_frames
yczhang-nv Jul 10, 2024
fecd7e0
remove from mlflow_drift_stage
yczhang-nv Jul 10, 2024
4a572c9
remove from timeseries stage
yczhang-nv Jul 10, 2024
3e3ad14
remove from validation stage
yczhang-nv Jul 10, 2024
fa3d971
update deserialize stage
yczhang-nv Jul 10, 2024
30a76a2
fix some unit tests
yczhang-nv Jul 11, 2024
6629139
fix unit tests
yczhang-nv Jul 12, 2024
746530c
fix inference
yczhang-nv Jul 12, 2024
f0dbfc0
update fil stage
yczhang-nv Jul 15, 2024
46217dc
rollback to test triton_inference_stage
yczhang-nv Jul 15, 2024
72171fa
test cm for test_inference_stage
yczhang-nv Jul 15, 2024
3753707
passed test_triton_inference_stage
yczhang-nv Jul 17, 2024
b6509f0
fix
yczhang-nv Jul 17, 2024
bddaf5f
debugging test_dfp.py
yczhang-nv Jul 19, 2024
cc945c7
fix test_dfp.py
yczhang-nv Jul 22, 2024
b436176
fix test_phishing.py
yczhang-nv Jul 22, 2024
83e8367
Merge remote-tracking branch 'upstream/branch-24.10' into verify-and-…
yczhang-nv Jul 22, 2024
9ca9320
fix test
yczhang-nv Jul 22, 2024
55c75a6
remove some multimessage branches
yczhang-nv Jul 23, 2024
90ab421
fix ci
yczhang-nv Jul 23, 2024
94e4639
fix ci
yczhang-nv Jul 24, 2024
aa00fbb
fix naming
yczhang-nv Jul 25, 2024
e77a50a
fix ci
yczhang-nv Jul 25, 2024
d8b60d6
Merge branch 'branch-24.10' into complete-remove-multi-message
yczhang-nv Jul 25, 2024
4581346
fix CI
yczhang-nv Jul 26, 2024
8cd1a7d
fix CI
yczhang-nv Jul 26, 2024
66013af
fix CI
yczhang-nv Jul 26, 2024
554856c
test gpg
yczhang-nv Jul 29, 2024
dfff798
test gpg sign
yczhang-nv Jul 29, 2024
5940311
test gpg
yczhang-nv Jul 29, 2024
5b890d2
test gpg
yczhang-nv Jul 29, 2024
9977a6a
gix abp_pcap_detection
yczhang-nv Jul 29, 2024
9351310
Finalize CI
yczhang-nv Jul 29, 2024
10a01ff
rollback
yczhang-nv Jul 29, 2024
db17566
Merge remote-tracking branch 'upstream/branch-24.10' into complete-re…
yczhang-nv Aug 13, 2024
8d9ecf4
fix python checks
yczhang-nv Aug 13, 2024
fa816ff
fix typo
yczhang-nv Aug 13, 2024
5927b62
Merge remote-tracking branch 'upstream/branch-24.10' into complete-re…
yczhang-nv Aug 14, 2024
95725c8
fix ci
yczhang-nv Aug 14, 2024
c4b1cdf
Merge branch 'branch-24.10' into complete-remove-multi-message
yczhang-nv Aug 14, 2024
c596520
Merge branch 'branch-24.10' into complete-remove-multi-message
yczhang-nv Aug 15, 2024
24c3af0
Merge remote-tracking branch 'upstream/branch-24.10' into complete-re…
yczhang-nv Aug 27, 2024
d3e1b45
support casting TensorObject from Python to C++ for ControlMessage
yczhang-nv Aug 28, 2024
371f001
add overload to TensorObject
yczhang-nv Aug 28, 2024
ac94065
Update comment
yczhang-nv Aug 28, 2024
efd9937
Update comments
yczhang-nv Aug 28, 2024
dd0e0a0
Merge branch 'cast-python-tensor-memory-to-cpp-for-control-message' i…
yczhang-nv Aug 28, 2024
4218d74
fix comments
yczhang-nv Aug 29, 2024
08fb90e
fix comments
yczhang-nv Aug 29, 2024
41e9e2f
fic CI format
yczhang-nv Aug 29, 2024
b811206
fix format
yczhang-nv Aug 29, 2024
c2bf5d3
fix CI
yczhang-nv Aug 29, 2024
8033055
Fix CI
yczhang-nv Aug 29, 2024
f4be468
fix CI
yczhang-nv Aug 29, 2024
809bbb5
revert changes that break the build
yczhang-nv Aug 29, 2024
9c03ee9
fix CI
yczhang-nv Aug 29, 2024
66922a7
fix CI
yczhang-nv Aug 29, 2024
f5c16b0
Merge remote-tracking branch 'upstream/branch-24.10' into cast-python…
yczhang-nv Sep 6, 2024
dddd3c8
fix CI
yczhang-nv Sep 6, 2024
c4a7095
Merge remote-tracking branch 'upstream/branch-24.10' into complete-re…
yczhang-nv Sep 6, 2024
9f30383
revert format
yczhang-nv Sep 6, 2024
38e8f9c
Merge remote-tracking branch 'origin/cast-python-tensor-memory-to-cpp…
yczhang-nv Sep 6, 2024
32ec933
fix CI
yczhang-nv Sep 6, 2024
f43f10a
try to minimize CI errors
yczhang-nv Sep 6, 2024
c74f51f
Update ransomware pipeline to use ControlMessage
yczhang-nv Sep 6, 2024
0876026
Merge branch 'branch-24.10' into complete-remove-multi-message
yczhang-nv Sep 6, 2024
9ed5e32
Merge branch 'complete-remove-multi-message' of github.com:yczhang-nv…
yczhang-nv Sep 6, 2024
ae33cab
remove comments
yczhang-nv Sep 6, 2024
84d179d
fix CI
yczhang-nv Sep 6, 2024
b33e128
fix format
yczhang-nv Sep 6, 2024
1776efc
Merge branch 'branch-24.10' into complete-remove-multi-message
yczhang-nv Sep 7, 2024
a848845
fix doc
yczhang-nv Sep 9, 2024
4115cc9
fix merge conflict
yczhang-nv Sep 9, 2024
8201199
Revert "Merge remote-tracking branch 'origin/cast-python-tensor-memor…
yczhang-nv Sep 9, 2024
0dc0947
Revert "fic CI format"
yczhang-nv Sep 9, 2024
6656224
Revert "support casting TensorObject from Python to C++ for ControlMe…
yczhang-nv Sep 9, 2024
21e3d2f
fix revert error
yczhang-nv Sep 9, 2024
2023469
fix CI
yczhang-nv Sep 9, 2024
adbc5ca
fix CI
yczhang-nv Sep 9, 2024
6d65a6d
TensorMemory
yczhang-nv Sep 10, 2024
7d8a64b
Cleanup during review
mdemoret-nv Sep 10, 2024
8b0833d
Fixing formatting on pyi files
mdemoret-nv Sep 10, 2024
9a975c1
fix header and formatting issue
yczhang-nv Sep 10, 2024
91474a3
fix docstring
yczhang-nv Sep 10, 2024
ecb6766
fix CI
yczhang-nv Sep 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions docs/source/developer_guide/guides/9_control_messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ msg_meta == retrieved_payload # True

### Conversion from `MultiMessage` to `ControlMessage`

Starting with version 24.06, the `MultiMessage` type will be deprecated, and all usage should transition to `ControlMessage`. Each `MultiMessage` functionality has a corresponding equivalent in `ControlMessage`, as illustrated below.
**The `MultiMessage` type was deprecated in 24.06 and has been completely removed in version 24.10.**

When upgrading to 24.10, all uses of `MultiMessage` need to be converted to `ControlMessage`. Each `MultiMessage` functionality has a corresponding equivalent in `ControlMessage`, as illustrated below.
```python
import cudf
from morpheus.messages import MultiMessage, ControlMessage
Expand All @@ -88,12 +90,17 @@ data = cudf.DataFrame()
msg_meta = MessageMeta(data)
yczhang-nv marked this conversation as resolved.
Show resolved Hide resolved
```

| **Functionality** | **MultiMessage** | **ControlMessage** |
| -------------------------------------------------------------- | ------------------------------------- | ------------------------------------------------------------------- |
| Initialization | `multi_msg = MultiMessage(msg_meta)` | `control_msg = ControlMessage()`<br>`control_msg.payload(msg_meta)` |
| Get columns from `cudf.DataFrame` | `multi_msg.get_meta(col_name)` | `control_msg.payload().get_data(col_name)` |
| Set columns values to `cudf.DataFrame` | `multi_msg.set_meta(col_name, value)` | `control_msg.payload().set_data(col_name, value)` |
| Get sliced `cudf.DataFrame` for given start and stop positions | `multi_msg.get_slice(start, stop)` | `control_msg.payload().get_slice(start, stop)` |
| Copy the `cudf.DataFrame` for given ranges of rows | `multi_msg.copy_ranges(ranges)` | `control_msg.payload().copy_ranges(ranges)` |
| **Functionality** | **MultiMessage** | **ControlMessage** |
| -------------------------------------------------------------- | ------------------------------------------ | ------------------------------------------------------------------- |
| Initialization | `multi_msg = MultiMessage(msg_meta)` | `control_msg = ControlMessage()`<br>`control_msg.payload(msg_meta)` |
| Get `cudf.DataFrame` | `multi_msg.get_meta()` | `control_msg.payload().get_data()` |
| Get columns from `cudf.DataFrame` | `multi_msg.get_meta(col_name)` | `control_msg.payload().get_data(col_name)` |
| Set columns values to `cudf.DataFrame` | `multi_msg.set_meta(col_name, value)` | `control_msg.payload().set_data(col_name, value)` |
| Get sliced `cudf.DataFrame` for given start and stop positions | `multi_msg.get_slice(start, stop)` | `control_msg.payload().get_slice(start, stop)` |
| Copy the `cudf.DataFrame` for given ranges of rows | `multi_msg.copy_ranges(ranges)` | `control_msg.payload().copy_ranges(ranges)` |
| | **MultiTensorMessage** | **ControlMessage** |
| Get the inference tensor `cupy.ndarray` | `multi_tensor_msg.tensor()` | `control_msg.tensors()` |
| Get a specific inference tensor | `multi_tensor_msg.get_tensor(tensor_name)` | `control_msg.tensors().get_tensor(tensor_name)` |


Note that the `get_slice()` and `copy_ranges()` functions in `ControlMessage` return the `MessageMeta` after slicing, whereas these functions in `MultiMessage` return a new `MultiMessage` instance.
Note that the `get_slice()` and `copy_ranges()` functions in `ControlMessage` return the `MessageMeta` after slicing, whereas these functions in `MultiMessage` return a new `MultiMessage` instance.
36 changes: 19 additions & 17 deletions examples/abp_pcap_detection/abp_pcap_preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@

import cudf

import morpheus._lib.messages as _messages
from morpheus.cli.register_stage import register_stage
from morpheus.common import TypeId
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.messages import InferenceMemoryFIL
from morpheus.messages import MultiInferenceFILMessage
from morpheus.messages import MultiInferenceMessage
from morpheus.messages import MultiMessage
from morpheus.messages import ControlMessage
from morpheus.stages.preprocess.preprocess_base_stage import PreprocessBaseStage


Expand Down Expand Up @@ -81,20 +79,22 @@ def supports_cpp_node(self):
return False

@staticmethod
def pre_process_batch(x: MultiMessage, fea_len: int, fea_cols: typing.List[str],
req_cols: typing.List[str]) -> MultiInferenceFILMessage:
def pre_process_batch(msg: ControlMessage, fea_len: int, fea_cols: typing.List[str],
req_cols: typing.List[str]) -> ControlMessage:
meta = msg.payload()
# Converts the int flags field into a binary string
flags_bin_series = x.get_meta("flags").to_pandas().apply(lambda x: format(int(x), "05b"))
flags_bin_series = meta.get_data("flags").to_pandas().apply(lambda x: format(int(x), "05b"))

# Expand binary string into an array
df = cudf.DataFrame(np.vstack(flags_bin_series.str.findall("[0-1]")).astype("int8"), index=x.get_meta().index)
df = cudf.DataFrame(np.vstack(flags_bin_series.str.findall("[0-1]")).astype("int8"),
index=meta.get_data().index)

# adding [ack, psh, rst, syn, fin] details from the binary flag
rename_cols_dct = {0: "ack", 1: "psh", 2: "rst", 3: "syn", 4: "fin"}
df = df.rename(columns=rename_cols_dct)

df["flags_bin"] = flags_bin_series
df["timestamp"] = x.get_meta("timestamp").astype("int64")
df["timestamp"] = meta.get_data("timestamp").astype("int64")

def round_time_kernel(timestamp, rollup_time, secs):
for i, time in enumerate(timestamp):
Expand All @@ -113,8 +113,8 @@ def round_time_kernel(timestamp, rollup_time, secs):
df["rollup_time"] = cudf.to_datetime(df["rollup_time"], unit="us").dt.strftime("%Y-%m-%d %H:%M")

# creating flow_id "src_ip:src_port=dst_ip:dst_port"
df["flow_id"] = (x.get_meta("src_ip") + ":" + x.get_meta("src_port").astype("str") + "=" +
x.get_meta("dest_ip") + ":" + x.get_meta("dest_port").astype("str"))
df["flow_id"] = (meta.get_data("src_ip") + ":" + meta.get_data("src_port").astype("str") + "=" +
meta.get_data("dest_ip") + ":" + meta.get_data("dest_port").astype("str"))
agg_dict = {
"ack": "sum",
"psh": "sum",
Expand All @@ -125,7 +125,7 @@ def round_time_kernel(timestamp, rollup_time, secs):
"flow_id": "count",
}

df["data_len"] = x.get_meta("data_len").astype("int16")
df["data_len"] = meta.get_data("data_len").astype("int16")

# group by operation
grouped_df = df.groupby(["rollup_time", "flow_id"]).agg(agg_dict)
Expand Down Expand Up @@ -175,22 +175,24 @@ def round_time_kernel(timestamp, rollup_time, secs):
count = data.shape[0]

for col in req_cols:
x.set_meta(col, merged_df[col])
meta.set_data(col, merged_df[col])

del merged_df

seq_ids = cp.zeros((count, 3), dtype=cp.uint32)
seq_ids[:, 0] = cp.arange(x.mess_offset, x.mess_offset + count, dtype=cp.uint32)
seq_ids[:, 0] = cp.arange(0, count, dtype=cp.uint32)
seq_ids[:, 2] = fea_len - 1

# Create the inference memory. Keep in mind count here could be > than input count
memory = InferenceMemoryFIL(count=count, input__0=data, seq_ids=seq_ids)
memory = _messages.InferenceMemoryFIL(count=count, input__0=data, seq_ids=seq_ids)
yczhang-nv marked this conversation as resolved.
Show resolved Hide resolved

infer_message = MultiInferenceFILMessage.from_message(x, memory=memory)
infer_message = ControlMessage(msg)
infer_message.payload(meta)
infer_message.tensors(memory)

return infer_message

def _get_preprocess_fn(self) -> typing.Callable[[MultiMessage], MultiInferenceMessage]:
def _get_preprocess_fn(self) -> typing.Callable[[ControlMessage], ControlMessage]:
return partial(AbpPcapPreprocessingStage.pre_process_batch,
fea_len=self._fea_length,
fea_cols=self.features,
Expand Down
4 changes: 1 addition & 3 deletions examples/llm/agents/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from langchain.llms.openai import OpenAI

from morpheus.config import Config
from morpheus.messages import ControlMessage
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
Expand Down Expand Up @@ -67,8 +66,7 @@ def build_common_pipeline(config: Config, pipe: LinearPipeline, task_payload: di
Construct the elements of the pipeline common to the simple and kafka agent pipelines.
This method should be called after the source stage has been set.
"""
pipe.add_stage(
DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=task_payload))
pipe.add_stage(DeserializeStage(config, task_type="llm_engine", task_payload=task_payload))

pipe.add_stage(MonitorStage(config, description="Source rate", unit='questions'))

Expand Down
4 changes: 1 addition & 3 deletions examples/llm/completion/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.io.deserializers import read_file_to_df
from morpheus.messages import ControlMessage
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
Expand Down Expand Up @@ -116,8 +115,7 @@ def pipeline(num_threads: int,

pipe.set_source(InMemorySourceStage(config, dataframes=[source_df], repeat=repeat_count))

pipe.add_stage(
DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=completion_task))
pipe.add_stage(DeserializeStage(config, task_type="llm_engine", task_payload=completion_task))

pipe.add_stage(MonitorStage(config, description="Source rate", unit='questions'))

Expand Down
4 changes: 1 addition & 3 deletions examples/llm/rag/standalone_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.messages import ControlMessage
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
Expand Down Expand Up @@ -114,8 +113,7 @@ def standalone(num_threads,

pipe.set_source(InMemorySourceStage(config, dataframes=source_dfs, repeat=repeat_count))

pipe.add_stage(
DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=completion_task))
pipe.add_stage(DeserializeStage(config, task_type="llm_engine", task_payload=completion_task))

pipe.add_stage(MonitorStage(config, description="Source rate", unit='questions'))

Expand Down
66 changes: 33 additions & 33 deletions examples/log_parsing/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@
import tritonclient.grpc as tritonclient
from scipy.special import softmax

import morpheus._lib.messages as _messages
from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.messages import MultiInferenceMessage
from morpheus.messages import MultiInferenceNLPMessage
from morpheus.messages import MultiResponseMessage
from morpheus.messages import ControlMessage
from morpheus.messages import TensorMemory
from morpheus.pipeline.stage_schema import StageSchema
from morpheus.stages.inference.triton_inference_stage import TritonInferenceStage
Expand Down Expand Up @@ -58,28 +57,27 @@ class TritonInferenceLogParsing(TritonInferenceWorker):
Determines whether a logits calculation is needed for the value returned by the Triton inference response.
"""

def build_output_message(self, x: MultiInferenceMessage) -> MultiResponseMessage:
seq_ids = cp.zeros((x.count, 3), dtype=cp.uint32)
seq_ids[:, 0] = cp.arange(x.mess_offset, x.mess_offset + x.count, dtype=cp.uint32)
seq_ids[:, 2] = x.get_tensor('seq_ids')[:, 2]
def build_output_message(self, msg: ControlMessage) -> ControlMessage:
seq_ids = cp.zeros((msg.tensors().count, 3), dtype=cp.uint32)
seq_ids[:, 0] = cp.arange(0, msg.tensors().count, dtype=cp.uint32)
seq_ids[:, 2] = msg.tensors().get_tensor('seq_ids')[:, 2]

memory = TensorMemory(
count=x.count,
memory = _messages.TensorMemory(
count=msg.tensors().count,
tensors={
yczhang-nv marked this conversation as resolved.
Show resolved Hide resolved
'confidences': cp.zeros((x.count, self._inputs[list(self._inputs.keys())[0]].shape[1])),
'labels': cp.zeros((x.count, self._inputs[list(self._inputs.keys())[0]].shape[1])),
'input_ids': cp.zeros((x.count, x.get_tensor('input_ids').shape[1])),
'confidences': cp.zeros((msg.tensors().count, self._inputs[list(self._inputs.keys())[0]].shape[1])),
'labels': cp.zeros((msg.tensors().count, self._inputs[list(self._inputs.keys())[0]].shape[1])),
'input_ids': cp.zeros((msg.tensors().count, msg.tensors().get_tensor('input_ids').shape[1])),
'seq_ids': seq_ids
})

return MultiResponseMessage(meta=x.meta,
mess_offset=x.mess_offset,
mess_count=x.mess_count,
memory=memory,
offset=0,
count=x.count)
resp = ControlMessage(msg)
resp.payload(msg.payload())
resp.tensors(memory)

def _build_response(self, batch: MultiInferenceMessage, result: tritonclient.InferResult) -> TensorMemory:
return resp

def _build_response(self, batch: ControlMessage, result: tritonclient.InferResult) -> TensorMemory:

outputs = {output.mapped_name: result.as_numpy(output.name) for output in self._outputs.values()}
outputs = {key: softmax(val, axis=2) for key, val in outputs.items()}
Expand Down Expand Up @@ -140,44 +138,46 @@ def supports_cpp_node(self) -> bool:
return False

def compute_schema(self, schema: StageSchema):
schema.output_schema.set_type(MultiResponseMessage)
schema.output_schema.set_type(ControlMessage)

@staticmethod
def _convert_one_response(output: MultiResponseMessage, inf: MultiInferenceNLPMessage,
res: TensorMemory) -> MultiResponseMessage:
memory = output.memory
def _convert_one_response(output: ControlMessage, inf: ControlMessage, res: TensorMemory) -> ControlMessage:
memory = output.tensors()

out_seq_ids = memory.get_tensor('seq_ids')
input_ids = memory.get_tensor('input_ids')
confidences = memory.get_tensor('confidences')
labels = memory.get_tensor('labels')

seq_ids = inf.get_id_tensor()
seq_ids = inf.tensors().get_tensor('seq_ids')

seq_offset = seq_ids[0, 0].item() - output.mess_offset
seq_count = (seq_ids[-1, 0].item() + 1 - seq_offset) - output.mess_offset
seq_offset = seq_ids[0, 0].item()
seq_count = seq_ids[-1, 0].item() + 1 - seq_offset

input_ids[inf.offset:inf.count + inf.offset, :] = inf.get_tensor('input_ids')
out_seq_ids[inf.offset:inf.count + inf.offset, :] = seq_ids
input_ids[0:inf.tensors().count, :] = inf.tensors().get_tensor('input_ids')
out_seq_ids[0:inf.tensors().count, :] = seq_ids

resp_confidences = res.get_tensor('confidences')
resp_labels = res.get_tensor('labels')

# Two scenarios:
if (inf.mess_count == inf.count):
if (inf.payload().count == inf.tensors().count):
assert seq_count == res.count
confidences[inf.offset:inf.offset + inf.count, :] = resp_confidences
labels[inf.offset:inf.offset + inf.count, :] = resp_labels
confidences[0:inf.tensors().count, :] = resp_confidences
labels[0:inf.tensors().count, :] = resp_labels
else:
assert inf.count == res.count
assert inf.tensors().count == res.count

mess_ids = seq_ids[:, 0].get().tolist()

for i, idx in enumerate(mess_ids):
confidences[idx, :] = cp.maximum(confidences[idx, :], resp_confidences[i, :])
labels[idx, :] = cp.maximum(labels[idx, :], resp_labels[i, :])

return MultiResponseMessage.from_message(inf, memory=memory, offset=inf.offset, count=inf.mess_count)
resp = ControlMessage(inf)
resp.payload(inf.payload())
resp.tensors(memory)
return resp

def _get_inference_worker(self, inf_queue: ProducerConsumerQueue) -> TritonInferenceLogParsing:
return TritonInferenceLogParsing(inf_queue=inf_queue,
Expand Down
15 changes: 7 additions & 8 deletions examples/log_parsing/postprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.messages import ControlMessage
from morpheus.messages import MessageMeta
from morpheus.messages import MultiResponseMessage
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stage_schema import StageSchema

Expand Down Expand Up @@ -77,18 +77,17 @@ def supports_cpp_node(self):
return False

def accepted_types(self) -> typing.Tuple:
return (MultiResponseMessage, )
return (ControlMessage, )

def compute_schema(self, schema: StageSchema):
schema.output_schema.set_type(MessageMeta)

def _postprocess(self, x: MultiResponseMessage):

infer_pdf = pd.DataFrame(x.get_tensor('seq_ids').get()).astype(int)
def _postprocess(self, msg: ControlMessage):
infer_pdf = pd.DataFrame(msg.tensors().get_tensor('seq_ids').get()).astype(int)
infer_pdf.columns = ["doc", "start", "stop"]
infer_pdf["confidences"] = x.get_tensor('confidences').tolist()
infer_pdf["labels"] = x.get_tensor('labels').tolist()
infer_pdf["token_ids"] = x.get_tensor('input_ids').tolist()
infer_pdf["confidences"] = msg.tensors().get_tensor('confidences').tolist()
infer_pdf["labels"] = msg.tensors().get_tensor('labels').tolist()
infer_pdf["token_ids"] = msg.tensors().get_tensor('input_ids').tolist()

infer_pdf["confidences"] = infer_pdf.apply(lambda row: row["confidences"][row["start"]:row["stop"]], axis=1)

Expand Down
Loading
Loading