diff --git a/src/async_pools/mongoose_aggregator_worker.erl b/src/async_pools/mongoose_aggregator_worker.erl index 029fe6cae34..a6fa07437f0 100644 --- a/src/async_pools/mongoose_aggregator_worker.erl +++ b/src/async_pools/mongoose_aggregator_worker.erl @@ -53,7 +53,9 @@ verify_callback :: undefined | mongoose_async_pools:verify_callback(), flush_elems = #{} :: map() | censored, % see format_status/2 for censored flush_queue = queue:new() :: queue:queue(), - flush_extra = #{} :: map() + flush_extra = #{} :: map(), + total_retries = 3 :: non_neg_integer(), + retries_left = 3 :: non_neg_integer() }). -type state() :: #state{}. @@ -99,19 +101,47 @@ handle_info(Msg, #state{async_request = no_request_pending} = State) -> ?UNEXPECTED_INFO(Msg), {noreply, State}; handle_info(Msg, #state{async_request = {AsyncRequest, ReqTask}} = State) -> + case check_response(Msg, AsyncRequest, ReqTask, State) of + ignore -> + {noreply, State}; + next -> + {noreply, maybe_request_next(State)}; + retry -> + {noreply, maybe_request_retry(ReqTask, State)} + end. + +maybe_request_retry(ReqTask, State = #state{retries_left = 0}) -> + ?LOG_ERROR(log_fields(State, #{what => asynchronous_request_dropped, txt => <<"Async request dropped, no more retries">>, task => ReqTask})), + cancel_request_retry(State); +maybe_request_retry(ReqTask, State = #state{retries_left = Left}) -> + case make_async_request(ReqTask, State#state{async_request = no_request_pending, retries_left = Left - 1}) of + #state{async_request = no_request_pending} = State2 -> + cancel_request_retry(State2); + State2 -> + State2 + end. + +cancel_request_retry(State) -> + maybe_request_next(State#state{async_request = no_request_pending}). + +check_response(Msg, AsyncRequest, ReqTask, State) -> case gen_server:check_response(Msg, AsyncRequest) of {error, {Reason, _Ref}} -> ?LOG_ERROR(log_fields(State, #{what => asynchronous_request_failed, reason => Reason})), - {noreply, State#state{async_request = no_request_pending}}; + retry; {reply, {error, Reason}} -> ?LOG_ERROR(log_fields(State, #{what => asynchronous_request_failed, reason => Reason})), - {noreply, State#state{async_request = no_request_pending}}; + retry; {reply, Reply} -> - maybe_verify_reply(Reply, ReqTask, State), - {noreply, maybe_request_next(State)}; + case maybe_verify_reply(Reply, ReqTask, State) of + ok -> + next; + {error, _Reason} -> + retry + end; no_reply -> ?UNEXPECTED_INFO(Msg), - {noreply, State} + ignore end. -spec terminate(term(), state()) -> term(). @@ -174,24 +204,28 @@ handle_broadcast(Task, #state{aggregate_callback = Aggregator, end, State#state{flush_elems = maps:map(Map, Acc)}. -maybe_request_next(#state{flush_elems = Acc, flush_queue = Queue} = State) -> +maybe_request_next(#state{flush_elems = Acc, flush_queue = Queue, total_retries = Total} = State) -> + %% Reset number of retries + State1 = State#state{retries_left = Total}, case queue:out(Queue) of {{value, Key}, NewQueue} -> {Value, NewAcc} = maps:take(Key, Acc), - NewState1 = State#state{flush_elems = NewAcc, flush_queue = NewQueue}, - case make_async_request(Value, NewState1) of - NewState2 = #state{async_request = no_request_pending} -> - maybe_request_next(NewState2); - NewState2 -> - NewState2 + State2 = State1#state{flush_elems = NewAcc, flush_queue = NewQueue}, + State3 = make_async_request(Value, State2), + case State3 of + #state{async_request = no_request_pending} -> + maybe_request_next(State3); + _ -> + State3 end; {empty, _} -> - State#state{async_request = no_request_pending} + State1#state{async_request = no_request_pending} end. make_async_request(Request, #state{host_type = HostType, pool_id = PoolId, request_callback = Requestor, flush_extra = Extra} = State) -> - case Requestor(Request, Extra) of + RetryNumber = State#state.total_retries - State#state.retries_left, + case Requestor(Request, Extra#{retry_number => RetryNumber}) of drop -> State; ReqId -> diff --git a/src/inbox/mod_inbox_rdbms_async.erl b/src/inbox/mod_inbox_rdbms_async.erl index 254edc31172..70162c7e22a 100644 --- a/src/inbox/mod_inbox_rdbms_async.erl +++ b/src/inbox/mod_inbox_rdbms_async.erl @@ -97,13 +97,14 @@ request_one(HostType, {remove_inbox_row, {LUser, LServer, LToBareJid}}) -> aggregate(Current, NewTask, _Extra) -> {ok, aggregate(Current, NewTask)}. --spec verify(term(), task(), mongoose_async_pools:pool_extra()) -> ok. +-spec verify(term(), task(), mongoose_async_pools:pool_extra()) -> ok | {error, term()}. verify(Answer, InboxTask, _Extra) -> case mod_inbox_rdbms:check_result(Answer) of {error, Reason} -> {LU, LS, LRem} = element(2, InboxTask), ?LOG_WARNING(#{what => inbox_process_message_failed, reason => Reason, - from_jid => jid:to_binary({LU, LS}), to_jid => LRem}); + from_jid => jid:to_binary({LU, LS}), to_jid => LRem}), + {error, Reason}; _ -> ok end. diff --git a/test/batches_SUITE.erl b/test/batches_SUITE.erl index 2700e899774..1ecd56f89e8 100644 --- a/test/batches_SUITE.erl +++ b/test/batches_SUITE.erl @@ -33,7 +33,12 @@ groups() -> sync_aggregates_down_everything, aggregating_error_is_handled_and_can_continue, aggregation_might_produce_noop_requests, - async_request + async_request, + retry_request, + retry_request_cancelled, + retry_request_cancelled_in_verify_function, + ignore_msg_when_waiting_for_reply, + async_request_fails ]} ]. @@ -264,6 +269,87 @@ async_request(_) -> async_helper:wait_until( fun() -> gen_server:call(Server, get_acc) end, 500500). +retry_request(_) -> + Opts = (retry_aggregator_opts())#{pool_id => retry_request}, + {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), + gen_server:cast(Pid, {task, key, 1}), + receive_task_called(0), + receive_task_called(1), + gen_server:cast(Pid, {task, key, 1}), + receive_task_called(0), + receive_task_called(1), + ensure_no_tasks_to_receive(). + +retry_request_cancelled(_) -> + Opts = (retry_aggregator_opts())#{pool_id => retry_request_cancelled, + request_callback => fun do_cancel_request/2}, + {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), + gen_server:cast(Pid, {task, key, 1}), + receive_task_called(0), + %% 3 retries + receive_task_called(1), + receive_task_called(2), + receive_task_called(3), + ensure_no_tasks_to_receive(), + %% Second task gets started + gen_server:cast(Pid, {task, key, 2}), + receive_task_called(0). + +retry_request_cancelled_in_verify_function(_) -> + Opts = (retry_aggregator_opts())#{pool_id => retry_request_cancelled_in_verify_function, + request_callback => fun do_request/2, + verify_callback => fun validate_all_fails/3}, + {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), + gen_server:cast(Pid, {task, key, 1}), + receive_task_called(0), + %% 3 retries + receive_task_called(1), + receive_task_called(2), + receive_task_called(3), + ensure_no_tasks_to_receive(), + %% Second task gets started + gen_server:cast(Pid, {task, key, 2}), + receive_task_called(0). + +ignore_msg_when_waiting_for_reply(_) -> + Opts = (retry_aggregator_opts())#{pool_id => ignore_msg_when_waiting_for_reply, + request_callback => fun do_request_but_ignore_other_messages/2}, + {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), + gen_server:cast(Pid, {task, key, 1}), + receive_task_called(0), + %% Second task gets started + gen_server:cast(Pid, {task, key, 2}), + receive_task_called(0). + +async_request_fails(_) -> + %% Does request that crashes the gen_server, but not the aggregator + {ok, Server} = gen_server:start({local, async_req_fails_server}, ?MODULE, [], []), + Ref = monitor(process, Server), + Opts = (default_aggregator_opts(async_req_fails_server))#{pool_id => ?FUNCTION_NAME}, + {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), + gen_server:cast(Pid, {task, key, {ack_and_die, self()}}), + %% Acked and the server dies + receive + {'DOWN', R, process, _, _} when R =:= Ref -> ok + after 5000 -> error(down_receive_timeout) + end, + receive + {acked, S} when S =:= Server -> ok + after 5000 -> error(acked_receive_timeout) + end, + %% Eventually the task is cancelled + async_helper:wait_until(fun() -> element(4, sys:get_state(Pid)) end, no_request_pending), + %% Check that aggregator still processes new tasks + %% Start the task and wait for processing + {ok, Server2} = gen_server:start({local, async_req_fails_server}, ?MODULE, [], []), + gen_server:cast(Pid, {task, key, {ack, self()}}), + receive + {acked, S2} when S2 =:= Server2 -> ok + after 5000 -> error(acked_receive_timeout) + end, + %% Check state + 1 = gen_server:call(Server2, get_acc). + %% helpers host_type() -> <<"HostType">>. @@ -278,19 +364,75 @@ default_aggregator_opts(Server) -> verify_callback => fun validate_all_ok/3, flush_extra => #{host_type => host_type()}}. +retry_aggregator_opts() -> + #{host_type => host_type(), + request_callback => fun do_retry_request/2, + aggregate_callback => fun aggregate_sum/3, + verify_callback => fun validate_ok/3, + flush_extra => #{host_type => host_type(), origin_pid => self()}}. + validate_all_ok(ok, _, _) -> ok. +validate_ok(ok, _, _) -> + ok; +validate_ok({error, Reason}, _, _) -> + {error, Reason}. + +validate_all_fails(_, _, _) -> + {error, all_fails}. + aggregate_sum(T1, T2, _) -> {ok, T1 + T2}. requester(Server) -> fun(return_error, _) -> gen_server:send_request(Server, return_error); + ({ack_and_die, _} = Task, _) -> + gen_server:send_request(Server, Task); + ({ack, _} = Task, _) -> + gen_server:send_request(Server, Task); (Task, _) -> timer:sleep(1), gen_server:send_request(Server, Task) end. +%% Fails first task +do_retry_request(_Task, #{origin_pid := Pid, retry_number := Retry}) -> + Pid ! {task_called, Retry}, + Ref = make_ref(), + Reply = case Retry of 0 -> {error, simulate_error}; 1 -> ok end, + %% Simulate gen_server call reply + self() ! {[alias|Ref], Reply}, + Ref. + +do_request(_Task, #{origin_pid := Pid, retry_number := Retry}) -> + Pid ! {task_called, Retry}, + Ref = make_ref(), + Reply = ok, + %% Simulate gen_server call reply + self() ! {[alias|Ref], Reply}, + Ref. + +%% Fails all tries +do_cancel_request(_Task, #{origin_pid := Pid, retry_number := Retry}) -> + Pid ! {task_called, Retry}, + Ref = make_ref(), + Reply = {error, simulate_error}, + %% Simulate gen_server call reply + self() ! {[alias|Ref], Reply}, + Ref. + +%% Fails all tries +do_request_but_ignore_other_messages(_Task, #{origin_pid := Pid, retry_number := Retry}) -> + Pid ! {task_called, Retry}, + Ref = make_ref(), + Reply = ok, + %% Just send an unexpected messages which should be ignored + self() ! unexpected_msg_should_be_ignored, + %% Simulate gen_server call reply + self() ! {[alias|Ref], Reply}, + Ref. + init([]) -> {ok, 0}. @@ -298,8 +440,25 @@ handle_call(get_acc, _From, Acc) -> {reply, Acc, Acc}; handle_call(return_error, _From, Acc) -> {reply, {error, return_error}, Acc}; +handle_call({ack_and_die, Pid}, _From, Acc) -> + Pid ! {acked, self()}, + error(oops); +handle_call({ack, Pid}, _From, Acc) -> + Pid ! {acked, self()}, + {reply, ok, 1 + Acc}; handle_call(N, _From, Acc) -> {reply, ok, N + Acc}. handle_cast(_Msg, Acc) -> {noreply, Acc}. + +receive_task_called(ExpectedRetryNumber) -> + receive + {task_called, RetryNum} -> + ExpectedRetryNumber = RetryNum + after 5000 -> + error(timeout) + end. + +ensure_no_tasks_to_receive() -> + receive {task_called, _} -> error(unexpected_msg) after 0 -> ok end.