Skip to content
This repository has been archived by the owner on Apr 22, 2024. It is now read-only.

Macpie/ch4600/improve docs #205

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions src/group/libp2p_ack_stream.erl
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
%%%-------------------------------------------------------------------
%% @doc
%% == Libp2p Ack Stream ==
%% @see libp2p_framed_stream
%% @end
%%%-------------------------------------------------------------------
-module(libp2p_ack_stream).

-include("pb/libp2p_ack_stream_pb.hrl").
Expand All @@ -10,9 +16,14 @@
{ok, Ref::any()} | {error, term()}.
-callback handle_ack(State::any(), Ref::any(), Seq::[pos_integer()], Reset::boolean()) -> ok.

%% API
%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export([send_ack/3]).
%% libp2p_framed_stream

%% ------------------------------------------------------------------
%% libp2p_framed_stream Function Exports
%% ------------------------------------------------------------------
-export([server/4, client/2, init/3, handle_data/3, handle_send/5, handle_info/3]).


Expand All @@ -25,22 +36,33 @@

-define(ACK_STREAM_TIMEOUT, timer:minutes(5)).

%% API
%%
%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------

%%%-------------------------------------------------------------------
%% @doc
%% Send ack message
%% @end
%%%-------------------------------------------------------------------
-spec send_ack(pid(), pos_integer(), boolean()) -> ok.
send_ack(Pid, Seq, Reset) ->
Pid ! {send_ack, Seq, Reset},
ok.

%% libp2p_framed_stream
%%
%% ------------------------------------------------------------------
%% libp2p_framed_stream Function Definitions
%% ------------------------------------------------------------------

%% @hidden
client(Connection, Args) ->
libp2p_framed_stream:client(?MODULE, Connection, Args).

%% @hidden
server(Connection, Path, _TID, Args) ->
libp2p_framed_stream:server(?MODULE, Connection, [Path | Args]).

%% @hidden
init(server, Connection, [Path, AckModule, AckState | _]) ->
%% Catch errors from the handler module since the handling group
%% may have already stopped or crashed.
Expand All @@ -65,6 +87,7 @@ init(client, Connection, [AckRef, AckModule, AckState | _]) ->
{ok, #state{connection=Connection,
ack_ref=AckRef, ack_module=AckModule, ack_state=AckState}}.

%% @hidden
handle_data(_Kind, Data, State=#state{ack_ref=AckRef, ack_module=AckModule, ack_state=AckState}) ->
case libp2p_ack_stream_pb:decode_msg(Data, libp2p_ack_frame_pb) of
#libp2p_ack_frame_pb{messages=Bin, seqs=Seq} when Bin /= [] ->
Expand All @@ -81,6 +104,7 @@ handle_data(_Kind, Data, State=#state{ack_ref=AckRef, ack_module=AckModule, ack_
{noreply, State}
end.

%% @hidden
handle_send(_Kind, From, Msgs, Timeout, State=#state{}) when is_list(Msgs) ->
{Seqs, Data} = lists:unzip(Msgs),
Msg = #libp2p_ack_frame_pb{messages=Data, seqs=Seqs},
Expand All @@ -89,6 +113,7 @@ handle_send(_Kind, From, {Data, Seq}, Timeout, State=#state{}) ->
Msg = #libp2p_ack_frame_pb{messages=[Data], seqs=[Seq]},
{ok, {reply, From, pending}, libp2p_ack_stream_pb:encode_msg(Msg), Timeout, State#state{}}.

%% @hidden
handle_info(_Kind, {send_ack, Seq, Reset}, State=#state{}) ->
Msg = #libp2p_ack_frame_pb{seqs=Seq, reset=Reset},
{noreply, State, libp2p_ack_stream_pb:encode_msg(Msg)}.
31 changes: 25 additions & 6 deletions src/group/libp2p_gossip_stream.erl
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
%%%-------------------------------------------------------------------
%% @doc
%% == Libp2p Gossip Stream ==
%% @see libp2p_framed_stream
%% @end
%%%-------------------------------------------------------------------
-module(libp2p_gossip_stream).

-include("pb/libp2p_gossip_pb.hrl").
Expand All @@ -9,9 +15,14 @@
Session::pid(),
Stream::pid()) -> ok | {error, term()}.

%% API
%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export([encode/2]).
%% libp2p_framed_stream

%% ------------------------------------------------------------------
%% libp2p_framed_stream Function Exports
%% ------------------------------------------------------------------
-export([server/4, client/2, init/3, handle_data/3]).


Expand All @@ -21,20 +32,27 @@
handler_state :: any()
}).

%% API
%%
%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------

encode(Key, Data) ->
Msg = #libp2p_gossip_frame_pb{key=Key, data=Data},
libp2p_gossip_pb:encode_msg(Msg).

%% libp2p_framed_stream
%%
%% ------------------------------------------------------------------
%% libp2p_framed_stream Function Definitions
%% ------------------------------------------------------------------

%% @hidden
client(Connection, Args) ->
libp2p_framed_stream:client(?MODULE, Connection, Args).

%% @hidden
server(Connection, _Path, _TID, Args) ->
libp2p_framed_stream:server(?MODULE, Connection, Args).

%% @hidden
init(server, Connection, [HandlerModule, HandlerState]) ->
{ok, Session} = libp2p_connection:session(Connection),
%% Catch errors from the handler module in accepting a stream. The
Expand All @@ -58,6 +76,7 @@ init(client, Connection, [HandlerModule, HandlerState]) ->
handler_module=HandlerModule,
handler_state=HandlerState}}.

%% @hidden
handle_data(_, Data, State=#state{handler_module=HandlerModule,
handler_state=HandlerState}) ->
#libp2p_gossip_frame_pb{key=Key, data=Bin} =
Expand Down
61 changes: 55 additions & 6 deletions src/identify/libp2p_identify.erl
Original file line number Diff line number Diff line change
@@ -1,17 +1,33 @@
%%%-------------------------------------------------------------------
%% @doc
%% == Libp2p2 Identify ==
%% @end
%%%-------------------------------------------------------------------
-module(libp2p_identify).

-include("pb/libp2p_peer_pb.hrl").
-include("pb/libp2p_identify_pb.hrl").

-type identify() :: #libp2p_signed_identify_pb{}.
-export([from_map/2, encode/1, decode/1, verify/1,
pubkey_bin/1, peer/1, observed_maddr/1, observed_addr/1, nonce/1]).


-type signed_identify() :: #libp2p_signed_identify_pb{identify :: libp2p_identify:identify() | undefined,
signature :: iodata() | undefined}.
-type identify() :: #libp2p_identify_pb{peer :: libp2p_identify:signed_identify() | undefined,
observed_addr :: iodata() | undefined,
nonce :: iodata() | undefined}.
-type identify_map() :: #{ peer => libp2p_peer:peer(),
observed_addr => string(),
nonce => binary()
}.
-export_type([identify/0, identify_map/0]).
-export([from_map/2, encode/1, decode/1, verify/1,
pubkey_bin/1, peer/1, observed_maddr/1, observed_addr/1, nonce/1]).

%%--------------------------------------------------------------------
%% @doc
%% Create an identify from map
%% @end
%%--------------------------------------------------------------------
-spec from_map(identify_map(), libp2p_crypto:sig_fun()) -> identify().
from_map(Map, SigFun) ->
Identify = #libp2p_identify_pb{peer=maps:get(peer, Map),
Expand All @@ -21,31 +37,60 @@ from_map(Map, SigFun) ->
Signature = SigFun(libp2p_identify_pb:encode_msg(Identify)),
#libp2p_signed_identify_pb{identify=Identify, signature=Signature}.

%%--------------------------------------------------------------------
%% @doc
%% Get peer
%% @end
%%--------------------------------------------------------------------
-spec peer(identify()) -> libp2p_peer:peer().
peer(#libp2p_signed_identify_pb{identify=#libp2p_identify_pb{peer=Peer}}) ->
Peer.

%%--------------------------------------------------------------------
%% @doc
%% Get pubkey bin
%% @end
%%--------------------------------------------------------------------
-spec pubkey_bin(identify()) -> libp2p_crypto:pubkey_bin().
pubkey_bin(Identify=#libp2p_signed_identify_pb{}) ->
libp2p_peer:pubkey_bin(peer(Identify)).

%%--------------------------------------------------------------------
%% @doc
%% Get observed address
%% @end
%%--------------------------------------------------------------------
-spec observed_addr(identify()) -> string().
observed_addr(Identify=#libp2p_signed_identify_pb{}) ->
multiaddr:to_string(observed_maddr(Identify)).

-spec observed_maddr(identify()) -> string().
observed_maddr(#libp2p_signed_identify_pb{identify=#libp2p_identify_pb{observed_addr=ObservedAddr}}) ->
ObservedAddr.

%%--------------------------------------------------------------------
%% @doc
%% Get nonce
%% @end
%%--------------------------------------------------------------------
-spec nonce(identify()) -> string().
nonce(#libp2p_signed_identify_pb{identify=#libp2p_identify_pb{nonce=Nonce}}) ->
Nonce.

%% @doc Encodes the given identify into its binary form.
%%--------------------------------------------------------------------
%% @doc
%% Encodes the given identify into its binary form.
%% @end
%%--------------------------------------------------------------------
-spec encode(identify()) -> binary().
encode(Msg=#libp2p_signed_identify_pb{}) ->
libp2p_identify_pb:encode_msg(Msg).

%% @doc Decodes a given binary into an identify.
%%--------------------------------------------------------------------
%% @doc
%% Decodes a given binary into an identify.
%% @end
%%--------------------------------------------------------------------
-spec decode(binary()) -> {ok, identify()} | {error, term()}.
decode(Bin) ->
try
Expand All @@ -55,7 +100,11 @@ decode(Bin) ->
_:_ -> {error, invalid_binary}
end.

%% @doc Cryptographically verifies a given identify.
%%--------------------------------------------------------------------
%% @doc
%% Cryptographically verifies a given identify.
%% @end
%%--------------------------------------------------------------------
-spec verify(identify()) -> {ok, identify()} | {error, term()}.
verify(Msg=#libp2p_signed_identify_pb{identify=Ident=#libp2p_identify_pb{}, signature=Signature}) ->
EncodedIdentify = libp2p_identify_pb:encode_msg(Ident),
Expand Down
35 changes: 29 additions & 6 deletions src/identify/libp2p_stream_identify.erl
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
%%%-------------------------------------------------------------------
%% @doc
%% == Libp2p identify Stream ==
%% @see libp2p_framed_stream
%% @end
%%%-------------------------------------------------------------------
-module(libp2p_stream_identify).

-include("pb/libp2p_identify_pb.hrl").

-behavior(libp2p_framed_stream).

-export([dial_spawn/3]).
-export([client/2, server/4, init/3, handle_data/3, handle_info/3]).
%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export([client/2, server/4, dial_spawn/3]).

%% ------------------------------------------------------------------
%% libp2p_framed_stream Function Exports
%% ------------------------------------------------------------------
-export([init/3, handle_data/3, handle_info/3]).

-include("pb/libp2p_identify_pb.hrl").

-record(state,
{ tid :: ets:tab(),
Expand All @@ -17,6 +30,10 @@
-define(PATH, "identify/1.0.0").
-define(TIMEOUT, 5000).

%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------

-spec dial_spawn(Session::pid(), ets:tab(), Handler::pid()) -> pid().
dial_spawn(Session, TID, Handler) ->
spawn(fun() ->
Expand All @@ -25,12 +42,18 @@ dial_spawn(Session, TID, Handler) ->
libp2p_session:dial_framed_stream(Path, Session, ?MODULE, [TID, Handler])
end).

%% @hidden
client(Connection, Args=[_TID, _Handler]) ->
libp2p_framed_stream:client(?MODULE, Connection, Args).

%% @hidden
server(Connection, Path, TID, []) ->
libp2p_framed_stream:server(?MODULE, Connection, [Path, TID]).

%% ------------------------------------------------------------------
%% libp2p_framed_stream Function Definitions
%% ------------------------------------------------------------------
%% @hidden
init(client, Connection, [_TID, Handler]) ->
case libp2p_connection:session(Connection) of
{ok, Session} ->
Expand All @@ -52,13 +75,13 @@ init(server, Connection, [Path, TID]) ->
SigFun),
{stop, normal, libp2p_identify:encode(Identify)}.


%% @hidden
handle_data(client, Data, State=#state{}) ->
erlang:cancel_timer(State#state.timeout),
State#state.handler ! {handle_identify, State#state.session, libp2p_identify:decode(Data)},
{stop, normal, State}.


%% @hidden
handle_info(client, identify_timeout, State=#state{}) ->
State#state.handler ! {handle_identify, State#state.session, {error, timeout}},
lager:notice("Identify timed out"),
Expand Down
Loading