diff --git a/src/eetcd_watch.erl b/src/eetcd_watch.erl index 4b28f3d..d52dce0 100644 --- a/src/eetcd_watch.erl +++ b/src/eetcd_watch.erl @@ -155,15 +155,17 @@ watch(_Name, CreateReq, #{http2_pid := Gun, when is_pid(Gun), is_reference(StreamRef), is_reference(MRef) -> watch_reuse_(CreateReq, WatchConn, Timeout); watch(Name, CreateReq, undefined, Timeout) -> - watch_new_(Name, CreateReq, Timeout). + case eetcd_watch_gen:watch(Name) of + {ok, Gun, StreamRef} -> watch_new_(CreateReq, Gun, StreamRef, Timeout); + {error, _Reason} = E -> E + end. %% Do watch request with a new watch stream. --spec watch_new_(name(), context(), pos_integer()) -> +-spec watch_new_(context(), pid(), reference(), pos_integer()) -> {ok, watch_conn(), WatchId :: pos_integer()} | {error, {stream_error | conn_error | http2_down, term()} | timeout}. -watch_new_(Name, CreateReq, Timeout) -> +watch_new_(CreateReq, Gun, StreamRef, Timeout) -> Request = #{request_union => {create_request, CreateReq}}, - {ok, Gun, StreamRef} = eetcd_watch_gen:watch(Name), MRef = erlang:monitor(process, Gun), eetcd_stream:data(Gun, StreamRef, Request, 'Etcd.WatchRequest', nofin), case eetcd_stream:await(Gun, StreamRef, Timeout, MRef) of diff --git a/test/eetcd_watch_example.erl b/test/eetcd_watch_example.erl index c4a4a7a..1e70efe 100644 --- a/test/eetcd_watch_example.erl +++ b/test/eetcd_watch_example.erl @@ -8,6 +8,7 @@ -define(KEY1, <<"heartbeat:">>). -define(KEY2, <<"heartbeat2:">>). +-define(CHECK_RETRY_MS, 3000). start_link() -> gen_server:start({local, ?MODULE}, ?MODULE, [], []). @@ -28,7 +29,10 @@ init([]) -> {ok, Conn2, WatchId2} = watch_services_event_(?KEY2, Rev2 + 1, Conn1), Mapping = #{WatchId1 => {?KEY1, Rev1}, WatchId2 => {?KEY2, Rev2}}, - {ok, {Conn2, Mapping}}. + {ok, ensure_retry(#{conn => Conn2, + mapping => Mapping, + retries => [], + retry_ref => undefined})}. get_exist_services(KeyPrefix) -> Ctx = eetcd_kv:new(?NAME), @@ -53,22 +57,25 @@ watch_services_event_(Key, Revision, Conn) -> Req = eetcd_watch:with_start_revision(ReqPrefix, Revision), eetcd_watch:watch(?NAME, Req, Conn). -handle_info(Msg, {#{watch_ids := _WatchIds} = Conn, Mapping} = State) -> +handle_info(retry, #{retries := []} = State) -> + {noreply, ensure_retry(State#{retry_ref => undefined})}; +handle_info(retry, State) -> + NewState = retry(State), + {noreply, ensure_retry(NewState#{retry_ref => undefined})}; + +handle_info(Msg, #{conn := Conn, mapping := Mapping} = State) -> case eetcd_watch:watch_stream(Conn, Msg) of {ok, NewConn, WatchEvent} -> io:format("Received changes: ~p~n", [WatchEvent]), update_services(WatchEvent), io:format("ets: ~p~n", [ets:tab2list(?MODULE)]), - {noreply, {NewConn, update_revision(Mapping, WatchEvent)}}; + {noreply, State#{conn => NewConn, mapping => update_revision(Mapping, WatchEvent)}}; {more, NewConn} -> - {noreply, {NewConn, Mapping}}; - {error, _Reason} -> - {NewConn, Watches} = - maps:fold(fun(_WatchId, {Key, Rev}, {Conn0, Acc}) -> - {ok, NewConn0, NewWatchId} = watch_services_event_(Key, Rev, Conn0), - {NewConn0, [{NewWatchId, {Key, Rev}}|Acc]} - end, {undefined, []}, Mapping), - {noreply, {NewConn, maps:from_list(Watches)}}; + {noreply, State#{conn => NewConn}}; + {error, Reason} -> + %% TODO handle error like stream error and/or connection error + io:format("watch_stream error for ~p, state: ~p~n", [Reason, State]), + {noreply, State#{conn => undefined, retries => maps:values(Mapping)}}; unknown -> {noreply, State} end. @@ -99,3 +106,25 @@ update_services(#{events := Events}) -> update_revision(Mapping, #{header := #{revision := Rev}, watch_id := WatchId}) -> maps:update_with(WatchId, fun({Key, _}) -> {Key, Rev} end, Mapping). + +ensure_retry(#{retry_ref := undefined} = State) -> + Ref = erlang:send_after(?CHECK_RETRY_MS, self(), retry), + State#{retry_ref := Ref}; +ensure_retry(State) -> + State. + +retry(#{mapping := _Mapping, retries := [], conn := _Conn} = State) -> State; +retry(#{mapping := Mapping, retries := Retries, conn := Conn} = State) -> + {NewConn, Watches, NewRetries} = + lists:foldl(fun({Key, Rev}, {Conn0, Acc, Retries0}) -> + case watch_services_event_(Key, Rev + 1, Conn0) of + {ok, NewConn0, NewWatchId} -> + {NewConn0, [{NewWatchId, {Key, Rev}}|Acc], Retries0}; + {error, Reason} -> + io:format("Watch key ~p error: ~p~n", [Key, Reason]), + {Conn0, Acc, [{Key, Rev}|Retries0]} + end + end, {Conn, [], []}, Retries), + State#{conn => NewConn, + mapping => maps:merge(Mapping, maps:from_list(Watches)), + retries => NewRetries}.