Skip to content

Commit

Permalink
Merge pull request #48 from emqx/register-clients-in-ets-tab
Browse files Browse the repository at this point in the history
perf: register client pids in an ETS tab
  • Loading branch information
terry-xiaoyu authored Jun 21, 2024
2 parents 51448c1 + b72b802 commit e9b7a7f
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 21 deletions.
8 changes: 5 additions & 3 deletions src/ecpool.appup.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
%% -*-: erlang -*-
{"0.5.8",
{"0.5.9",
[
{<<"0\\.5\\.[3-7]">>, [
{<<"0\\.5\\.[3-8]">>, [
{load_module, ecpool_pool, brutal_purge, soft_purge, []},
{load_module, ecpool_worker, brutal_purge, soft_purge, []},
{load_module, ecpool, brutal_purge, soft_purge, []}
]},
Expand All @@ -20,7 +21,8 @@
]}
],
[
{<<"0\\.5\\.[3-7]">>, [
{<<"0\\.5\\.[3-8]">>, [
{load_module, ecpool_pool, brutal_purge, soft_purge, []},
{load_module, ecpool_worker, brutal_purge, soft_purge, []},
{load_module, ecpool, brutal_purge, soft_purge, []}
]},
Expand Down
47 changes: 31 additions & 16 deletions src/ecpool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
, get_client/2
, pick_and_do/3
, name/1
, ets_name/1
, workers/1
]).

Expand Down Expand Up @@ -66,6 +67,7 @@
| {on_reconnect, conn_callback()}
| {on_disconnect, conn_callback()}
| tuple().
-type get_client_ret() :: pid() | false | no_such_pool.

-define(IS_ACTION(ACTION), ((is_tuple(ACTION) andalso tuple_size(ACTION) == 3) orelse is_function(ACTION, 1))).

Expand All @@ -91,14 +93,20 @@ stop_sup_pool(Pool) ->
ecpool_sup:stop_pool(Pool).

%% @doc Get client/connection
-spec(get_client(pool_name()) -> pid() | false).
-spec(get_client(pool_name()) -> get_client_ret()).
get_client(Pool) ->
gproc_pool:pick_worker(name(Pool)).
try gproc_pool:pick_worker(name(Pool))
catch
error:badarg -> no_such_pool
end.

%% @doc Get client/connection with hash key.
-spec(get_client(pool_name(), any()) -> pid() | false).
-spec(get_client(pool_name(), any()) -> get_client_ret()).
get_client(Pool, Key) ->
gproc_pool:pick_worker(name(Pool), Key).
try gproc_pool:pick_worker(name(Pool), Key)
catch
error:badarg -> no_such_pool
end.

-spec(set_reconnect_callback(pool_name(), conn_callback()) -> ok).
set_reconnect_callback(Pool, Callback) ->
Expand All @@ -125,37 +133,39 @@ remove_reconnect_callback(Pool, Callback) ->
-spec with_client(pool_name(), action(Result)) ->
Result | {error, disconnected | ecpool_empty}.
with_client(Pool, Fun) when ?IS_ACTION(Fun) ->
with_worker(get_client(Pool), Fun, no_handover).
with_worker(Pool, get_client(Pool), Fun, no_handover).

%% @doc Call the fun with client/connection
-spec with_client(pool_name(), any(), action(Result)) ->
Result | {error, disconnected | ecpool_empty}.
with_client(Pool, Key, Fun) when ?IS_ACTION(Fun) ->
with_worker(get_client(Pool, Key), Fun, no_handover).
with_worker(Pool, get_client(Pool, Key), Fun, no_handover).

-spec pick_and_do({pool_name(), term()} | pool_name(), action(Result), apply_mode()) ->
Result | {error, disconnected | ecpool_empty}.
pick_and_do({Pool, KeyOrNum}, Action, ApplyMode) when ?IS_ACTION(Action) ->
with_worker(get_client(Pool, KeyOrNum), Action, ApplyMode);
with_worker(Pool, get_client(Pool, KeyOrNum), Action, ApplyMode);
pick_and_do(Pool, Action, ApplyMode) when ?IS_ACTION(Action) ->
with_worker(get_client(Pool), Action, ApplyMode).
with_worker(Pool, get_client(Pool), Action, ApplyMode).

-spec with_worker(pid() | false, action(Result), apply_mode()) ->
-spec with_worker(pool_name(), get_client_ret(), action(Result), apply_mode()) ->
Result | {error, disconnected | ecpool_empty}.
with_worker(false, Action, _Mode) when ?IS_ACTION(Action) ->
with_worker(_, no_such_pool, Action, _Mode) when ?IS_ACTION(Action) ->
{error, no_such_pool};
with_worker(_, false, Action, _Mode) when ?IS_ACTION(Action) ->
{error, ecpool_empty};
with_worker(Worker, Action, no_handover) when ?IS_ACTION(Action) ->
case ecpool_worker:client(Worker) of
with_worker(Pool, Worker, Action, no_handover) when ?IS_ACTION(Action) ->
case ecpool_pool:get_client_global(Pool, Worker) of
{ok, Client} -> exec(Action, Client);
{error, Reason} -> {error, Reason}
end;
with_worker(Worker, Action, handover) when ?IS_ACTION(Action) ->
with_worker(_, Worker, Action, handover) when ?IS_ACTION(Action) ->
ecpool_worker:exec(Worker, Action, infinity);
with_worker(Worker, Action, {handover, Timeout}) when is_integer(Timeout) andalso ?IS_ACTION(Action) ->
with_worker(_, Worker, Action, {handover, Timeout}) when is_integer(Timeout) andalso ?IS_ACTION(Action) ->
ecpool_worker:exec(Worker, Action, Timeout);
with_worker(Worker, Action, handover_async) when ?IS_ACTION(Action) ->
with_worker(_, Worker, Action, handover_async) when ?IS_ACTION(Action) ->
ecpool_worker:exec_async(Worker, Action);
with_worker(Worker, Action, {handover_async, CallbackFun = {_,_,_}}) when ?IS_ACTION(Action) ->
with_worker(_, Worker, Action, {handover_async, CallbackFun = {_,_,_}}) when ?IS_ACTION(Action) ->
ecpool_worker:exec_async(Worker, Action, CallbackFun).

%% @doc Pool workers
Expand All @@ -165,6 +175,11 @@ workers(Pool) ->
%% @doc ecpool name
name(Pool) -> {?MODULE, Pool}.

ets_name(Pool) -> list_to_atom("ecpool_" ++ str(Pool)).

str(Pool) when is_atom(Pool) -> atom_to_list(Pool);
str(Pool) when is_binary(Pool) -> binary_to_list(Pool).

exec({M, F, A}, Client) ->
erlang:apply(M, F, [Client]++A);
exec(Action, Client) when is_function(Action) ->
Expand Down
20 changes: 18 additions & 2 deletions src/ecpool_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
%% API Function Exports
-export([start_link/2]).

-export([info/1]).
-export([info/1, reg_client_global/2, get_client_global/2]).

%% gen_server Function Exports
-export([ init/1
Expand Down Expand Up @@ -59,12 +59,28 @@ init([Pool, Opts]) ->
PoolSize = get_value(pool_size, Opts, Schedulers),
PoolType = get_value(pool_type, Opts, random),
ok = ensure_pool(ecpool:name(Pool), PoolType, [{size, PoolSize}]),
EtsOpts = [named_table, set, public, {read_concurrency, true}],
_ = ets:new(ecpool:ets_name(Pool), EtsOpts),
ok = lists:foreach(
fun(I) ->
ensure_pool_worker(ecpool:name(Pool), {Pool, I}, I)
ensure_pool_worker(ecpool:name(Pool), {Pool, I}, I)
end, lists:seq(1, PoolSize)),
{ok, #state{name = Pool, size = PoolSize, type = PoolType}}.

%% register client globally to speed up client lookup
reg_client_global(Pool, Client) ->
ets:insert(ecpool:ets_name(Pool), {self(), Client}),
ok.

get_client_global(Pool, Worker) ->
try ets:lookup(ecpool:ets_name(Pool), Worker) of
[] -> {error, disconnected};
[{_, undefined}] -> {error, disconnected};
[{_, Client}] -> {ok, Client}
catch
error:badarg -> {error, disconnected}
end.

ensure_pool(Pool, Type, Opts) ->
try gproc_pool:new(Pool, Type, Opts)
catch
Expand Down
4 changes: 4 additions & 0 deletions src/ecpool_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ reconnect(Secs, State = #state{client = Client, on_disconnect = Disconnect, supe
[erlang:unlink(P) || P <- SubPids, is_pid(P)],
handle_disconnect(Client, Disconnect),
erlang:send_after(timer:seconds(Secs), self(), reconnect),
ecpool_pool:reg_client_global(State#state.pool, undefined),
{noreply, State#state{client = undefined}}.

handle_reconnect(undefined, _) ->
Expand All @@ -265,9 +266,11 @@ connect_internal(State) ->
try connect(State) of
{ok, Client} when is_pid(Client) ->
erlang:link(Client),
ecpool_pool:reg_client_global(State#state.pool, Client),
{ok, State#state{client = Client, supervisees = [Client]}};
{ok, Client, #{supervisees := SupPids} = _SupOpts} when is_list(SupPids) ->
[erlang:link(P) || P <- SupPids],
ecpool_pool:reg_client_global(State#state.pool, Client),
{ok, State#state{client = Client, supervisees = SupPids}};
{error, Error} ->
{error, Error}
Expand Down Expand Up @@ -304,6 +307,7 @@ remove_conn_callback({Mod, Fn}, Callbacks) when is_list(Callbacks) ->
lists:filter(fun({Mod0, Fn0, _Args}) -> {Mod0, Fn0} =/= {Mod, Fn} end, Callbacks).

erase_client(Pid, State = #state{client = Pid, supervisees = SupPids}) ->
ecpool_pool:reg_client_global(State#state.pool, undefined),
State#state{client = undefined, supervisees = SupPids -- [Pid]};
erase_client(Pid, State = #state{supervisees = SupPids}) ->
State#state{supervisees = SupPids -- [Pid]}.
Expand Down

0 comments on commit e9b7a7f

Please sign in to comment.