Skip to content
This repository has been archived by the owner on Apr 22, 2024. It is now read-only.

Commit

Permalink
Numerous libp2p scalability fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Vagabond committed Aug 12, 2021
1 parent 2f00271 commit 117b0f6
Show file tree
Hide file tree
Showing 16 changed files with 375 additions and 169 deletions.
1 change: 1 addition & 0 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
]}.

{deps, [
sidejob,
{lager, "3.6.5"},
{ranch, "1.5.0"},
{libp2p_crypto, "1.4.0"},
Expand Down
5 changes: 4 additions & 1 deletion rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
{<<"multihash">>,{pkg,<<"multihash">>,<<"2.0.3">>},1},
{<<"nat">>,
{git,"https://github.com/benoitc/erlang-nat",
{ref,"6136102c176814dd26c11b93ca0ce852b66c4195"}},
{ref,"5f6b47aebc14d89a2ae4f4ab0f21bb39063e3c13"}},
0},
{<<"procket">>,{pkg,<<"procket">>,<<"0.9.4">>},1},
{<<"ranch">>,{pkg,<<"ranch">>,<<"1.5.0">>},0},
Expand All @@ -40,6 +40,7 @@
{git,"https://gitlab.com/vagabond1/erlang-rocksdb",
{ref,"e391336db73ca3644050b308d20071fb79667022"}},
1},
{<<"sidejob">>,{pkg,<<"sidejob">>,<<"2.1.0">>},0},
{<<"small_ints">>,{pkg,<<"small_ints">>,<<"0.1.0">>},2},
{<<"splicer">>,{pkg,<<"splicer">>,<<"0.5.4">>},0},
{<<"throttle">>,{pkg,<<"lambda_throttle">>,<<"0.2.0">>},0}]}.
Expand All @@ -62,6 +63,7 @@
{<<"procket">>, <<"1FC0D557ACC0145DD167C566C68F83B183E923016904DFE980534B3C9150E92B">>},
{<<"ranch">>, <<"F04166F456790FEE2AC1AA05A02745CC75783C2BFB26D39FAF6AEFC9A3D3A58A">>},
{<<"rand_compat">>, <<"011646BC1F0B0C432FE101B816F25B9BBB74A085713CEE1DAFD2D62E9415EAD3">>},
{<<"sidejob">>, <<"5D6A7C9C620778CB1908E46B552D767DF2ED4D77070BB7B5B8773D4FF18D1D37">>},
{<<"small_ints">>, <<"82A824C8794A2DDC73CB5CD00EAD11331DB296521AD16A619C13D668572B868A">>},
{<<"splicer">>, <<"B221E3D037CAA9993ABB5C11B709026A7371AF4A7164219241EBC3B68ED331F4">>},
{<<"throttle">>, <<"E881B46D9836AFB70F3E2FA3BE9B0140805BA324ED26AA734FF6C5C1568C6CA7">>}]},
Expand All @@ -83,6 +85,7 @@
{<<"procket">>, <<"647C2D1F5006370E457522ECC69509F1020D7D558F3D6E196825E1F4F3660218">>},
{<<"ranch">>, <<"86D40FC42AA47BCB6952DDF1DBFD3DA04B5BA69AFB65C322C99845913250B11F">>},
{<<"rand_compat">>, <<"CDF7BE2B17308EC245B912C45FE55741F93B6E4F1A24BA6074F7137B0CC09BF4">>},
{<<"sidejob">>, <<"6DC3DAC041C8C07C64401ECD22684730DA1497F5F14377B3CA9C5B2B9A135181">>},
{<<"small_ints">>, <<"00B3BFF6C446711F8EA4EA942056F375E0F13C7983CC3950C6EA1DE014C7C416">>},
{<<"splicer">>, <<"3FC111BCEC469F4E62B2E7069A2ECFEACB40844D97CAF181D1DD3674EF0919E9">>},
{<<"throttle">>, <<"3EACFAAC1C2EBD0F17D77D9E96B1029BF07DED4AC233BA38883D70CDF1FFF740">>}]}
Expand Down
10 changes: 8 additions & 2 deletions src/group/libp2p_group_gossip.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,20 @@ remove_handler(Pid, Key) ->
%% @doc Send the given data to all members of the group for the given
%% gossip key. The implementation of the group determines the strategy
%% used for delivery. Delivery is best effort.
-spec send(pid(), string(), iodata() | fun(() -> iodata())) -> ok.
-spec send(pid(), string(), iodata() | fun((connection_kind()) -> iodata())) -> ok.
send(Pid, Key, Data) when is_list(Key), is_binary(Data) orelse is_function(Data) ->
gen_server:cast(Pid, {send, Key, Data}).

-spec connected_addrs(pid(), connection_kind() | all) -> [MAddr::string()].
connected_addrs(Pid, WorkerKind) ->
gen_server:call(Pid, {connected_addrs, WorkerKind}, infinity).
[ mk_multiaddr(A) || A <- gen_server:call(Pid, {connected_addrs, WorkerKind}, infinity)].

-spec connected_pids(Pid::pid(), connection_kind() | all) -> [pid()].
connected_pids(Pid, WorkerKind) ->
gen_server:call(Pid, {connected_pids, WorkerKind}, infinity).

mk_multiaddr(Addr) when is_binary(Addr) ->
libp2p_crypto:pubkey_bin_to_p2p(Addr);
mk_multiaddr(Value) ->
Value.

406 changes: 284 additions & 122 deletions src/group/libp2p_group_gossip_server.erl

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions src/group/libp2p_group_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ connected(info, {assign_stream, StreamPid, Path}, Data0=#data{}) ->
{ok, NewData} -> {keep_state, NewData};
_ -> {keep_state, Data}
end;
connected(info, {'EXIT', StreamPid, _Reason}, Data=#data{stream_pid=StreamPid, kind=inbound}) ->
%% don't try to reconnect inbound streams, it never seems to work
libp2p_group_server:request_target(Data#data.server, Data#data.kind, self(), Data#data.ref),
{next_state, closing, Data};
connected(info, {'EXIT', StreamPid, Reason}, Data=#data{stream_pid=StreamPid, target={MAddr, _}}) ->
%% The stream we're using died. Let's go back to connecting, but
%% do not trigger a connect retry right away, (re-)start the
Expand Down
13 changes: 7 additions & 6 deletions src/identify/libp2p_stream_identify.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
-define(PATH, "identify/1.0.0").
-define(TIMEOUT, 5000).

-spec dial_spawn(Session::pid(), ets:tab(), Handler::pid()) -> pid().
-spec dial_spawn(Session::pid(), ets:tab(), Handler::pid()) -> {pid(), reference()}.
dial_spawn(Session, TID, Handler) ->
spawn(fun() ->
Challenge = crypto:strong_rand_bytes(20),
Path = lists:flatten([?PATH, "/", base58:binary_to_base58(Challenge)]),
libp2p_session:dial_framed_stream(Path, Session, ?MODULE, [TID, Handler])
end).
spawn_monitor(
fun() ->
Challenge = crypto:strong_rand_bytes(20),
Path = lists:flatten([?PATH, "/", base58:binary_to_base58(Challenge)]),
libp2p_session:dial_framed_stream(Path, Session, ?MODULE, [TID, Handler])
end).

client(Connection, Args=[_TID, _Handler]) ->
libp2p_framed_stream:client(?MODULE, Connection, Args).
Expand Down
3 changes: 2 additions & 1 deletion src/libp2p.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
relcast,
libp2p_crypto,
throttle,
erbloom
erbloom,
sidejob
]},
{env,[]},

Expand Down
3 changes: 2 additions & 1 deletion src/libp2p_stream_stungun.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

-record(client_state, {
txn_id :: binary(),
handler :: pid()
handler :: pid(),
direction :: inbound | outbound
}).

%%
Expand Down
5 changes: 4 additions & 1 deletion src/libp2p_swarm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ start(Name) when is_atom(Name) ->
-spec start(atom(), swarm_opts()) -> {ok, pid()} | ignore | {error, term()}.
start(Name, Opts) ->
RegName = list_to_atom(atom_to_list(libp2p_swarm_sup) ++ "_" ++ atom_to_list(Name)),
SideJobRegName = list_to_atom(atom_to_list(libp2p_swarm_sidejob_sup) ++ "_" ++ atom_to_list(Name)),
{ok, _} = application:ensure_all_started(sidejob),
sidejob:new_resource(SideJobRegName, sidejob_supervisor, application:get_env(libp2p, sidejob_limit, 15000)),
case supervisor:start_link({local,RegName}, libp2p_swarm_sup, [Name, Opts]) of
{ok, Pid} ->
unlink(Pid),
Expand Down Expand Up @@ -178,7 +181,7 @@ store_peerbook(TID, Handle) ->
%% @doc Get the peerbook pid for a swarm.
-spec peerbook_pid(ets:tab() | pid()) -> pid().
peerbook_pid(Sup) when is_pid(Sup) ->
peerbook(tid(Sup));
peerbook_pid(tid(Sup));
peerbook_pid(TID) ->
libp2p_swarm_sup:peerbook(TID).

Expand Down
29 changes: 18 additions & 11 deletions src/libp2p_swarm_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
sig_fun :: libp2p_crypto:sig_fun(),
ecdh_fun :: libp2p_crypto:ecdh_fun(),
pid_gc_monitor = make_ref() :: reference(),
monitors=[] :: [{pid(), {reference(), atom()}}]
monitors=#{} :: #{pid() => {reference(), atom()}}
}).

-export([start_link/3, init/1, handle_call/3, handle_info/2, handle_cast/2, terminate/2]).
Expand Down Expand Up @@ -70,23 +70,30 @@ handle_info({handle_identify, Session, {ok, Identify}}, State=#state{tid=TID}) -
%%
%% Store the session in config and tell the peerbook about the
%% session change as well as the new identify record.
spawn(fun() ->
Addr = libp2p_crypto:pubkey_bin_to_p2p(libp2p_identify:pubkey_bin(Identify)),
lager:debug("received identity for peer ~p. Putting this peer", [Addr]),
libp2p_config:insert_session(TID,
Addr,
Session),
PeerBook = libp2p_swarm:peerbook(TID),
libp2p_peerbook:register_session(PeerBook, Session, Identify),
libp2p_peerbook:put(PeerBook, [libp2p_identify:peer(Identify)]),
libp2p_peerbook:put(PeerBook, [libp2p_identify:peer(Identify)])
end),
{noreply, State};
handle_info({'DOWN', MonitorRef, process, _Pid, _Reason}, State=#state{pid_gc_monitor=MonitorRef}) ->
erlang:send_after(timer:minutes(5), self(), gc_pids),
{noreply, State};
handle_info({'DOWN', MonitorRef, process, _Pid, _Reason}, State=#state{pid_gc_monitor=MonitorRef}) ->
erlang:send_after(timer:minutes(5), self(), gc_pids),
{noreply, State};
handle_info({'DOWN', MonitorRef, process, Pid, _}, State=#state{tid=TID}) ->
NewState = remove_monitor(MonitorRef, Pid, State),
spawn(fun() ->
libp2p_config:remove_pid(TID, Pid),
PeerBook = libp2p_swarm:peerbook(TID),
libp2p_peerbook:unregister_session(PeerBook, Pid),
libp2p_peerbook:unregister_session(PeerBook, Pid)
end),
{noreply, NewState};
handle_info({'EXIT', _From, Reason}, State=#state{}) ->
{stop, Reason, State};
Expand Down Expand Up @@ -129,17 +136,17 @@ terminate(Reason, #state{tid=TID}) ->

-spec add_monitor(atom(), pid(), #state{}) -> #state{}.
add_monitor(Kind, Pid, State=#state{monitors=Monitors}) ->
Value = case lists:keyfind(Pid, 1, Monitors) of
false -> {erlang:monitor(process, Pid), Kind};
{Pid, {MonitorRef, Kind}} -> {MonitorRef, Kind}
Value = case maps:find(Pid, Monitors) of
error -> {erlang:monitor(process, Pid), Kind};
{ok, {MonitorRef, Kind}} -> {MonitorRef, Kind}
end,
State#state{monitors=lists:keystore(Pid, 1, Monitors, {Pid, Value})}.
State#state{monitors=Monitors#{Pid => Value}}.

-spec remove_monitor(reference(), pid(), #state{}) -> #state{}.
remove_monitor(MonitorRef, Pid, State=#state{tid=TID, monitors=Monitors}) ->
case lists:keytake(Pid, 1, Monitors) of
false -> State;
{value, {Pid, {MonitorRef, _}}, NewMonitors} ->
case maps:find(Pid, Monitors) of
error -> State;
{ok, {MonitorRef, _}} ->
libp2p_config:remove_pid(TID, Pid),
State#state{monitors=NewMonitors}
State#state{monitors=maps:remove(Pid, Monitors)}
end.
29 changes: 11 additions & 18 deletions src/libp2p_transport_tcp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ terminate(_Reason, #state{}) ->
listen_options(IP, TID) ->
OptionDefaults = [
{ip, IP},
{backlog, 1024},
{backlog, application:get_env(libp2p, listen_backlog, 1024)},
{nodelay, true},
{send_timeout, 30000},
{send_timeout_close, true}
Expand Down Expand Up @@ -1007,11 +1007,10 @@ record_observed_addr(PeerAddr, ObservedAddr, State=#state{tid=TID, observed_addr
true ->
lager:info("Saw 3 distinct observed addresses, assuming symmetric NAT"),
libp2p_peerbook:update_nat_type(libp2p_swarm:peerbook(TID), symmetric),
Ref = monitor_relay_server(State),
libp2p_relay:init(libp2p_swarm:swarm(TID)),
%% also check if we have a port forward from the same external port to our internal port
%% as this is a common configuration
attempt_port_forward_discovery(ObservedAddr, PeerAddr, State#state{observed_addrs=ObservedAddresses, relay_monitor=Ref, nat_type=symmetric});
attempt_port_forward_discovery(ObservedAddr, PeerAddr, State#state{observed_addrs=ObservedAddresses});
false ->
State#state{observed_addrs=ObservedAddresses}
end
Expand All @@ -1030,21 +1029,15 @@ attempt_port_forward_discovery(ObservedAddr, PeerAddr, State=#state{tid=TID, stu
{PeerPath, TxnID} = libp2p_stream_stungun:mk_stun_txn(list_to_integer(PortStr)),
[{"ip4", IP}, {"tcp", _}] = multiaddr:protocols(ObservedAddr),
ObservedAddr1 = "/ip4/"++IP++"/tcp/"++PortStr,
case lists:member(ObservedAddr1, State#state.resolved_addresses) of
true ->
%% we don't need to try this again
StateAcc;
false ->
case libp2p_stream_stungun:dial(TID, PeerAddr, PeerPath, TxnID, self()) of
{ok, StunPid} ->
lager:info("dialed stungun peer ~p looking for validation of ~p", [PeerAddr, ObservedAddr1]),
%% TODO: Remove this once dial stops using start_link
unlink(StunPid),
erlang:send_after(60000, self(), {stungun_timeout, TxnID}),
StateAcc#state{stun_txns=add_stun_txn(TxnID, ObservedAddr1, StunTxns)};
_ ->
StateAcc
end
case libp2p_stream_stungun:dial(TID, PeerAddr, PeerPath, TxnID, self()) of
{ok, StunPid} ->
lager:debug("dialed stungun peer ~p looking for validation of ~p", [PeerAddr, ObservedAddr1]),
%% TODO: Remove this once dial stops using start_link
unlink(StunPid),
erlang:send_after(60000, self(), {stungun_timeout, TxnID}),
StateAcc#state{stun_txns=add_stun_txn(TxnID, ObservedAddr1, StunTxns)};
_ ->
StateAcc
end;
_ ->
StateAcc
Expand Down
23 changes: 20 additions & 3 deletions src/libp2p_yamux_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
-record(ident,
{ identify=undefined :: libp2p_identify:identify() | undefined,
pid=undefined :: pid() | undefined,
ref :: reference() | undefined,
waiters=[] :: [term()]
}).

Expand Down Expand Up @@ -209,13 +210,14 @@ handle_info({identify, Handler, HandlerData}, State=#state{ident=#ident{identify
Handler ! {handle_identify, HandlerData, {ok, I}},
{noreply, State};
handle_info({identify, Handler, HandlerData}, State=#state{ident=Ident=#ident{pid=undefined}, tid=TID}) ->
Pid = libp2p_stream_identify:dial_spawn(self(), TID, self()),
NewIdent=Ident#ident{waiters=[{Handler, HandlerData} | Ident#ident.waiters], pid=Pid},
{Pid, Ref} = libp2p_stream_identify:dial_spawn(self(), TID, self()),
NewIdent=Ident#ident{waiters=[{Handler, HandlerData} | Ident#ident.waiters], pid=Pid, ref=Ref},
{noreply, State#state{ident=NewIdent}};
handle_info({identify, Handler, HandlerData}, State=#state{ident=Ident=#ident{}}) ->
NewIdent=Ident#ident{waiters=[{Handler, HandlerData} | Ident#ident.waiters]},
{noreply, State#state{ident=NewIdent}};
handle_info({handle_identify, _Session, Response}, State=#state{ident=Ident=#ident{}}) ->
handle_info({handle_identify, _Session, Response}, State=#state{ident=Ident=#ident{ref = Ref}}) ->
erlang:demonitor(Ref, [flush]),
lists:foreach(fun({Handler, HandlerData}) ->
Handler ! {handle_identify, HandlerData, Response}
end, Ident#ident.waiters),
Expand All @@ -224,6 +226,21 @@ handle_info({handle_identify, _Session, Response}, State=#state{ident=Ident=#ide
{error, _} -> undefined
end,
{noreply, State#state{ident=Ident#ident{pid=undefined, waiters=[], identify=NewIdentify}}};
handle_info({'DOWN', _Ref, process, _Pid, normal}, State) ->
%% down beat the reply somehow, ignore and it'll get cleaned up elsewhere
{noreply, State};
handle_info({'DOWN', Ref, process, Pid, Reason}, State=#state{ident=Ident=#ident{ref = IRef}}) ->
case IRef == Ref of
true ->
lager:warning("crash of known dial ~p ~p ~p", [Ref, Pid, Reason]),
lists:foreach(fun({Handler, HandlerData}) ->
Handler ! {handle_identify, HandlerData, {error, Reason}}
end, Ident#ident.waiters),
{noreply, State#state{ident=Ident#ident{pid=undefined, waiters=[], identify=undefined}}};
false ->
lager:warning("crash of unknown ref ~p ~p ~p", [Ref, Pid, Reason]),
{noreply, State}
end;

handle_info(Msg, State) ->
lager:warning("Unhandled message: ~p", [Msg]),
Expand Down
7 changes: 5 additions & 2 deletions src/peerbook/libp2p_peerbook.erl
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ lookup_association(Handle=#peerbook{}, AssocType, AssocAddress) ->
%%

-spec handle_gossip_data(pid(), binary(), peerbook()) -> noreply.
handle_gossip_data(_StreamPid, DecodedList, Handle) ->
handle_gossip_data(_StreamPid, {"gossip/1.0."++_, DecodedList}, Handle) ->
%% DecodedList = libp2p_peer:decode_list(Data),
?MODULE:put(Handle, DecodedList, true),
noreply.
Expand Down Expand Up @@ -693,7 +693,10 @@ notify_peers(State=#state{notify_peers=NotifyPeers, notify_group=NotifyGroup,
Opts = libp2p_swarm:opts(TID),
PeerCount = libp2p_config:get_opt(Opts, [?MODULE, notify_peer_gossip_limit], ?DEFAULT_NOTIFY_PEER_GOSSIP_LIMIT),
%% Gossip to any attached parties
SendFun = fun() ->
SendFun = fun(seed) ->
%% send everything to the seed nodes
libp2p_peer:encode_list(PeerList);
(_Type) ->
{_, RandomNPeers} = lists:unzip(lists:sublist(lists:keysort(1, [ {rand:uniform(), E} || E <- PeerList]), PeerCount)),
libp2p_peer:encode_list(RandomNPeers)
end,
Expand Down
1 change: 1 addition & 0 deletions src/relay/libp2p_relay_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ init_relay(#state{tid = TID, banlist = Banlist}) ->
{error, retry}
end.

-spec banlist(undefined | string(), #state{}) -> #state{}.
banlist(undefined, State) ->
%% relay failed before we found out the address
State;
Expand Down
3 changes: 2 additions & 1 deletion test/group_gossip_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,8 @@ seed_test(Config) ->
init_gossip_data(_) ->
ok.

handle_gossip_data(_StreamPid, Msg, Parent) ->
handle_gossip_data(_StreamPid, {"gossip/1.0"++_, Msg}, Parent) ->
ct:pal("handle gossip data ~p ~p ~p", [_StreamPid, Msg, Parent]),
Parent ! {handle_gossip_data, Msg},
noreply.

Expand Down
2 changes: 2 additions & 0 deletions test/group_relcast_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ unicast_test(Config) ->

test_util:connect_swarms(S1, S2),
test_util:connect_swarms(S1, S3),
test_util:connect_swarms(S2, S3),

test_util:await_gossip_groups(Swarms),
test_util:await_gossip_streams(Swarms),
Expand Down Expand Up @@ -111,6 +112,7 @@ multicast_test(Config) ->

test_util:connect_swarms(S1, S2),
test_util:connect_swarms(S1, S3),
test_util:connect_swarms(S2, S3),

test_util:await_gossip_groups(Swarms),
test_util:await_gossip_streams(Swarms),
Expand Down

0 comments on commit 117b0f6

Please sign in to comment.