Skip to content
This repository has been archived by the owner on Apr 22, 2024. It is now read-only.

try making this more async #371

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
]}.

{deps, [
sidejob,
{lager, "3.6.5"},
{ranch, "1.5.0"},
{libp2p_crypto, "1.4.0"},
Expand Down
3 changes: 3 additions & 0 deletions rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
{git,"https://gitlab.com/vagabond1/erlang-rocksdb",
{ref,"e391336db73ca3644050b308d20071fb79667022"}},
1},
{<<"sidejob">>,{pkg,<<"sidejob">>,<<"2.1.0">>},0},
{<<"small_ints">>,{pkg,<<"small_ints">>,<<"0.1.0">>},2},
{<<"splicer">>,{pkg,<<"splicer">>,<<"0.5.4">>},0},
{<<"throttle">>,{pkg,<<"lambda_throttle">>,<<"0.2.0">>},0}]}.
Expand All @@ -62,6 +63,7 @@
{<<"procket">>, <<"1FC0D557ACC0145DD167C566C68F83B183E923016904DFE980534B3C9150E92B">>},
{<<"ranch">>, <<"F04166F456790FEE2AC1AA05A02745CC75783C2BFB26D39FAF6AEFC9A3D3A58A">>},
{<<"rand_compat">>, <<"011646BC1F0B0C432FE101B816F25B9BBB74A085713CEE1DAFD2D62E9415EAD3">>},
{<<"sidejob">>, <<"5D6A7C9C620778CB1908E46B552D767DF2ED4D77070BB7B5B8773D4FF18D1D37">>},
{<<"small_ints">>, <<"82A824C8794A2DDC73CB5CD00EAD11331DB296521AD16A619C13D668572B868A">>},
{<<"splicer">>, <<"B221E3D037CAA9993ABB5C11B709026A7371AF4A7164219241EBC3B68ED331F4">>},
{<<"throttle">>, <<"E881B46D9836AFB70F3E2FA3BE9B0140805BA324ED26AA734FF6C5C1568C6CA7">>}]},
Expand All @@ -83,6 +85,7 @@
{<<"procket">>, <<"647C2D1F5006370E457522ECC69509F1020D7D558F3D6E196825E1F4F3660218">>},
{<<"ranch">>, <<"86D40FC42AA47BCB6952DDF1DBFD3DA04B5BA69AFB65C322C99845913250B11F">>},
{<<"rand_compat">>, <<"CDF7BE2B17308EC245B912C45FE55741F93B6E4F1A24BA6074F7137B0CC09BF4">>},
{<<"sidejob">>, <<"6DC3DAC041C8C07C64401ECD22684730DA1497F5F14377B3CA9C5B2B9A135181">>},
{<<"small_ints">>, <<"00B3BFF6C446711F8EA4EA942056F375E0F13C7983CC3950C6EA1DE014C7C416">>},
{<<"splicer">>, <<"3FC111BCEC469F4E62B2E7069A2ECFEACB40844D97CAF181D1DD3674EF0919E9">>},
{<<"throttle">>, <<"3EACFAAC1C2EBD0F17D77D9E96B1029BF07DED4AC233BA38883D70CDF1FFF740">>}]}
Expand Down
10 changes: 8 additions & 2 deletions src/group/libp2p_group_gossip.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,20 @@ remove_handler(Pid, Key) ->
%% @doc Send the given data to all members of the group for the given
%% gossip key. The implementation of the group determines the strategy
%% used for delivery. Delivery is best effort.
-spec send(pid(), string(), iodata() | fun(() -> iodata())) -> ok.
-spec send(pid(), string(), iodata() | fun((connection_kind()) -> iodata())) -> ok.
send(Pid, Key, Data) when is_list(Key), is_binary(Data) orelse is_function(Data) ->
gen_server:cast(Pid, {send, Key, Data}).

-spec connected_addrs(pid(), connection_kind() | all) -> [MAddr::string()].
connected_addrs(Pid, WorkerKind) ->
gen_server:call(Pid, {connected_addrs, WorkerKind}, infinity).
[ mk_multiaddr(A) || A <- gen_server:call(Pid, {connected_addrs, WorkerKind}, infinity)].

-spec connected_pids(Pid::pid(), connection_kind() | all) -> [pid()].
connected_pids(Pid, WorkerKind) ->
gen_server:call(Pid, {connected_pids, WorkerKind}, infinity).

mk_multiaddr(Addr) when is_binary(Addr) ->
libp2p_crypto:pubkey_bin_to_p2p(Addr);
mk_multiaddr(Value) ->
Value.

406 changes: 284 additions & 122 deletions src/group/libp2p_group_gossip_server.erl

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions src/group/libp2p_group_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ connected(info, {assign_stream, StreamPid, Path}, Data0=#data{}) ->
{ok, NewData} -> {keep_state, NewData};
_ -> {keep_state, Data}
end;
connected(info, {'EXIT', StreamPid, _Reason}, Data=#data{stream_pid=StreamPid, kind=inbound}) ->
%% don't try to reconnect inbound streams, it never seems to work
libp2p_group_server:request_target(Data#data.server, Data#data.kind, self(), Data#data.ref),
{next_state, closing, Data};
connected(info, {'EXIT', StreamPid, Reason}, Data=#data{stream_pid=StreamPid, target={MAddr, _}}) ->
%% The stream we're using died. Let's go back to connecting, but
%% do not trigger a connect retry right away, (re-)start the
Expand Down
13 changes: 7 additions & 6 deletions src/identify/libp2p_stream_identify.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
-define(PATH, "identify/1.0.0").
-define(TIMEOUT, 5000).

-spec dial_spawn(Session::pid(), ets:tab(), Handler::pid()) -> pid().
-spec dial_spawn(Session::pid(), ets:tab(), Handler::pid()) -> {pid(), reference()}.
dial_spawn(Session, TID, Handler) ->
spawn(fun() ->
Challenge = crypto:strong_rand_bytes(20),
Path = lists:flatten([?PATH, "/", base58:binary_to_base58(Challenge)]),
libp2p_session:dial_framed_stream(Path, Session, ?MODULE, [TID, Handler])
end).
spawn_monitor(
fun() ->
Challenge = crypto:strong_rand_bytes(20),
Path = lists:flatten([?PATH, "/", base58:binary_to_base58(Challenge)]),
libp2p_session:dial_framed_stream(Path, Session, ?MODULE, [TID, Handler])
end).

client(Connection, Args=[_TID, _Handler]) ->
libp2p_framed_stream:client(?MODULE, Connection, Args).
Expand Down
3 changes: 2 additions & 1 deletion src/libp2p.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
relcast,
libp2p_crypto,
throttle,
erbloom
erbloom,
sidejob
]},
{env,[]},

Expand Down
3 changes: 2 additions & 1 deletion src/libp2p_stream_stungun.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

-record(client_state, {
txn_id :: binary(),
handler :: pid()
handler :: pid(),
direction :: inbound | outbound
}).

%%
Expand Down
5 changes: 4 additions & 1 deletion src/libp2p_swarm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ start(Name) when is_atom(Name) ->
-spec start(atom(), swarm_opts()) -> {ok, pid()} | ignore | {error, term()}.
start(Name, Opts) ->
RegName = list_to_atom(atom_to_list(libp2p_swarm_sup) ++ "_" ++ atom_to_list(Name)),
SideJobRegName = list_to_atom(atom_to_list(libp2p_swarm_sidejob_sup) ++ "_" ++ atom_to_list(Name)),
{ok, _} = application:ensure_all_started(sidejob),
sidejob:new_resource(SideJobRegName, sidejob_supervisor, application:get_env(libp2p, sidejob_limit, 15000)),
case supervisor:start_link({local,RegName}, libp2p_swarm_sup, [Name, Opts]) of
{ok, Pid} ->
unlink(Pid),
Expand Down Expand Up @@ -178,7 +181,7 @@ store_peerbook(TID, Handle) ->
%% @doc Get the peerbook pid for a swarm.
-spec peerbook_pid(ets:tab() | pid()) -> pid().
peerbook_pid(Sup) when is_pid(Sup) ->
peerbook(tid(Sup));
peerbook_pid(tid(Sup));
peerbook_pid(TID) ->
libp2p_swarm_sup:peerbook(TID).

Expand Down
29 changes: 18 additions & 11 deletions src/libp2p_swarm_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
sig_fun :: libp2p_crypto:sig_fun(),
ecdh_fun :: libp2p_crypto:ecdh_fun(),
pid_gc_monitor = make_ref() :: reference(),
monitors=[] :: [{pid(), {reference(), atom()}}]
monitors=#{} :: #{pid() => {reference(), atom()}}
}).

-export([start_link/3, init/1, handle_call/3, handle_info/2, handle_cast/2, terminate/2]).
Expand Down Expand Up @@ -70,23 +70,30 @@ handle_info({handle_identify, Session, {ok, Identify}}, State=#state{tid=TID}) -
%%
%% Store the session in config and tell the peerbook about the
%% session change as well as the new identify record.
spawn(fun() ->
Addr = libp2p_crypto:pubkey_bin_to_p2p(libp2p_identify:pubkey_bin(Identify)),
lager:debug("received identity for peer ~p. Putting this peer", [Addr]),
libp2p_config:insert_session(TID,
Addr,
Session),
PeerBook = libp2p_swarm:peerbook(TID),
libp2p_peerbook:register_session(PeerBook, Session, Identify),
libp2p_peerbook:put(PeerBook, [libp2p_identify:peer(Identify)]),
libp2p_peerbook:put(PeerBook, [libp2p_identify:peer(Identify)])
end),
{noreply, State};
handle_info({'DOWN', MonitorRef, process, _Pid, _Reason}, State=#state{pid_gc_monitor=MonitorRef}) ->
erlang:send_after(timer:minutes(5), self(), gc_pids),
{noreply, State};
handle_info({'DOWN', MonitorRef, process, _Pid, _Reason}, State=#state{pid_gc_monitor=MonitorRef}) ->
erlang:send_after(timer:minutes(5), self(), gc_pids),
{noreply, State};
handle_info({'DOWN', MonitorRef, process, Pid, _}, State=#state{tid=TID}) ->
NewState = remove_monitor(MonitorRef, Pid, State),
spawn(fun() ->
libp2p_config:remove_pid(TID, Pid),
PeerBook = libp2p_swarm:peerbook(TID),
libp2p_peerbook:unregister_session(PeerBook, Pid),
libp2p_peerbook:unregister_session(PeerBook, Pid)
end),
{noreply, NewState};
handle_info({'EXIT', _From, Reason}, State=#state{}) ->
{stop, Reason, State};
Expand Down Expand Up @@ -129,17 +136,17 @@ terminate(Reason, #state{tid=TID}) ->

-spec add_monitor(atom(), pid(), #state{}) -> #state{}.
add_monitor(Kind, Pid, State=#state{monitors=Monitors}) ->
Value = case lists:keyfind(Pid, 1, Monitors) of
false -> {erlang:monitor(process, Pid), Kind};
{Pid, {MonitorRef, Kind}} -> {MonitorRef, Kind}
Value = case maps:find(Pid, Monitors) of
error -> {erlang:monitor(process, Pid), Kind};
{ok, {MonitorRef, Kind}} -> {MonitorRef, Kind}
end,
State#state{monitors=lists:keystore(Pid, 1, Monitors, {Pid, Value})}.
State#state{monitors=Monitors#{Pid => Value}}.

-spec remove_monitor(reference(), pid(), #state{}) -> #state{}.
remove_monitor(MonitorRef, Pid, State=#state{tid=TID, monitors=Monitors}) ->
case lists:keytake(Pid, 1, Monitors) of
false -> State;
{value, {Pid, {MonitorRef, _}}, NewMonitors} ->
case maps:find(Pid, Monitors) of
error -> State;
{ok, {MonitorRef, _}} ->
libp2p_config:remove_pid(TID, Pid),
State#state{monitors=NewMonitors}
State#state{monitors=maps:remove(Pid, Monitors)}
end.
34 changes: 15 additions & 19 deletions src/libp2p_transport_tcp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
}).

-define(DEFAULT_MAX_TCP_CONNECTIONS, 1024).
-define(DEFAULT_MAX_TCP_ACCEPTORS, 10).

%% libp2p_transport
%%
Expand Down Expand Up @@ -685,7 +686,7 @@ terminate(_Reason, #state{}) ->
listen_options(IP, TID) ->
OptionDefaults = [
{ip, IP},
{backlog, 1024},
{backlog, application:get_env(libp2p, listen_backlog, 1024)},
{nodelay, true},
{send_timeout, 30000},
{send_timeout_close, true}
Expand Down Expand Up @@ -721,8 +722,10 @@ listen_on(Addr, TID) ->
ok = libp2p_cache:insert(Cache, {tcp_local_listen_addrs, Type}, ListenAddrs),

MaxTCPConnections = application:get_env(libp2p, max_tcp_connections, ?DEFAULT_MAX_TCP_CONNECTIONS),
MaxAcceptors = application:get_env(libp2p, num_tcp_acceptors, ?DEFAULT_MAX_TCP_ACCEPTORS),
ChildSpec = ranch:child_spec(ListenAddrs,
ranch_tcp, [{socket, Socket}, {max_connections, MaxTCPConnections}],
ranch_tcp, [{socket, Socket}, {max_connections, MaxTCPConnections},
{num_acceptors, MaxAcceptors}],
libp2p_transport_ranch_protocol, {?MODULE, TID}),
case supervisor:start_child(Sup, ChildSpec) of
{ok, Pid} ->
Expand Down Expand Up @@ -1007,11 +1010,10 @@ record_observed_addr(PeerAddr, ObservedAddr, State=#state{tid=TID, observed_addr
true ->
lager:info("Saw 3 distinct observed addresses, assuming symmetric NAT"),
libp2p_peerbook:update_nat_type(libp2p_swarm:peerbook(TID), symmetric),
Ref = monitor_relay_server(State),
libp2p_relay:init(libp2p_swarm:swarm(TID)),
%% also check if we have a port forward from the same external port to our internal port
%% as this is a common configuration
attempt_port_forward_discovery(ObservedAddr, PeerAddr, State#state{observed_addrs=ObservedAddresses, relay_monitor=Ref, nat_type=symmetric});
attempt_port_forward_discovery(ObservedAddr, PeerAddr, State#state{observed_addrs=ObservedAddresses});
false ->
State#state{observed_addrs=ObservedAddresses}
end
Expand All @@ -1030,21 +1032,15 @@ attempt_port_forward_discovery(ObservedAddr, PeerAddr, State=#state{tid=TID, stu
{PeerPath, TxnID} = libp2p_stream_stungun:mk_stun_txn(list_to_integer(PortStr)),
[{"ip4", IP}, {"tcp", _}] = multiaddr:protocols(ObservedAddr),
ObservedAddr1 = "/ip4/"++IP++"/tcp/"++PortStr,
case lists:member(ObservedAddr1, State#state.resolved_addresses) of
true ->
%% we don't need to try this again
StateAcc;
false ->
case libp2p_stream_stungun:dial(TID, PeerAddr, PeerPath, TxnID, self()) of
{ok, StunPid} ->
lager:info("dialed stungun peer ~p looking for validation of ~p", [PeerAddr, ObservedAddr1]),
%% TODO: Remove this once dial stops using start_link
unlink(StunPid),
erlang:send_after(60000, self(), {stungun_timeout, TxnID}),
StateAcc#state{stun_txns=add_stun_txn(TxnID, ObservedAddr1, StunTxns)};
_ ->
StateAcc
end
case libp2p_stream_stungun:dial(TID, PeerAddr, PeerPath, TxnID, self()) of
{ok, StunPid} ->
lager:debug("dialed stungun peer ~p looking for validation of ~p", [PeerAddr, ObservedAddr1]),
%% TODO: Remove this once dial stops using start_link
unlink(StunPid),
erlang:send_after(60000, self(), {stungun_timeout, TxnID}),
StateAcc#state{stun_txns=add_stun_txn(TxnID, ObservedAddr1, StunTxns)};
_ ->
StateAcc
end;
_ ->
StateAcc
Expand Down
23 changes: 20 additions & 3 deletions src/libp2p_yamux_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
-record(ident,
{ identify=undefined :: libp2p_identify:identify() | undefined,
pid=undefined :: pid() | undefined,
ref :: reference() | undefined,
waiters=[] :: [term()]
}).

Expand Down Expand Up @@ -209,13 +210,14 @@ handle_info({identify, Handler, HandlerData}, State=#state{ident=#ident{identify
Handler ! {handle_identify, HandlerData, {ok, I}},
{noreply, State};
handle_info({identify, Handler, HandlerData}, State=#state{ident=Ident=#ident{pid=undefined}, tid=TID}) ->
Pid = libp2p_stream_identify:dial_spawn(self(), TID, self()),
NewIdent=Ident#ident{waiters=[{Handler, HandlerData} | Ident#ident.waiters], pid=Pid},
{Pid, Ref} = libp2p_stream_identify:dial_spawn(self(), TID, self()),
NewIdent=Ident#ident{waiters=[{Handler, HandlerData} | Ident#ident.waiters], pid=Pid, ref=Ref},
{noreply, State#state{ident=NewIdent}};
handle_info({identify, Handler, HandlerData}, State=#state{ident=Ident=#ident{}}) ->
NewIdent=Ident#ident{waiters=[{Handler, HandlerData} | Ident#ident.waiters]},
{noreply, State#state{ident=NewIdent}};
handle_info({handle_identify, _Session, Response}, State=#state{ident=Ident=#ident{}}) ->
handle_info({handle_identify, _Session, Response}, State=#state{ident=Ident=#ident{ref = Ref}}) ->
erlang:demonitor(Ref, [flush]),
lists:foreach(fun({Handler, HandlerData}) ->
Handler ! {handle_identify, HandlerData, Response}
end, Ident#ident.waiters),
Expand All @@ -224,6 +226,21 @@ handle_info({handle_identify, _Session, Response}, State=#state{ident=Ident=#ide
{error, _} -> undefined
end,
{noreply, State#state{ident=Ident#ident{pid=undefined, waiters=[], identify=NewIdentify}}};
handle_info({'DOWN', _Ref, process, _Pid, normal}, State) ->
%% down beat the reply somehow, ignore and it'll get cleaned up elsewhere
{noreply, State};
handle_info({'DOWN', Ref, process, Pid, Reason}, State=#state{ident=Ident=#ident{ref = IRef}}) ->
case IRef == Ref of
true ->
lager:warning("crash of known dial ~p ~p ~p", [Ref, Pid, Reason]),
lists:foreach(fun({Handler, HandlerData}) ->
Handler ! {handle_identify, HandlerData, {error, Reason}}
end, Ident#ident.waiters),
{noreply, State#state{ident=Ident#ident{pid=undefined, waiters=[], identify=undefined}}};
false ->
lager:warning("crash of unknown ref ~p ~p ~p", [Ref, Pid, Reason]),
{noreply, State}
end;

handle_info(Msg, State) ->
lager:warning("Unhandled message: ~p", [Msg]),
Expand Down
Loading