Skip to content

Commit

Permalink
Merge pull request #2 from cabol/cabol.version_0.1
Browse files Browse the repository at this point in the history
Cabol.version 0.1
  • Loading branch information
cabol committed Jun 4, 2015
2 parents 892ca5b + ca6530f commit 0db994e
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 41 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
PROJECT = ebus

CONFIG ?= test/config/test.config
CONFIG ?= test/test.config

ifdef EBUS_DIST
DEPS = gproc poolboy riak_core
Expand Down
42 changes: 39 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ application:start(ebus).
ok
```

Then in `node1` create a handler with subscribe it to a topic:
Then in `node1` create a handler and subscription to a topic:

```erlang
% Anonymous handler function
Expand Down Expand Up @@ -172,6 +172,15 @@ in case of any demo o simple test. But the right way would be create your own me
the `ebus_handler` beahvior. Because in this way, your handler will be part of the supervision tree,
and you will be able to use other features too, that we'll cover later.

First, we have to create an Erlang module to implement the behavior `ebus_handler`, which defines a
callback to handling message logic: `handle_msg({Topic, Payload}, Context)`, where:

- `Topic` is the topic/channel where message comes from.
- `Payload` is the message itself, the content af what you published or dispatched.
- `Context` is an optional parameter that you can pass in the moment of the handler creation,
and you want to be able to recovered at the moment of the `handle_msg` invocation.


### my_handler.erl

```erlang
Expand All @@ -194,8 +203,10 @@ Once you have compiled your module(s) and started an Erlang console:
application:start(ebus).
ok

% Create a new handler
MH1 = ebus_handler:new(my_handler).
% Create a new handler, passing a context as argument
% In this the context is a simple binary string with the name of the handler,
% but it can be anything that you want (tuple, record, map, etc.)
MH1 = ebus_handler:new(my_handler, <<"MH1">>).
<0.49.0>

% From here, everything is the same as previous example
Expand All @@ -214,6 +225,31 @@ and now you have `ebus` running in distributed fashion, it's extremely easy, you
anything at all.


Task Executors (worker pool)
----------------------------

Suppose now that you have a handler that takes a while processing each message/event, so it will
be blocked until complete the task, and for some scenarios would be unthinkable. Therefore,
`ebus_handler` module gives you the option to create a pool of workers attached to your handler,
and is totally transparent to you.

```erlang
% Start ebus
application:start(ebus).
ok

% Create a handler with a worker pool (3 workers)
HandlerPool = ebus_handler:new_pool(my_pool_1, 3, my_handler).
<0.49.0>

% And that's it, now the load will be distributed among the workers
% From here everything is as previously
% Finally, let's subscribe this new handler with workers to some topic
ebus:sub(my_topic, HandlerPool).
ok
```


ErlBus with Riak Core and Gproc local
-------------------------------------

Expand Down
18 changes: 9 additions & 9 deletions src/ebus.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
%% ebus types
-type cmd() :: sub | unsub | pub | dispatch | get_subscribers | get_topics.
-type topic() :: any().
-type message() :: any().
-type event() :: {topic(), message()}.
-type payload() :: any().
-type message() :: {topic(), payload()}.
-type handler() :: pid().
-type callback() :: {Module :: module(), Function :: atom(), Args :: [any()]}.
-type reason() :: no_such_topic | no_handler | internal.
Expand All @@ -62,8 +62,8 @@
%% Exported types
-export_type([cmd/0,
topic/0,
payload/0,
message/0,
event/0,
handler/0,
callback/0,
ebus_ret/0]).
Expand Down Expand Up @@ -114,7 +114,7 @@
%% @doc Sends the `Message` to all subscribers of the `Topic`.
-callback pub(Topic, Message) -> Response when
Topic :: topic(),
Message :: message(),
Message :: payload(),
Response :: ebus_ret().

%% @doc Returns a list with all subscribers to the `Topic`.
Expand All @@ -129,7 +129,7 @@
%% @doc Sends the `Message` to `Handler`, which is subscribed to `Topic`.
-callback dispatch(Topic, Message, Handler) -> Response when
Topic :: topic(),
Message :: message(),
Message :: payload(),
Handler :: handler(),
Response :: ebus_ret().

Expand Down Expand Up @@ -183,11 +183,11 @@ unsub(Topic, Handler) ->
unsub(Name, Topic, Handler) ->
gen_server:call(Name, {unsub, Topic, Handler}).

-spec pub(topic(), message()) -> ebus_ret().
-spec pub(topic(), payload()) -> ebus_ret().
pub(Topic, Message) ->
gen_server:call(?SERVER, {pub, Topic, Message}).

-spec pub(name(), topic(), message()) -> ebus_ret().
-spec pub(name(), topic(), payload()) -> ebus_ret().
pub(Name, Topic, Message) ->
gen_server:call(Name, {pub, Topic, Message}).

Expand All @@ -207,11 +207,11 @@ get_topics() ->
get_topics(Name) ->
gen_server:call(Name, get_topics).

-spec dispatch(topic(), message(), handler()) -> ebus_ret().
-spec dispatch(topic(), payload(), handler()) -> ebus_ret().
dispatch(Topic, Message, Handler) ->
gen_server:call(?SERVER, {dispatch, Topic, Message, Handler}).

-spec dispatch(name(), topic(), message(), handler()) -> ebus_ret().
-spec dispatch(name(), topic(), payload(), handler()) -> ebus_ret().
dispatch(Name, Topic, Message, Handler) ->
gen_server:call(Name, {dispatch, Topic, Message, Handler}).

Expand Down
8 changes: 4 additions & 4 deletions src/ebus_dist.erl
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ unsub(Topic, Handler, Opts) ->
Callback = {ebus_gproc, unsub, [Topic, Handler]},
do_write(Topic, Topic, unsub, Callback, Opts).

-spec pub(ebus:topic(), ebus:message()) -> ebus:ebus_ret().
-spec pub(ebus:topic(), ebus:payload()) -> ebus:ebus_ret().
pub(Topic, Message) ->
pub(Topic, Message, []).

-spec pub(ebus:topic(), ebus:message(), options()) -> ebus:ebus_ret().
-spec pub(ebus:topic(), ebus:payload(), options()) -> ebus:ebus_ret().
pub(Topic, Message, Opts) ->
Callback = {ebus_gproc, pub, [Topic, Message]},
do_write(Topic, Topic, pub, Callback, Opts).
Expand All @@ -116,13 +116,13 @@ get_topics(Opts) ->
do_write(undefined, undefined, get_topics, Callback, Opts).

-spec dispatch(
ebus:topic(), ebus:message(), ebus:handler()
ebus:topic(), ebus:payload(), ebus:handler()
) -> ebus:ebus_ret().
dispatch(Topic, Message, Handler) ->
dispatch(Topic, Message, Handler, []).

-spec dispatch(
ebus:topic(), ebus:message(), ebus:handler(), options()
ebus:topic(), ebus:payload(), ebus:handler(), options()
) -> ebus:ebus_ret().
dispatch(Topic, Message, Handler, Opts) ->
Callback = {ebus_gproc, dispatch, [Topic, Message, Handler]},
Expand Down
4 changes: 1 addition & 3 deletions src/ebus_dist_cmd_vnode.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,7 @@ start_vnode(I) ->
%% <code>reg|unreg|send|sub|unsub|pub</code>
%% <br/>
%% <li>PrefList: Riak Core preflist.</li>
%% <li>ReqId: Request id so the caller can verify the response.</li>
%% <li>Ref: Unique reference to the GS that will be created.</li>
%% <li>Key: Key which the GS will be registered.</li>
%% <li>Identity: Any value to identify the command.</li>
%% <li>CallbackSpec: Callback specification. This will applied when
%% messages arrives. If `Mod' is `none', the callback will be treated
%% as a fun.</li>
Expand Down
4 changes: 2 additions & 2 deletions src/ebus_gproc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ unsub(Topic, Handler) ->
gproc_lib:remove_reg(Key, Handler, unreg),
ok.

-spec pub(ebus:topic(), ebus:message()) -> ok.
-spec pub(ebus:topic(), ebus:payload()) -> ok.
pub(Topic, Message) ->
gproc:send({p, l, {?MODULE, Topic}}, {ebus, {Topic, Message}}),
ok.
Expand All @@ -64,7 +64,7 @@ get_topics() ->
L = [N || [N] <- ets:match(gproc, Pattern)],
ebus_util:rem_dups_from_list(L).

-spec dispatch(ebus:topic(), ebus:message(), ebus:handler()) -> ok.
-spec dispatch(ebus:topic(), ebus:payload(), ebus:handler()) -> ok.
dispatch(Topic, Message, Handler) ->
Handler ! {ebus, {Topic, Message}},
ok.
8 changes: 4 additions & 4 deletions src/ebus_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@
-type pool_opt() :: {name, atom()} | {size, integer()}.
-type option() :: {monitors, [pid()]} | {pool, [pool_opt()]}.
-type options() :: [option()].
-type handle_fun() :: fun((ebus:topic(), ebus:message()) -> ok).
-type handle_fun() :: fun((ebus:topic(), ebus:payload()) -> ok).

%%%===================================================================
%%% Callback API
%%%===================================================================

%% @doc This function handles incoming events/messages.
-callback handle_msg(Event, Context) -> Response when
Event :: ebus:event(),
%% @doc This function handles incoming messages/events.
-callback handle_msg(Message, Context) -> Response when
Message :: ebus:message(),
Context :: context(),
Response :: any().

Expand Down
4 changes: 2 additions & 2 deletions src/ebus_pg2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ unsub(Topic, Handler) ->
ok
end.

-spec pub(ebus:topic(), ebus:message()) -> ok.
-spec pub(ebus:topic(), ebus:payload()) -> ok.
pub(Topic, Msg) ->
Pids = get_subscribers(Topic),
lists:foreach(fun(Pid) -> Pid ! {ebus, {Topic, Msg}} end, Pids).
Expand All @@ -77,7 +77,7 @@ get_subscribers(Topic) ->
get_topics() ->
pg2:which_groups().

-spec dispatch(ebus:topic(), ebus:message(), ebus:handler()) -> ok.
-spec dispatch(ebus:topic(), ebus:payload(), ebus:handler()) -> ok.
dispatch(Topic, Msg, Handler) ->
Handler ! {ebus, {Topic, Msg}},
ok.
11 changes: 0 additions & 11 deletions test/handlers/test_handler.erl

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
%% API
-export([handle_msg/2]).

handle_msg({Topic, {Id, Payload}}, Context) ->
%% Message Handler
handle_msg({Topic, {Id, Payload} = Msg}, Context) ->
ct:print("\e[1;1m[Pid: ~p][Topic: ~p][Msg: ~p][Ctx: ~p]~n\e[0m",
[self(), Topic, Payload, Context]),
[self(), Topic, Msg, Context]),
ets:insert(ebus_test, {ebus_util:build_name([Id, Context]), Payload}).
File renamed without changes.

0 comments on commit 0db994e

Please sign in to comment.