Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to Gun2.0 #58

Merged
merged 6 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,29 @@ Gen proto and client file
```erlang
rebar3 etcd gen
```

Migration from eetcd 0.3.x to 0.4.x
-----

eetcd 0.4.x now dependents on Gun 2.0, which introduced some breaking changes,
and propagate to eetcd.

The prior transport options are split into `tcp_opts` and `tls_opts` and moved
inside the new `eetcd:opts()` parameter. As a result, the functions `eetcd:open/4,5`
have been replaced with `eetcd:open/2,3`.

Likewise, the transport options for `eetcd_maintenance` APIs are split into
`tcp_opts` and `tls_opts` as well.

- The function `eetcd:open/4,5` has been replaced with `eetcd:open/3`.
- The function `eetcd_maintenance:defragment/3` has been replaced with `eetcd_maintenance:defragment/2`.
- The function `eetcd_maintenance:status/3` has been replaced with `eetcd_maintenance:status/2`.
- The function `eetcd_maintenance:has_kv/4` has been replaced with `eetcd_maintenance:has_kv/3`.

New options `{domain_lookup_timeout, Interval}` and `{tls_handshake_timeout, Interval}`
have been added for `eetcd:open/3`. Alone with the prior `{connect_timeout, Interval}`,
it allows the underlining Gun library to get separate events when connecting,
the domain lookup, connection and TLS handshakes.
- `tls_opts` Passed to Gun.

Read more details of Gun options in the [Gun 2.0 manual](https://ninenines.eu/docs/en/gun/2.0/manual/gun/).
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{erl_opts, [{i, "./_build/default/plugins/gpb/include"}]}.
{deps, [
{gun, "1.3.3"}
{gun, "2.0.0"}
]}.

{gpb_opts, [
Expand Down
12 changes: 6 additions & 6 deletions rebar.lock
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
{"1.2.0",
[{<<"cowlib">>,{pkg,<<"cowlib">>,<<"2.7.3">>},1},
{<<"gun">>,{pkg,<<"gun">>,<<"1.3.3">>},0}]}.
[{<<"cowlib">>,{pkg,<<"cowlib">>,<<"2.11.0">>},1},
{<<"gun">>,{pkg,<<"gun">>,<<"2.0.0">>},0}]}.
[
{pkg_hash,[
{<<"cowlib">>, <<"A7FFCD0917E6D50B4D5FB28E9E2085A0CEB3C97DEA310505F7460FF5ED764CE9">>},
{<<"gun">>, <<"CF8B51BEB36C22B9C8DF1921E3F2BC4D2B1F68B49AD4FBC64E91875AA14E16B4">>}]},
{<<"cowlib">>, <<"0B9FF9C346629256C42EBE1EEB769A83C6CB771A6EE5960BD110AB0B9B872063">>},
{<<"gun">>, <<"2326BC0FD6D9CF628419708270D6FE8B02B8D002CF992E4165A77D997B1DEFD0">>}]},
{pkg_hash_ext,[
{<<"cowlib">>, <<"1E1A3D176D52DAEBBECBBCDFD27C27726076567905C2A9D7398C54DA9D225761">>},
{<<"gun">>, <<"3106CE167F9C9723F849E4FB54EA4A4D814E3996AE243A1C828B256E749041E0">>}]}
{<<"cowlib">>, <<"2B3E9DA0B21C4565751A6D4901C20D1B4CC25CBB7FD50D91D2AB6DD287BC86A9">>},
{<<"gun">>, <<"6613CB7C62930DC8D58263C44DDA72F8556346BA88358FC929DCBC5F76D04569">>}]}
].
2 changes: 1 addition & 1 deletion src/eetcd.app.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{application, eetcd,
[
{description, "ETCD V3 client"},
{vsn, "0.3.6"},
{vsn, "0.4.0"},
{registered, [eetcd_sup, eetcd_conn_sup, eetcd_lease_sup]},
{mod, {eetcd_app, []}},
{applications, [kernel, stdlib, gun]},
Expand Down
63 changes: 34 additions & 29 deletions src/eetcd.erl
Original file line number Diff line number Diff line change
@@ -1,30 +1,34 @@
-module(eetcd).
-include("eetcd.hrl").
%% API
-export([open/2, open/4, open/5, close/1]).
-export([open/2, open/3, close/1]).
-export([info/0]).
-export([new/1, with_timeout/2]).
-export([get_prefix_range_end/1]).
-export_type([opts/0]).

-type opts() :: [ {mode, connect_all | random} |
{transport, tcp | tls | ssl} |
{name, string()} |
{password, string()} |
{auto_sync_interval_ms, non_neg_integer()} |
{retry, non_neg_integer()} |
{retry_timeout, pos_integer()} |
{connect_timeout, timeout()} |
{domain_lookup_timeout, timeout()} |
{tls_handshake_timeout, timeout()} |
{tcp_opts, [gen_tcp:connect_option()]} |
{tls_opts, [ssl:tls_client_option()]}
].

%% @doc Connects to a etcd server on TCP port
%% Port on the host with IP address Address, such as:
%% `open(test,["127.0.0.1:2379","127.0.0.1:2479","127.0.0.1:2579"]).'
-spec open(name(), [string()]) -> {ok, pid()} | {error, any()}.
open(Name, Hosts) ->
open(Name, Hosts, [], tcp, []).

%% @doc Connects to a etcd server.
-spec open(name(),
[string()],
tcp | tls | ssl,
[gen_tcp:connect_option()] | [ssl:connect_option()]) ->
{ok, pid()} | {error, any()}.
open(Name, Hosts, Transport, TransportOpts) ->
open(Name, Hosts, [], Transport, TransportOpts).
open(Name, Hosts, [{transport, tcp}]).

%% @doc Connects to a etcd server.
%% ssl:connect_option() see all options in ssl_api.hrl
%% such as [{certfile, Certfile}, {keyfile, Keyfile}] or [{cert, Cert}, {key, Key}].
%%
%% Default mode is `connect_all', it creates multiple sub-connections (one sub-connection per each endpoint).
%% The balancing policy is round robin.
Expand All @@ -39,6 +43,8 @@ open(Name, Hosts, Transport, TransportOpts) ->
%% When the client receives an error, it randomly picks another normal endpoint.
%%
%% `{connect_timeout, Interval}' is the connection timeout. Defaults to one second (1000).
%% `{domain_lookup_timeout, Interval}' is the domain_lookup_timeout timeout. Defaults to one second (1000).
%% `{tls_handshake_timeout, Interval}' is the tls_handshake_timeout timeout. Defaults to three second (3000).
%% `{retry, Attempts}' is the number of times it will try to reconnect on failure before giving up. Defaults to zero (disabled).
%% `{retry_timeout, Interval}' is the time between retries in milliseconds.
%%
Expand All @@ -49,25 +55,24 @@ open(Name, Hosts, Transport, TransportOpts) ->
%% list via the MemberList API of etcd, and will try to connect any new endpoints if in `connect_all'
%% mode.
%%
%% `[{name, string()},{password, string()}]' generates an authentication token based on a given user name and password.
%% `[{name, string()}, {password, string()}]' generates an authentication token based on a given user name and password.
%%
%% `{tcp_opts, [gen_tcp:connect_option()]}' and `{tls_opts, [ssl:tls_client_option()]}' are the
%% options for gun:open/3 in Gun 2.0.
%%
%% See all TCP options in {@link gen_tcp} module.
%%
%% See all TLS client options in {@link ssl} module,
%% such as `[{certfile, Certfile}, {keyfile, Keyfile}] or [{cert, Cert}, {key, Key}]'.
%%
%% Read more details of gun options in the
%% [https://ninenines.eu/docs/en/gun/2.0/manual/gun/ Gun 2.0 manual].
%%
%% You can use `eetcd:info/0' to see the internal connection status.
-spec open(name(),
[string()],
[
{mode, connect_all | random}
| {name, string()}
| {password, string()}
| {retry, non_neg_integer()}
| {retry_timeout, pos_integer()}
| {connect_timeout, timeout()}
],
tcp | tls | ssl,
[gen_tcp:connect_option()] | [ssl:connect_option()]) ->
{ok, pid()} | {error, any()}.
open(Name, Hosts, Options, Transport, TransportOpts) ->
-spec open(name(), [string()], opts()) -> {ok, pid()} | {error, any()}.
open(Name, Hosts, Options) ->
Cluster = [begin [IP, Port] = string:tokens(Host, ":"), {IP, list_to_integer(Port)} end || Host <- Hosts],
eetcd_conn_sup:start_child([{Name, Cluster, Options, Transport, TransportOpts}]).
eetcd_conn_sup:start_child([{Name, Cluster, Options}]).

%% @doc close connections with etcd server.
-spec close(name()) -> ok | {error, eetcd_conn_unavailable}.
Expand Down
9 changes: 6 additions & 3 deletions src/eetcd_conn.erl
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,18 @@ flush_token(Gun, Headers) ->
%%%===================================================================
%%% gen_statem callbacks
%%%===================================================================
init({Name, Hosts, Options, Transport, TransportOpts}) ->
init({Name, Hosts, Options}) ->
erlang:process_flag(trap_exit, true),
GunOpts = #{protocols => [http2],
connect_timeout => proplists:get_value(connect_timeout, Options, 1000),
domain_lookup_timeout => proplists:get_value(domain_lookup_timeout, Options, 1000),
tls_handshake_timeout => proplists:get_value(tls_handshake_timeout, Options, 3000),
http2_opts => #{keepalive => 45000},
retry => proplists:get_value(retry, Options, 0),
retry_timeout => proplists:get_value(retry_timeout, Options, 5000),
transport => Transport,
transport_opts => TransportOpts
transport => proplists:get_value(transport, Options, tcp),
tcp_opts => proplists:get_value(tcp_opts, Options, []),
tls_opts => proplists:get_value(tls_opts, Options, [])
},
AutoSyncInterval = proplists:get_value(auto_sync_interval_ms, Options, 0),
Data0 = #{
Expand Down
24 changes: 12 additions & 12 deletions src/eetcd_maintenance.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
-include("eetcd.hrl").
%% API
-export([alarm_list/1, alarm_disarm/3, alarm_disarm_all/1]).
-export([defragment/3, status/3, hash_kv/4, move_leader/2]).
-export([defragment/2, status/2, hash_kv/3, move_leader/2]).

%%% @doc AlarmList gets all active alarms.
-spec alarm_list(name()|context()) ->
Expand Down Expand Up @@ -46,30 +46,30 @@ alarm_disarm_all(ConnName) ->
%%% Defragment is an expensive operation. User should avoid defragmenting multiple members at the same time.
%%% To defragment multiple members in the cluster, user need to call defragment multiple
%%% times with different endpoints.
-spec defragment(iodata(), tcp | tls | ssl, [gen_tcp:connect_option()] | [ssl:connect_option()]) ->
-spec defragment(iodata(), eetcd:opts()) ->
{ok,router_pb:'Etcd.DefragmentResponse'()}|{error,eetcd_error()}.
defragment(Endpoint, Transport, TransportOpts) ->
defragment(Endpoint, Options) ->
Fun = fun(Conn) -> eetcd_maintenance_gen:defragment(eetcd:new(Conn)) end,
dial(Endpoint, Transport, TransportOpts, Fun).
dial(Endpoint, Options, Fun).

%%% @doc Status gets the status of the endpoint.
-spec status(iodata(), tcp | tls | ssl, [gen_tcp:connect_option()] | [ssl:connect_option()]) ->
-spec status(iodata(), eetcd:opts()) ->
{ok,router_pb:'Etcd.StatusResponse'()}|{error,eetcd_error()}.
status(Endpoint, Transport, TransportOpts) ->
status(Endpoint, Options) ->
Fun = fun(Conn) -> eetcd_maintenance_gen:status(eetcd:new(Conn)) end,
dial(Endpoint, Transport, TransportOpts, Fun).
dial(Endpoint, Options, Fun).

%%% @doc HashKV returns a hash of the KV state at the time of the RPC.
%%% If revision is zero, the hash is computed on all keys. If the revision
%%% is non-zero, the hash is computed on all keys at or below the given revision.
-spec hash_kv(iodata(), tcp | tls | ssl, [gen_tcp:connect_option()] | [ssl:connect_option()], pos_integer()) ->
-spec hash_kv(iodata(), eetcd:opts(), pos_integer()) ->
{ok,router_pb:'Etcd.HashKVResponse'()}|{error,eetcd_error()}.
hash_kv(Endpoint, Transport, TransportOpts, Rev) ->
hash_kv(Endpoint, Options, Rev) ->
Fun = fun(Conn) ->
Context = maps:put(revision, Rev, eetcd:new(Conn)),
eetcd_maintenance_gen:hash_kv(Context)
end,
dial(Endpoint, Transport, TransportOpts, Fun).
dial(Endpoint, Options, Fun).

%%% Snapshot provides a reader for a point-in-time snapshot of etcd.
%%% If the context "ctx" is canceled or timed out, reading from returned
Expand All @@ -89,10 +89,10 @@ move_leader(Context, TargetID) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
dial(Endpoint, Transport, TransportOpts, Fun) ->
dial(Endpoint, Options, Fun) ->
Conn = make_ref(),
try
case eetcd:open(Conn, [Endpoint], Transport, TransportOpts) of
case eetcd:open(Conn, [Endpoint], Options) of
{ok, _Pid} -> Fun(Conn);
Err ->
Err
Expand Down
64 changes: 24 additions & 40 deletions src/eetcd_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
new(Name, Path) ->
case eetcd_conn:round_robin_select(Name) of
{ok, Pid, Headers} ->
Ref = gun:request(Pid, <<"POST">>, Path, Headers, <<>>),
Ref = gun:headers(Pid, <<"POST">>, Path, Headers),
{ok, Pid, Ref};
Err -> Err
end.
Expand Down Expand Up @@ -82,7 +82,7 @@ unary(Pid, Request, RequestName, Path, ResponseType, Headers) when is_pid(Pid) -
Res =
case await(Pid, StreamRef, Timeout, MRef) of
{response, nofin, 200, _Headers} ->
case await_body(Pid, StreamRef, Timeout, MRef, <<>>) of
case await_body(Pid, StreamRef, Timeout, MRef) of
{ok, ResBody, _Trailers} ->
{ok, Resp, <<>>} = eetcd_grpc:decode(identity, ResBody, ResponseType),
{ok, Resp};
Expand All @@ -96,7 +96,7 @@ unary(Pid, Request, RequestName, Path, ResponseType, Headers) when is_pid(Pid) -
StreamRef1 = gun:request(Pid, <<"POST">>, Path, NewHeaders, EncodeBody),
case await(Pid, StreamRef1, Timeout, MRef) of
{response, nofin, 200, _Headers} ->
case await_body(Pid, StreamRef1, Timeout, MRef, <<>>) of
case await_body(Pid, StreamRef1, Timeout, MRef) of
{ok, ResBody, _Trailers} ->
{ok, Resp, <<>>} = eetcd_grpc:decode(identity, ResBody, ResponseType),
{ok, Resp};
Expand All @@ -112,43 +112,27 @@ unary(Pid, Request, RequestName, Path, ResponseType, Headers) when is_pid(Pid) -
erlang:demonitor(MRef, [flush]),
Res.

%% `gun:await/2,3,4`, `gun:await_body/2,3,4` and `gun:await_up/1,2,3` don't distinguish the error types until v2.0.0.
%% They can be a timeout, a connection error, a stream error or a down error (when the Gun process exited while waiting).
%% so we copy some code from gun v2.0.0 to replace `gun:await/4`
%% TODO remove this when upgrade gun to v2.0.0
await(ServerPid, StreamRef, Timeout, MRef) ->
receive
{gun_response, ServerPid, StreamRef, IsFin, Status, Headers} ->
{response, IsFin, Status, Headers};
{gun_data, ServerPid, StreamRef, IsFin, Data} ->
{data, IsFin, Data};
{gun_error, ServerPid, StreamRef, Reason} ->
{error, {gun_stream_error, Reason}};
{gun_error, ServerPid, Reason} ->
{error, {gun_conn_error, Reason}};
{'DOWN', MRef, process, ServerPid, Reason} ->
{error, {gun_down, Reason}}
after Timeout ->
{error, timeout}
case gun:await(ServerPid, StreamRef, Timeout, MRef) of
{response, _, _, _}=Resp ->
Resp;
{data, _, _}=Resp ->
Resp;
{error, _} = Resp ->
transfer_error(Resp);
Other ->
?LOG_INFO("eetcd_await_resp_other ~p", [Other]),
await(ServerPid, StreamRef, Timeout, MRef)
end.

await_body(ServerPid, StreamRef, Timeout, MRef, Acc) ->
receive
{gun_data, ServerPid, StreamRef, nofin, Data} ->
await_body(ServerPid, StreamRef, Timeout, MRef,
<<Acc/binary, Data/binary>>);
{gun_data, ServerPid, StreamRef, fin, Data} ->
{ok, <<Acc/binary, Data/binary>>};
%% It's OK to return trailers here because the client specifically requested them
%% Trailers are grpc_status and grpc_message headers
{gun_trailers, ServerPid, StreamRef, Trailers} ->
{ok, Acc, Trailers};
{gun_error, ServerPid, StreamRef, Reason} ->
{error, {gun_stream_error, Reason}};
{gun_error, ServerPid, Reason} ->
{error, {gun_conn_error, Reason}};
{'DOWN', MRef, process, ServerPid, Reason} ->
{error, {gun_down, Reason}}
after Timeout ->
{error, timeout}
end.
await_body(ServerPid, StreamRef, Timeout, MRef) ->
transfer_error(gun:await_body(ServerPid, StreamRef, Timeout, MRef)).

transfer_error({error, {stream_error, Reason}}) ->
{error, {gun_stream_error, Reason}};
transfer_error({error, {connection_error, Reason}}) ->
{error, {gun_conn_error, Reason}};
transfer_error({error, {down, Reason}}) ->
{error, {gun_down, Reason}};
transfer_error(Other) ->
Other.
16 changes: 8 additions & 8 deletions test/eetcd_auth_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,19 @@ auth(_Config) ->
user(Name) ->
{ok, #{}} = eetcd_auth:auth_enable(Name),
%% list
{ok, AuthRoleListResponse} = eetcd_auth:user_list(Name),
{ok, _AuthRoleListResponse} = eetcd_auth:user_list(Name),
%% add
{ok, AuthRoleAddResponse} = eetcd_auth:user_add(Name, <<"name1">>, <<"12345">>),
{ok, _AuthRoleAddResponse} = eetcd_auth:user_add(Name, <<"name1">>, <<"12345">>),
%% get
{ok, AuthUserGetResponse} = eetcd_auth:user_get(Name, <<"name1">>),
{ok, _AuthUserGetResponse} = eetcd_auth:user_get(Name, <<"name1">>),
%% change password
{ok, AuthUserChangePasswordResponse} = eetcd_auth:user_change_password(Name, <<"name1">>,<<"54321">>),
{ok, _AuthUserChangePasswordResponse} = eetcd_auth:user_change_password(Name, <<"name1">>,<<"54321">>),
%% get
{ok, AuthUserGetResponse1} = eetcd_auth:user_get(Name, <<"name1">>),
{ok, _AuthUserGetResponse1} = eetcd_auth:user_get(Name, <<"name1">>),
%% delete
{ok, AuthUserDeleteResponse} = eetcd_auth:user_delete(Name, <<"name1">>),
{ok, _AuthUserDeleteResponse} = eetcd_auth:user_delete(Name, <<"name1">>),
%% list
{ok, AuthRoleListResponse2} = eetcd_auth:user_list(Name),
{ok, AuthDisableResponse2} = eetcd_auth:auth_disable(Name),
{ok, _AuthRoleListResponse2} = eetcd_auth:user_list(Name),
{ok, _AuthDisableResponse2} = eetcd_auth:auth_disable(Name),
ok.

2 changes: 1 addition & 1 deletion test/eetcd_lease_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ groups() ->
init_per_suite(Config) ->
application:ensure_all_started(eetcd),
{ok, _Pid} = eetcd:open(?Name, ["127.0.0.1:2379", "127.0.0.1:2479", "127.0.0.1:2579"],
[{mode, random}], tcp, []),
[{mode, random}, {transport, tcp}]),
Config.

init_per_testcase(_TestCase, Config) ->
Expand Down
2 changes: 1 addition & 1 deletion test/eetcd_lock_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ groups() ->
init_per_suite(Config) ->
application:ensure_all_started(eetcd),
{ok, _Pid} = eetcd:open(?Name, ["127.0.0.1:2379", "127.0.0.1:2479", "127.0.0.1:2579"],
[{mode, random}], tcp, []),
[{mode, random}, {transport, tcp}]),
Config.

init_per_testcase(_TestCase, Config) ->
Expand Down
Loading