diff --git a/src/eetcd.erl b/src/eetcd.erl index fa39314..952755c 100644 --- a/src/eetcd.erl +++ b/src/eetcd.erl @@ -38,6 +38,13 @@ open(Name, Hosts, Transport, TransportOpts) -> %% The pinned address is maintained until the client connection is closed. %% When the client receives an error, it randomly picks another normal endpoint. %% +%% `{auto_sync_interval_ms, Interval}' sets the default `Interval' in milliseconds of auto-sync. +%% Default is 0, which means no auto-sync. If enabled auto-sync, you can set `auto_sync_interval_ms' +%% in application env to change the interval. If disabled, the `auto_sync_interval_ms' in application +%% env will be ignored. With auto-sync enabled, eetcd will automatically sync the cluster member +%% list via the MemberList API of etcd, and will try to connect any new endpoints if in `connect_all' +%% mode. +%% %% `[{name, string()},{password, string()}]' generates an authentication token based on a given user name and password. %% %% You can use `eetcd:info/0' to see the internal connection status. diff --git a/src/eetcd_conn.erl b/src/eetcd_conn.erl index 7a3ccdb..5b7a911 100644 --- a/src/eetcd_conn.erl +++ b/src/eetcd_conn.erl @@ -7,7 +7,7 @@ %% API -export([open/1, close/1, round_robin_select/1, lookup/1, - check_health/1, flush_token/2]). + sync/1, check_health/1, flush_token/2]). -export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0, format_status/2]). @@ -16,6 +16,7 @@ -define(reconnect, reconnect). -define(check_health, check_health_msg). +-define(auto_sync, auto_sync). -define(flush_token, flush_token_msg). %%% 200 400 800 1600 3200 6400 12800 25600 @@ -64,6 +65,12 @@ lookup(Name) when is_atom(Name) orelse is_reference(Name) -> [Pid | _] -> {ok, Pid} end. +sync(Name) -> + case lookup(Name) of + {ok, Pid} -> erlang:send(Pid, ?auto_sync); + Err -> Err + end. + check_health(Name) -> case lookup(Name) of {ok, Pid} -> erlang:send(Pid, ?check_health); @@ -90,18 +97,23 @@ init({Name, Hosts, Options, Transport, TransportOpts}) -> transport => Transport, transport_opts => TransportOpts }, + AutoSyncInterval = proplists:get_value(auto_sync_interval_ms, Options, 0), Data0 = #{ name => Name, + auto_sync_interval_ms => AutoSyncInterval, gun_opts => GunOpts, health_ref => undefined, reconn_ref => undefined, - active_conns => [] + active_conns => [], + freeze_conns => [] }, Data = put_in_authenticate(Data0, Options), case proplists:get_value(mode, Options, connect_all) of connect_all -> - connect_all(Hosts, Name, GunOpts, Data); + connect_all(Hosts, Name, Data); random -> + AutoSyncInterval > 0 andalso + ?LOG_WARNING("~s run under random mode, disabled auto_sync member list", [Name]), Length = erlang:length(Hosts), Data1 = Data#{endpoints => shuffle(Hosts), mode => random}, Index = rand:uniform(Length), @@ -125,6 +137,8 @@ handle_event(EventType, reconnecting, _StateName, Data) reconnect_conns(Data); handle_event(info, ?check_health, _StateName, Data) -> {keep_state, do_check_health(Data)}; +handle_event(info, ?auto_sync, _StateName, Data) -> + handle_do_sync(Data); handle_event(internal, ?ready, ?ready, #{name := Name}) -> ?LOG_INFO("ETCD(~p, ~p)'s connections are ready.", [Name, self()]), keep_state_and_data; @@ -149,13 +163,15 @@ code_change(_OldVsn, StateName, Data, _Extra) -> %%% Internal functions %%%=================================================================== -connect_all(Hosts, Name, GunOpts, Data) -> +connect_all(Hosts, Name, #{gun_opts := GunOpts} = Data) -> Auth = maps:get(authenticate, Data, undefined), case fold_connect(Hosts, Name, GunOpts, Auth, [], []) of {Ok, []} -> {ok, ?ready, Data#{ mode => connect_all, health_ref => next_check_health(), + sync_ref => next_sync(Data), + member_list => Hosts, active_conns => Ok, freeze_conns => []}}; {Ok, Failed} when length(Ok) > length(Failed) -> @@ -163,6 +179,8 @@ connect_all(Hosts, Name, GunOpts, Data) -> {ok, ?reconnect, Data#{ mode => connect_all, health_ref => next_check_health(), + sync_ref => next_sync(Data), + member_list => Hosts, active_conns => Ok, freeze_conns => Freezes}, {next_event, internal, reconnecting}}; @@ -184,6 +202,7 @@ connect_one(Index, Retry, Data, Len) -> {ok, ?ready, Data#{ index => Index, health_ref => next_check_health(), + sync_ref => next_sync(Data), active_conns => [{Host, Gun, AuthToken}]}}; {error, Reason} when Retry =< 0 -> {stop, {shutdown, Reason}}; @@ -348,6 +367,15 @@ next_check_health() -> Ms = application:get_env(eetcd, health_check_ms, 15000), erlang:send_after(Ms, self(), ?check_health). +next_sync(#{auto_sync_interval_ms := Interval}) -> + case application:get_env(eetcd, auto_sync_interval_ms, Interval) of + Ms when is_number(Ms), Ms > 0 -> + erlang:send_after(Ms, self(), ?auto_sync); + _ -> undefined + end; +next_sync(_Data) -> + undefined. + do_check_health(Data = #{mode := Mode, health_ref := HealthRef}) -> erlang:cancel_timer(HealthRef), NewData = do_check_health(Mode, Data), @@ -358,10 +386,12 @@ do_check_health(connect_all, Data) -> #{ name := Name, active_conns := Actives, - freeze_conns := Freezes + freeze_conns := Freezes, + member_list := Members } = Data, {NewActives, NewFreezes} = - lists:foldl(fun({Host, Gun, _Token} = Endpoint, {Health, Freeze}) -> + lists:foldl( + fun({Host, Gun, _Token} = Endpoint, {Health, Freeze}) -> case check_health_remote(Gun) of ok -> case check_leader_remote(Gun) of @@ -379,8 +409,9 @@ do_check_health(connect_all, Data) -> ?LOG_ERROR("~p check (~p) health failed by ~p ", [Name, Host, Reason1]), {Health, [{Host, ?MIN_RECONN} | Freeze]} end - end, {[], Freezes}, Actives), - Data#{active_conns => NewActives, freeze_conns => NewFreezes}; + end, {[], Freezes}, Actives), + NewFreezes1 = [{H1, P1} || {H1, P1} <- NewFreezes, {H2, P2} <- Members, H1 =:= H2, P1 =:= P2], + Data#{active_conns => NewActives, freeze_conns => NewFreezes1}; do_check_health(random, Data) -> #{name := Name, active_conns := ActiveConns} = Data, case ActiveConns of @@ -404,6 +435,106 @@ do_check_health(random, Data) -> [] -> Data end. +handle_do_sync(Data = #{sync_ref := SyncRef}) -> + is_reference(SyncRef) andalso erlang:cancel_timer(SyncRef), + #{freeze_conns := Freezes} = NewData = do_sync_memberlist(Data), + NewSyncRef = next_sync(NewData), + NewData1 = NewData#{sync_ref => NewSyncRef}, + case Freezes of + [] -> + {keep_state, NewData1}; + _ -> + reconnect_conns(NewData1) + end. + +do_sync_memberlist(#{mode := random} = Data) -> + Data; +do_sync_memberlist(#{active_conns := [], name := Name} = Data) -> + ?LOG_WARNING("~s has no active connections to etcd cluster, cannot get member list", [Name]), + Data; +do_sync_memberlist(#{name := Name, + gun_opts := #{transport := Transport}, + active_conns := ActiveConns, + freeze_conns := Freezes} = Data) -> + case eetcd_cluster:member_list(Name) of + {ok, #{members := []}} -> + %% TODO: remove all active connections? That's critical situation. + Data; + {ok, #{members := Members}} -> + %% FIXME: take all clientURLs per member or take one per member? + %% Here we take all available clientURLs for every not-a-learner member. + ClientUrls0 = lists:filtermap( + fun(#{isLearner := true, 'ID' := MemberId, name := MemberName}) -> + ?LOG_INFO(#{msg => "Member is a learner, skipped for now", + member_id => MemberId, + member_name => MemberName}), + false; + (#{clientURLs := [], 'ID' := MemberId, name := MemberName}) -> + ?LOG_INFO(#{msg => "Member not advertise client URLs", + member_id => MemberId, + member_name => MemberName}), + false; + (#{clientURLs := Urls}) -> + {true, Urls} + end, Members), + ClientUrls1 = lists:flatten(ClientUrls0), + + ClientUrls2 = lists:filtermap( + fun(Url) -> + case uri_string:parse(Url) of + #{host := Host, port := Port, scheme := Scheme} + when erlang:bit_size(Host) > 0 -> + case {Scheme, Transport} of + {<<"http">>, tcp} -> + {true, {erlang:binary_to_list(Host), Port}}; + {<<"https">>, tls} -> + {true, {erlang:binary_to_list(Host), Port}}; + {<<"https">>, ssl} -> + {true, {erlang:binary_to_list(Host), Port}}; + _ -> + %% Note: because of the design of eetcd_conn, we need + %% the member lists' URL use the same transport + %% options to the active connections. + ?LOG_WARNING("Not matched schemes from member list ~s", + [Url]), + false + end; + _I -> + ?LOG_WARNING("Url ~s from member list is not a valid etcd url", + [Url]), + false + end + end, ClientUrls1), + + ClientUrls = lists:usort(ClientUrls2), + + A = [HostPort || {HostPort, _GunPid, _Token} <- ActiveConns], + F = [HostPort || {HostPort, _Reconn} <- Freezes ], + RemovedEndpoints = (A ++ F) -- ClientUrls, + NewEndpoints = (ClientUrls -- A) -- F, + + %% Remove removed_endpoints from freeze_conns + NewFreezes = [ {H, ?MIN_RECONN} || H <- (ClientUrls -- A)], + + case RemovedEndpoints of + [] -> ok; + _ -> + ?LOG_NOTICE(#{msg => "Got removed endpoints", + removed_endpoints => RemovedEndpoints}) + end, + + case NewEndpoints of + [] -> + Data#{member_list => ClientUrls}; + _ -> + ?LOG_NOTICE(#{msg => "Got new endpoints", new_endpoints => NewEndpoints}), + Data#{freeze_conns => NewFreezes, member_list => ClientUrls} + end; + {error, Reason} -> + ?LOG_ERROR("~p get member_list failed by ~p ", [Name, Reason]), + Data + end. + %% UNKNOWN = 0; %% SERVING = 1; %% NOT_SERVING = 2; @@ -459,4 +590,4 @@ put_in_authenticate(Data, Options) -> shuffle(List) -> Disorders = [begin {rand:uniform(), K} end||K <-List], [begin K end||{_, K} <- lists:keysort(1, Disorders)]. - +