Skip to content

Commit

Permalink
This fixes a couple of places where we're overallocating.
Browse files Browse the repository at this point in the history
1. Reserve vocabs while loading arrows into perspective to prevent overgrowth.
2. Free data tables and protobuf messages before processing the table to minimize the concurrent copies of the data in memory.

Signed-off-by: Timothy Bess <tim@prospective.dev>
  • Loading branch information
timbess committed Jan 31, 2025
1 parent c25388c commit 47f1672
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 95 deletions.
6 changes: 6 additions & 0 deletions cpp/perspective/src/cpp/arrow_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ using namespace perspective;

ArrowLoader::ArrowLoader() = default;
ArrowLoader::~ArrowLoader() = default;
ArrowLoader::ArrowLoader(ArrowLoader&&) noexcept = default;

t_dtype
convert_type(const std::string& src) {
Expand Down Expand Up @@ -484,6 +485,11 @@ copy_array(
t_vocab* vocab = dest->_get_vocab();
std::string elem;

vocab->reserve(
dict->value_data()->size() + dsize, // vocab len + null bytes
dsize
);

for (std::uint64_t i = 0; i < dsize; ++i) {
std::int32_t bidx = offsets[i];
std::size_t es = offsets[i + 1] - bidx;
Expand Down
57 changes: 37 additions & 20 deletions cpp/perspective/src/cpp/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -682,13 +682,16 @@ ProtoServer::handle_request(
req_env.ParseFromString(data);
std::vector<ProtoServerResp<std::string>> serialized_responses;
std::vector<proto::Response> responses;

auto msg_id = req_env.msg_id();
auto entity_id = req_env.entity_id();
try {
auto resp_msg = _handle_request(client_id, req_env);
auto resp_msg = _handle_request(client_id, std::move(req_env));
for (auto& resp : resp_msg) {
ProtoServerResp<std::string> str_resp;
str_resp.data = resp.data.SerializeAsString();
str_resp.client_id = resp.client_id;
serialized_responses.emplace_back(str_resp);
serialized_responses.emplace_back(std::move(str_resp));
}
} catch (const PerspectiveException& e) {
proto::Response resp;
Expand Down Expand Up @@ -718,13 +721,13 @@ ProtoServer::handle_request(
// proto::Response resp_env;
serialized_responses.reserve(responses.size());
for (auto& resp : responses) {
resp.set_msg_id(req_env.msg_id());
resp.set_entity_id(req_env.entity_id());
resp.set_msg_id(msg_id);
resp.set_entity_id(entity_id);

ProtoServerResp<std::string> str_resp;
str_resp.data = resp.SerializeAsString();
str_resp.client_id = client_id;
serialized_responses.emplace_back(str_resp);
serialized_responses.emplace_back(std::move(str_resp));
}

return serialized_responses;
Expand Down Expand Up @@ -1148,7 +1151,7 @@ coerce_to(const t_dtype dtype, const A& val) {
}

std::vector<ProtoServerResp<ProtoServer::Response>>
ProtoServer::_handle_request(std::uint32_t client_id, const Request& req) {
ProtoServer::_handle_request(std::uint32_t client_id, Request&& req) {
static bool is_init_expr = false;
if (!is_init_expr) {
t_computed_expression_parser::init();
Expand All @@ -1157,9 +1160,13 @@ ProtoServer::_handle_request(std::uint32_t client_id, const Request& req) {

std::vector<ProtoServerResp<ProtoServer::Response>> proto_resp;
// proto::Response resp_env;

auto msg_id = req.msg_id();
auto entity_id = req.entity_id();

auto push_resp = [&](Response&& resp) {
resp.set_msg_id(req.msg_id());
resp.set_entity_id(req.entity_id());
resp.set_msg_id(msg_id);
resp.set_entity_id(entity_id);
ProtoServerResp<ProtoServer::Response> resp2;
resp2.data = std::move(resp);
resp2.client_id = client_id;
Expand Down Expand Up @@ -1280,32 +1287,42 @@ ProtoServer::_handle_request(std::uint32_t client_id, const Request& req) {
dims.end_col
);

table = Table::from_arrow(index, *arrow, limit);
table = Table::from_arrow(index, std::move(*arrow), limit);
break;
}
case proto::MakeTableData::kFromArrow: {
table =
Table::from_arrow(index, r.data().from_arrow(), limit);
std::string data = r.data().from_arrow();
{ auto _ = std::move(req); }

table = Table::from_arrow(index, std::move(data), limit);
break;
}
case proto::MakeTableData::kFromCsv: {
table = Table::from_csv(index, r.data().from_csv(), limit);
std::string data = r.data().from_csv();
{ auto _ = std::move(req); }

table = Table::from_csv(index, std::move(data), limit);
break;
}
case proto::MakeTableData::kFromCols: {
table =
Table::from_cols(index, r.data().from_cols(), limit);
std::string data = r.data().from_cols();
{ auto _ = std::move(req); }

table = Table::from_cols(index, std::move(data), limit);
break;
}
case proto::MakeTableData::kFromRows: {
table =
Table::from_rows(index, r.data().from_rows(), limit);
std::string data = r.data().from_rows();
{ auto _ = std::move(req); }

table = Table::from_rows(index, std::move(data), limit);
break;
}
case proto::MakeTableData::kFromNdjson: {
table = Table::from_ndjson(
index, r.data().from_ndjson(), limit
);
std::string data = r.data().from_ndjson();
{ auto _ = std::move(req); }

table = Table::from_ndjson(index, std::move(data), limit);
break;
}
case proto::MakeTableData::kFromSchema: {
Expand All @@ -1327,7 +1344,7 @@ ProtoServer::_handle_request(std::uint32_t client_id, const Request& req) {
}
}

m_resources.host_table(req.entity_id(), table);
m_resources.host_table(entity_id, table);
proto::Response resp;
resp.mutable_make_table_resp();
push_resp(std::move(resp));
Expand Down
Loading

0 comments on commit 47f1672

Please sign in to comment.