From 816c00e2ae146497c63624a5bea7759b767917bb Mon Sep 17 00:00:00 2001 From: mamaheux <35638081+mamaheux@users.noreply.github.com> Date: Tue, 19 Dec 2023 09:26:59 -0500 Subject: [PATCH] Reliability tests (#134) * Add cpp-data-channel-client-reliability-tests * Update the readme and the room. * Fix cpp-data-channel-client-reliability-tests. * Add the return value for datachannel.send. * Reduce delays. * Release the GIL for data channel send methods. --- CMakeLists.txt | 1 + VERSION | 2 +- .../.gitignore | 4 + .../CMakeLists.txt | 28 +++ .../README.md | 31 +++ .../iceServers.json | 5 + .../main.cpp | 210 ++++++++++++++++++ .../start_server.bash | 7 + .../DataChannelClient.h | 26 +-- .../DataChannelPeerConnectionHandler.h | 2 +- .../python/src/DataChannelClientPython.cpp | 4 + .../src/DataChannelClient.cpp | 16 +- .../DataChannelPeerConnectionHandler.cpp | 8 +- 13 files changed, 321 insertions(+), 23 deletions(-) create mode 100644 examples/cpp-data-channel-client-reliability-tests/.gitignore create mode 100644 examples/cpp-data-channel-client-reliability-tests/CMakeLists.txt create mode 100644 examples/cpp-data-channel-client-reliability-tests/README.md create mode 100644 examples/cpp-data-channel-client-reliability-tests/iceServers.json create mode 100644 examples/cpp-data-channel-client-reliability-tests/main.cpp create mode 100755 examples/cpp-data-channel-client-reliability-tests/start_server.bash diff --git a/CMakeLists.txt b/CMakeLists.txt index 7f0cbc99..f6683699 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -138,5 +138,6 @@ add_subdirectory(signaling-server) if(OPENTERA_WEBRTC_ENABLE_EXAMPLES) add_subdirectory(examples/cpp-data-channel-client) + add_subdirectory(examples/cpp-data-channel-client-reliability-tests) add_subdirectory(examples/cpp-stream-client) endif() diff --git a/VERSION b/VERSION index 05e8a459..2228cad4 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.6.6 +0.6.7 diff --git a/examples/cpp-data-channel-client-reliability-tests/.gitignore b/examples/cpp-data-channel-client-reliability-tests/.gitignore new file mode 100644 index 00000000..3967845b --- /dev/null +++ b/examples/cpp-data-channel-client-reliability-tests/.gitignore @@ -0,0 +1,4 @@ +cmake-build-debug +cmake-build-release +build +.idea diff --git a/examples/cpp-data-channel-client-reliability-tests/CMakeLists.txt b/examples/cpp-data-channel-client-reliability-tests/CMakeLists.txt new file mode 100644 index 00000000..ad5e9258 --- /dev/null +++ b/examples/cpp-data-channel-client-reliability-tests/CMakeLists.txt @@ -0,0 +1,28 @@ +cmake_minimum_required(VERSION 3.14.0) + +include_directories(${CMAKE_CURRENT_BINARY_DIR}) + +project(CppDataChannelReliabilityTests) + +set(LIBRARY_OUTPUT_PATH bin/${CMAKE_BUILD_TYPE}) + +include_directories(${OpenCV_INCLUDE_DIRS}) +include_directories(BEFORE SYSTEM ${webrtc_native_INCLUDE}) +include_directories(../../opentera-webrtc-native-client/3rdParty/socket.io-client-cpp/src) +include_directories(../../opentera-webrtc-native-client/3rdParty/socket.io-client-cpp/lib/rapidjson/include) +include_directories(../../opentera-webrtc-native-client/3rdParty/cpp-httplib) +include_directories(../../opentera-webrtc-native-client/OpenteraWebrtcNativeClient/include) + +add_executable(CppDataChannelReliabilityTests main.cpp) + +target_link_libraries(CppDataChannelReliabilityTests + OpenteraWebrtcNativeClient +) + +if (NOT WIN32) + target_link_libraries(CppDataChannelReliabilityTests + pthread + ) +endif() + +set_property(TARGET CppDataChannelReliabilityTests PROPERTY CXX_STANDARD 17) diff --git a/examples/cpp-data-channel-client-reliability-tests/README.md b/examples/cpp-data-channel-client-reliability-tests/README.md new file mode 100644 index 00000000..da3be6a2 --- /dev/null +++ b/examples/cpp-data-channel-client-reliability-tests/README.md @@ -0,0 +1,31 @@ +# cpp-data-channel-client-reliability-tests + +This example tests the reliability of the data channel. + +## How to use + +1. Build the example. +```bash +cd ../.. +mkdir build +cd build +cmake .. +cmake --build . --config Release|Debug +``` + +2. Start the signaling server. +```bash +./start_server.bash +``` + +3. Start a master client. +```bash +cd ../../build/bin/Release +./CppDataChannelReliabilityTests http://localhost:8080 master abc true +``` + +4. Start a slave client. +```bash +cd ../../build/bin/Release +./CppDataChannelReliabilityTests http://localhost:8080 slave abc false +``` diff --git a/examples/cpp-data-channel-client-reliability-tests/iceServers.json b/examples/cpp-data-channel-client-reliability-tests/iceServers.json new file mode 100644 index 00000000..74575ab7 --- /dev/null +++ b/examples/cpp-data-channel-client-reliability-tests/iceServers.json @@ -0,0 +1,5 @@ +[ + { + "urls": "stun:stun.l.google.com:19302" + } +] diff --git a/examples/cpp-data-channel-client-reliability-tests/main.cpp b/examples/cpp-data-channel-client-reliability-tests/main.cpp new file mode 100644 index 00000000..987f0e2c --- /dev/null +++ b/examples/cpp-data-channel-client-reliability-tests/main.cpp @@ -0,0 +1,210 @@ +#include + +#include +#include +#include +#include +#include + +using namespace opentera; +using namespace std; + +constexpr chrono::milliseconds CLOSING_CONNECTION_DELAY(1000); + +constexpr int MESSAGE_COUNT = 100; + +atomic_bool isRunning = true; + +void sigintSigtermCallbackHandler(int signum) +{ + isRunning = false; +} + +template +bool waitFor(F f) +{ + constexpr chrono::milliseconds SLEEP_TIME(100); + constexpr chrono::milliseconds TIMEOUT(10000); + + auto start = chrono::steady_clock::now(); + while (!f()) + { + this_thread::sleep_for(SLEEP_TIME); + + if (std::chrono::duration_cast(chrono::steady_clock::now() - start) > TIMEOUT) + { + return false; + } + else if (!isRunning) + { + exit(-1); + } + } + + return true; +} + +int main(int argc, char* argv[]) +{ + if (argc != 5) + { + cout << "Usage: CppDataChannelReliabilityTests base_url name password master(true|false)" << endl; + return -1; + } + + string baseUrl = argv[1]; + string name = argv[2]; + string password = argv[3]; + bool isMaster = string(argv[4]) != "false"; + + signal(SIGINT, sigintSigtermCallbackHandler); + signal(SIGTERM, sigintSigtermCallbackHandler); + + vector iceServers; + if (!IceServer::fetchFromServer(baseUrl + "/iceservers", password, iceServers)) + { + cout << "IceServer::fetchFromServer failed" << endl; + iceServers.clear(); + } + + cout << "Ice servers=" << endl; + for (auto s : iceServers) + { + cout << "\turls=" << endl; + for (auto u : s.urls()) + { + cout << "\t\t" << u << endl; + } + cout << "\tusername=" << s.username() << endl; + cout << "\tcredential=" << s.credential() << endl; + } + cout << endl; + + auto signalingServerConfiguration = + SignalingServerConfiguration::create(baseUrl, name, "reliability", password); + auto webrtcConfiguration = WebrtcConfiguration::create(iceServers); + auto dataChannelConfiguration = DataChannelConfiguration::create(); + DataChannelClient client(signalingServerConfiguration, webrtcConfiguration, dataChannelConfiguration); + + atomic_bool hasAnotherClient = false; + atomic_bool isDataChannelOpened = false; + atomic_int currentMessageId = 0; + + int successfulConnectionCount = 0; + int failedConnectionCount = 0; + int successfulMessageGroupCount = 0; + int failedMessageGroupCount = 0; + + client.setOnSignalingConnectionError( + [](const string& error) + { + cout << "OnSignalingConnectionClosed:" << endl << "\t" << error; + }); + + client.setOnRoomClientsChanged( + [&](const vector& roomClients) + { + hasAnotherClient = roomClients.size() > 1; + }); + + client.setOnError( + [](const string& error) + { + cout << "error:" << endl; + cout << "\t" << error << endl; + }); + + client.setOnDataChannelOpened( + [&](const Client& client) + { + isDataChannelOpened = true; + }); + client.setOnDataChannelError( + [](const Client& client, const string& error) + { + cout << "OnDataChannelError:" << endl; + cout << "\tid=" << client.id() << ", name=" << client.name() << endl; + cout << "\t" << error << endl; + }); + client.setOnDataChannelMessageString( + [&](const Client& _, const string& message) + { + int receivedId = stoi(message); + currentMessageId = receivedId + 1; + if (!client.sendToAll(to_string(currentMessageId.load()))) + { + cout << "sendToAll failed" << endl; + } + + cout << "receivedId=" << receivedId << endl; + }); + + client.connect(); + + cout << "Connecting to the signaling server." << endl; + if (!waitFor([&](){ return client.isConnected(); })) + { + cout << "Signaling server connection failed." << endl; + return -1; + } + + cout << "Waiting for another client." << endl; + if (!waitFor([&](){ return hasAnotherClient.load(); })) + { + cout << "No other client." << endl; + return -1; + } + + while (isRunning) + { + isDataChannelOpened = false; + if (isMaster) + { + client.callAll(); + } + + if (waitFor([&](){ return isDataChannelOpened.load(); })) + { + successfulConnectionCount++; + currentMessageId = 0; + if (isMaster) + { + if (!client.sendToAll(to_string(currentMessageId.load()))) + { + cout << "sendToAll failed" << endl; + } + } + + if (waitFor([&](){ return currentMessageId.load() >= MESSAGE_COUNT; })) + { + successfulMessageGroupCount++; + } + else + { + failedMessageGroupCount++; + } + } + else + { + failedConnectionCount++; + } + + if (isMaster) + { + client.closeAllRoomPeerConnections(); + } + + cout << endl << "************ Stats ************" << endl; + cout << "\t successfulConnectionCount=" << successfulConnectionCount << endl; + cout << "\t failedConnectionCount=" << failedConnectionCount << endl; + cout << "\t successfulMessageGroupCount=" << successfulMessageGroupCount << endl; + cout << "\t failedMessageGroupCount=" << failedMessageGroupCount << endl; + cout << "************ Stats ************" << endl << endl; + + this_thread::sleep_for(CLOSING_CONNECTION_DELAY); + } + + client.closeSync(); + + return 0; +} diff --git a/examples/cpp-data-channel-client-reliability-tests/start_server.bash b/examples/cpp-data-channel-client-reliability-tests/start_server.bash new file mode 100755 index 00000000..e374c1d8 --- /dev/null +++ b/examples/cpp-data-channel-client-reliability-tests/start_server.bash @@ -0,0 +1,7 @@ +#!/bin/bash + +SCRIPT=`realpath $0` +SCRIPT_PATH=`dirname $SCRIPT` + +cd $SCRIPT_PATH/../../signaling-server +python3 opentera-signaling-server --port 8080 --password abc --ice_servers $SCRIPT_PATH/iceServers.json diff --git a/opentera-webrtc-native-client/OpenteraWebrtcNativeClient/include/OpenteraWebrtcNativeClient/DataChannelClient.h b/opentera-webrtc-native-client/OpenteraWebrtcNativeClient/include/OpenteraWebrtcNativeClient/DataChannelClient.h index 63c703ac..20578d7d 100644 --- a/opentera-webrtc-native-client/OpenteraWebrtcNativeClient/include/OpenteraWebrtcNativeClient/DataChannelClient.h +++ b/opentera-webrtc-native-client/OpenteraWebrtcNativeClient/include/OpenteraWebrtcNativeClient/DataChannelClient.h @@ -32,10 +32,10 @@ namespace opentera DECLARE_NOT_COPYABLE(DataChannelClient); DECLARE_NOT_MOVABLE(DataChannelClient); - void sendTo(const uint8_t* data, std::size_t size, const std::vector& ids); - void sendTo(const std::string& message, const std::vector& ids); - void sendToAll(const uint8_t* data, std::size_t size); - void sendToAll(const std::string& message); + bool sendTo(const uint8_t* data, std::size_t size, const std::vector& ids); + bool sendTo(const std::string& message, const std::vector& ids); + bool sendToAll(const uint8_t* data, std::size_t size); + bool sendToAll(const std::string& message); void setOnDataChannelOpened(const std::function& callback); void setOnDataChannelClosed(const std::function& callback); @@ -45,8 +45,8 @@ namespace opentera void setOnDataChannelMessageString(const std::function& callback); protected: - void sendTo(const webrtc::DataBuffer& buffer, const std::vector& ids); - void sendToAll(const webrtc::DataBuffer& buffer); + bool sendTo(const webrtc::DataBuffer& buffer, const std::vector& ids); + bool sendToAll(const webrtc::DataBuffer& buffer); std::unique_ptr createPeerConnectionHandler(const std::string& id, const Client& peerClient, bool isCaller) override; @@ -59,9 +59,9 @@ namespace opentera * @param size The binary data size * @param ids The client ids */ - inline void DataChannelClient::sendTo(const uint8_t* data, size_t size, const std::vector& ids) + inline bool DataChannelClient::sendTo(const uint8_t* data, size_t size, const std::vector& ids) { - sendTo(webrtc::DataBuffer(rtc::CopyOnWriteBuffer(data, size), true), ids); + return sendTo(webrtc::DataBuffer(rtc::CopyOnWriteBuffer(data, size), true), ids); } /** @@ -70,9 +70,9 @@ namespace opentera * @param message The string message * @param ids The client ids */ - inline void DataChannelClient::sendTo(const std::string& message, const std::vector& ids) + inline bool DataChannelClient::sendTo(const std::string& message, const std::vector& ids) { - sendTo(webrtc::DataBuffer(message), ids); + return sendTo(webrtc::DataBuffer(message), ids); } /** @@ -81,9 +81,9 @@ namespace opentera * @param data The binary data * @param size The binary data size */ - inline void DataChannelClient::sendToAll(const uint8_t* data, size_t size) + inline bool DataChannelClient::sendToAll(const uint8_t* data, size_t size) { - sendToAll(webrtc::DataBuffer(rtc::CopyOnWriteBuffer(data, size), true)); + return sendToAll(webrtc::DataBuffer(rtc::CopyOnWriteBuffer(data, size), true)); } /** @@ -91,7 +91,7 @@ namespace opentera * * @param message The string message */ - inline void DataChannelClient::sendToAll(const std::string& message) { sendToAll(webrtc::DataBuffer(message)); } + inline bool DataChannelClient::sendToAll(const std::string& message) { return sendToAll(webrtc::DataBuffer(message)); } /** * @brief Sets the callback that is called when a data channel opens. diff --git a/opentera-webrtc-native-client/OpenteraWebrtcNativeClient/include/OpenteraWebrtcNativeClient/Handlers/DataChannelPeerConnectionHandler.h b/opentera-webrtc-native-client/OpenteraWebrtcNativeClient/include/OpenteraWebrtcNativeClient/Handlers/DataChannelPeerConnectionHandler.h index 3934d218..04eee513 100644 --- a/opentera-webrtc-native-client/OpenteraWebrtcNativeClient/include/OpenteraWebrtcNativeClient/Handlers/DataChannelPeerConnectionHandler.h +++ b/opentera-webrtc-native-client/OpenteraWebrtcNativeClient/include/OpenteraWebrtcNativeClient/Handlers/DataChannelPeerConnectionHandler.h @@ -46,7 +46,7 @@ namespace opentera void setPeerConnection(const rtc::scoped_refptr& peerConnection) override; - void send(const webrtc::DataBuffer& buffer); + bool send(const webrtc::DataBuffer& buffer); // Observer methods void OnDataChannel(rtc::scoped_refptr dataChannel) override; diff --git a/opentera-webrtc-native-client/OpenteraWebrtcNativeClient/python/src/DataChannelClientPython.cpp b/opentera-webrtc-native-client/OpenteraWebrtcNativeClient/python/src/DataChannelClientPython.cpp index de690a79..716391e3 100644 --- a/opentera-webrtc-native-client/OpenteraWebrtcNativeClient/python/src/DataChannelClientPython.cpp +++ b/opentera-webrtc-native-client/OpenteraWebrtcNativeClient/python/src/DataChannelClientPython.cpp @@ -48,6 +48,7 @@ void opentera::initDataChannelClientPython(pybind11::module& m) auto data = bytes.cast(); self.sendTo(reinterpret_cast(data.data()), data.size(), ids); }, + py::call_guard(), "Sends binary data to the specified clients.\n" "\n" ":param bytes: The binary data\n" @@ -57,6 +58,7 @@ void opentera::initDataChannelClientPython(pybind11::module& m) .def( "send_to", py::overload_cast&>(&DataChannelClient::sendTo), + py::call_guard(), "Sends a string message to the specified clients.\n" "\n" ":param message: The string message\n" @@ -70,6 +72,7 @@ void opentera::initDataChannelClientPython(pybind11::module& m) auto data = bytes.cast(); self.sendToAll(reinterpret_cast(data.data()), data.size()); }, + py::call_guard(), "Sends binary data to all clients.\n" "\n" ":param bytes: The binary data (bytes)\n", @@ -77,6 +80,7 @@ void opentera::initDataChannelClientPython(pybind11::module& m) .def( "send_to_all", py::overload_cast(&DataChannelClient::sendToAll), + py::call_guard(), "Sends a string message to all clients.\n" "\n" ":param message: The string message", diff --git a/opentera-webrtc-native-client/OpenteraWebrtcNativeClient/src/DataChannelClient.cpp b/opentera-webrtc-native-client/OpenteraWebrtcNativeClient/src/DataChannelClient.cpp index f713982c..1ba2f55d 100644 --- a/opentera-webrtc-native-client/OpenteraWebrtcNativeClient/src/DataChannelClient.cpp +++ b/opentera-webrtc-native-client/OpenteraWebrtcNativeClient/src/DataChannelClient.cpp @@ -23,33 +23,37 @@ DataChannelClient::DataChannelClient( { } -void DataChannelClient::sendTo(const webrtc::DataBuffer& buffer, const vector& ids) +bool DataChannelClient::sendTo(const webrtc::DataBuffer& buffer, const vector& ids) { - callAsync( + return callSync( getInternalClientThread(), [this, buffer, ids]() { + bool ok = true; for (const auto& id : ids) { auto it = m_peerConnectionHandlersById.find(id); if (it != m_peerConnectionHandlersById.end()) { - dynamic_cast(it->second.get())->send(buffer); + ok = ok && dynamic_cast(it->second.get())->send(buffer); } } + return ok; }); } -void DataChannelClient::sendToAll(const webrtc::DataBuffer& buffer) +bool DataChannelClient::sendToAll(const webrtc::DataBuffer& buffer) { - callAsync( + return callSync( getInternalClientThread(), [this, buffer]() { + bool ok = true; for (auto& pair : m_peerConnectionHandlersById) { - dynamic_cast(pair.second.get())->send(buffer); + ok = ok && dynamic_cast(pair.second.get())->send(buffer); } + return ok; }); } diff --git a/opentera-webrtc-native-client/OpenteraWebrtcNativeClient/src/Handlers/DataChannelPeerConnectionHandler.cpp b/opentera-webrtc-native-client/OpenteraWebrtcNativeClient/src/Handlers/DataChannelPeerConnectionHandler.cpp index ff54b2ca..35c4ba80 100644 --- a/opentera-webrtc-native-client/OpenteraWebrtcNativeClient/src/Handlers/DataChannelPeerConnectionHandler.cpp +++ b/opentera-webrtc-native-client/OpenteraWebrtcNativeClient/src/Handlers/DataChannelPeerConnectionHandler.cpp @@ -72,11 +72,15 @@ void DataChannelPeerConnectionHandler::setPeerConnection( } } -void DataChannelPeerConnectionHandler::send(const webrtc::DataBuffer& buffer) +bool DataChannelPeerConnectionHandler::send(const webrtc::DataBuffer& buffer) { if (m_dataChannel) { - m_dataChannel->Send(buffer); + return m_dataChannel->Send(buffer); + } + else + { + return false; } }