Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread and memory safety fixes #562

Merged
merged 5 commits into from
May 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions worker/include/Channel/ChannelSocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ namespace Channel
virtual ~ChannelSocket();

public:
void Close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this? We have a destructor.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was necessary to close the socket, but not destroy it (see Worker::Close() method changes). Otherwise libuv's callbacks were leading to access of consumer/producer sockets that were already freed, thus causing use-after-free I mentioned.

So I had to add explicit Close() method that closes producer and consumer sockets using their Close() methods, while keeping everything in memory, so things like IsClosed() in consumer callback still access valid memory location and returns true instead of undefined behavior.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because we call Worker::Close() explicitly in a few places, while libuv is fully running and can still trigger various callbacks on its own.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is the diff in Worker::Close() in this PR:

  // Close the Channel.
- delete this->channel;
+ this->channel->Close();

and ChannelSocket::Close() looks as follows:

void ChannelSocket::Close()
{
  MS_TRACE_STD();

  this->consumerSocket.Close();
  this->producerSocket.Close();
}

So what I mean is: why don't we just delete this->channel in the Worker destructor and make ChannelSocket do this?

void ChannelSocket::Close()
{
  MS_TRACE_STD();

  std::free(this->WriteBuffer);

  this->consumerSocket.Close();
  this->producerSocket.Close();
}

I don't like having a Close() method if we can live with just the destructor. We avoided Close() methods in the past for a reason: they are error prune: you call Close() on something and then what? can you reuse it? should you also call delete instance?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand we need to close consumer and producer sockets explicitly or else libuv will not stop event loop. Hence we do need to have explicit closing procedure for worker, namely Worker::Close(). It is just that freeing memory for things that could still be accessed is wrong.

I do not know if delete this->channel is still necessary in worker's destructor with these changes, I though it will be freed automatically.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the worker crashed in Node even when terminating the Node process, I'd have my ~/Library/Logs/DiagnosticReports/ folder full of mediasoup-worker core dumps :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the shutdown order is somehow slightly different between Rust and Node versions. Node uses SIGTERM and Rust uses worker.close command, which are slightly different code paths.

Anyway, what I did seems to work, once you have time to take a closer look, let me know what needs to be changed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the shutdown order is somehow slightly different between Rust and Node versions. Node uses SIGTERM and Rust uses worker.close command, which are slightly different code paths.

This is what I wanted to read!

Ok, true, it makes sense. I just need to check the code very carefully (it's been long since all that logic was done).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance you may have time to look at this in coming days?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, HYPER busy this week. I'll do tomorrow Friday at some random hour. Promised.

void SetListener(Listener* listener);
void Send(json& jsonMessage);
void SendLog(char* message, size_t messageLen);
Expand Down
2 changes: 1 addition & 1 deletion worker/include/Logger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class Logger
static const int64_t pid;
thread_local static Channel::ChannelSocket* channel;
static const size_t bufferSize {50000};
static char buffer[];
thread_local static char buffer[];
};

/* Logging macros. */
Expand Down
32 changes: 12 additions & 20 deletions worker/include/MediaSoupErrors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ class MediaSoupError : public std::runtime_error
explicit MediaSoupError(const char* description) : std::runtime_error(description)
{
}

public:
static const size_t bufferSize{ 2000 };
thread_local static char buffer[];
};

class MediaSoupTypeError : public MediaSoupError
Expand All @@ -26,44 +30,32 @@ class MediaSoupTypeError : public MediaSoupError
do \
{ \
MS_ERROR("throwing MediaSoupError: " desc, ##__VA_ARGS__); \
\
static char buffer[2000]; \
\
std::snprintf(buffer, 2000, desc, ##__VA_ARGS__); \
throw MediaSoupError(buffer); \
std::snprintf(MediaSoupError::buffer, MediaSoupError::bufferSize, desc, ##__VA_ARGS__); \
throw MediaSoupError(MediaSoupError::buffer); \
} while (false)

#define MS_THROW_ERROR_STD(desc, ...) \
do \
{ \
MS_ERROR_STD("throwing MediaSoupError: " desc, ##__VA_ARGS__); \
\
static char buffer[2000]; \
\
std::snprintf(buffer, 2000, desc, ##__VA_ARGS__); \
throw MediaSoupError(buffer); \
std::snprintf(MediaSoupError::buffer, MediaSoupError::bufferSize, desc, ##__VA_ARGS__); \
throw MediaSoupError(MediaSoupError::buffer); \
} while (false)

#define MS_THROW_TYPE_ERROR(desc, ...) \
do \
{ \
MS_ERROR("throwing MediaSoupTypeError: " desc, ##__VA_ARGS__); \
\
static char buffer[2000]; \
\
std::snprintf(buffer, 2000, desc, ##__VA_ARGS__); \
throw MediaSoupTypeError(buffer); \
std::snprintf(MediaSoupError::buffer, MediaSoupError::bufferSize, desc, ##__VA_ARGS__); \
throw MediaSoupTypeError(MediaSoupError::buffer); \
} while (false)

#define MS_THROW_TYPE_ERROR_STD(desc, ...) \
do \
{ \
MS_ERROR_STD("throwing MediaSoupTypeError: " desc, ##__VA_ARGS__); \
\
static char buffer[2000]; \
\
std::snprintf(buffer, 2000, desc, ##__VA_ARGS__); \
throw MediaSoupTypeError(buffer); \
std::snprintf(MediaSoupError::buffer, MediaSoupError::bufferSize, desc, ##__VA_ARGS__); \
throw MediaSoupTypeError(MediaSoupError::buffer); \
} while (false)
// clang-format on

Expand Down
1 change: 1 addition & 0 deletions worker/include/PayloadChannel/PayloadChannelSocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ namespace PayloadChannel
virtual ~PayloadChannelSocket();

public:
void Close();
void SetListener(Listener* listener);
void Send(json& jsonMessage, const uint8_t* payload, size_t payloadLen);
void Send(json& jsonMessage);
Expand Down
4 changes: 2 additions & 2 deletions worker/include/RTC/PortManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ namespace RTC
static std::vector<bool>& GetPorts(Transport transport, const std::string& ip);

private:
static std::unordered_map<std::string, std::vector<bool>> mapUdpIpPorts;
static std::unordered_map<std::string, std::vector<bool>> mapTcpIpPorts;
thread_local static std::unordered_map<std::string, std::vector<bool>> mapUdpIpPorts;
thread_local static std::unordered_map<std::string, std::vector<bool>> mapTcpIpPorts;
};
} // namespace RTC

Expand Down
2 changes: 1 addition & 1 deletion worker/include/Utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ namespace Utils

static const std::string GetRandomString(size_t len)
{
static char buffer[64];
char buffer[64];
static const char chars[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b',
'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n',
'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z' };
Expand Down
1 change: 1 addition & 0 deletions worker/mediasoup-worker.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
'src/DepOpenSSL.cpp',
'src/DepUsrSCTP.cpp',
'src/Logger.cpp',
'src/MediaSoupErrors.cpp',
'src/Settings.cpp',
'src/Worker.cpp',
'src/Utils/Crypto.cpp',
Expand Down
8 changes: 8 additions & 0 deletions worker/src/Channel/ChannelSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ namespace Channel
std::free(this->WriteBuffer);
}

void ChannelSocket::Close()
{
MS_TRACE_STD();

this->consumerSocket.Close();
this->producerSocket.Close();
}

void ChannelSocket::SetListener(Listener* listener)
{
MS_TRACE_STD();
Expand Down
2 changes: 1 addition & 1 deletion worker/src/Logger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

const int64_t Logger::pid{ static_cast<int64_t>(uv_os_getpid()) };
thread_local Channel::ChannelSocket* Logger::channel{ nullptr };
char Logger::buffer[Logger::bufferSize];
thread_local char Logger::buffer[Logger::bufferSize];

/* Class methods. */

Expand Down
5 changes: 5 additions & 0 deletions worker/src/MediaSoupErrors.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#define MS_CLASS "MediaSoupError"

#include "MediaSoupErrors.hpp"

thread_local char MediaSoupError::buffer[MediaSoupError::bufferSize];
8 changes: 8 additions & 0 deletions worker/src/PayloadChannel/PayloadChannelSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ namespace PayloadChannel
delete this->ongoingNotification;
}

void PayloadChannelSocket::Close()
{
MS_TRACE_STD();

this->consumerSocket.Close();
this->producerSocket.Close();
}

void PayloadChannelSocket::SetListener(Listener* listener)
{
MS_TRACE();
Expand Down
4 changes: 2 additions & 2 deletions worker/src/RTC/PortManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ namespace RTC
{
/* Class variables. */

std::unordered_map<std::string, std::vector<bool>> PortManager::mapUdpIpPorts;
std::unordered_map<std::string, std::vector<bool>> PortManager::mapTcpIpPorts;
thread_local std::unordered_map<std::string, std::vector<bool>> PortManager::mapUdpIpPorts;
thread_local std::unordered_map<std::string, std::vector<bool>> PortManager::mapTcpIpPorts;

/* Class methods. */

Expand Down
4 changes: 2 additions & 2 deletions worker/src/RTC/Producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1206,8 +1206,8 @@ namespace RTC

// Mangle RTP header extensions.
{
static uint8_t buffer[4096];
static std::vector<RTC::RtpPacket::GenericExtension> extensions;
thread_local static uint8_t buffer[4096];
thread_local static std::vector<RTC::RtpPacket::GenericExtension> extensions;

// This happens just once.
if (extensions.capacity() != 24)
Expand Down
2 changes: 1 addition & 1 deletion worker/src/RTC/RtpProbationGenerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ namespace RTC
this->probationPacket->SetTimestamp(Utils::Crypto::GetRandomUInt(0, 4294967295));

// Add BWE related RTP header extensions.
static uint8_t buffer[4096];
thread_local static uint8_t buffer[4096];

std::vector<RTC::RtpPacket::GenericExtension> extensions;
uint8_t extenLen;
Expand Down
8 changes: 4 additions & 4 deletions worker/src/RTC/SctpAssociation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ namespace RTC
if (notification->sn_header.sn_length > 0)
{
static const size_t BufferSize{ 1024 };
static char buffer[BufferSize];
thread_local static char buffer[BufferSize];

uint32_t len = notification->sn_header.sn_length;

Expand Down Expand Up @@ -838,7 +838,7 @@ namespace RTC
if (notification->sn_header.sn_length > 0)
{
static const size_t BufferSize{ 1024 };
static char buffer[BufferSize];
thread_local static char buffer[BufferSize];

uint32_t len = notification->sn_header.sn_length;

Expand Down Expand Up @@ -879,7 +879,7 @@ namespace RTC
case SCTP_REMOTE_ERROR:
{
static const size_t BufferSize{ 1024 };
static char buffer[BufferSize];
thread_local static char buffer[BufferSize];

uint32_t len = notification->sn_remote_error.sre_length - sizeof(struct sctp_remote_error);

Expand Down Expand Up @@ -915,7 +915,7 @@ namespace RTC
case SCTP_SEND_FAILED_EVENT:
{
static const size_t BufferSize{ 1024 };
static char buffer[BufferSize];
thread_local static char buffer[BufferSize];

uint32_t len =
notification->sn_send_failed_event.ssfe_length - sizeof(struct sctp_send_failed_event);
Expand Down
4 changes: 2 additions & 2 deletions worker/src/RTC/StunPacket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ namespace RTC
}
MS_DUMP(" size: %zu bytes", this->size);

static char transactionId[25];
char transactionId[25];

for (int i{ 0 }; i < 12; ++i)
{
Expand Down Expand Up @@ -385,7 +385,7 @@ namespace RTC
}
if (this->messageIntegrity != nullptr)
{
static char messageIntegrity[41];
char messageIntegrity[41];

for (int i{ 0 }; i < 20; ++i)
{
Expand Down
2 changes: 1 addition & 1 deletion worker/src/Utils/IP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ namespace Utils
{
MS_TRACE();

static sockaddr_storage addrStorage;
sockaddr_storage addrStorage;
char ipBuffer[INET6_ADDRSTRLEN] = { 0 };
int err;

Expand Down
2 changes: 1 addition & 1 deletion worker/src/Utils/String.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
/* Static. */

static constexpr size_t BufferOutSize{ 65536 };
static uint8_t BufferOut[BufferOutSize];
thread_local static uint8_t BufferOut[BufferOutSize];
static const uint8_t Base64Table[65] =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";

Expand Down
10 changes: 8 additions & 2 deletions worker/src/Worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ Worker::~Worker()
{
MS_TRACE();

// Delete the Channel.
delete this->channel;

// Delete the PayloadChannel.
delete this->payloadChannel;

if (!this->closed)
Close();
}
Expand All @@ -70,10 +76,10 @@ void Worker::Close()
this->mapRouters.clear();

// Close the Channel.
delete this->channel;
this->channel->Close();

// Close the PayloadChannel.
delete this->payloadChannel;
this->payloadChannel->Close();
}

void Worker::FillJson(json& jsonObject) const
Expand Down
4 changes: 2 additions & 2 deletions worker/src/handles/UnixStreamSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,10 @@ UnixStreamSocket::~UnixStreamSocket()
{
MS_TRACE_STD();

delete[] this->buffer;

if (!this->closed)
Close();

delete[] this->buffer;
}

void UnixStreamSocket::Close()
Expand Down
4 changes: 2 additions & 2 deletions worker/src/lib.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,14 @@ extern "C" int run_worker(
Worker worker(channel, payloadChannel);

// Free static stuff.
DepLibUV::ClassDestroy();
DepLibSRTP::ClassDestroy();
Utils::Crypto::ClassDestroy();
DepLibWebRTC::ClassDestroy();
RTC::DtlsTransport::ClassDestroy();
DepUsrSCTP::ClassDestroy();
DepLibUV::ClassDestroy();

// Wait a bit so peding messages to stdout/Channel arrive to the Node
// Wait a bit so pending messages to stdout/Channel arrive to the Node
// process.
uv_sleep(200);

Expand Down