Skip to content

Commit

Permalink
Merge pull request #28 from cabol/cabol.version_0.1
Browse files Browse the repository at this point in the history
Fixed ebus_handler to support handle_fun/1
  • Loading branch information
cabol committed Oct 15, 2015
2 parents 3f8a7bb + 5e5cb01 commit 47d035f
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 27 deletions.
46 changes: 27 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
<img src="http://wallpaperssfree.com/wp-content/uploads/2013/05/world-fastest-train-hd-wallpapers.png" height="200" width="100%" />

ErlBus
======
# ErlBus

Message / Event Bus written in Erlang.

Expand Down Expand Up @@ -62,17 +61,25 @@ Once into the erlang console:
application:start(ebus).
ok

% Create anonymous function to act as handler
F = fun({Channel, Msg}, Ctx) ->
io:format("[Pid: ~p][Channel: ~p][Msg: ~p][Ctx: ~p]~n",
[self(), Channel, Msg, Ctx])
end.
#Fun<erl_eval.12.90072148>
% Create callback fun
F1 = fun({Channel, Msg}) ->
io:format("[Pid: ~p][Channel: ~p][Msg: ~p]~n", [self(), Channel, Msg])
end.
#Fun<erl_eval.6.90072148>

% Create anonymous handlers
MH1 = ebus_handler:new(F).
% Create handlers
MH1 = ebus_handler:new(F1).
<0.50.0>
MH2 = ebus_handler:new(F, {my_ctx, <<"MH2">>}).

% Create another callback fun, but receiving both, message and context
F2 = fun({Channel, Msg}, Ctx) ->
io:format("[Pid: ~p][Channel: ~p][Msg: ~p][Ctx: ~p]~n",
[self(), Channel, Msg, Ctx])
end.
#Fun<erl_eval.12.90072148>

% Create handlers
MH2 = ebus_handler:new(F2, {my_ctx, <<"MH2">>}).
<0.52.0>

% Subscribe them to channel ch1
Expand All @@ -82,21 +89,21 @@ ok

% Let's publish a message to 'ch1'
ebus:pub(ch1, "Hello!").
[Pid: <0.50.0>][Channel: ch1][Msg: "Hello!"][Ctx: undefined]
[Pid: <0.52.0>][Channel: ch1][Msg: "Hello!"][Ctx: {my_ctx,<<"MH2">>}]
[Pid: <0.51.0>][Channel: ch1][Msg: "Hello!"]
[Pid: <0.53.0>][Channel: ch1][Msg: "Hello!"][Ctx: {my_ctx,<<"MH2">>}]
ok

% Another handler
MH3 = ebus_handler:new(F, {my_ctx, <<"MH3">>}).
<0.54.0>
MH3 = ebus_handler:new(F2, {my_ctx, <<"MH3">>}).
<0.58.0>

% Subscribe the other handler 'MH3' to ch2
ebus:sub(ch2, MH3).
ok

% Publish to 'ch2'
ebus:pub(ch2, "Hello other!").
[Pid: <0.57.0>][Channel: ch2][Msg: "Hello other!"][Ctx: {my_ctx,<<"MH3">>}]
[Pid: <0.58.0>][Channel: ch2][Msg: "Hello other!"][Ctx: {my_ctx,<<"MH3">>}]
ok

% Unsubscribe 'MH2' from ch1
Expand All @@ -106,7 +113,7 @@ ok

% Publish again to 'ch1'
ebus:pub(ch1, "Hello again!").
[Pid: <0.50.0>][Channel: ch1][Msg: "Hello again!"][Ctx: undefined]
[Pid: <0.51.0>][Channel: ch1][Msg: "Hello again!"]
ok
```

Expand Down Expand Up @@ -193,7 +200,8 @@ This module provides all needed functions to create and manage message handlers.
to create a message handler:

- Passing an anonymous function as callback (as we saw previously). The callback fun must be compliant
with the spec: `fun(({Channel :: any(), Payload :: any()}, Context :: any()) -> any())`.
with the spec: `fun(({Channel :: any(), Payload :: any()}, Context :: any()) -> any())`, or
`fun(({Channel :: any(), Payload :: any()}) -> any())`.
- Passing an existing module that implements the `ebus_handler` behavior, which defines the
callback: `handle_msg({Channel :: any(), Payload :: any()}, Context :: any()) -> any()`.

Expand All @@ -202,7 +210,7 @@ Both cases receives the same arguments:

- `Channel` is the logical mechanism that allows communicate two or more endpoints each other
(either Pub/Sub or Point-to-Point) through messages.
- `Payload` is the message itself, the content af what you published or dispatched.
- `Payload` is the message itself, the content what you published or dispatched.
- `Context` is an optional parameter that you can pass in the moment of the handler creation,
and you want to be able to recovered at the moment of the `handle_msg` invocation.

Expand Down
12 changes: 9 additions & 3 deletions src/ebus_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@
-type context() :: any().
-type option() :: {monitors, [pid()]} | {pool, pid()}.
-type options() :: [option()].
-type handle_fun() :: fun((ebus:message(), context()) -> any()).
-type handle_fun() :: fun((ebus:message()) -> any()) |
fun((ebus:message(), context()) -> any()).
-type callback() :: module() | handle_fun().
-type status() :: exiting | garbage_collecting | waiting | running |
runnable | suspended.
Expand Down Expand Up @@ -166,8 +167,13 @@ handle_info({ebus, Event}, #state{pool = Pool} = State) when is_pid(Pool) ->
{noreply, State};
handle_info({ebus, Event}, #state{callback = CB, context = Ctx} = State) ->
case is_function(CB) of
true -> CB(Event, Ctx);
false -> CB:handle_msg(Event, Ctx)
true ->
case erlang:fun_info(CB, arity) of
{arity, 2} -> CB(Event, Ctx);
{arity, 1} -> CB(Event)
end;
false ->
CB:handle_msg(Event, Ctx)
end,
{noreply, State};
handle_info(_Info, State) ->
Expand Down
38 changes: 34 additions & 4 deletions test/ebus_local_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
end_per_testcase/2]).

%% Tests
-export([ebus_pub_sub/1, ebus_pool/1]).
-export([ebus_basic/1, ebus_pub_sub/1, ebus_pool/1]).

-define(TAB, ebus_test).
-define(HANDLER, my_test_handler).
Expand All @@ -45,7 +45,7 @@
%%%===================================================================

all() ->
[ebus_pub_sub, ebus_pool].
[ebus_basic, ebus_pub_sub, ebus_pool].

init_per_suite(Config) ->
application:start(ebus),
Expand All @@ -67,6 +67,36 @@ end_per_testcase(_, Config) ->
%%% Exported Tests Functions
%%%===================================================================

ebus_basic(_Config) ->
%% Debug
ct:print("\e[1;96m 'ebus_basic' testcase. \e[0m"),

%% Callback funs
F1 = fun({Ch, Msg}) ->
ct:print("\e[1;1m [Pid: ~p][Channel: ~p][Msg: ~p]~n \e[0m",
[self(), Ch, Msg])
end,
F2 = fun({Ch, Msg}, Ctx) ->
ct:print("\e[1;1m [Pid: ~p][Channel: ~p][Msg: ~p][Ctx: ~p]~n \e[0m",
[self(), Ch, Msg, Ctx])
end,

%% Handlers
MH1 = ebus_handler:new(F1),
MH2 = ebus_handler:new(F2, 2),
MH3 = ebus_handler:new(F2, 2),

%% Subscribe handlers
ok = ebus:sub(ch1, [MH1, MH2, MH3]),

%% Publish to 'ch1'
ok = ebus:pub(ch1, <<"Hi!">>),
timer:sleep(500),

%% End
cleanup([MH1, MH2, MH3]),
ok.

ebus_pub_sub(_Config) ->
%% Debug
ct:print("\e[1;96m 'ebus_pub_sub' testcase. \e[0m"),
Expand All @@ -79,7 +109,7 @@ ebus_pub_sub(_Config) ->
MH2 = ebus_handler:new(?HANDLER, <<"MH2">>),
MH3 = ebus_handler:new(?HANDLER, <<"MH3">>),

%% Create anonymous handler
%% Create handlers with anonymous fun
AH1 = ebus_handler:new(fun my_test_handler:handle_msg/2, <<"AH1">>),

%% Subscribe MH1 and MH2
Expand Down Expand Up @@ -186,7 +216,7 @@ ebus_pub_sub(_Config) ->
[] = ets:lookup(?TAB, ebus_util:build_name([<<"ID4">>, <<"MH1">>])),

%% End
cleanup([MH1, MH2, MH3]),
cleanup([MH1, MH2, MH3, AH1]),
ok.

ebus_pool(_Config) ->
Expand Down
2 changes: 1 addition & 1 deletion test/ebus_pg2_gproc_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ t_pub_sub(Module) ->
MH2 = ebus_handler:new(?HANDLER, <<"MH2">>),
MH3 = ebus_handler:new(?HANDLER, <<"MH3">>),

%% Create anonymous handler
%% Create handlers with anonymous fun
AH1 = ebus_handler:new(fun my_test_handler:handle_msg/2, <<"AH1">>),

%% Subscribe MH1 and MH2
Expand Down

0 comments on commit 47d035f

Please sign in to comment.