diff --git a/src/riak_repl2_pg_block_provider.erl b/src/riak_repl2_pg_block_provider.erl index 4db6419d..d366d9cc 100644 --- a/src/riak_repl2_pg_block_provider.erl +++ b/src/riak_repl2_pg_block_provider.erl @@ -16,7 +16,7 @@ terminate/2, code_change/3, status/1, status/2]). %% send a message every KEEPALIVE milliseconds to make sure the service is running on the sink --define(KEEPALIVE, 1000). +-define(KEEPALIVE, 60000). -record(state, { @@ -27,7 +27,6 @@ connection_ref, worker, client, - keepalive_timer, proxy_gets_provided = 0 }). @@ -75,16 +74,16 @@ handle_call({connected, Socket, Transport, _Endpoint, _Proto, Props}, _From, lager:debug("Keeping stats for " ++ SocketTag), riak_core_tcp_mon:monitor(Socket, {?TCP_MON_PROXYGET_APP, source, SocketTag}, Transport), - TRef = keepalive_timer(), Transport:setopts(Socket, [{active, once}]), {ok, Client} = riak:local_client(), - {reply, ok, State#state{ + State2 = State#state{ transport=Transport, socket=Socket, other_cluster=Cluster, - client=Client, - keepalive_timer=TRef - }}; + client=Client + }, + _ = keepalive_timer(), + {reply, ok, State2}; handle_call(status, _From, State=#state{socket=Socket, proxy_gets_provided=PGCount}) -> SocketStats = riak_core_tcp_mon:socket_status(Socket), @@ -109,6 +108,7 @@ handle_cast(_Msg, State) -> handle_info(keepalive, State=#state{socket=Socket, transport=Transport}) -> Data = term_to_binary(stay_awake), Transport:send(Socket, Data), + _ = keepalive_timer(), {noreply, State}; handle_info({tcp_closed, Socket}, State=#state{socket=Socket}) -> lager:info("Connection for proxy_get ~p closed", [State#state.other_cluster]), @@ -125,14 +125,11 @@ handle_info({ssl_error, _Socket, Reason}, State) -> [State#state.other_cluster, Reason]), {stop, socket_closed, State}; handle_info({Proto, Socket, Data}, - State0=#state{socket=Socket,transport=Transport, keepalive_timer=TRef}) + State0=#state{socket=Socket,transport=Transport}) when Proto==tcp; Proto==ssl -> Transport:setopts(Socket, [{active, once}]), - _ = timer:cancel(TRef), Msg = binary_to_term(Data), - %% restart the timer after each message has been processed - State = State0#state{keepalive_timer=keepalive_timer()}, - handle_msg(Msg, State); + handle_msg(Msg, State0); handle_info(_Msg, State) -> {noreply, State}. @@ -161,5 +158,4 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. keepalive_timer() -> - timer:send_interval(?KEEPALIVE, keepalive). - + erlang:send_after(?KEEPALIVE, self(), keepalive).