Merge pull request #41 from belltoy/feature/auto-sync-memberlist
Suport auto-sync member list for an etcd conn instance
zhongwencool authored Feb 11, 2022
2 parents 02f77b8 + f0b5ec8 commit 527a931
Showing 2 changed files with 147 additions and 9 deletions.
7 changes: 7 additions & 0 deletions src/eetcd.erl
Expand Up @@ -38,6 +38,13 @@ open(Name, Hosts, Transport, TransportOpts) ->
%% The pinned address is maintained until the client connection is closed.
%% When the client receives an error, it randomly picks another normal endpoint.
%% `{auto_sync_interval_ms, Interval}' sets the default `Interval' in milliseconds of auto-sync.
%% Default is 0, which means no auto-sync. If enabled auto-sync, you can set `auto_sync_interval_ms'
%% in application env to change the interval. If disabled, the `auto_sync_interval_ms' in application
%% env will be ignored. With auto-sync enabled, eetcd will automatically sync the cluster member
%% list via the MemberList API of etcd, and will try to connect any new endpoints if in `connect_all'
%% mode.
%% `[{name, string()},{password, string()}]' generates an authentication token based on a given user name and password.
%% You can use `eetcd:info/0' to see the internal connection status.
149 changes: 140 additions & 9 deletions src/eetcd_conn.erl
Expand Up @@ -7,7 +7,7 @@
%% API
-export([open/1, close/1,
round_robin_select/1, lookup/1,
check_health/1, flush_token/2]).
sync/1, check_health/1, flush_token/2]).

-export([init/1, handle_event/4, terminate/3,
code_change/4, callback_mode/0, format_status/2]).
Expand All @@ -16,6 +16,7 @@
-define(reconnect, reconnect).

-define(check_health, check_health_msg).
-define(auto_sync, auto_sync).
-define(flush_token, flush_token_msg).

%%% 200 400 800 1600 3200 6400 12800 25600
Expand Down Expand Up @@ -64,6 +65,12 @@ lookup(Name) when is_atom(Name) orelse is_reference(Name) ->
[Pid | _] -> {ok, Pid}

sync(Name) ->
case lookup(Name) of
{ok, Pid} -> erlang:send(Pid, ?auto_sync);
Err -> Err

check_health(Name) ->
case lookup(Name) of
{ok, Pid} -> erlang:send(Pid, ?check_health);
Expand All @@ -90,18 +97,23 @@ init({Name, Hosts, Options, Transport, TransportOpts}) ->
transport => Transport,
transport_opts => TransportOpts
AutoSyncInterval = proplists:get_value(auto_sync_interval_ms, Options, 0),
Data0 = #{
name => Name,
auto_sync_interval_ms => AutoSyncInterval,
gun_opts => GunOpts,
health_ref => undefined,
reconn_ref => undefined,
active_conns => []
active_conns => [],
freeze_conns => []
Data = put_in_authenticate(Data0, Options),
case proplists:get_value(mode, Options, connect_all) of
connect_all ->
connect_all(Hosts, Name, GunOpts, Data);
connect_all(Hosts, Name, Data);
random ->
AutoSyncInterval > 0 andalso
?LOG_WARNING("~s run under random mode, disabled auto_sync member list", [Name]),
Length = erlang:length(Hosts),
Data1 = Data#{endpoints => shuffle(Hosts), mode => random},
Index = rand:uniform(Length),
Expand All @@ -125,6 +137,8 @@ handle_event(EventType, reconnecting, _StateName, Data)
handle_event(info, ?check_health, _StateName, Data) ->
{keep_state, do_check_health(Data)};
handle_event(info, ?auto_sync, _StateName, Data) ->
handle_event(internal, ?ready, ?ready, #{name := Name}) ->
?LOG_INFO("ETCD(~p, ~p)'s connections are ready.", [Name, self()]),
Expand All @@ -149,20 +163,24 @@ code_change(_OldVsn, StateName, Data, _Extra) ->
%%% Internal functions

connect_all(Hosts, Name, GunOpts, Data) ->
connect_all(Hosts, Name, #{gun_opts := GunOpts} = Data) ->
Auth = maps:get(authenticate, Data, undefined),
case fold_connect(Hosts, Name, GunOpts, Auth, [], []) of
{Ok, []} ->
{ok, ?ready, Data#{
mode => connect_all,
health_ref => next_check_health(),
sync_ref => next_sync(Data),
member_list => Hosts,
active_conns => Ok,
freeze_conns => []}};
{Ok, Failed} when length(Ok) > length(Failed) ->
Freezes = [{Host, ?MIN_RECONN} || {Host, _Reason} <- Failed],
{ok, ?reconnect, Data#{
mode => connect_all,
health_ref => next_check_health(),
sync_ref => next_sync(Data),
member_list => Hosts,
active_conns => Ok,
freeze_conns => Freezes},
{next_event, internal, reconnecting}};
Expand All @@ -184,6 +202,7 @@ connect_one(Index, Retry, Data, Len) ->
{ok, ?ready, Data#{
index => Index,
health_ref => next_check_health(),
sync_ref => next_sync(Data),
active_conns => [{Host, Gun, AuthToken}]}};
{error, Reason} when Retry =< 0 ->
{stop, {shutdown, Reason}};
Expand Down Expand Up @@ -348,6 +367,15 @@ next_check_health() ->
Ms = application:get_env(eetcd, health_check_ms, 15000),
erlang:send_after(Ms, self(), ?check_health).

next_sync(#{auto_sync_interval_ms := Interval}) ->
case application:get_env(eetcd, auto_sync_interval_ms, Interval) of
Ms when is_number(Ms), Ms > 0 ->
erlang:send_after(Ms, self(), ?auto_sync);
_ -> undefined
next_sync(_Data) ->

do_check_health(Data = #{mode := Mode, health_ref := HealthRef}) ->
NewData = do_check_health(Mode, Data),
Expand All @@ -358,10 +386,12 @@ do_check_health(connect_all, Data) ->
name := Name,
active_conns := Actives,
freeze_conns := Freezes
freeze_conns := Freezes,
member_list := Members
} = Data,
{NewActives, NewFreezes} =
lists:foldl(fun({Host, Gun, _Token} = Endpoint, {Health, Freeze}) ->
fun({Host, Gun, _Token} = Endpoint, {Health, Freeze}) ->
case check_health_remote(Gun) of
ok ->
case check_leader_remote(Gun) of
Expand All @@ -379,8 +409,9 @@ do_check_health(connect_all, Data) ->
?LOG_ERROR("~p check (~p) health failed by ~p ", [Name, Host, Reason1]),
{Health, [{Host, ?MIN_RECONN} | Freeze]}
end, {[], Freezes}, Actives),
Data#{active_conns => NewActives, freeze_conns => NewFreezes};
end, {[], Freezes}, Actives),
NewFreezes1 = [{H1, P1} || {H1, P1} <- NewFreezes, {H2, P2} <- Members, H1 =:= H2, P1 =:= P2],
Data#{active_conns => NewActives, freeze_conns => NewFreezes1};
do_check_health(random, Data) ->
#{name := Name, active_conns := ActiveConns} = Data,
case ActiveConns of
Expand All @@ -404,6 +435,106 @@ do_check_health(random, Data) ->
[] -> Data

handle_do_sync(Data = #{sync_ref := SyncRef}) ->
is_reference(SyncRef) andalso erlang:cancel_timer(SyncRef),
#{freeze_conns := Freezes} = NewData = do_sync_memberlist(Data),
NewSyncRef = next_sync(NewData),
NewData1 = NewData#{sync_ref => NewSyncRef},
case Freezes of
[] ->
{keep_state, NewData1};
_ ->

do_sync_memberlist(#{mode := random} = Data) ->
do_sync_memberlist(#{active_conns := [], name := Name} = Data) ->
?LOG_WARNING("~s has no active connections to etcd cluster, cannot get member list", [Name]),
do_sync_memberlist(#{name := Name,
gun_opts := #{transport := Transport},
active_conns := ActiveConns,
freeze_conns := Freezes} = Data) ->
case eetcd_cluster:member_list(Name) of
{ok, #{members := []}} ->
%% TODO: remove all active connections? That's critical situation.
{ok, #{members := Members}} ->
%% FIXME: take all clientURLs per member or take one per member?
%% Here we take all available clientURLs for every not-a-learner member.
ClientUrls0 = lists:filtermap(
fun(#{isLearner := true, 'ID' := MemberId, name := MemberName}) ->
?LOG_INFO(#{msg => "Member is a learner, skipped for now",
member_id => MemberId,
member_name => MemberName}),
(#{clientURLs := [], 'ID' := MemberId, name := MemberName}) ->
?LOG_INFO(#{msg => "Member not advertise client URLs",
member_id => MemberId,
member_name => MemberName}),
(#{clientURLs := Urls}) ->
{true, Urls}
end, Members),
ClientUrls1 = lists:flatten(ClientUrls0),

ClientUrls2 = lists:filtermap(
fun(Url) ->
case uri_string:parse(Url) of
#{host := Host, port := Port, scheme := Scheme}
when erlang:bit_size(Host) > 0 ->
case {Scheme, Transport} of
{<<"http">>, tcp} ->
{true, {erlang:binary_to_list(Host), Port}};
{<<"https">>, tls} ->
{true, {erlang:binary_to_list(Host), Port}};
{<<"https">>, ssl} ->
{true, {erlang:binary_to_list(Host), Port}};
_ ->
%% Note: because of the design of eetcd_conn, we need
%% the member lists' URL use the same transport
%% options to the active connections.
?LOG_WARNING("Not matched schemes from member list ~s",
_I ->
?LOG_WARNING("Url ~s from member list is not a valid etcd url",
end, ClientUrls1),

ClientUrls = lists:usort(ClientUrls2),

A = [HostPort || {HostPort, _GunPid, _Token} <- ActiveConns],
F = [HostPort || {HostPort, _Reconn} <- Freezes ],
RemovedEndpoints = (A ++ F) -- ClientUrls,
NewEndpoints = (ClientUrls -- A) -- F,

%% Remove removed_endpoints from freeze_conns
NewFreezes = [ {H, ?MIN_RECONN} || H <- (ClientUrls -- A)],

case RemovedEndpoints of
[] -> ok;
_ ->
?LOG_NOTICE(#{msg => "Got removed endpoints",
removed_endpoints => RemovedEndpoints})

case NewEndpoints of
[] ->
Data#{member_list => ClientUrls};
_ ->
?LOG_NOTICE(#{msg => "Got new endpoints", new_endpoints => NewEndpoints}),
Data#{freeze_conns => NewFreezes, member_list => ClientUrls}
{error, Reason} ->
?LOG_ERROR("~p get member_list failed by ~p ", [Name, Reason]),

%% UNKNOWN = 0;
%% SERVING = 1;
Expand Down Expand Up @@ -459,4 +590,4 @@ put_in_authenticate(Data, Options) ->
shuffle(List) ->
Disorders = [begin {rand:uniform(), K} end||K <-List],
[begin K end||{_, K} <- lists:keysort(1, Disorders)].

