Skip to content

Commit

Permalink
Add a flag to optionally include or exclude the cudf ID column from C…
Browse files Browse the repository at this point in the history
…SV output, due to a known issue in cudf & pandas (rapidsai/cudf#11317 & pandas-dev/pandas#37600) this option has no effect on JSON output
  • Loading branch information
dagardner-nv committed Jul 21, 2022
1 parent 81d4fd8 commit 8b4554c
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 33 deletions.
10 changes: 6 additions & 4 deletions morpheus/_lib/include/morpheus/io/serializers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@

namespace morpheus {

std::string df_to_csv(const TableInfo& tbl, bool include_header);
std::string df_to_csv(const TableInfo& tbl, bool include_header, bool include_index_col = true);

void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header);
void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header, bool include_index_col = true);

std::string df_to_json(const TableInfo& tbl);
// Note the include_index_col is currently being ignored in both versions of `df_to_json` due to a known issue in
// Pandas: https://github.com/pandas-dev/pandas/issues/37600
std::string df_to_json(const TableInfo& tbl, bool include_index_col = true);

void df_to_json(const TableInfo& tbl, std::ostream& out_stream);
void df_to_json(const TableInfo& tbl, std::ostream& out_stream, bool include_index_col = true);

} // namespace morpheus
7 changes: 5 additions & 2 deletions morpheus/_lib/include/morpheus/stages/write_to_file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class WriteToFileStage : public srf::pysrf::PythonNode<std::shared_ptr<MessageMe
*/
WriteToFileStage(const std::string &filename,
std::ios::openmode mode = std::ios::out,
FileTypes file_type = FileTypes::Auto);
FileTypes file_type = FileTypes::Auto,
bool include_index_col = true);

private:
/**
Expand All @@ -64,6 +65,7 @@ class WriteToFileStage : public srf::pysrf::PythonNode<std::shared_ptr<MessageMe
subscribe_fn_t build_operator();

bool m_is_first;
bool m_include_index_col;
std::ofstream m_fstream;
std::function<void(sink_type_t &)> m_write_func;
};
Expand All @@ -81,7 +83,8 @@ struct WriteToFileStageInterfaceProxy
const std::string &name,
const std::string &filename,
const std::string &mode = "w",
FileTypes file_type = FileTypes::Auto);
FileTypes file_type = FileTypes::Auto,
bool include_index_col = true);
};

#pragma GCC visibility pop
Expand Down
35 changes: 25 additions & 10 deletions morpheus/_lib/src/io/serializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@
* limitations under the License.
*/

#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>
#include <memory>
#include <morpheus/io/serializers.hpp>

#include <bits/c++config.h>
#include <pybind11/gil.h>
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h>
#include <cudf/io/csv.hpp>
#include <cudf/io/data_sink.hpp>
#include <rmm/mr/device/per_device_resource.hpp>

#include <numeric>
#include <ostream>
#include <sstream>

Expand Down Expand Up @@ -75,38 +80,48 @@ class OStreamSink : public cudf::io::data_sink
size_t m_bytest_written{0};
};

std::string df_to_csv(const TableInfo& tbl, bool include_header)
std::string df_to_csv(const TableInfo& tbl, bool include_header, bool include_index_col)
{
// Create an ostringstream and use that with the overload accepting an ostream
std::ostringstream out_stream;

df_to_csv(tbl, out_stream, include_header);
df_to_csv(tbl, out_stream, include_header, include_index_col);

return out_stream.str();
}

void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header)
void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header, bool include_index_col)
{
auto column_names = tbl.get_column_names();
cudf::size_type start_col = 1;
if (include_index_col)
{
start_col = 0;
column_names.insert(column_names.begin(), ""s); // insert the id column
}

std::vector<cudf::size_type> col_idexes(column_names.size());
std::iota(col_idexes.begin(), col_idexes.end(), start_col);
auto tbl_view = tbl.get_view().select(col_idexes);

OStreamSink sink(out_stream);
auto destination = cudf::io::sink_info(&sink);
auto options_builder = cudf::io::csv_writer_options_builder(destination, tbl.get_view())
auto options_builder = cudf::io::csv_writer_options_builder(destination, tbl_view)
.include_header(include_header)
.true_value("True"s)
.false_value("False"s);

cudf::io::table_metadata metadata{};
if (include_header)
{
auto column_names = tbl.get_column_names();
column_names.insert(column_names.begin(), ""s); // insert the id column
metadata.column_names = column_names;
options_builder = options_builder.metadata(&metadata);
}

cudf::io::write_csv(options_builder.build(), rmm::mr::get_current_device_resource());
}

std::string df_to_json(const TableInfo& tbl)
std::string df_to_json(const TableInfo& tbl, bool include_index_col)
{
std::string results;
// no cpp impl for to_json, instead python module converts to pandas and calls to_json
Expand All @@ -116,7 +131,7 @@ std::string df_to_json(const TableInfo& tbl)

auto df = tbl.as_py_object();
auto buffer = StringIO();
py::dict kwargs = py::dict("orient"_a = "records", "lines"_a = true);
py::dict kwargs = py::dict("orient"_a = "records", "lines"_a = true, "index"_a = include_index_col);
df.attr("to_json")(buffer, **kwargs);
buffer.attr("seek")(0);

Expand All @@ -127,11 +142,11 @@ std::string df_to_json(const TableInfo& tbl)
return results;
}

void df_to_json(const TableInfo& tbl, std::ostream& out_stream)
void df_to_json(const TableInfo& tbl, std::ostream& out_stream, bool include_index_col)
{
// Unlike df_to_csv, we use the ostream overload to call the string overload because there is no C++ implementation
// of to_json
std::string output = df_to_json(tbl);
std::string output = df_to_json(tbl, include_index_col);

// Now write the contents to the stream
out_stream.write(output.data(), output.size());
Expand Down
5 changes: 3 additions & 2 deletions morpheus/_lib/src/python_modules/stages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,9 @@ PYBIND11_MODULE(stages, m)
py::arg("builder"),
py::arg("name"),
py::arg("filename"),
py::arg("mode") = "w",
py::arg("file_type") = 0); // Setting this to FileTypes::AUTO throws a conversion error at runtime
py::arg("mode") = "w",
py::arg("file_type") = 0, // Setting this to FileTypes::AUTO throws a conversion error at runtime
py::arg("include_index_col") = true);

#ifdef VERSION_INFO
m.attr("__version__") = MACRO_STRINGIFY(VERSION_INFO);
Expand Down
17 changes: 11 additions & 6 deletions morpheus/_lib/src/stages/write_to_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@
namespace morpheus {
// Component public implementations
// ************ WriteToFileStage **************************** //
WriteToFileStage::WriteToFileStage(const std::string &filename, std::ios::openmode mode, FileTypes file_type) :
WriteToFileStage::WriteToFileStage(const std::string &filename,
std::ios::openmode mode,
FileTypes file_type,
bool include_index_col) :
PythonNode(base_t::op_factory_from_sub_fn(build_operator())),
m_is_first(true)
m_is_first(true),
m_include_index_col(include_index_col)
{
if (file_type == FileTypes::Auto)
{
Expand Down Expand Up @@ -59,13 +63,13 @@ WriteToFileStage::WriteToFileStage(const std::string &filename, std::ios::openmo
void WriteToFileStage::write_json(WriteToFileStage::sink_type_t &msg)
{
// Call df_to_json passing our fstream
df_to_json(msg->get_info(), m_fstream);
df_to_json(msg->get_info(), m_fstream, m_include_index_col);
}

void WriteToFileStage::write_csv(WriteToFileStage::sink_type_t &msg)
{
// Call df_to_csv passing our fstream
df_to_csv(msg->get_info(), m_fstream, m_is_first);
df_to_csv(msg->get_info(), m_fstream, m_is_first, m_include_index_col);
}

void WriteToFileStage::close()
Expand Down Expand Up @@ -102,7 +106,8 @@ std::shared_ptr<srf::segment::Object<WriteToFileStage>> WriteToFileStageInterfac
const std::string &name,
const std::string &filename,
const std::string &mode,
FileTypes file_type)
FileTypes file_type,
bool include_index_col)
{
std::ios::openmode fsmode = std::ios::out;

Expand Down Expand Up @@ -138,7 +143,7 @@ std::shared_ptr<srf::segment::Object<WriteToFileStage>> WriteToFileStageInterfac
throw std::runtime_error(std::string("Unsupported file mode. Must choose either 'w' or 'a'. Mode: ") + mode);
}

auto stage = builder.construct_object<WriteToFileStage>(name, filename, fsmode, file_type);
auto stage = builder.construct_object<WriteToFileStage>(name, filename, fsmode, file_type, include_index_col);

return stage;
}
Expand Down
6 changes: 6 additions & 0 deletions morpheus/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1321,6 +1321,12 @@ def validate(ctx: click.Context, **kwargs):
@click.command(short_help="Write all messages to a file", **command_kwargs)
@click.option('--filename', type=click.Path(writable=True), required=True, help="The file to write to")
@click.option('--overwrite', is_flag=True, help="Whether or not to overwrite the target file")
@click.option('--include-index-col',
'include_index_col',
default=True,
type=bool,
help=("Includes dataframe's index column in the output "
"Note: this currently only works for CSV file output"))
@prepare_command()
def to_file(ctx: click.Context, **kwargs):

Expand Down
18 changes: 13 additions & 5 deletions morpheus/io/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import cudf


def df_to_csv(df: cudf.DataFrame, include_header=False, strip_newline=False) -> typing.List[str]:
def df_to_csv(df: cudf.DataFrame,
include_header=False,
strip_newline=False,
include_index_col=True) -> typing.List[str]:
"""
Serializes a DataFrame into CSV and returns the serialized output seperated by lines.
Expand All @@ -31,13 +34,15 @@ def df_to_csv(df: cudf.DataFrame, include_header=False, strip_newline=False) ->
Whether or not to include the header, by default False.
strip_newline : bool, optional
Whether or not to strip the newline characters from each string, by default False.
include_index_col: bool, optional
Write out the index as a column, by default True.
Returns
-------
typing.List[str]
List of strings for each line
"""
results = df.to_csv(header=include_header)
results = df.to_csv(header=include_header, index=include_index_col)
if strip_newline:
results = results.split("\n")
else:
Expand All @@ -46,7 +51,7 @@ def df_to_csv(df: cudf.DataFrame, include_header=False, strip_newline=False) ->
return results


def df_to_json(df: cudf.DataFrame, strip_newlines=False) -> typing.List[str]:
def df_to_json(df: cudf.DataFrame, strip_newlines=False, include_index_col=True) -> typing.List[str]:
"""
Serializes a DataFrame into JSON and returns the serialized output seperated by lines.
Expand All @@ -56,7 +61,10 @@ def df_to_json(df: cudf.DataFrame, strip_newlines=False) -> typing.List[str]:
Input DataFrame to serialize.
strip_newline : bool, optional
Whether or not to strip the newline characters from each string, by default False.
include_index_col: bool, optional
Write out the index as a column, by default True.
Note: This value is currently being ignored due to a known issue in Pandas:
https://github.com/pandas-dev/pandas/issues/37600
Returns
-------
typing.List[str]
Expand All @@ -65,7 +73,7 @@ def df_to_json(df: cudf.DataFrame, strip_newlines=False) -> typing.List[str]:
str_buf = StringIO()

# Convert to list of json string objects
df.to_json(str_buf, orient="records", lines=True)
df.to_json(str_buf, orient="records", lines=True, index=include_index_col)

# Start from beginning
str_buf.seek(0)
Expand Down
21 changes: 17 additions & 4 deletions morpheus/stages/output/write_to_file_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ class WriteToFileStage(SinglePortStage):
"""

def __init__(self, c: Config, filename: str, overwrite: bool, file_type: FileTypes = FileTypes.Auto):
def __init__(self,
c: Config,
filename: str,
overwrite: bool,
file_type: FileTypes = FileTypes.Auto,
include_index_col: bool = True):

super().__init__(c)

Expand All @@ -69,6 +74,7 @@ def __init__(self, c: Config, filename: str, overwrite: bool, file_type: FileTyp
self._file_type = determine_file_type(self._output_file)

self._is_first = True
self._include_index_col = include_index_col

@property
def name(self) -> str:
Expand All @@ -91,9 +97,11 @@ def supports_cpp_node(self):

def _convert_to_strings(self, df: typing.Union[pd.DataFrame, cudf.DataFrame]):
if (self._file_type == FileTypes.JSON):
output_strs = serializers.df_to_json(df)
output_strs = serializers.df_to_json(df, include_index_col=self._include_index_col)
elif (self._file_type == FileTypes.CSV):
output_strs = serializers.df_to_csv(df, include_header=self._is_first)
output_strs = serializers.df_to_csv(df,
include_header=self._is_first,
include_index_col=self._include_index_col)
self._is_first = False
else:
raise NotImplementedError("Unknown file type: {}".format(self._file_type))
Expand All @@ -110,7 +118,12 @@ def _build_single(self, builder: srf.Builder, input_stream: StreamPair) -> Strea

# Sink to file
if (self._build_cpp_node()):
to_file = _stages.WriteToFileStage(builder, self.unique_name, self._output_file, "w", self._file_type)
to_file = _stages.WriteToFileStage(builder,
self.unique_name,
self._output_file,
"w",
self._file_type,
self._include_index_col)
else:

def node_fn(obs: srf.Observable, sub: srf.Subscriber):
Expand Down

0 comments on commit 8b4554c

Please sign in to comment.