Skip to content
This repository has been archived by the owner on Apr 22, 2024. It is now read-only.

Commit

Permalink
group stteam docs
Browse files Browse the repository at this point in the history
  • Loading branch information
macpie committed Aug 29, 2019
1 parent 8e38eb7 commit b171cb8
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 12 deletions.
37 changes: 31 additions & 6 deletions src/group/libp2p_ack_stream.erl
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
%%%-------------------------------------------------------------------
%% @doc
%% == Libp2p Ack Stream ==
%% @see libp2p_framed_stream
%% @end
%%%-------------------------------------------------------------------
-module(libp2p_ack_stream).

-include("pb/libp2p_ack_stream_pb.hrl").
Expand All @@ -10,9 +16,14 @@
{ok, Ref::any()} | {error, term()}.
-callback handle_ack(State::any(), Ref::any(), Seq::[pos_integer()], Reset::boolean()) -> ok.

%% API
%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export([send_ack/3]).
%% libp2p_framed_stream

%% ------------------------------------------------------------------
%% libp2p_framed_stream Function Exports
%% ------------------------------------------------------------------
-export([server/4, client/2, init/3, handle_data/3, handle_send/5, handle_info/3]).


Expand All @@ -25,22 +36,33 @@

-define(ACK_STREAM_TIMEOUT, timer:minutes(5)).

%% API
%%
%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------

%%%-------------------------------------------------------------------
%% @doc
%% Send ack message
%% @end
%%%-------------------------------------------------------------------
-spec send_ack(pid(), pos_integer(), boolean()) -> ok.
send_ack(Pid, Seq, Reset) ->
Pid ! {send_ack, Seq, Reset},
ok.

%% libp2p_framed_stream
%%
%% ------------------------------------------------------------------
%% libp2p_framed_stream Function Definitions
%% ------------------------------------------------------------------

%% @hidden
client(Connection, Args) ->
libp2p_framed_stream:client(?MODULE, Connection, Args).

%% @hidden
server(Connection, Path, _TID, Args) ->
libp2p_framed_stream:server(?MODULE, Connection, [Path | Args]).

%% @hidden
init(server, Connection, [Path, AckModule, AckState | _]) ->
case AckModule:accept_stream(AckState, self(), Path) of
{ok, AckRef} ->
Expand All @@ -55,6 +77,7 @@ init(client, Connection, [AckRef, AckModule, AckState | _]) ->
{ok, #state{connection=Connection,
ack_ref=AckRef, ack_module=AckModule, ack_state=AckState}}.

%% @hidden
handle_data(_Kind, Data, State=#state{ack_ref=AckRef, ack_module=AckModule, ack_state=AckState}) ->
case libp2p_ack_stream_pb:decode_msg(Data, libp2p_ack_frame_pb) of
#libp2p_ack_frame_pb{messages=Bin, seqs=Seq} when Bin /= [] ->
Expand All @@ -71,6 +94,7 @@ handle_data(_Kind, Data, State=#state{ack_ref=AckRef, ack_module=AckModule, ack_
{noreply, State}
end.

%% @hidden
handle_send(_Kind, From, Msgs, Timeout, State=#state{}) when is_list(Msgs) ->
{Seqs, Data} = lists:unzip(Msgs),
Msg = #libp2p_ack_frame_pb{messages=Data, seqs=Seqs},
Expand All @@ -79,6 +103,7 @@ handle_send(_Kind, From, {Data, Seq}, Timeout, State=#state{}) ->
Msg = #libp2p_ack_frame_pb{messages=[Data], seqs=[Seq]},
{ok, {reply, From, pending}, libp2p_ack_stream_pb:encode_msg(Msg), Timeout, State#state{}}.

%% @hidden
handle_info(_Kind, {send_ack, Seq, Reset}, State=#state{}) ->
Msg = #libp2p_ack_frame_pb{seqs=Seq, reset=Reset},
{noreply, State, libp2p_ack_stream_pb:encode_msg(Msg)}.
31 changes: 25 additions & 6 deletions src/group/libp2p_gossip_stream.erl
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
%%%-------------------------------------------------------------------
%% @doc
%% == Libp2p Gossip Stream ==
%% @see libp2p_framed_stream
%% @end
%%%-------------------------------------------------------------------
-module(libp2p_gossip_stream).

-include("pb/libp2p_gossip_pb.hrl").
Expand All @@ -9,9 +15,14 @@
Session::pid(),
Stream::pid()) -> ok | {error, term()}.

%% API
%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export([encode/2]).
%% libp2p_framed_stream

%% ------------------------------------------------------------------
%% libp2p_framed_stream Function Exports
%% ------------------------------------------------------------------
-export([server/4, client/2, init/3, handle_data/3]).


Expand All @@ -21,20 +32,27 @@
handler_state :: any()
}).

%% API
%%
%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------

encode(Key, Data) ->
Msg = #libp2p_gossip_frame_pb{key=Key, data=Data},
libp2p_gossip_pb:encode_msg(Msg).

%% libp2p_framed_stream
%%
%% ------------------------------------------------------------------
%% libp2p_framed_stream Function Definitions
%% ------------------------------------------------------------------

%% @hidden
client(Connection, Args) ->
libp2p_framed_stream:client(?MODULE, Connection, Args).

%% @hidden
server(Connection, _Path, _TID, Args) ->
libp2p_framed_stream:server(?MODULE, Connection, Args).

%% @hidden
init(server, Connection, [HandlerModule, HandlerState]) ->
{ok, Session} = libp2p_connection:session(Connection),
State = #state{connection=Connection,
Expand All @@ -59,6 +77,7 @@ init(client, Connection, [HandlerModule, HandlerState]) ->
handler_module=HandlerModule,
handler_state=HandlerState}}.

%% @hidden
handle_data(_, Data, State=#state{handler_module=HandlerModule,
handler_state=HandlerState}) ->
#libp2p_gossip_frame_pb{key=Key, data=Bin} =
Expand Down

0 comments on commit b171cb8

Please sign in to comment.