From 3b97aaf21644d953b591175cfc3fa19b4b496969 Mon Sep 17 00:00:00 2001 From: Cristian Ferretti <37232625+jcferretti@users.noreply.github.com> Date: Wed, 15 May 2024 00:34:50 -0400 Subject: [PATCH] Add a C++ client concurrent test/example (#5490) * Replicate failing python unit test in C++. * Removed unused code from server.cc * undo change. * Tweak * shared_ptr * Review comments. * Followup review comments. --- .../deephaven/dhclient/src/server/server.cc | 35 ------- cpp-client/deephaven/examples/CMakeLists.txt | 1 + .../examples/concurrent_client/CMakeLists.txt | 7 ++ .../examples/concurrent_client/main.cc | 93 +++++++++++++++++++ 4 files changed, 101 insertions(+), 35 deletions(-) create mode 100644 cpp-client/deephaven/examples/concurrent_client/CMakeLists.txt create mode 100644 cpp-client/deephaven/examples/concurrent_client/main.cc diff --git a/cpp-client/deephaven/dhclient/src/server/server.cc b/cpp-client/deephaven/dhclient/src/server/server.cc index ccd39bf6eac..31315bfda70 100644 --- a/cpp-client/deephaven/dhclient/src/server/server.cc +++ b/cpp-client/deephaven/dhclient/src/server/server.cc @@ -10,7 +10,6 @@ #include #include #include -#include #include #include #include @@ -20,46 +19,12 @@ #include "deephaven/third_party/fmt/format.h" using arrow::flight::FlightClient; -using deephaven::client::impl::MoveVectorData; -using deephaven::dhcore::utility::Bit_cast; using deephaven::dhcore::utility::GetWhat; -using io::deephaven::proto::backplane::grpc::AddTableRequest; -using io::deephaven::proto::backplane::grpc::AddTableResponse; -using io::deephaven::proto::backplane::grpc::AjRajTablesRequest; -using io::deephaven::proto::backplane::grpc::AuthenticationConstantsRequest; using io::deephaven::proto::backplane::grpc::ConfigurationConstantsRequest; using io::deephaven::proto::backplane::grpc::ConfigurationConstantsResponse; -using io::deephaven::proto::backplane::grpc::ConfigService; -using io::deephaven::proto::backplane::grpc::CreateInputTableRequest; -using io::deephaven::proto::backplane::grpc::CrossJoinTablesRequest; -using io::deephaven::proto::backplane::grpc::DeleteTableRequest; -using io::deephaven::proto::backplane::grpc::DeleteTableResponse; -using io::deephaven::proto::backplane::grpc::DropColumnsRequest; -using io::deephaven::proto::backplane::grpc::EmptyTableRequest; -using io::deephaven::proto::backplane::grpc::ExactJoinTablesRequest; -using io::deephaven::proto::backplane::grpc::ExportedTableCreationResponse; -using io::deephaven::proto::backplane::grpc::FetchTableRequest; -using io::deephaven::proto::backplane::grpc::HandshakeRequest; -using io::deephaven::proto::backplane::grpc::HeadOrTailRequest; -using io::deephaven::proto::backplane::grpc::HeadOrTailByRequest; -using io::deephaven::proto::backplane::grpc::LeftJoinTablesRequest; -using io::deephaven::proto::backplane::grpc::MergeTablesRequest; -using io::deephaven::proto::backplane::grpc::NaturalJoinTablesRequest; using io::deephaven::proto::backplane::grpc::ReleaseRequest; using io::deephaven::proto::backplane::grpc::ReleaseResponse; -using io::deephaven::proto::backplane::grpc::SelectDistinctRequest; -using io::deephaven::proto::backplane::grpc::SelectOrUpdateRequest; -using io::deephaven::proto::backplane::grpc::SortTableRequest; using io::deephaven::proto::backplane::grpc::Ticket; -using io::deephaven::proto::backplane::grpc::TimeTableRequest; -using io::deephaven::proto::backplane::grpc::WhereInRequest; -using io::deephaven::proto::backplane::grpc::UpdateByRequest; -using io::deephaven::proto::backplane::grpc::UnstructuredFilterTableRequest; -using io::deephaven::proto::backplane::grpc::UngroupRequest; -using io::deephaven::proto::backplane::script::grpc::BindTableToVariableRequest; -using io::deephaven::proto::backplane::script::grpc::ExecuteCommandRequest; -using io::deephaven::proto::backplane::script::grpc::ExecuteCommandResponse; -using io::deephaven::proto::backplane::script::grpc::StartConsoleRequest; using UpdateByOperation = io::deephaven::proto::backplane::grpc::UpdateByRequest::UpdateByOperation; diff --git a/cpp-client/deephaven/examples/CMakeLists.txt b/cpp-client/deephaven/examples/CMakeLists.txt index 90ad3ea1e33..a64df824cd2 100644 --- a/cpp-client/deephaven/examples/CMakeLists.txt +++ b/cpp-client/deephaven/examples/CMakeLists.txt @@ -1,4 +1,5 @@ add_subdirectory(table_cleanup) +add_subdirectory(concurrent_client) add_subdirectory(create_table_with_arrow_flight) add_subdirectory(create_table_with_table_maker) add_subdirectory(demos) diff --git a/cpp-client/deephaven/examples/concurrent_client/CMakeLists.txt b/cpp-client/deephaven/examples/concurrent_client/CMakeLists.txt new file mode 100644 index 00000000000..81358a32d65 --- /dev/null +++ b/cpp-client/deephaven/examples/concurrent_client/CMakeLists.txt @@ -0,0 +1,7 @@ +project(hello_world) + +set(CMAKE_CXX_STANDARD 17) + +add_executable(concurrent_session main.cc) + +target_link_libraries(concurrent_session deephaven::client) diff --git a/cpp-client/deephaven/examples/concurrent_client/main.cc b/cpp-client/deephaven/examples/concurrent_client/main.cc new file mode 100644 index 00000000000..1308c790f14 --- /dev/null +++ b/cpp-client/deephaven/examples/concurrent_client/main.cc @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending + */ +#include +#include +#include +#include +#include +#include +#include "deephaven/client/client.h" +#include "deephaven/third_party/fmt/format.h" +#include "deephaven/third_party/fmt/ostream.h" + +using deephaven::client::Client; +using deephaven::client::TableHandleManager; + +void Usage(const char *const argv0, const int exit_status) { + std::cerr << "Usage: " << argv0 << "[numthreads [iterations [host:port]]]" << '\n'; + std::exit(exit_status); +} + +void ThreadFun(std::shared_ptr clientp, const std::size_t ti) { + auto manager = clientp->GetManager(); + using namespace std::chrono_literals; + std::cout << "THREAD START " << ti << std::endl << std::flush; + const std::string t1_name = std::string("import deephaven; t1_") + std::to_string(ti); + while (true) { + manager.RunScript( + t1_name + "= deephaven.time_table(\"PT1S\")"); + std::this_thread::sleep_for(2000ms); + auto t1 = manager.FetchTable(std::string("t1_") + std::to_string(ti)); + } + std::cout << "THREAD END " << ti << std::endl << std::flush; +} + +void Run(std::shared_ptr clientp, const std::size_t nthreads) { + std::vector threads; + for (std::size_t i = 0; i < nthreads; ++i) { + threads.emplace_back(&ThreadFun, clientp, i); + } + + for (std::size_t i = 0; i < nthreads; ++i) { + threads[i].join(); + } +} + +std::size_t get_size_arg( + const char *const description, + const std::size_t defaultval, + const int argc, + const char *const argv[], + int &c) { + if (argc <= c) { + return defaultval; + } + + ++c; + try { + const std::size_t val = static_cast(std::stoi(argv[c])); + if (val <= 0) { + throw std::invalid_argument("<= 0"); + } + return val; + } catch (const std::invalid_argument &e) { + fmt::println(std::cerr, "{}: can't convert {} argument #{} '{}' to a positive intenger [{}], aborting.", + argv[0], description, c, argv[c], e.what()); + std::exit(1); + } +} + +int main(int argc, char *argv[]) { + int c = 1; + if (argc > 1 && std::strcmp("-h", argv[1]) == 0) { + Usage(argv[0], 0); + } + std::size_t nthreads = get_size_arg("number of threads", 200, argc, argv, c); + std::size_t iterations = get_size_arg("number of iterations", 600, argc, argv, c); + const char *endpoint = "localhost:10000"; + if (argc > c) { + endpoint = argv[c]; + } + ++c; + if (argc > c) { + Usage(argv[0], 1); + } + try { + auto client = Client::Connect(endpoint); + Run(std::make_shared(std::move(client)), nthreads); + } catch (const std::exception &e) { + std::cerr << "Caught exception: " << e.what() << '\n'; + } + return 0; +}