diff --git a/morpheus/_lib/include/morpheus/io/serializers.hpp b/morpheus/_lib/include/morpheus/io/serializers.hpp index e5b297b896..b63a4e6e7f 100644 --- a/morpheus/_lib/include/morpheus/io/serializers.hpp +++ b/morpheus/_lib/include/morpheus/io/serializers.hpp @@ -25,12 +25,14 @@ namespace morpheus { -std::string df_to_csv(const TableInfo& tbl, bool include_header); +std::string df_to_csv(const TableInfo& tbl, bool include_header, bool include_index_col = true); -void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header); +void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header, bool include_index_col = true); -std::string df_to_json(const TableInfo& tbl); +// Note the include_index_col is currently being ignored in both versions of `df_to_json` due to a known issue in +// Pandas: https://github.com/pandas-dev/pandas/issues/37600 +std::string df_to_json(const TableInfo& tbl, bool include_index_col = true); -void df_to_json(const TableInfo& tbl, std::ostream& out_stream); +void df_to_json(const TableInfo& tbl, std::ostream& out_stream, bool include_index_col = true); } // namespace morpheus diff --git a/morpheus/_lib/include/morpheus/stages/write_to_file.hpp b/morpheus/_lib/include/morpheus/stages/write_to_file.hpp index 71b1b9b12f..5ac09e7842 100644 --- a/morpheus/_lib/include/morpheus/stages/write_to_file.hpp +++ b/morpheus/_lib/include/morpheus/stages/write_to_file.hpp @@ -49,7 +49,8 @@ class WriteToFileStage : public srf::pysrf::PythonNode m_write_func; }; @@ -81,7 +83,8 @@ struct WriteToFileStageInterfaceProxy const std::string &name, const std::string &filename, const std::string &mode = "w", - FileTypes file_type = FileTypes::Auto); + FileTypes file_type = FileTypes::Auto, + bool include_index_col = true); }; #pragma GCC visibility pop diff --git a/morpheus/_lib/src/io/serializers.cpp b/morpheus/_lib/src/io/serializers.cpp index 51781dc8bd..313ea4bf30 100644 --- a/morpheus/_lib/src/io/serializers.cpp +++ b/morpheus/_lib/src/io/serializers.cpp @@ -15,8 +15,12 @@ * limitations under the License. */ +#include +#include +#include #include +#include #include #include #include @@ -24,6 +28,7 @@ #include #include +#include #include #include @@ -75,21 +80,33 @@ class OStreamSink : public cudf::io::data_sink size_t m_bytest_written{0}; }; -std::string df_to_csv(const TableInfo& tbl, bool include_header) +std::string df_to_csv(const TableInfo& tbl, bool include_header, bool include_index_col) { // Create an ostringstream and use that with the overload accepting an ostream std::ostringstream out_stream; - df_to_csv(tbl, out_stream, include_header); + df_to_csv(tbl, out_stream, include_header, include_index_col); return out_stream.str(); } -void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header) +void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header, bool include_index_col) { + auto column_names = tbl.get_column_names(); + cudf::size_type start_col = 1; + if (include_index_col) + { + start_col = 0; + column_names.insert(column_names.begin(), ""s); // insert the id column + } + + std::vector col_idexes(column_names.size()); + std::iota(col_idexes.begin(), col_idexes.end(), start_col); + auto tbl_view = tbl.get_view().select(col_idexes); + OStreamSink sink(out_stream); auto destination = cudf::io::sink_info(&sink); - auto options_builder = cudf::io::csv_writer_options_builder(destination, tbl.get_view()) + auto options_builder = cudf::io::csv_writer_options_builder(destination, tbl_view) .include_header(include_header) .true_value("True"s) .false_value("False"s); @@ -97,8 +114,6 @@ void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_head cudf::io::table_metadata metadata{}; if (include_header) { - auto column_names = tbl.get_column_names(); - column_names.insert(column_names.begin(), ""s); // insert the id column metadata.column_names = column_names; options_builder = options_builder.metadata(&metadata); } @@ -106,7 +121,7 @@ void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_head cudf::io::write_csv(options_builder.build(), rmm::mr::get_current_device_resource()); } -std::string df_to_json(const TableInfo& tbl) +std::string df_to_json(const TableInfo& tbl, bool include_index_col) { std::string results; // no cpp impl for to_json, instead python module converts to pandas and calls to_json @@ -116,7 +131,7 @@ std::string df_to_json(const TableInfo& tbl) auto df = tbl.as_py_object(); auto buffer = StringIO(); - py::dict kwargs = py::dict("orient"_a = "records", "lines"_a = true); + py::dict kwargs = py::dict("orient"_a = "records", "lines"_a = true, "index"_a = include_index_col); df.attr("to_json")(buffer, **kwargs); buffer.attr("seek")(0); @@ -127,11 +142,11 @@ std::string df_to_json(const TableInfo& tbl) return results; } -void df_to_json(const TableInfo& tbl, std::ostream& out_stream) +void df_to_json(const TableInfo& tbl, std::ostream& out_stream, bool include_index_col) { // Unlike df_to_csv, we use the ostream overload to call the string overload because there is no C++ implementation // of to_json - std::string output = df_to_json(tbl); + std::string output = df_to_json(tbl, include_index_col); // Now write the contents to the stream out_stream.write(output.data(), output.size()); diff --git a/morpheus/_lib/src/python_modules/stages.cpp b/morpheus/_lib/src/python_modules/stages.cpp index f5a9cb71a6..d502822cb2 100644 --- a/morpheus/_lib/src/python_modules/stages.cpp +++ b/morpheus/_lib/src/python_modules/stages.cpp @@ -167,8 +167,9 @@ PYBIND11_MODULE(stages, m) py::arg("builder"), py::arg("name"), py::arg("filename"), - py::arg("mode") = "w", - py::arg("file_type") = 0); // Setting this to FileTypes::AUTO throws a conversion error at runtime + py::arg("mode") = "w", + py::arg("file_type") = 0, // Setting this to FileTypes::AUTO throws a conversion error at runtime + py::arg("include_index_col") = true); #ifdef VERSION_INFO m.attr("__version__") = MACRO_STRINGIFY(VERSION_INFO); diff --git a/morpheus/_lib/src/stages/write_to_file.cpp b/morpheus/_lib/src/stages/write_to_file.cpp index 45f3fb40c8..dba85e8cb1 100644 --- a/morpheus/_lib/src/stages/write_to_file.cpp +++ b/morpheus/_lib/src/stages/write_to_file.cpp @@ -27,9 +27,13 @@ namespace morpheus { // Component public implementations // ************ WriteToFileStage **************************** // -WriteToFileStage::WriteToFileStage(const std::string &filename, std::ios::openmode mode, FileTypes file_type) : +WriteToFileStage::WriteToFileStage(const std::string &filename, + std::ios::openmode mode, + FileTypes file_type, + bool include_index_col) : PythonNode(base_t::op_factory_from_sub_fn(build_operator())), - m_is_first(true) + m_is_first(true), + m_include_index_col(include_index_col) { if (file_type == FileTypes::Auto) { @@ -59,13 +63,13 @@ WriteToFileStage::WriteToFileStage(const std::string &filename, std::ios::openmo void WriteToFileStage::write_json(WriteToFileStage::sink_type_t &msg) { // Call df_to_json passing our fstream - df_to_json(msg->get_info(), m_fstream); + df_to_json(msg->get_info(), m_fstream, m_include_index_col); } void WriteToFileStage::write_csv(WriteToFileStage::sink_type_t &msg) { // Call df_to_csv passing our fstream - df_to_csv(msg->get_info(), m_fstream, m_is_first); + df_to_csv(msg->get_info(), m_fstream, m_is_first, m_include_index_col); } void WriteToFileStage::close() @@ -102,7 +106,8 @@ std::shared_ptr> WriteToFileStageInterfac const std::string &name, const std::string &filename, const std::string &mode, - FileTypes file_type) + FileTypes file_type, + bool include_index_col) { std::ios::openmode fsmode = std::ios::out; @@ -138,7 +143,7 @@ std::shared_ptr> WriteToFileStageInterfac throw std::runtime_error(std::string("Unsupported file mode. Must choose either 'w' or 'a'. Mode: ") + mode); } - auto stage = builder.construct_object(name, filename, fsmode, file_type); + auto stage = builder.construct_object(name, filename, fsmode, file_type, include_index_col); return stage; } diff --git a/morpheus/cli.py b/morpheus/cli.py index 98315d3ef0..5e23804edd 100644 --- a/morpheus/cli.py +++ b/morpheus/cli.py @@ -1321,6 +1321,12 @@ def validate(ctx: click.Context, **kwargs): @click.command(short_help="Write all messages to a file", **command_kwargs) @click.option('--filename', type=click.Path(writable=True), required=True, help="The file to write to") @click.option('--overwrite', is_flag=True, help="Whether or not to overwrite the target file") +@click.option('--include-index-col', + 'include_index_col', + default=True, + type=bool, + help=("Includes dataframe's index column in the output " + "Note: this currently only works for CSV file output")) @prepare_command() def to_file(ctx: click.Context, **kwargs): diff --git a/morpheus/io/serializers.py b/morpheus/io/serializers.py index 64bcd58bde..8bf8996cfd 100644 --- a/morpheus/io/serializers.py +++ b/morpheus/io/serializers.py @@ -19,7 +19,10 @@ import cudf -def df_to_csv(df: cudf.DataFrame, include_header=False, strip_newline=False) -> typing.List[str]: +def df_to_csv(df: cudf.DataFrame, + include_header=False, + strip_newline=False, + include_index_col=True) -> typing.List[str]: """ Serializes a DataFrame into CSV and returns the serialized output seperated by lines. @@ -31,13 +34,15 @@ def df_to_csv(df: cudf.DataFrame, include_header=False, strip_newline=False) -> Whether or not to include the header, by default False. strip_newline : bool, optional Whether or not to strip the newline characters from each string, by default False. + include_index_col: bool, optional + Write out the index as a column, by default True. Returns ------- typing.List[str] List of strings for each line """ - results = df.to_csv(header=include_header) + results = df.to_csv(header=include_header, index=include_index_col) if strip_newline: results = results.split("\n") else: @@ -46,7 +51,7 @@ def df_to_csv(df: cudf.DataFrame, include_header=False, strip_newline=False) -> return results -def df_to_json(df: cudf.DataFrame, strip_newlines=False) -> typing.List[str]: +def df_to_json(df: cudf.DataFrame, strip_newlines=False, include_index_col=True) -> typing.List[str]: """ Serializes a DataFrame into JSON and returns the serialized output seperated by lines. @@ -56,7 +61,10 @@ def df_to_json(df: cudf.DataFrame, strip_newlines=False) -> typing.List[str]: Input DataFrame to serialize. strip_newline : bool, optional Whether or not to strip the newline characters from each string, by default False. - + include_index_col: bool, optional + Write out the index as a column, by default True. + Note: This value is currently being ignored due to a known issue in Pandas: + https://github.com/pandas-dev/pandas/issues/37600 Returns ------- typing.List[str] @@ -65,7 +73,7 @@ def df_to_json(df: cudf.DataFrame, strip_newlines=False) -> typing.List[str]: str_buf = StringIO() # Convert to list of json string objects - df.to_json(str_buf, orient="records", lines=True) + df.to_json(str_buf, orient="records", lines=True, index=include_index_col) # Start from beginning str_buf.seek(0) diff --git a/morpheus/stages/output/write_to_file_stage.py b/morpheus/stages/output/write_to_file_stage.py index e3ea37ad30..c55a9fa03f 100644 --- a/morpheus/stages/output/write_to_file_stage.py +++ b/morpheus/stages/output/write_to_file_stage.py @@ -49,7 +49,12 @@ class WriteToFileStage(SinglePortStage): """ - def __init__(self, c: Config, filename: str, overwrite: bool, file_type: FileTypes = FileTypes.Auto): + def __init__(self, + c: Config, + filename: str, + overwrite: bool, + file_type: FileTypes = FileTypes.Auto, + include_index_col: bool = True): super().__init__(c) @@ -69,6 +74,7 @@ def __init__(self, c: Config, filename: str, overwrite: bool, file_type: FileTyp self._file_type = determine_file_type(self._output_file) self._is_first = True + self._include_index_col = include_index_col @property def name(self) -> str: @@ -91,9 +97,11 @@ def supports_cpp_node(self): def _convert_to_strings(self, df: typing.Union[pd.DataFrame, cudf.DataFrame]): if (self._file_type == FileTypes.JSON): - output_strs = serializers.df_to_json(df) + output_strs = serializers.df_to_json(df, include_index_col=self._include_index_col) elif (self._file_type == FileTypes.CSV): - output_strs = serializers.df_to_csv(df, include_header=self._is_first) + output_strs = serializers.df_to_csv(df, + include_header=self._is_first, + include_index_col=self._include_index_col) self._is_first = False else: raise NotImplementedError("Unknown file type: {}".format(self._file_type)) @@ -110,7 +118,12 @@ def _build_single(self, builder: srf.Builder, input_stream: StreamPair) -> Strea # Sink to file if (self._build_cpp_node()): - to_file = _stages.WriteToFileStage(builder, self.unique_name, self._output_file, "w", self._file_type) + to_file = _stages.WriteToFileStage(builder, + self.unique_name, + self._output_file, + "w", + self._file_type, + self._include_index_col) else: def node_fn(obs: srf.Observable, sub: srf.Subscriber):