-
Notifications
You must be signed in to change notification settings - Fork 47
SO 5.7 InDepth Message Limits
Message limits are a tool for the defense of an agent from too intensive message flow. Message limits is not a full-fledged overload control mechanism. It solves the only one problem -- limitation of the number of messages of a particular type in the agent's event queue. No more. No less.
Construction of the appropriate overload control mechanism for a particular problem is not an easy task. It requires an accounting of specific features of the problem. For example: what to do with messages which do not fit into size-limited event queue? Should they be silently discarded? Or there must be some logging? Or they must be stored in some persistent queue and replayed after some time?
So the right overload control tool almost always will be a very problem-specific tool. Which will not work for a different problem with different conditions. Because of that SObjectizer has no ready-to-use overload control tool out-of-box yet. And it is a big question will it be some time in the future.
A user can use collector-performer idiom and can implement its own overload control mechanism. Which will be adapted to the user's problem. But it requires some programming. It is not always appropriate. There could be very simple cases when extra messages can be ignored or work of the application must be simply aborted. In such cases, message limits could be used as a very simple but ready-to-use tool for the defense of agents.
First of all message limits are optional. It is not required to define message limits for every agent in an application. Message limits can be defined only for agents those require such defense. But if message limits are defined for some agent then SObjectizer will count all messages which are sent to that agent.
Message limit is specified for a particular message type. For example, if an agent receives messages of type M and of type N, then message limits are specified separately for M and N. There is no such thing as common limit for the size of the agent's queue. It is impossible to limit the queue's size to 100 items regardless of the type of messages. It is necessary to define the limit for messages of type M and, separately, the limit for messages of type N.
Since v.5.7.1 there is also a way to specify a default (or generic) message limits that will automatically be applied for all other messages. For example, a user can specify a particular message limit for type M, another limit for type N, and default message limit that will be applied for messages of type O, P, R, S, T, and so on.
When a message limit for some type of message is defined then SObjectizer will count instances of messages waiting for processing by this agent. An attempt to send a message over that limit will be detected and SObjectizer will perform some action as a reaction to limit's overflow.
SObjectizer supports the four types of reaction to exceeding the limit of the number of messages:
- dropping the message without any kind of processing. The message will be ignored silently;
- abortion of the whole application by calling
std::abort()
(there is also a possibility to call user-supplied pre-abort action to do some application specific action, butstd::abort()
will be called anyway just after the return of pre-abort action); - redirection of the message instance to another message box;
- transformation of the message instance to a new message instance and sending the new instance to some message box (maybe the same mbox, maybe not).
Message limits are defined via agent tuning options. It means that message limits must be prepared and passed as an argument to a constructor of so_5::agent_t
. The limits defined will be stored inside agent_t
object and cannot be changed or removed or added later.
An important note: If an agent defines a limit for one message type it must also define limits for all other message types it receives. An attempt to subscribe to message without predefined limit will lead to an error at runtime (a subscription method will throw an exception). But note: this rule is only for agents that use message limits.
Base class so_5::agent_t
provides several helper methods each of them defines a limit and a particular reaction to exceeding the limit.
Method limit_then_drop
defines a limit and dictates to SObjectizer to silently drop exceeding messages. For example:
// For ordinary agents:
class request_processor : public so_5::agent_t
{
public :
request_processor( context_t ctx )
: so_5::agent_t( ctx
// No more than 20 messages with dropping of extra messages.
+ limit_then_drop< request >( 20 )
// No more than 1 message with dropping of extra messages.
+ limit_then_drop< get_status >( 1 ) )
...
};
Method limit_then_abort
defines a limit and dictates to SObjectizer to call std::abort()
and terminate application in the case the limit exceeded:
// For ordinary agents:
class hardware_interface : public so_5::agent_t
{
public :
hardware_interface( context_t ctx )
: so_5::agent_t( ctx
// Usually there must be no more than 10 waiting commands.
// But if something went wrong and hardware part doesn't respond
// it is better to terminate the whole application.
+ limit_then_abort< outgoing_apdu_command >( 500 ) )
...
};
Sometimes for debugging purposes is necessary to do some action just before calling std::abort()
in the case of overlimit. It can be done via another form of limit_then_abort
method:
class hardware_interface : public so_5::agent_t
{
public :
hardware_interface( context_t ctx )
: so_5::agent_t( ctx
// Usually there must be no more than 10 waiting commands.
// But if something went wrong and hardware part doesn't respond
// it is better to terminate the whole application.
+ limit_then_abort< outgoing_apdu_command >( 500,
[]( const so_5::agent_t & a, const outgoing_apdu_command & msg ) {
std::clog << "too many outgoing_apdu, device: "
<< dynamic_cast< const hardware_interface & >(a).device_name()
<< ", APDU: " << msg << std::endl;
} ) )
...
};
Method limit_then_redirect
defines a limit and dictates to SObjectizer to redirect the message instance to another box.
Since v.5.7.1 there are two versions of limit_then_redirect
methods. The first one accepts a mbox as the second argument:
// Agent for 'normal' processing of requests.
class normal_request_processor : public so_5::agent_t
{
public :
normal_request_processor(
context_t ctx,
// Message box of agent for handling requests in overload mode.
// In this mode requests are not processed but negative response
// with the appropriate result code is sent back
// (and this fact is stored in the application log).
so_5::mbox_t overload_mode_processor )
: so_5::agent_t( ctx
// We can hold no more that 10 requests in queue.
// All extra messages must be redirected to overload-mode processor
// for generation of negative replies.
+ limit_then_redirect< request >( 10, overload_mode_processor )
...
};
The second version of limit_then_redirect
accepts a lambda/functional object which returns a mbox for message redirection. This version can be used in cases when it isn't possible to know the target mbox in the constructor of an agent. Sometimes this mbox is defined some time later. For example:
// Coop for request processing.
auto coop = env.make_coop();
// Agent for normal processing.
auto normal_processor = coop->make_agent< normal_request_processor >(...);
// Agent for overload-mode processing.
auto overload_processor = coop->make_agent< overload_mode_request_processor >(
// This agent must know mbox of normal processor.
normal_processor->so_direct_mbox() );
// Bind normal processor with overload-mode processor.
normal_processor->set_overload_mode_processor( overload_processor->so_direct_mbox() );
It is not a big problem because limit_then_redirect
can get lambda/functional object but not the exact mbox. That allows to write:
// Agent for 'normal' processing of requests.
class normal_request_processor : public so_5::agent_t
{
public :
normal_request_processor(
context_t ctx,
: so_5::agent_t( ctx
// We can hold no more that 10 requests in queue.
// All extra messages must be redirected to overload-mode processor
// for generation of negative replies.
+ limit_then_redirect< request >( 10,
[this] { return this->overload_mode_processor; } )
{}
...
// Message box of agent for handling requests in overload mode.
// In this mode requests are not processed but negative response
// with the appropriate result code is sent back
// (and this fact is stored in the application log).
void set_overload_mode_processor(
const so_5::mbox_t & overload_mode_processor )
{
this->overload_mode_processor = overload_mode_processor;
}
private:
so_5::mbox_t overload_mode_processor;
...
};
But there is a very important rule: the target mbox for message redirection must be known at the moment of agent registration in the SObjectizer Environment. If lambda/functional object passed to limit_then_redirect return a null mbox at runtime it will lead to the application crash (because of an attempt to dereference null-pointer).
Method limit_then_transform
defines a limit and dictates to SObjectizer to transform the message instance to another message/signal and send the new message/signal to some mbox. The second argument for limit_then_transform is a lambda/functional object which returns a pair of values: a target mbox for a new message and the new message itself:
class request_processor : public so_5::agent_t
{
public :
normal_request_processor(
context_t ctx )
: so_5::agent_t( ctx
// We can hold no more that 10 requests in queue.
// For all extra messages negative replies must be generated.
+ limit_then_transform( 10,
[](const request & evt) {
return make_transformed< negative_reply >(
// Mbox for sending reply is got from original request.
evt.reply_to(),
// All other arguments are passed to negative_reply constructor.
error_code::processor_is_busy );
} ) )
...
{}
...
};
There are two forms of limit_then_transform
. One is shown above. It is intended to use with messages. The type of message is deduced from the argument of transformation lambda. As in the previous example.
There is another form of limit_then_transform
for signals. It receives the type of the signal as a template parameter. And the transformation lambda must be a lambda without arguments:
class long_operation_performer : public so_5::agent_t
{
public :
long_operation_performer(
context_t ctx,
so_5::mbox_t mbox_for_notifications )
: so_5::agent_t( ctx
// If we cannot process previous get_status signal
// then we are busy and can tell about this.
+ limit_then_transform< get_status >( 1,
[this] {
// The result of the transformation is another signal
// and because of that there is only one argument for
// make_transform (the target mbox).
return make_transformed< status_busy >( this->notification_mbox );
} ) )
, notification_mbox( std::move( mbox_for_notifications ) )
{}
...
private :
const so_5::mbox_t notification_mbox;
...
};
As for limit_then_redirect
there is the same important rule for limit_then_transform
: the target mbox for the transformed message must be known at the moment of agent registration in the SObjectizer Environment. If lambda/functional object passed to limit_then_transform
return a null mbox at runtime it will lead to the application crash (because of an attempt to dereference null-pointer).
Before v.5.7.1 a user had to specify a message limit for every message it subscribes manually. Even if most of limits are the same. For example:
class handler_of_different_messages final : public so_5::agent_t
{
public:
handler_of_different_messages(context_t ctx)
: so_5::agent{ctx
+ limit_then_abort<msg_A>(20u)
+ limit_then_abort<msg_B>(30u)
// All other limits are the same.
+ limit_then_abort<msg_C>(10u)
+ limit_then_abort<msg_D>(10u)
+ limit_then_abort<msg_E>(10u)
+ limit_then_abort<msg_F>(10u)
+ limit_then_abort<msg_G>(10u)
+ limit_then_abort<msg_I>(10u)
+ limit_then_abort<msg_J>(10u)
}
{}
...
};
This was a boring and error-prone task. So since v.5.7.1 there is a way to specify a default (or generic) message limit:
class handler_of_different_messages final : public so_5::agent_t
{
public:
handler_of_different_messages(context_t ctx)
: so_5::agent{ctx
+ limit_then_abort<msg_A>(20u)
+ limit_then_abort<msg_B>(30u)
// The trick is in `any_unspecified_message`.
+ limit_then_abort<any_unspecified_message>(10u)
}
{}
...
};
A new special type so_5::any_unspecified_message
was introduced in v.5.7.1. This type can be used as a template parameter for methods limit_then_drop
, limit_then_abort
and limit_then_redirect
. If any_unspecified_message
is used with one of those methods then that message limit will be used as the default (generic) message limit. That default message limit for all messages for those individual limit is not set.
Note that when any_unspecified_message
is used then a separate message limit object is created for every subscribed message at the subscription. For example:
class handler_of_different_messages final : public so_5::agent_t
{
public:
handler_of_different_messages(context_t ctx)
: so_5::agent{ctx
+ limit_then_abort<msg_A>(20u) // Limit object for msg_A is created.
+ limit_then_abort<msg_B>(30u) // Limit object for msg_B is created.
// The trick is in `any_unspecified_message`.
+ limit_then_abort<any_unspecified_message>(10u)
}
{}
void so_define_agent() override
{
so_subscribe_self()
.event(
// Limit object for msg_A is already exists.
[](mhood_t<msg_A> cmd) {...})
.event(
// A new limit object for msg_C will be created here.
[](mhood_t<msg_C> cmd) {...})
.event(
// A new limit object for msg_D will be created here.
[](mhood_t<msg_D> cmd) {...})
;
so_subscribe(some_mbox)
.event(
// Limit object for msg_B is already exists.
[](mhood_t<msg_B> cmd) {...})
.event(
// Limit object for msg_C was created earlier.
[](mhood_t<msg_C> cmd) {...})
.event(
// A new limit object for msg_E will be created here.
[](mhood_t<msg_E> cmd) {...})
;
}
...
};
Please note that limit_then_transform
can be used for setting a default (generic) message limit. Only limit_then_drop
, limit_then_abort
and limit_then_redirect
can be used with any_unspecified_message
.
SObjectizer does message counting only if there is a limit for that message type. SObjectizer increments message counter during dispatching of the message: SObjectizer walks through subscribers of message and checks if there a predefined limit for the next subscriber. If there is a limit then SObjectizer tries to increment message counter of that type for that agent. If it is impossible then the reaction to exceeding of the limit is performed. If the message counter is successfully incremented then the message is stored to the appropriate event queue.
The counter for the message is decremented when the message is taken from the queue and scheduled for processing by the agent. It means that the message's counter is decremented before the invocation of the event handler for that message instance.
Please note that no counters are incremented/decremented when there is no message limit defined for an agent. It means that some performance penalty is paid only when an agent uses message limits.
There could be a situation when reactions to an extra message are bound to a very long chain. For example, an extra message M is redirected to agent A2, but the queue for A2 is full and the message is redirected to A3, but the queue for A3 is full and the message is redirected to A4... Also, there could be user mistakes. For example, agent redirects an extra message to itself -- there will be an infinite redirection loop.
There is a hardcoded limit of reaction recursion deep in the SObjectizer v.5.6. When this limit is reached then a problematic message will be logged and discarded.
A lambda/functional object that specified as part of a reaction in limit_then_redirect
and limit_then_transform
is called inside send
functions on the context of the message sender. It means that lambda can be concurrently called on the different working contexts.
A developer must understand that and write redirection and transformation lambdas in thread-safe manner.
The easiest way to do that is to write such lambdas as stateless, side effect free functions. They must access only constant, immutable data. If this is impossible and some kind of data synchronization required here then this should be done will additionally care.
Yet another note for a developer. It is better to make redirection/transformation lambdas as small and as fast as possible. Because they are called inside message dispatching procedure they could affect SObjectizer performance significantly.
It is strongly recommended to make redirection/transformation lambda/functional objects as stateless, side effect free functions. It is not recommended to perform any actions those can change the state of SObjectizer Environment (like registration of new cooperations or deregistration of existing cooperations). It is because those lambdas are called under some locks (mbox's lock in particular) and an attempt of changing the SObjectizer Environment state can lead to deadlock on those locks.