diff --git a/cpp/perspective/src/cpp/arrow_loader.cpp b/cpp/perspective/src/cpp/arrow_loader.cpp index bfe0faa111..44fa4b1097 100644 --- a/cpp/perspective/src/cpp/arrow_loader.cpp +++ b/cpp/perspective/src/cpp/arrow_loader.cpp @@ -138,6 +138,7 @@ using namespace perspective; ArrowLoader::ArrowLoader() = default; ArrowLoader::~ArrowLoader() = default; +ArrowLoader::ArrowLoader(ArrowLoader&&) noexcept = default; t_dtype convert_type(const std::string& src) { @@ -484,6 +485,11 @@ copy_array( t_vocab* vocab = dest->_get_vocab(); std::string elem; + vocab->reserve( + dict->value_data()->size() + dsize, // vocab len + null bytes + dsize + ); + for (std::uint64_t i = 0; i < dsize; ++i) { std::int32_t bidx = offsets[i]; std::size_t es = offsets[i + 1] - bidx; diff --git a/cpp/perspective/src/cpp/server.cpp b/cpp/perspective/src/cpp/server.cpp index 5d19e9fa39..eef950c80e 100644 --- a/cpp/perspective/src/cpp/server.cpp +++ b/cpp/perspective/src/cpp/server.cpp @@ -682,13 +682,16 @@ ProtoServer::handle_request( req_env.ParseFromString(data); std::vector> serialized_responses; std::vector responses; + + auto msg_id = req_env.msg_id(); + auto entity_id = req_env.entity_id(); try { - auto resp_msg = _handle_request(client_id, req_env); + auto resp_msg = _handle_request(client_id, std::move(req_env)); for (auto& resp : resp_msg) { ProtoServerResp str_resp; str_resp.data = resp.data.SerializeAsString(); str_resp.client_id = resp.client_id; - serialized_responses.emplace_back(str_resp); + serialized_responses.emplace_back(std::move(str_resp)); } } catch (const PerspectiveException& e) { proto::Response resp; @@ -718,13 +721,13 @@ ProtoServer::handle_request( // proto::Response resp_env; serialized_responses.reserve(responses.size()); for (auto& resp : responses) { - resp.set_msg_id(req_env.msg_id()); - resp.set_entity_id(req_env.entity_id()); + resp.set_msg_id(msg_id); + resp.set_entity_id(entity_id); ProtoServerResp str_resp; str_resp.data = resp.SerializeAsString(); str_resp.client_id = client_id; - serialized_responses.emplace_back(str_resp); + serialized_responses.emplace_back(std::move(str_resp)); } return serialized_responses; @@ -1148,7 +1151,7 @@ coerce_to(const t_dtype dtype, const A& val) { } std::vector> -ProtoServer::_handle_request(std::uint32_t client_id, const Request& req) { +ProtoServer::_handle_request(std::uint32_t client_id, Request&& req) { static bool is_init_expr = false; if (!is_init_expr) { t_computed_expression_parser::init(); @@ -1157,9 +1160,13 @@ ProtoServer::_handle_request(std::uint32_t client_id, const Request& req) { std::vector> proto_resp; // proto::Response resp_env; + + auto msg_id = req.msg_id(); + auto entity_id = req.entity_id(); + auto push_resp = [&](Response&& resp) { - resp.set_msg_id(req.msg_id()); - resp.set_entity_id(req.entity_id()); + resp.set_msg_id(msg_id); + resp.set_entity_id(entity_id); ProtoServerResp resp2; resp2.data = std::move(resp); resp2.client_id = client_id; @@ -1280,32 +1287,42 @@ ProtoServer::_handle_request(std::uint32_t client_id, const Request& req) { dims.end_col ); - table = Table::from_arrow(index, *arrow, limit); + table = Table::from_arrow(index, std::move(*arrow), limit); break; } case proto::MakeTableData::kFromArrow: { - table = - Table::from_arrow(index, r.data().from_arrow(), limit); + std::string data = r.data().from_arrow(); + { auto _ = std::move(req); } + + table = Table::from_arrow(index, std::move(data), limit); break; } case proto::MakeTableData::kFromCsv: { - table = Table::from_csv(index, r.data().from_csv(), limit); + std::string data = r.data().from_csv(); + { auto _ = std::move(req); } + + table = Table::from_csv(index, std::move(data), limit); break; } case proto::MakeTableData::kFromCols: { - table = - Table::from_cols(index, r.data().from_cols(), limit); + std::string data = r.data().from_cols(); + { auto _ = std::move(req); } + + table = Table::from_cols(index, std::move(data), limit); break; } case proto::MakeTableData::kFromRows: { - table = - Table::from_rows(index, r.data().from_rows(), limit); + std::string data = r.data().from_rows(); + { auto _ = std::move(req); } + + table = Table::from_rows(index, std::move(data), limit); break; } case proto::MakeTableData::kFromNdjson: { - table = Table::from_ndjson( - index, r.data().from_ndjson(), limit - ); + std::string data = r.data().from_ndjson(); + { auto _ = std::move(req); } + + table = Table::from_ndjson(index, std::move(data), limit); break; } case proto::MakeTableData::kFromSchema: { @@ -1327,7 +1344,7 @@ ProtoServer::_handle_request(std::uint32_t client_id, const Request& req) { } } - m_resources.host_table(req.entity_id(), table); + m_resources.host_table(entity_id, table); proto::Response resp; resp.mutable_make_table_resp(); push_resp(std::move(resp)); diff --git a/cpp/perspective/src/cpp/table.cpp b/cpp/perspective/src/cpp/table.cpp index a8d340976c..591704748b 100644 --- a/cpp/perspective/src/cpp/table.cpp +++ b/cpp/perspective/src/cpp/table.cpp @@ -360,15 +360,13 @@ Table::update_csv(const std::string_view& data, std::uint32_t port_id) { std::shared_ptr Table::from_csv( - const std::string& index, const std::string_view& data, std::uint32_t limit + const std::string& index, std::string&& data, std::uint32_t limit ) { - auto pool = std::make_shared(); - pool->init(); auto map = std::unordered_map>(); apachearrow::ArrowLoader arrow_loader; - arrow_loader.init_csv(data.data(), false, map); + arrow_loader.init_csv(data, false, map); std::vector column_names; std::vector data_types; @@ -390,14 +388,22 @@ Table::from_csv( std::uint32_t row_count = 0; row_count = arrow_loader.row_count(); - t_data_table data_table(output_schema); - data_table.init(); - data_table.extend(row_count); - arrow_loader.fill_table(data_table, input_schema, index, 0, limit, false); + auto data_table = std::make_shared(output_schema); + data_table->init(); + + { + auto _ = std::move(data); + auto loader = std::move(arrow_loader); + data_table->extend(row_count); + loader.fill_table(*data_table, input_schema, index, 0, limit, false); + } + auto pool = std::make_shared(); + pool->init(); auto tbl = std::make_shared
(pool, column_names, data_types, limit, index); - tbl->init(data_table, row_count, t_op::OP_INSERT, 0); + tbl->init(*data_table, row_count, t_op::OP_INSERT, 0); + data_table.reset(); pool->_process(); return tbl; } @@ -986,11 +992,8 @@ Table::update_cols(const std::string_view& data, std::uint32_t port_id) { std::shared_ptr
Table::from_cols( - const std::string& index, const std::string_view& data, std::uint32_t limit + const std::string& index, std::string&& data, std::uint32_t limit ) { - auto pool = std::make_shared(); - pool->init(); - // 1.) Infer schema rapidjson::Document document; document.Parse(data.data()); @@ -1035,21 +1038,21 @@ Table::from_cols( t_schema schema(column_names, data_types); // 2.) Create table - t_data_table data_table(schema); - data_table.init(); - data_table.extend(nrows); + auto data_table = std::make_unique(schema); + data_table->init(); + data_table->extend(nrows); if (is_implicit) { // TODO should this be t_uindex? - data_table.add_column("psp_pkey", DTYPE_INT32, true); - data_table.add_column("psp_okey", DTYPE_INT32, true); + data_table->add_column("psp_pkey", DTYPE_INT32, true); + data_table->add_column("psp_okey", DTYPE_INT32, true); } else { - data_table.add_column("psp_pkey", schema.get_dtype(index), true); - data_table.add_column("psp_okey", schema.get_dtype(index), true); + data_table->add_column("psp_pkey", schema.get_dtype(index), true); + data_table->add_column("psp_okey", schema.get_dtype(index), true); } - const auto& psp_pkey_col = data_table.get_column("psp_pkey"); - const auto& psp_okey_col = data_table.get_column("psp_okey"); + const auto& psp_pkey_col = data_table->get_column("psp_pkey"); + const auto& psp_okey_col = data_table->get_column("psp_okey"); // 3.) Fill table for (const auto& col : document.GetObj()) { @@ -1061,7 +1064,7 @@ Table::from_cols( << dtype_to_str(data_table.get_column(col_name)->get_dtype()) ); for (const auto& cell : col.value.GetArray()) { - auto col = data_table.get_column(col_name); + auto col = data_table->get_column(col_name); auto promote = fill_column_json(col, ii, cell, false); if (promote) { LOG_DEBUG( @@ -1069,8 +1072,8 @@ Table::from_cols( << dtype_to_str(col->get_dtype()) << " to " << dtype_to_str(*promote) ); - data_table.promote_column(col_name, *promote, ii, true); - col = data_table.get_column(col_name); + data_table->promote_column(col_name, *promote, ii, true); + col = data_table->get_column(col_name); fill_column_json(col, ii, cell, false); } @@ -1090,11 +1093,17 @@ Table::from_cols( } } + { auto _ = std::move(document); } + { auto _ = std::move(data); } + + auto pool = std::make_shared(); + pool->init(); auto tbl = std::make_shared
( pool, schema.columns(), schema.types(), limit, index ); - tbl->init(data_table, nrows, t_op::OP_INSERT, 0); + tbl->init(*data_table, nrows, t_op::OP_INSERT, 0); + data_table.reset(); pool->_process(); return tbl; } @@ -1218,11 +1227,8 @@ Table::update_rows(const std::string_view& data, std::uint32_t port_id) { std::shared_ptr
Table::from_rows( - const std::string& index, const std::string_view& data, std::uint32_t limit + const std::string& index, std::string&& data, std::uint32_t limit ) { - auto pool = std::make_shared(); - pool->init(); - // 1.) Infer schema rapidjson::Document document; document.Parse(data.data()); @@ -1288,27 +1294,27 @@ Table::from_rows( t_schema schema(column_names, data_types); // 2.) Create table - t_data_table data_table(schema); - data_table.init(); - data_table.extend(document.Size()); + auto data_table = std::make_unique(schema); + data_table->init(); + data_table->extend(document.Size()); if (is_implicit) { - data_table.add_column("psp_pkey", DTYPE_INT32, true); - data_table.add_column("psp_okey", DTYPE_INT32, true); + data_table->add_column("psp_pkey", DTYPE_INT32, true); + data_table->add_column("psp_okey", DTYPE_INT32, true); } else { - data_table.add_column("psp_pkey", schema.get_dtype(index), true); - data_table.add_column("psp_okey", schema.get_dtype(index), true); + data_table->add_column("psp_pkey", schema.get_dtype(index), true); + data_table->add_column("psp_okey", schema.get_dtype(index), true); } std::int32_t ii = 0; - const auto& psp_pkey_col = data_table.get_column("psp_pkey"); - const auto& psp_okey_col = data_table.get_column("psp_okey"); + const auto& psp_pkey_col = data_table->get_column("psp_pkey"); + const auto& psp_okey_col = data_table->get_column("psp_okey"); // 3.) Fill table for (const auto& row : document.GetArray()) { for (const auto& it : row.GetObj()) { - auto col = data_table.get_column(it.name.GetString()); + auto col = data_table->get_column(it.name.GetString()); const auto* col_name = it.name.GetString(); const auto& cell = it.value; auto promote = fill_column_json(col, ii, cell, false); @@ -1318,8 +1324,8 @@ Table::from_rows( << dtype_to_str(col->get_dtype()) << " to " << dtype_to_str(*promote) ); - data_table.promote_column(col_name, *promote, ii, true); - col = data_table.get_column(col_name); + data_table->promote_column(col_name, *promote, ii, true); + col = data_table->get_column(col_name); fill_column_json(col, ii, cell, false); } @@ -1337,11 +1343,17 @@ Table::from_rows( ii++; } + { auto _ = std::move(document); } + { auto _ = std::move(data); } + + auto pool = std::make_shared(); + pool->init(); auto tbl = std::make_shared
( pool, schema.columns(), schema.types(), limit, index ); - tbl->init(data_table, document.Size(), t_op::OP_INSERT, 0); + tbl->init(*data_table, document.Size(), t_op::OP_INSERT, 0); + data_table.reset(); pool->_process(); return tbl; } @@ -1460,11 +1472,8 @@ Table::update_ndjson(const std::string_view& data, std::uint32_t port_id) { std::shared_ptr
Table::from_ndjson( - const std::string& index, const std::string_view& data, std::uint32_t limit + const std::string& index, std::string&& data, std::uint32_t limit ) { - auto pool = std::make_shared(); - pool->init(); - // 1.) Infer schema rapidjson::Document document; rapidjson::StringStream s(data.data()); @@ -1527,20 +1536,20 @@ Table::from_ndjson( t_schema schema(column_names, data_types); // 2.) Create table - t_data_table data_table(schema); - data_table.init(); + auto data_table = std::make_unique(schema); + data_table->init(); if (is_implicit) { - data_table.add_column("psp_pkey", DTYPE_INT32, true); - data_table.add_column("psp_okey", DTYPE_INT32, true); + data_table->add_column("psp_pkey", DTYPE_INT32, true); + data_table->add_column("psp_okey", DTYPE_INT32, true); } else { - data_table.add_column("psp_pkey", schema.get_dtype(index), true); - data_table.add_column("psp_okey", schema.get_dtype(index), true); + data_table->add_column("psp_pkey", schema.get_dtype(index), true); + data_table->add_column("psp_okey", schema.get_dtype(index), true); } std::int32_t ii = 0; - const auto& psp_pkey_col = data_table.get_column("psp_pkey"); - const auto& psp_okey_col = data_table.get_column("psp_okey"); + const auto& psp_pkey_col = data_table->get_column("psp_pkey"); + const auto& psp_okey_col = data_table->get_column("psp_okey"); // 2a.) Estimate row size to reduce malloc pressure. auto newlines = 0; @@ -1550,13 +1559,13 @@ Table::from_ndjson( } } - data_table.reserve(newlines + 1); + data_table->reserve(newlines + 1); // 3.) Fill table bool is_finished = false; while (!is_finished) { for (const auto& it : document.GetObj()) { - auto col = data_table.get_column(it.name.GetString()); + auto col = data_table->get_column(it.name.GetString()); const auto* col_name = it.name.GetString(); const auto& cell = it.value; auto promote = fill_column_json(col, ii, cell, false); @@ -1567,8 +1576,8 @@ Table::from_ndjson( << " to " << dtype_to_str(*promote) ); - data_table.promote_column(col_name, *promote, ii, true); - col = data_table.get_column(col_name); + data_table->promote_column(col_name, *promote, ii, true); + col = data_table->get_column(col_name); fill_column_json(col, ii, cell, false); } @@ -1590,12 +1599,19 @@ Table::from_ndjson( } } - data_table.extend(ii); + data_table->extend(ii); + + { auto _ = std::move(document); } + { auto _ = std::move(data); } + + auto pool = std::make_shared(); + pool->init(); auto tbl = std::make_shared
( pool, schema.columns(), schema.types(), limit, index ); - tbl->init(data_table, ii, t_op::OP_INSERT, 0); + tbl->init(*data_table, ii, t_op::OP_INSERT, 0); + data_table.reset(); pool->_process(); return tbl; } @@ -1671,7 +1687,7 @@ Table::update_arrow(const std::string_view& data, std::uint32_t port_id) { std::shared_ptr
Table::from_arrow( - const std::string& index, const std::string_view& data, std::uint32_t limit + const std::string& index, std::string&& data, std::uint32_t limit ) { apachearrow::ArrowLoader arrow_loader; @@ -1698,18 +1714,23 @@ Table::from_arrow( } t_schema output_schema{columns, types}; - t_data_table data_table{output_schema}; - data_table.init(); - - auto row_count = arrow_loader.row_count(); - data_table.extend(row_count); - arrow_loader.fill_table(data_table, input_schema, index, 0, limit, false); + auto data_table = std::make_unique(output_schema); + data_table->init(); + + { + auto _ = std::move(data); + auto loader = std::move(arrow_loader); + auto row_count = loader.row_count(); + data_table->extend(row_count); + loader.fill_table(*data_table, input_schema, index, 0, limit, false); + } // Make Table auto pool = std::make_shared(); pool->init(); auto table = std::make_shared
(pool, columns, types, limit, index); - table->init(data_table, data_table.num_rows(), t_op::OP_INSERT, 0); + table->init(*data_table, data_table->num_rows(), t_op::OP_INSERT, 0); + data_table.reset(); pool->_process(); return table; } diff --git a/cpp/perspective/src/include/perspective/arrow_loader.h b/cpp/perspective/src/include/perspective/arrow_loader.h index f6f48bd72d..f0d4c4d972 100644 --- a/cpp/perspective/src/include/perspective/arrow_loader.h +++ b/cpp/perspective/src/include/perspective/arrow_loader.h @@ -32,6 +32,7 @@ namespace apachearrow { public: ArrowLoader(); ~ArrowLoader(); + ArrowLoader(ArrowLoader&& other) noexcept; /** * @brief Initialize the arrow loader with a pointer to a binary. diff --git a/cpp/perspective/src/include/perspective/server.h b/cpp/perspective/src/include/perspective/server.h index 273717c847..9f2efb18ac 100644 --- a/cpp/perspective/src/include/perspective/server.h +++ b/cpp/perspective/src/include/perspective/server.h @@ -625,7 +625,7 @@ namespace server { ); std::vector> - _handle_request(std::uint32_t client_id, const Request& req); + _handle_request(std::uint32_t client_id, Request&& req); std::vector> _poll(); diff --git a/cpp/perspective/src/include/perspective/table.h b/cpp/perspective/src/include/perspective/table.h index 1ad153566c..e75985e1a1 100644 --- a/cpp/perspective/src/include/perspective/table.h +++ b/cpp/perspective/src/include/perspective/table.h @@ -203,25 +203,25 @@ class PERSPECTIVE_EXPORT Table { static std::shared_ptr
from_csv( const std::string& index, - const std::string_view& data, + std::string&& data, std::uint32_t limit = std::numeric_limits::max() ); static std::shared_ptr
from_cols( const std::string& index, - const std::string_view& data, + std::string&& data, std::uint32_t limit = std::numeric_limits::max() ); static std::shared_ptr
from_rows( const std::string& index, - const std::string_view& data, + std::string&& data, std::uint32_t limit = std::numeric_limits::max() ); static std::shared_ptr
from_ndjson( const std::string& index, - const std::string_view& data, + std::string&& data, std::uint32_t limit = std::numeric_limits::max() ); @@ -233,7 +233,7 @@ class PERSPECTIVE_EXPORT Table { static std::shared_ptr
from_arrow( const std::string& index, - const std::string_view& data, + std::string&& data, std::uint32_t limit = std::numeric_limits::max() );