Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry logic into mongooseim task aggregator #3969

Merged
merged 8 commits into from
Feb 28, 2023
64 changes: 49 additions & 15 deletions src/async_pools/mongoose_aggregator_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@
verify_callback :: undefined | mongoose_async_pools:verify_callback(),
flush_elems = #{} :: map() | censored, % see format_status/2 for censored
flush_queue = queue:new() :: queue:queue(),
flush_extra = #{} :: map()
flush_extra = #{} :: map(),
total_retries = 3 :: non_neg_integer(),
retries_left = 3 :: non_neg_integer()
}).
-type state() :: #state{}.

Expand Down Expand Up @@ -99,19 +101,47 @@ handle_info(Msg, #state{async_request = no_request_pending} = State) ->
?UNEXPECTED_INFO(Msg),
{noreply, State};
handle_info(Msg, #state{async_request = {AsyncRequest, ReqTask}} = State) ->
case check_response(Msg, AsyncRequest, ReqTask, State) of
ignore ->
{noreply, State};
next ->
{noreply, maybe_request_next(State)};
retry ->
{noreply, maybe_request_retry(ReqTask, State)}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation is off here

end.

maybe_request_retry(ReqTask, State = #state{retries_left = 0}) ->
?LOG_ERROR(log_fields(State, #{what => asynchronous_request_dropped, txt => <<"Async request dropped, no more retries">>, task => ReqTask})),
cancel_request_retry(State);
maybe_request_retry(ReqTask, State = #state{retries_left = Left}) ->
case make_async_request(ReqTask, State#state{async_request = no_request_pending, retries_left = Left - 1}) of
#state{async_request = no_request_pending} = State2 ->
cancel_request_retry(State2);
State2 ->
State2
end.

cancel_request_retry(State) ->
maybe_request_next(State#state{async_request = no_request_pending}).

check_response(Msg, AsyncRequest, ReqTask, State) ->
case gen_server:check_response(Msg, AsyncRequest) of
{error, {Reason, _Ref}} ->
?LOG_ERROR(log_fields(State, #{what => asynchronous_request_failed, reason => Reason})),
{noreply, State#state{async_request = no_request_pending}};
retry;
{reply, {error, Reason}} ->
?LOG_ERROR(log_fields(State, #{what => asynchronous_request_failed, reason => Reason})),
{noreply, State#state{async_request = no_request_pending}};
retry;
{reply, Reply} ->
maybe_verify_reply(Reply, ReqTask, State),
{noreply, maybe_request_next(State)};
case maybe_verify_reply(Reply, ReqTask, State) of
ok ->
next;
{error, _Reason} ->
retry
end;
no_reply ->
?UNEXPECTED_INFO(Msg),
{noreply, State}
ignore
end.

-spec terminate(term(), state()) -> term().
Expand Down Expand Up @@ -174,24 +204,28 @@ handle_broadcast(Task, #state{aggregate_callback = Aggregator,
end,
State#state{flush_elems = maps:map(Map, Acc)}.

maybe_request_next(#state{flush_elems = Acc, flush_queue = Queue} = State) ->
maybe_request_next(#state{flush_elems = Acc, flush_queue = Queue, total_retries = Total} = State) ->
%% Reset number of retries
State1 = State#state{retries_left = Total},
case queue:out(Queue) of
{{value, Key}, NewQueue} ->
{Value, NewAcc} = maps:take(Key, Acc),
NewState1 = State#state{flush_elems = NewAcc, flush_queue = NewQueue},
case make_async_request(Value, NewState1) of
NewState2 = #state{async_request = no_request_pending} ->
maybe_request_next(NewState2);
NewState2 ->
NewState2
State2 = State1#state{flush_elems = NewAcc, flush_queue = NewQueue},
State3 = make_async_request(Value, State2),
case State3 of
#state{async_request = no_request_pending} ->
maybe_request_next(State3);
_ ->
State3
end;
{empty, _} ->
State#state{async_request = no_request_pending}
State1#state{async_request = no_request_pending}
end.

make_async_request(Request, #state{host_type = HostType, pool_id = PoolId,
request_callback = Requestor, flush_extra = Extra} = State) ->
case Requestor(Request, Extra) of
RetryNumber = State#state.total_retries - State#state.retries_left,
case Requestor(Request, Extra#{retry_number => RetryNumber}) of
drop ->
State;
ReqId ->
Expand Down
5 changes: 3 additions & 2 deletions src/inbox/mod_inbox_rdbms_async.erl
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,14 @@ request_one(HostType, {remove_inbox_row, {LUser, LServer, LToBareJid}}) ->
aggregate(Current, NewTask, _Extra) ->
{ok, aggregate(Current, NewTask)}.

-spec verify(term(), task(), mongoose_async_pools:pool_extra()) -> ok.
-spec verify(term(), task(), mongoose_async_pools:pool_extra()) -> ok | {error, term()}.
verify(Answer, InboxTask, _Extra) ->
case mod_inbox_rdbms:check_result(Answer) of
{error, Reason} ->
{LU, LS, LRem} = element(2, InboxTask),
?LOG_WARNING(#{what => inbox_process_message_failed, reason => Reason,
from_jid => jid:to_binary({LU, LS}), to_jid => LRem});
from_jid => jid:to_binary({LU, LS}), to_jid => LRem}),
{error, Reason};
_ -> ok
end.

Expand Down
161 changes: 160 additions & 1 deletion test/batches_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ groups() ->
sync_aggregates_down_everything,
aggregating_error_is_handled_and_can_continue,
aggregation_might_produce_noop_requests,
async_request
async_request,
retry_request,
retry_request_cancelled,
retry_request_cancelled_in_verify_function,
ignore_msg_when_waiting_for_reply,
async_request_fails
]}
].

Expand Down Expand Up @@ -264,6 +269,87 @@ async_request(_) ->
async_helper:wait_until(
fun() -> gen_server:call(Server, get_acc) end, 500500).

retry_request(_) ->
Opts = (retry_aggregator_opts())#{pool_id => retry_request},
{ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []),
gen_server:cast(Pid, {task, key, 1}),
receive_task_called(0),
receive_task_called(1),
gen_server:cast(Pid, {task, key, 1}),
receive_task_called(0),
receive_task_called(1),
ensure_no_tasks_to_receive().

retry_request_cancelled(_) ->
Opts = (retry_aggregator_opts())#{pool_id => retry_request_cancelled,
request_callback => fun do_cancel_request/2},
{ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []),
gen_server:cast(Pid, {task, key, 1}),
receive_task_called(0),
%% 3 retries
receive_task_called(1),
receive_task_called(2),
receive_task_called(3),
ensure_no_tasks_to_receive(),
%% Second task gets started
gen_server:cast(Pid, {task, key, 2}),
receive_task_called(0).

retry_request_cancelled_in_verify_function(_) ->
Opts = (retry_aggregator_opts())#{pool_id => retry_request_cancelled_in_verify_function,
request_callback => fun do_request/2,
verify_callback => fun validate_all_fails/3},
{ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []),
gen_server:cast(Pid, {task, key, 1}),
receive_task_called(0),
%% 3 retries
receive_task_called(1),
receive_task_called(2),
receive_task_called(3),
ensure_no_tasks_to_receive(),
%% Second task gets started
gen_server:cast(Pid, {task, key, 2}),
receive_task_called(0).

ignore_msg_when_waiting_for_reply(_) ->
Opts = (retry_aggregator_opts())#{pool_id => ignore_msg_when_waiting_for_reply,
request_callback => fun do_request_but_ignore_other_messages/2},
{ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []),
gen_server:cast(Pid, {task, key, 1}),
receive_task_called(0),
%% Second task gets started
gen_server:cast(Pid, {task, key, 2}),
receive_task_called(0).

async_request_fails(_) ->
%% Does request that crashes the gen_server, but not the aggregator
{ok, Server} = gen_server:start({local, async_req_fails_server}, ?MODULE, [], []),
Ref = monitor(process, Server),
Opts = (default_aggregator_opts(async_req_fails_server))#{pool_id => ?FUNCTION_NAME},
{ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []),
gen_server:cast(Pid, {task, key, {ack_and_die, self()}}),
%% Acked and the server dies
receive
{'DOWN', R, process, _, _} when R =:= Ref -> ok
after 5000 -> error(down_receive_timeout)
end,
receive
{acked, S} when S =:= Server -> ok
after 5000 -> error(acked_receive_timeout)
end,
%% Eventually the task is cancelled
async_helper:wait_until(fun() -> element(4, sys:get_state(Pid)) end, no_request_pending),
%% Check that aggregator still processes new tasks
%% Start the task and wait for processing
{ok, Server2} = gen_server:start({local, async_req_fails_server}, ?MODULE, [], []),
gen_server:cast(Pid, {task, key, {ack, self()}}),
receive
{acked, S2} when S2 =:= Server2 -> ok
after 5000 -> error(acked_receive_timeout)
end,
%% Check state
1 = gen_server:call(Server2, get_acc).

%% helpers
host_type() ->
<<"HostType">>.
Expand All @@ -278,28 +364,101 @@ default_aggregator_opts(Server) ->
verify_callback => fun validate_all_ok/3,
flush_extra => #{host_type => host_type()}}.

retry_aggregator_opts() ->
#{host_type => host_type(),
request_callback => fun do_retry_request/2,
aggregate_callback => fun aggregate_sum/3,
verify_callback => fun validate_ok/3,
flush_extra => #{host_type => host_type(), origin_pid => self()}}.

validate_all_ok(ok, _, _) ->
ok.

validate_ok(ok, _, _) ->
ok;
validate_ok({error, Reason}, _, _) ->
{error, Reason}.

validate_all_fails(_, _, _) ->
{error, all_fails}.

aggregate_sum(T1, T2, _) ->
{ok, T1 + T2}.

requester(Server) ->
fun(return_error, _) ->
gen_server:send_request(Server, return_error);
({ack_and_die, _} = Task, _) ->
gen_server:send_request(Server, Task);
({ack, _} = Task, _) ->
gen_server:send_request(Server, Task);
(Task, _) ->
timer:sleep(1), gen_server:send_request(Server, Task)
end.

%% Fails first task
do_retry_request(_Task, #{origin_pid := Pid, retry_number := Retry}) ->
Pid ! {task_called, Retry},
Ref = make_ref(),
Reply = case Retry of 0 -> {error, simulate_error}; 1 -> ok end,
%% Simulate gen_server call reply
self() ! {[alias|Ref], Reply},
Ref.

do_request(_Task, #{origin_pid := Pid, retry_number := Retry}) ->
Pid ! {task_called, Retry},
Ref = make_ref(),
Reply = ok,
%% Simulate gen_server call reply
self() ! {[alias|Ref], Reply},
Ref.

%% Fails all tries
do_cancel_request(_Task, #{origin_pid := Pid, retry_number := Retry}) ->
Pid ! {task_called, Retry},
Ref = make_ref(),
Reply = {error, simulate_error},
%% Simulate gen_server call reply
self() ! {[alias|Ref], Reply},
Ref.

%% Fails all tries
do_request_but_ignore_other_messages(_Task, #{origin_pid := Pid, retry_number := Retry}) ->
Pid ! {task_called, Retry},
Ref = make_ref(),
Reply = ok,
%% Just send an unexpected messages which should be ignored
self() ! unexpected_msg_should_be_ignored,
%% Simulate gen_server call reply
self() ! {[alias|Ref], Reply},
Ref.

init([]) ->
{ok, 0}.

handle_call(get_acc, _From, Acc) ->
{reply, Acc, Acc};
handle_call(return_error, _From, Acc) ->
{reply, {error, return_error}, Acc};
handle_call({ack_and_die, Pid}, _From, Acc) ->
Pid ! {acked, self()},
error(oops);
handle_call({ack, Pid}, _From, Acc) ->
Pid ! {acked, self()},
{reply, ok, 1 + Acc};
handle_call(N, _From, Acc) ->
{reply, ok, N + Acc}.

handle_cast(_Msg, Acc) ->
{noreply, Acc}.

receive_task_called(ExpectedRetryNumber) ->
receive
{task_called, RetryNum} ->
ExpectedRetryNumber = RetryNum
after 5000 ->
error(timeout)
end.

ensure_no_tasks_to_receive() ->
receive {task_called, _} -> error(unexpected_msg) after 0 -> ok end.