Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DeserializeStage to ensure output messages correctly contain the correct rows for each batch #2015

Merged
merged 7 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions python/morpheus/morpheus/_lib/src/messages/meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -540,14 +540,12 @@ TensorIndex SlicedMessageMeta::count() const

TableInfo SlicedMessageMeta::get_info() const
{
return this->m_data->get_info().get_slice(m_start, m_stop, m_column_names);
return get_info(m_column_names);
}

TableInfo SlicedMessageMeta::get_info(const std::string& col_name) const
{
auto full_info = this->m_data->get_info();

return full_info.get_slice(m_start, m_stop, {col_name});
return get_info(std::vector<std::string>{{col_name}});
}

TableInfo SlicedMessageMeta::get_info(const std::vector<std::string>& column_names) const
Expand Down
28 changes: 9 additions & 19 deletions python/morpheus/morpheus/_lib/src/stages/deserialize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
#include "morpheus/stages/deserialize.hpp"

#include "morpheus/messages/control.hpp" // for ControlMessage
#include "morpheus/messages/meta.hpp" // for MessageMeta, SlicedMessageMeta
#include "morpheus/objects/table_info.hpp" // for TableInfo
#include "morpheus/types.hpp" // for TensorIndex
#include "morpheus/utilities/cudf_util.hpp" // for CudfHelper
#include "morpheus/utilities/json_types.hpp" // for PythonByteContainer
#include "morpheus/utilities/python_util.hpp" // for show_warning_message
#include "morpheus/utilities/string_util.hpp" // for MORPHEUS_CONCAT_STR
Expand All @@ -36,23 +39,6 @@

namespace morpheus {

void make_output_message(std::shared_ptr<MessageMeta>& incoming_message,
TensorIndex start,
TensorIndex stop,
control_message_task_t* task,
std::shared_ptr<ControlMessage>& windowed_message)
{
auto sliced_meta = std::make_shared<SlicedMessageMeta>(incoming_message, start, stop);
auto message = std::make_shared<ControlMessage>();
message->payload(sliced_meta);
if (task)
{
message->add_task(task->first, task->second);
}

windowed_message.swap(message);
}

DeserializeStage::subscribe_fn_t DeserializeStage::build_operator()
{
return [this](rxcpp::observable<sink_type_t> input, rxcpp::subscriber<source_type_t> output) {
Expand Down Expand Up @@ -89,9 +75,13 @@ DeserializeStage::subscribe_fn_t DeserializeStage::build_operator()
{
std::shared_ptr<ControlMessage> windowed_message = std::make_shared<ControlMessage>();

auto sliced_meta = std::make_shared<SlicedMessageMeta>(
auto sliced_meta = SlicedMessageMeta(
incoming_message, i, std::min(i + this->m_batch_size, incoming_message->count()));
windowed_message->payload(sliced_meta);
auto sliced_info = sliced_meta.get_info();

// This unforuntately requires grabbing the GIL and is a work-around for issue #2018
auto new_meta = MessageMeta::create_from_python(CudfHelper::table_from_table_info(sliced_info));
windowed_message->payload(new_meta);

auto task = m_task.get();
if (task)
Expand Down
46 changes: 46 additions & 0 deletions tests/morpheus/pipeline/test_file_in_out.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,25 @@
import numpy as np
import pytest

import cudf

from _utils import TEST_DIRS
from _utils import assert_path_exists
from _utils import assert_results
from _utils.dataset_manager import DatasetManager
from morpheus.common import FileTypes
from morpheus.common import TypeId
from morpheus.config import Config
from morpheus.config import CppConfig
from morpheus.io.deserializers import read_file_to_df
from morpheus.io.serializers import write_df_to_file
from morpheus.messages import ControlMessage
from morpheus.messages import MessageMeta
from morpheus.pipeline import LinearPipeline
from morpheus.pipeline.stage_decorator import stage
from morpheus.stages.input.file_source_stage import FileSourceStage
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from morpheus.stages.output.write_to_file_stage import WriteToFileStage
from morpheus.stages.postprocess.serialize_stage import SerializeStage
Expand Down Expand Up @@ -300,3 +307,42 @@ def test_file_rw_serialize_deserialize_multi_segment_pipe(tmp_path: pathlib.Path
# Somehow 0.7 ends up being 0.7000000000000001
output_data = np.around(output_data, 2)
assert output_data.tolist() == input_data.tolist()


@pytest.mark.slow
@pytest.mark.parametrize("use_get_set_data", [False, True])
def test_sliced_meta_nulls(config: Config, use_get_set_data: bool):
"""
Test reproduces Morpheus issue #2011
Issue occurrs when the length of the dataframe is larger than the pipeline batch size
"""
config.pipeline_batch_size = 256

input_df = cudf.DataFrame({"a": range(1024)})
expected_df = cudf.DataFrame({"a": range(1024), "copy": range(1024)})

pipe = LinearPipeline(config)
pipe.set_source(InMemorySourceStage(config, dataframes=[input_df]))
pipe.add_stage(DeserializeStage(config))

@stage(execution_modes=(config.execution_mode, ), needed_columns={"copy": TypeId.INT64})
def copy_col(msg: ControlMessage) -> ControlMessage:
meta = msg.payload()

if use_get_set_data:
a_col = meta.get_data('a')
assert len(a_col) <= config.pipeline_batch_size
meta.set_data("copy", a_col)
else:
with meta.mutable_dataframe() as df:
assert len(df) <= config.pipeline_batch_size
df['copy'] = df['a']

return msg

pipe.add_stage(copy_col(config))
pipe.add_stage(SerializeStage(config))
cmp_stage = pipe.add_stage(CompareDataFrameStage(config, compare_df=expected_df))
pipe.run()

assert_results(cmp_stage.get_results())
Loading