-
Notifications
You must be signed in to change notification settings - Fork 46
SO 5.7 InDepth Coops
This part of InDepth series is dedicated to such an important feature as cooperations. In particular:
- parent-child relationship;
- resource lifetime management;
- reg/dereg notificators.
If you don't understand what a cooperation is then please return and re-study the Basic part of SObjectizer's tutorials.
Cooperation (or coop in short) is a way for binding several tightly related agents into a whole entity.
Coop is registered in SObjectizer Environment in a transactional manner: all agents from the cooperation must be registered successfully or no one of them.
When a coop is being deregistered all its agents are deregistered and destroyed at the same time.
There could be agents which require creation of additional coop(s) for performing their work.
Let's imagine an agent which is responsible for receiving and processing some requests. Payment requests, for example.
Processing of each payment request requires several operations:
- checking payments parameters,
- checking the operation risks,
- checking the availability of funds
- and so on...
The request receiver could do those actions for every payment by itself. But this will lead to very complicated logic of the request receiver.
There is a much simpler approach: delegation of processing of one request to a separate processor agent.
In that case the request receiver will only receive new requests and create new coops with actual request processors for each new request. Receiver and processor agents will have more simple logic and it is good. But there will be a new question: Who and how will control the lifetime of all cooperations with request processors?
Very simple view of that problem:
Someone could call deregister_coop() for the request receiver’s coop. As a result, all coops with request processors must be deregistered too.
But how could it be done?
Such a feature as child coop is coming into play here.
SObjectizer-5 allows to mark new coop as a child of any existing coop. In this case SObjectizer guarantees that all children coops will be deregistered and destroyed before its parent coop.
It means that if we have, for example, a parent coop with a child coop and do call:
env.deregister_coop( parent_coop_id, so_5::dereg_reason::normal );
Then SObjectizer-5 will deregister and destroy the child coop and only then the parent coop will be deregistered and destroyed.
A child coop could have its own children coops too.
It means that a request_receiver
coop could have any number of children coops like request_processor_1
, request_processor_2
and so on.
All of them will be automatically destroyed when the top-level parent coop manager
is deregistered.
Parent-child relationship between coops allows to build coops hierarchies: a top-level coop creates children coops, those create its own children coops and so on.
If a top-level coop is being deregistered for some reason then all its children coops (and children of children and so on) will be deregistered too. A programmer could not take care of this.
There are several ways to make a child coop...
The first way is to specify the ID of the parent coop during the creation of a coop instance:
// Now we can create a child coop.
auto child = env.make_coop(parent_id); // We tell that this coop has the parent.
... // Fille the child.
env.register_coop(std::move(child));
The second way is to use so_5::create_child_coop
free functions. For example:
class owner : public so_5::agent_t
{
public :
...
void so_evt_start() override
{
auto child = so_5::create_child_coop( *this );
child->make_agent< worker >();
...
so_environment().register_coop( std::move( child ) );
}
};
And the third way is to use so_5::introduce_child_coop
free functions. For example:
class owner : public so_5::agent_t
{
public :
...
void so_evt_start() override
{
so_5::introduce_child_coop( *this, []( so_5::coop_t & coop ) {
coop.make_agent< worker >();
} );
}
};
Sometimes it is necessary to manage the lifetime of some resources. For example all agents from your coop should have a reference to the same DB connection object.
If this connection object is allocated dynamically you can pass a shared_ptr to every agent's constructor. Something like:
class first_agent : public so_5::agent_t { ...
public :
first_agent( context_t ctx, std::shared_ptr< db_connection > conn, ... );
...
private :
std::shared_ptr< db_connection > connection_;
};
class second_agent : public so_5::agent_t { ...
public :
second_agent( context_t ctx, std::shared_ptr< db_connection > conn, ... );
...
private :
std::shared_ptr< db_connection > connection_;
};
env.introduce_coop( []( so_5::coop_t & coop ) {
std::shared_ptr< db_connection > connection = connect_to_db(...);
coop.make_agent< first_agent >( connection, ... );
coop.make_agent< second_agent >( connection, ... );
...
} );
In such a case you make tight coupling between application domain logic of your agents and DB connection lifetime management.
What if this lifetime management needs to be changed in the future? What if the connection object is controlled by someone else and you have a simple reference to it (not shared_ptr)?
You will need to do some rewriting...
class first_agent : public so_5::agent_t { ...
public :
first_agent( context_t ctx, db_connection & conn, ... ); // The constructor has to be changed.
...
private :
db_connection & connection_; // Declaration of the member has to be changed.
};
class second_agent : public so_5::agent_t { ...
public :
second_agent( context_t ctx, db_connection & conn, ... ); // The constructor has to be changed.
...
private :
db_connection & connection_; // Declaration of the member has to be changed.
};
...
db_connection & conn = receive_connection_from_somewhere();
env.introduce_coop( [&conn]( so_5::coop_t & coop ) {
coop.make_agent< first_agent >( conn, ... );
coop.make_agent< second_agent >( conn, ... );
...
} );
SObjectizer-5 has a tool for decoupling resource lifetime management from agent's domain-specific logic. This is coop_t::take_under_control()
.
Method coop_t::take_under_control()
allows passing a dynamically allocated object under the control of the cooperation. The object placed under control will be deallocated only after destroying all agents of the coop.
The behavior of take_under_control() allows us to use the reference to a controlled object even from the agent's destructor...
class request_processor : public so_5::agent_t
{
public :
request_processor( context_t ctx, db_connection & connection );
~request_processor() {
if( !commited() )
// Assume that this reference is still valid.
connection_.commit();
}
...
private :
db_connection & connection_;
};
...
env.introduce_coop( []( so_5::coop_t & coop )
{
// Place DB connection under the control of the cooperation.
auto & connection = *coop.take_under_control(create_connection(...));
// Reference to DB connection will be valid even in requests_processor's destructor.
coop->make_agent< request_processor >( connection );
...
} );
Parent-child relationship could also be used for resource lifetime management...
class request_receiver : public so_5::agent_t
{
std::unique_ptr< db_connection > connection_;
...
public :
void so_evt_start() override {
connection_ = make_db_connection(...);
...
}
...
private :
void evt_new_request( const request & evt ) {
// New request handler must be created.
so_5::introduce_child_coop( *this, [=]( so_5::coop_t & coop ) {
// Reference to DB connection will be valid even in requests_processor's destructor.
// It is because agents from child cooperation will be destroyed before any of
// agents from the parent cooperation.
coop->make_agent< request_processor >( connection );
...
} );
}
};
It is not easy to detect precise moments when a coop is completely registered or completely deregistered.
The biggest problem is a detection of complete coop deregistration.
It is because a call to environment_t::deregister_coop()
just initiates the coop deregistration process. But the entire process of deregistration could take a long time.
To simplify that there are such things as registration and deregistration notificators.
A notificator could be bound to a coop and it will be called when coop registration/deregistration process is finished.
The simplest reg/dereg notificators:
env.introduce_coop( []( so_5::coop_t & coop ) {
coop.add_reg_notificator(
[]( so_5::environment_t &, const so_5::coop_handle_t & handle ) {
std::cout << "registered: " << handle << std::endl;
} );
coop.add_dereg_notificator(
[]( so_5::environment_t &,
const so_5::coop_handle_t & handle,
const so_5::coop_dereg_reason_t & ) {
std::cout << "deregistered: " << handle << std::endl;
} );
...
} );
The ID of coop will be printed to stdout on coop registration and deregistration.
Usually reg/dereg notifications are used for sending messages to some mboxes.
Because this scenario is widely used there are two ready-to-use notificators in SObjectizer-5.
They are created by make_coop_reg_notificator()
and make_coop_dereg_notificator()
functions:
auto notify_mbox = env.create_mbox();
env.introduce_coop( [&]( so_5::coop_t & coop ) {
// An instance of so_5::msg_coop_registered will be sent to notify_mbox
// when the cooperation is registered.
coop.add_reg_notificator( so_5::make_coop_reg_notificator( notify_mbox ) );
// An instance of so_5::msg_coop_deregistered will be sent to
// notify_mbox when the cooperation is deregistered.
coop.add_dereg_notificator( so_5::make_coop_dereg_notificator( notify_mbox ) );
...
} );
Coop dereg notificators can be used for implementation of Erlang-like supervisors.
For example, a request receiver could receive notification about deregistration of child coops and restart them if they fail:
class request_receiver : public so_5::agent_t
{
public :
virtual void so_define_agent() override {
// msg_coop_deregistered must be handled.
so_subscribe_self().event( &request_receiver::evt_child_finished );
...
}
...
private :
void evt_new_request( const request & req ) {
// Create a child coop instance.
auto child_coop = so_5::create_child_coop(*this);
... // Filling the coop with agents.
// Dereg notificator is necessary to receive info about child disappearance.
// Standard notificator will be used.
coop->add_dereg_notificator(
// We want a message to request_receiver direct_mbox.
so_5::make_coop_dereg_notificator( so_direct_mbox() ) );
// Register coop and store its handle.
auto child_handle = so_environment().register_coop( std::move(child_coop) );
store_request_info( req, child_handle );
}
void evt_child_finished( const so_5::msg_coop_deregistered & evt ) {
// If child cooperation failed its dereg reason will differ
// from the normal value.
if( so_5::dereg_reason::normal != evt.m_reason.reason() )
recreate_child_coop( evt.m_coop );
else
remove_request_info( evt.m_coop );
}
...
};
Registration And Deregistration Of A Cooperation Are Asynchronous And Potentially Long Lasting Procedures
It's important to note that register_coop
and deregister_coop
are asynchronous operations that can take a long time.
During the registration of a new coop several actions have to be performed by SObjectizer Environment:
- Some resources should be preallocated for agents from the new coop.
- The
so_define_agent
method has to be called for all agents from the new coop. - Agents from the new coop have to be bound to the corresponding dispatchers.
- Special internal message has to be sent to every agent from the new coop.
- If there are coop_reg_notificators they have to be called.
- Agents from the new coop should receive this internal message and call the
so_evt_start
method.
Steps from 1 to 5 are performed inside register_coop
and the completion of step 4 means that coop is registered in SObjectizer. But the return from register_coop
doesn't mean that agents already started their work.
Deregistration of a coop is a less deterministic operation. When deregister_coop
is called then SObjectizer only do the following things:
- Mark the cooperation as being deregistering.
- Send a special internal message to every agent from the coop and mark agents a special way (prohibit delivering of new messages to agents from deregistering coop).
And that's all.
Agents from the deregistering coop will continue their work until they have previously received messages.
Only when all agents from the coop receive the special internal message and call the so_evt_finish
method the coop will be finally destroyed. The destruction will be performed in several steps:
- Unbinding agents from their dispatchers.
- Destruction of agents (e.g.
delete
is called for every agent object). - If there are coop_dereg_notificators they are called.
- The coop object is destroyed.
All of that mean that the actual destruction of the coop can happen any time from the call to deregister_coop
: sometimes coop can be physically destroyed before the return from deregister_coop
, sometimes it will be destroyed long time after the return from deregister_coop
.
To make things more complex it's necessary to mention the presence of child coops. For example, if deregister_coop
is called for a parent coop then SObjectizer first deregisters and destroys its children and only then the parent coop will be destroyed.
If you need to know when a coop is destroyed you can use coop_dereg_notificatiors.