Skip to content

Commit

Permalink
Merge pull request #3 from cabol/cabol.version_0.1
Browse files Browse the repository at this point in the history
Fixed anonymous handler in ebus_handler. Fixed documentation. Added C…
  • Loading branch information
cabol committed Jun 5, 2015
2 parents 0db994e + 6ebb3a2 commit 1d462e9
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 18 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Change Log

## [Unreleased](https://github.com/cabol/erlbus/tree/HEAD)

**Merged pull requests:**

- Cabol.version 0.1 [\#2](https://github.com/cabol/erlbus/pull/2) ([cabol](https://github.com/cabol))

- Fixed README [\#1](https://github.com/cabol/erlbus/pull/1) ([cabol](https://github.com/cabol))



\* *This Change Log was automatically generated by [github_changelog_generator](https://github.com/skywinder/Github-Changelog-Generator)*
84 changes: 81 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,60 @@ and now you have `ebus` running in distributed fashion, it's extremely easy, you
anything at all.


Point-To-Point Example
----------------------

The great thing here is that you don't need something special to implement a point-to-point behavior.
Is as simple as this:

```erlang
ebus:dispatch(t1, "Hi!", MyHandler).
```

Instead of call `ebus:pub(Topic, Message)`, you call `ebus:dispatch(Topic, Message, Handler)`, and
the only difference is that you have to provide the `Handler` which will receive the message.
The reason of this is that you're free to implement your scheduling/dispatching strategy. Also,
you can use `ebus_util:get_best_pid(ListOfHandlers)` to find an available handler. For example:

```erlang
%% Start ebus
application:start(ebus).
ok

%% Create some handlers
MH1 = ebus_handler:new(my_handler, <<"MH1">>).
<0.47.0>
MH2 = ebus_handler:new(my_handler, <<"MH2">>).
<0.48.0>
MH3 = ebus_handler:new(my_handler, <<"MH3">>).
<0.49.0>

%% Subscribe created handlers
ebus:sub(my_topic, MH1).
ok
ebus:sub(my_topic, MH2).
ok
ebus:sub(my_topic, MH3).
ok

%% Get the subscribed handlers
Handlers = ebus:get_subscribers(my_topic).
[<0.47.0>, <0.48.0>, <0.49.0>]

%% Find an available handler
Handler = ebus_util:get_best_pid(Handlers).
<0.47.0>

ebus:dispatch(my_topic, "Hi!", Handler).
Topic: t1 - Msg: "Hi!"
ok
```

> **Note:**
> - The example above, assumes that you're working with the previous compiled handler `my_hanlder.erl`.

Task Executors (worker pool)
----------------------------

Expand Down Expand Up @@ -273,6 +327,30 @@ Start `ebus` on each one:
```erlang
% Start ebus
application:start(ebus).
12:05:30.633 [info] Application lager started on node 'node1@127.0.0.1'
12:05:30.743 [info] Application sasl started on node 'node1@127.0.0.1'
12:05:30.768 [info] Application crypto started on node 'node1@127.0.0.1'
12:05:30.801 [info] Application riak_sysmon started on node 'node1@127.0.0.1'
12:05:30.884 [info] Application os_mon started on node 'node1@127.0.0.1'
12:05:30.892 [info] alarm_handler: {set,{system_memory_high_watermark,[]}}
12:05:30.918 [info] Application basho_stats started on node 'node1@127.0.0.1'
12:05:30.938 [info] Application eleveldb started on node 'node1@127.0.0.1'
12:05:30.960 [info] Application pbkdf2 started on node 'node1@127.0.0.1'
12:05:30.981 [info] Application poolboy started on node 'node1@127.0.0.1'
12:05:31.086 [info] Starting reporters with []
12:05:31.086 [info] Application exometer_core started on node 'node1@127.0.0.1'
12:05:31.154 [info] Application clique started on node 'node1@127.0.0.1'
12:05:31.374 [warning] No ring file available.
12:05:31.607 [info] monitor long_schedule <0.160.0> [{timeout,62},{in,{code_server,call,2}},{out,{code_server,'-handle_on_load/4-fun-0-',1}}]
12:05:32.306 [info] New capability: {riak_core,vnode_routing} = proxy
12:05:32.626 [info] New capability: {riak_core,staged_joins} = true
12:05:32.772 [info] New capability: {riak_core,resizable_ring} = true
12:05:32.895 [info] New capability: {riak_core,fold_req_version} = v2
12:05:33.039 [info] New capability: {riak_core,security} = true
12:05:33.196 [info] New capability: {riak_core,bucket_types} = true
12:05:33.330 [info] New capability: {riak_core,net_ticktime} = true
12:05:33.779 [info] Application riak_core started on node 'node1@127.0.0.1'
12:05:33.950 [info] Application ebus started on node 'node1@127.0.0.1'
ok
```

Expand All @@ -294,7 +372,7 @@ Running Tests
$ make test


Building Edoc
-------------
Change Log
----------

No supported yet.
All notable changes to this project will be documented in the [CHANGELOG.md](CHANGELOG.md).
30 changes: 15 additions & 15 deletions src/ebus_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
-export([start_link/3]).
-export([new/1, new/2, new/3, delete/1]).
-export([new_pool/3, new_pool/4, new_pool/5]).
-export([new_anonymous/1, anonymous_handler/1]).
-export([get_module/1, get_context/1]).
-export([new_anonymous/1]).

%% gen_server callbacks
-export([init/1,
Expand Down Expand Up @@ -113,20 +113,6 @@ new_pool(Name, Size, Module, Context, Opts) ->
{ok, Pool} = poolboy:start_link(PoolArgs, [Module, Context]),
new(Module, Context, [{pool, Pool} | Opts]).

-spec new_anonymous(handle_fun()) -> ebus:handler().
new_anonymous(Fun) ->
spawn_link(?MODULE, anonymous_handler, [Fun]).

-spec anonymous_handler(handle_fun()) -> ebus:handler().
anonymous_handler(Fun) ->
receive
{ebus, {Topic, Msg}} ->
Fun(Topic, Msg),
anonymous_handler(Fun);
exit ->
ok
end.

-spec get_module(ebus:handler()) -> any().
get_module(Handler) ->
gen_server:call(Handler, get_mod).
Expand All @@ -135,6 +121,10 @@ get_module(Handler) ->
get_context(Handler) ->
gen_server:call(Handler, get_ctx).

-spec new_anonymous(handle_fun()) -> ebus:handler().
new_anonymous(Fun) ->
spawn_link(fun() -> anonymous_handler(Fun) end).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
Expand Down Expand Up @@ -196,3 +186,13 @@ parse_options([{pool, Pool} | Opts], State) ->
parse_options(Opts, State#state{pool = Pool});
parse_options([{_, _} | Opts], State) ->
parse_options(Opts, State).

%% @private
anonymous_handler(Fun) ->
receive
{ebus, {Topic, Msg}} ->
Fun(Topic, Msg),
anonymous_handler(Fun);
exit ->
ok
end.

0 comments on commit 1d462e9

Please sign in to comment.