Skip to content

Commit

Permalink
Merge pull request #13 from cabol/cabol.version_0.1
Browse files Browse the repository at this point in the history
Refactoring functions: get_subscribers -> subscribers and get_channel…
  • Loading branch information
cabol committed Jul 23, 2015
2 parents 61c62ad + 8f4ede0 commit 74c6497
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 90 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ Is as simple as this:
ebus:dispatch(ch1, "Hi!", MyHandler).
```

Instead of call `ebus:pub(Channel, Message)`, you call `ebus:dispatch(Channel, Message, Handler)`, and
the only difference is that you have to provide the `Handler` which will receive the message.
Instead of call `ebus:pub(Channel, Message)`, you call `ebus:dispatch(Channel, Message, Handler)`,
and the only difference is that you have to provide the `Handler` which will receive the message.
The reason of this is that you're free to implement your scheduling/dispatching strategy. Also,
you can use `ebus_util:get_best_pid(ListOfHandlers)` to find an available handler. For example:

Expand All @@ -265,7 +265,7 @@ ebus:sub(my_channel, [MH1, MH2, MH3]).
ok

%% Get the subscribed handlers
Handlers = ebus:get_subscribers(my_channel).
Handlers = ebus:subscribers(my_channel).
[<0.47.0>, <0.48.0>, <0.49.0>]

%% Find an available handler
Expand Down
80 changes: 40 additions & 40 deletions src/ebus.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
-export([new/1, new/2, new/3]).
-export([set_options/1, set_options/2]).
-export([sub/2, sub/3, unsub/2, unsub/3, pub/2, pub/3]).
-export([get_subscribers/1, get_subscribers/2]).
-export([get_channels/0, get_channels/1]).
-export([subscribers/1, subscribers/2]).
-export([channels/0, channels/1]).
-export([dispatch/3, dispatch/4]).

%% Hidden
Expand All @@ -52,7 +52,7 @@

%% ebus types
-type cmd() :: sub | unsub | pub | dispatch |
get_subscribers | get_channels.
subscribers | channels.
-type channel() :: any().
-type payload() :: any().
-type message() :: {channel(), payload()}.
Expand All @@ -76,15 +76,15 @@
-type options() :: [option()].

%% State
-record(state, {module :: module(),
call_timeout :: non_neg_integer(),
n :: non_neg_integer(),
sub :: non_neg_integer(),
unsub :: non_neg_integer(),
pub :: non_neg_integer(),
dispatch :: non_neg_integer(),
get_subscribers :: non_neg_integer(),
get_channels :: non_neg_integer()}).
-record(state, {module :: module(),
call_timeout :: non_neg_integer(),
n :: pos_integer(),
sub :: pos_integer(),
unsub :: pos_integer(),
pub :: pos_integer(),
dispatch :: pos_integer(),
subscribers :: pos_integer(),
channels :: pos_integer()}).

%% Modules
-define(MODULES, [ebus_pg2, ebus_gproc, ebus_dist]).
Expand Down Expand Up @@ -120,12 +120,12 @@
Response :: ebus_ret().

%% @doc Returns a list with all subscribers to the `Channel`.
-callback get_subscribers(Channel) -> Response when
-callback subscribers(Channel) -> Response when
Channel :: channel(),
Response :: [handler()].

%% @doc Returns a list with all registered channels.
-callback get_channels() -> Response when
-callback channels() -> Response when
Response :: [channel()].

%% @doc Sends the `Message` to `Handler`, which is subscribed to `Channel`.
Expand Down Expand Up @@ -193,21 +193,21 @@ pub(Channel, Message) ->
pub(Name, Channel, Message) ->
gen_server:call(Name, {pub, Channel, Message}).

-spec get_subscribers(channel()) -> [handler()].
get_subscribers(Channel) ->
gen_server:call(?SERVER, {get_subscribers, Channel}).
-spec subscribers(channel()) -> [handler()].
subscribers(Channel) ->
subscribers(?SERVER, Channel).

-spec get_subscribers(name(), channel()) -> [handler()].
get_subscribers(Name, Channel) ->
gen_server:call(Name, {get_subscribers, Channel}).
-spec subscribers(name(), channel()) -> [handler()].
subscribers(Name, Channel) ->
gen_server:call(Name, {subscribers, Channel}).

-spec get_channels() -> [channel()].
get_channels() ->
gen_server:call(?SERVER, get_channels).
-spec channels() -> [channel()].
channels() ->
gen_server:call(?SERVER, channels).

-spec get_channels(name()) -> [channel()].
get_channels(Name) ->
gen_server:call(Name, get_channels).
-spec channels(name()) -> [channel()].
channels(Name) ->
gen_server:call(Name, channels).

-spec dispatch(channel(), payload(), handler()) -> ebus_ret().
dispatch(Channel, Message, Handler) ->
Expand Down Expand Up @@ -254,21 +254,21 @@ handle_call({pub, Ch, M},
handle_call({pub, Ch, M}, _From, #state{module = Mod} = S0) ->
Reply = Mod:pub(Ch, M),
{reply, Reply, S0};
handle_call({get_subscribers, Ch},
handle_call({subscribers, Ch},
_From,
#state{module = ebus_dist, n = N, get_subscribers = Q} = S0) ->
Reply = ebus_dist:get_subscribers(Ch, dist_opts(N, Q)),
#state{module = ebus_dist, n = N, subscribers = Q} = S0) ->
Reply = ebus_dist:subscribers(Ch, dist_opts(N, Q)),
{reply, Reply, S0};
handle_call({get_subscribers, Ch}, _From, #state{module = Mod} = S0) ->
Reply = Mod:get_subscribers(Ch),
handle_call({subscribers, Ch}, _From, #state{module = Mod} = S0) ->
Reply = Mod:subscribers(Ch),
{reply, Reply, S0};
handle_call(get_channels,
handle_call(channels,
_From,
#state{module = ebus_dist, n = N, get_channels = Q} = S0) ->
Reply = ebus_dist:get_channels(dist_opts(N, Q)),
#state{module = ebus_dist, n = N, channels = Q} = S0) ->
Reply = ebus_dist:channels(dist_opts(N, Q)),
{reply, Reply, S0};
handle_call(get_channels, _From, #state{module = Mod} = S0) ->
Reply = Mod:get_channels(),
handle_call(channels, _From, #state{module = Mod} = S0) ->
Reply = Mod:channels(),
{reply, Reply, S0};
handle_call({dispatch, Ch, M, H},
_From,
Expand Down Expand Up @@ -314,10 +314,10 @@ parse_options([{pub, Q} | Opts], State) when is_integer(Q) ->
parse_options(Opts, State#state{pub = Q});
parse_options([{dispatch, Q} | Opts], State) when is_integer(Q) ->
parse_options(Opts, State#state{dispatch = Q});
parse_options([{get_subscribers, Q} | Opts], State) when is_integer(Q) ->
parse_options(Opts, State#state{get_subscribers = Q});
parse_options([{get_channels, Q} | Opts], State) when is_integer(Q) ->
parse_options(Opts, State#state{get_channels = Q});
parse_options([{subscribers, Q} | Opts], State) when is_integer(Q) ->
parse_options(Opts, State#state{subscribers = Q});
parse_options([{channels, Q} | Opts], State) when is_integer(Q) ->
parse_options(Opts, State#state{channels = Q});
parse_options([_Opt | Opts], State) ->
parse_options(Opts, State).

Expand Down
36 changes: 18 additions & 18 deletions src/ebus_dist.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@

%% API
-export([sub/2, unsub/2, pub/2]).
-export([get_subscribers/1, get_channels/0]).
-export([subscribers/1, channels/0]).
-export([dispatch/3]).

%% Extended API
-export([sub/3, unsub/3, pub/3]).
-export([get_subscribers/2, get_channels/1]).
-export([subscribers/2, channels/1]).
-export([dispatch/4]).

%% Debug
Expand All @@ -46,9 +46,9 @@

%% Distribution parameters
%% n: number of replicas.
%% sub | unsub | pub | dispatch | get_subscribers | get_channels: quorums.
%% sub | unsub | pub | dispatch | subscribers | channels: quorums.
-type parameter() :: n | sub | unsub | pub | dispatch |
get_subscribers | get_channels.
subscribers | channels.

%% ebus_dist options
-type option() :: {parameter(), pos_integer()}.
Expand Down Expand Up @@ -97,23 +97,23 @@ pub(Channel, Message, Opts) ->
Callback = {ebus_gproc, pub, [Channel, Message]},
do_write(Channel, Channel, pub, Callback, Opts).

-spec get_subscribers(ebus:channel()) -> [ebus:handler()].
get_subscribers(Channel) ->
get_subscribers(Channel, []).
-spec subscribers(ebus:channel()) -> [ebus:handler()].
subscribers(Channel) ->
subscribers(Channel, []).

-spec get_subscribers(ebus:channel(), options()) -> [ebus:handler()].
get_subscribers(Channel, Opts) ->
Callback = {ebus_gproc, get_subscribers, [Channel]},
do_write(Channel, Channel, get_subscribers, Callback, Opts).
-spec subscribers(ebus:channel(), options()) -> [ebus:handler()].
subscribers(Channel, Opts) ->
Callback = {ebus_gproc, subscribers, [Channel]},
do_write(Channel, Channel, subscribers, Callback, Opts).

-spec get_channels() -> [ebus:channel()].
get_channels() ->
get_subscribers([]).
-spec channels() -> [ebus:channel()].
channels() ->
channels([]).

-spec get_channels(options()) -> [ebus:channel()].
get_channels(Opts) ->
Callback = {ebus_gproc, get_channels, []},
do_write(undefined, undefined, get_channels, Callback, Opts).
-spec channels(options()) -> [ebus:channel()].
channels(Opts) ->
Callback = {ebus_gproc, channels, []},
do_write(undefined, undefined, channels, Callback, Opts).

-spec dispatch(
ebus:channel(), ebus:payload(), ebus:handler()
Expand Down
4 changes: 2 additions & 2 deletions src/ebus_dist_cmd_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,9 @@ mk_reqid() -> erlang:phash2(os:timestamp()).
different(A) -> fun(B) -> A =/= B end.

%% @private
repair(get_subscribers, Replies, _State) ->
repair(subscribers, Replies, _State) ->
max_size_list(Replies);
repair(get_channels, Replies, _State) ->
repair(channels, Replies, _State) ->
MergedList = merge_lists(Replies),
ebus_util:rem_dups_from_list(MergedList);
repair(_Op, Replies, #state{q = Q}) ->
Expand Down
10 changes: 5 additions & 5 deletions src/ebus_gproc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

%% API
-export([sub/2, unsub/2, pub/2]).
-export([get_subscribers/1, get_channels/0]).
-export([subscribers/1, channels/0]).
-export([dispatch/3]).

%%%===================================================================
Expand Down Expand Up @@ -57,13 +57,13 @@ pub(Channel, Message) ->
gproc:send({p, l, {?MODULE, Channel}}, {ebus, {Channel, Message}}),
ok.

-spec get_subscribers(ebus:channel()) -> [ebus:handler()].
get_subscribers(Channel) ->
-spec subscribers(ebus:channel()) -> [ebus:handler()].
subscribers(Channel) ->
Key = {p, l, {?MODULE, Channel}},
gproc:lookup_pids(Key).

-spec get_channels() -> [ebus:channel()].
get_channels() ->
-spec channels() -> [ebus:channel()].
channels() ->
Pattern = {{{p, l, {?MODULE, '$1'}}, '_'}, '_', '_'},
L = [N || [N] <- ets:match(gproc, Pattern)],
ebus_util:rem_dups_from_list(L).
Expand Down
12 changes: 6 additions & 6 deletions src/ebus_pg2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

%% API
-export([sub/2, unsub/2, pub/2]).
-export([get_subscribers/1, get_channels/0]).
-export([subscribers/1, channels/0]).
-export([dispatch/3]).

%%%===================================================================
Expand Down Expand Up @@ -64,11 +64,11 @@ unsub(Channel, Handler) ->

-spec pub(ebus:channel(), ebus:payload()) -> ok.
pub(Channel, Msg) ->
Pids = get_subscribers(Channel),
Pids = subscribers(Channel),
lists:foreach(fun(Pid) -> Pid ! {ebus, {Channel, Msg}} end, Pids).

-spec get_subscribers(ebus:channel()) -> [ebus:handler()].
get_subscribers(Channel) ->
-spec subscribers(ebus:channel()) -> [ebus:handler()].
subscribers(Channel) ->
case pg2:get_members(Channel) of
{error, {no_such_group, _}} ->
ok = pg2:create(Channel),
Expand All @@ -77,8 +77,8 @@ get_subscribers(Channel) ->
Members
end.

-spec get_channels() -> [ebus:channel()].
get_channels() ->
-spec channels() -> [ebus:channel()].
channels() ->
pg2:which_groups().

-spec dispatch(ebus:channel(), ebus:payload(), ebus:handler()) -> ok.
Expand Down
10 changes: 5 additions & 5 deletions test/ebus_dist_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ basic_n1_q1(_Config) ->
ok = ebus:sub(Name, ch1, [MH1, MH2]),

%% Check subscribers
2 = length(ebus:get_subscribers(Name, ch1)),
2 = length(ebus:subscribers(Name, ch1)),

%% Check channels
1 = length(ebus:get_channels(Name)),
1 = length(ebus:channels(Name)),

%% Publish to 'ch1'
ok = ebus:pub(Name, ch1, {<<"ID1">>, <<"Hi!">>}),
Expand All @@ -107,7 +107,7 @@ basic_n1_q1(_Config) ->
ok = ebus:sub(Name, ch1, MH3),

%% Check subscribers
3 = length(ebus:get_subscribers(Name, ch1)),
3 = length(ebus:subscribers(Name, ch1)),

%% Publish to 'ch1'
ok = ebus:pub(Name, ch1, {<<"ID2">>, <<"Hi!">>}),
Expand All @@ -134,7 +134,7 @@ basic_n1_q1(_Config) ->
ok = ebus:unsub(Name, ch1, [MH1, MH2]),

%% Check subscribers
1 = length(ebus:get_subscribers(Name, ch1)),
1 = length(ebus:subscribers(Name, ch1)),

%% Publish to 'ch1'
ok = ebus:pub(Name, ch1, {<<"ID3">>, <<"Hi!">>}),
Expand All @@ -150,7 +150,7 @@ basic_n1_q1(_Config) ->
ok = ebus:unsub(Name, ch1, MH3),

%% Check subscribers
0 = length(ebus:get_subscribers(Name, ch1)),
0 = length(ebus:subscribers(Name, ch1)),

%% Publish to 'ch1'
ok = ebus:pub(Name, ch1, {<<"ID4">>, <<"Hi!">>}),
Expand Down
12 changes: 6 additions & 6 deletions test/ebus_local_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ ebus_pub_sub(_Config) ->
ok = ebus:sub(ch3, MH2),

%% Check subscribers
2 = length(ebus:get_subscribers(ch1)),
2 = length(ebus:subscribers(ch1)),

%% Check channels
3 = length(ebus:get_channels()),
3 = length(ebus:channels()),

%% Publish to 'ch1'
ok = ebus:pub(ch1, {<<"ID1">>, <<"Hi!">>}),
Expand Down Expand Up @@ -123,7 +123,7 @@ ebus_pub_sub(_Config) ->
ok = ebus:sub(ch1, MH3),

%% Check subscribers
3 = length(ebus:get_subscribers(ch1)),
3 = length(ebus:subscribers(ch1)),

%% Publish to 'ch1'
ok = ebus:pub(ch1, {<<"ID2">>, <<"Hi!">>}),
Expand All @@ -150,7 +150,7 @@ ebus_pub_sub(_Config) ->
ok = ebus:unsub(ch1, [MH1, MH2]),

%% Check subscribers
1 = length(ebus:get_subscribers(ch1)),
1 = length(ebus:subscribers(ch1)),

%% Publish to 'ch1'
ok = ebus:pub(ch1, {<<"ID3">>, <<"Hi!">>}),
Expand All @@ -166,7 +166,7 @@ ebus_pub_sub(_Config) ->
ok = ebus:unsub(ch2, MH1),

%% Check subscribers
0 = length(ebus:get_subscribers(ch2)),
0 = length(ebus:subscribers(ch2)),

%% Publish to 'ch2'
ok = ebus:pub(ch2, {<<"ID4">>, <<"Hi!">>}),
Expand Down Expand Up @@ -199,7 +199,7 @@ ebus_pool(_Config) ->
ok = ebus:sub(Name, ch1, MH2),

%% Check subscribers
2 = length(ebus:get_subscribers(Name, ch1)),
2 = length(ebus:subscribers(Name, ch1)),

%% Publish to 'ch1'
ok = ebus:pub(Name, ch1, {<<"ID1">>, <<"Hi!">>}),
Expand Down
Loading

0 comments on commit 74c6497

Please sign in to comment.