Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: DIOS-7656 AsyncSafe might be called when the object is in destruction or construction #390

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
16 changes: 10 additions & 6 deletions include/TimeService.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class TimeServiceWrapper : public std::enable_shared_from_this<T>
static std::shared_ptr<T> Create(ARGS&&... args)
{
auto obj = std::shared_ptr<T>(new T(std::forward<ARGS>(args)...));
obj->initialized = true;
obj->OnCreated();
return obj;
}
Expand All @@ -82,7 +83,7 @@ class TimeServiceWrapper : public std::enable_shared_from_this<T>
{
auto selfWeak = TimeServiceWrapper<T>::weak_from_this();
// If following assert failed, the function might be called in constructor. See OnCreated() description.
assert(!selfWeak.expired());
assert(initialized);

timeService.AsyncUnsafe([selfWeak, func = std::forward<Func>(func)] (std::chrono::milliseconds now) mutable {
auto self = selfWeak.lock();
Expand All @@ -97,7 +98,7 @@ class TimeServiceWrapper : public std::enable_shared_from_this<T>
{
auto selfWeak = TimeServiceWrapper<T>::weak_from_this();
// If following assert failed, the function might be called in constructor. See OnCreated() description.
assert(!selfWeak.expired());
assert(initialized);

timeService.AsyncUnsafe([selfWeak, func = std::forward<Func>(func)](std::chrono::milliseconds now) mutable {
auto self = selfWeak.lock();
Expand All @@ -112,7 +113,7 @@ class TimeServiceWrapper : public std::enable_shared_from_this<T>
{
auto selfWeak = TimeServiceWrapper<T>::weak_from_this();
// If following assert failed, the function might be called in constructor. See OnCreated() description.
assert(!selfWeak.expired());
assert(initialized);

return timeService.CreateTimerUnsafe([selfWeak, func = std::forward<Func>(func)] (std::chrono::milliseconds now) mutable {
auto self = selfWeak.lock();
Expand All @@ -127,7 +128,7 @@ class TimeServiceWrapper : public std::enable_shared_from_this<T>
{
auto selfWeak = TimeServiceWrapper<T>::weak_from_this();
// If following assert failed, the function might be called in constructor. See OnCreated() description.
assert(!selfWeak.expired());
assert(initialized);

return timeService.CreateTimerUnsafe(ms, [selfWeak, func = std::forward<Func>(func)] (std::chrono::milliseconds now) mutable {
auto self = selfWeak.lock();
Expand All @@ -142,7 +143,7 @@ class TimeServiceWrapper : public std::enable_shared_from_this<T>
{
auto selfWeak = TimeServiceWrapper<T>::weak_from_this();
// If following assert failed, the function might be called in constructor. See OnCreated() description.
assert(!selfWeak.expired());
assert(initialized);

return timeService.CreateTimerUnsafe(ms, repeat, [selfWeak, func = std::forward<Func>(func)] (std::chrono::milliseconds now) mutable {
auto self = selfWeak.lock();
Expand All @@ -157,7 +158,7 @@ class TimeServiceWrapper : public std::enable_shared_from_this<T>
{
auto selfWeak = TimeServiceWrapper<T>::weak_from_this();
// If following assert failed, the function might be called in constructor. See OnCreated() description.
assert(!selfWeak.expired());
assert(initialized);

return timeService.FutureUnsafe([selfWeak, func = std::forward<Func>(func)] (std::chrono::milliseconds now) mutable {
auto self = selfWeak.lock();
Expand All @@ -169,8 +170,11 @@ class TimeServiceWrapper : public std::enable_shared_from_this<T>

TimeService& GetTimeService() { return timeService; }

protected:
bool initialized = false;
private:
TimeService& timeService;

};


Expand Down
6 changes: 5 additions & 1 deletion include/rtp/RTPIncomingMediaStreamMultiplexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class RTPIncomingMediaStreamMultiplexer :
RTPIncomingMediaStreamMultiplexer(const RTPIncomingMediaStream::shared& incomingMediaStream, TimeService& timeService);

public:
virtual ~RTPIncomingMediaStreamMultiplexer() = default;
virtual ~RTPIncomingMediaStreamMultiplexer();

// RTPIncomingMediaStream interface;
virtual void AddListener(RTPIncomingMediaStream::Listener* listener) override;
Expand All @@ -35,11 +35,15 @@ class RTPIncomingMediaStreamMultiplexer :
virtual void onEnded(const RTPIncomingMediaStream* stream) override;

virtual TimeService& GetTimeService() override { return TimeServiceWrapper<RTPIncomingMediaStreamMultiplexer>::GetTimeService(); }

virtual void OnCreated() override;
void Stop();
private:
RTPIncomingMediaStream::shared incomingMediaStream;
std::set<RTPIncomingMediaStream::Listener*> listeners;
volatile bool muted = false;

bool stopped = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we set initialized = false instead of a second bool? Will also assert/protect against using the object in async calls after we have stopped it.

The stop currently assumes that it is the last thing executed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand two bools seems duplicated. However, I am keen to separate the logic in the concrete class from the base TimeServiceWrapper class. The initialized was created for check whether it is constructed to avoid misuse of TimeServiceWrapper. I feel it would make it complicated to expose it to child classes.

I am current investigating a destruction issue related to this. I would do a few more tweaks around that. Will resend it for review once it's done.

};

#endif //RTPINCOMINGMEDIASTREAMMULTIPLEXER_H
14 changes: 14 additions & 0 deletions src/rtp/RTPIncomingMediaStreamMultiplexer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,17 @@ RTPIncomingMediaStreamMultiplexer::RTPIncomingMediaStreamMultiplexer(const RTPIn
{

Debug("-RTPIncomingMediaStreamMultiplexer::RTPIncomingMediaStreamMultiplexer() [stream:%p,this:%p]\n", incomingMediaStream, this);
}

RTPIncomingMediaStreamMultiplexer::~RTPIncomingMediaStreamMultiplexer()
{
Debug("-RTPIncomingMediaStreamMultiplexer::~RTPIncomingMediaStreamMultiplexer() [stream:%p,this:%p]\n", incomingMediaStream, this);

Stop();
}

void RTPIncomingMediaStreamMultiplexer::OnCreated()
{
if (incomingMediaStream)
{
//Add us as listeners
Expand All @@ -20,6 +30,8 @@ RTPIncomingMediaStreamMultiplexer::RTPIncomingMediaStreamMultiplexer(const RTPIn
void RTPIncomingMediaStreamMultiplexer::Stop()
{
Debug("-RTPIncomingMediaStreamMultiplexer::Stop() [this:%p]\n", this);

if (stopped) return;

//Wait until all the previous async have finished as async calls are executed in order
Sync([=](auto now){
Expand All @@ -36,6 +48,8 @@ void RTPIncomingMediaStreamMultiplexer::Stop()
//Remove all listeners
listeners.clear();
});

stopped = true;
}

void RTPIncomingMediaStreamMultiplexer::AddListener(RTPIncomingMediaStream::Listener* listener)
Expand Down
Loading