diff --git a/rebar.config b/rebar.config index b150787a..fca18599 100644 --- a/rebar.config +++ b/rebar.config @@ -17,6 +17,7 @@ ]}. {deps, [ + sidejob, {lager, "3.6.5"}, {ranch, "1.5.0"}, {libp2p_crypto, "1.4.0"}, diff --git a/rebar.lock b/rebar.lock index abf4ad6a..ad92d181 100644 --- a/rebar.lock +++ b/rebar.lock @@ -27,7 +27,7 @@ {<<"multihash">>,{pkg,<<"multihash">>,<<"2.0.3">>},1}, {<<"nat">>, {git,"https://github.com/benoitc/erlang-nat", - {ref,"6136102c176814dd26c11b93ca0ce852b66c4195"}}, + {ref,"5f6b47aebc14d89a2ae4f4ab0f21bb39063e3c13"}}, 0}, {<<"procket">>,{pkg,<<"procket">>,<<"0.9.4">>},1}, {<<"ranch">>,{pkg,<<"ranch">>,<<"1.5.0">>},0}, @@ -40,6 +40,7 @@ {git,"https://gitlab.com/vagabond1/erlang-rocksdb", {ref,"e391336db73ca3644050b308d20071fb79667022"}}, 1}, + {<<"sidejob">>,{pkg,<<"sidejob">>,<<"2.1.0">>},0}, {<<"small_ints">>,{pkg,<<"small_ints">>,<<"0.1.0">>},2}, {<<"splicer">>,{pkg,<<"splicer">>,<<"0.5.4">>},0}, {<<"throttle">>,{pkg,<<"lambda_throttle">>,<<"0.2.0">>},0}]}. @@ -62,6 +63,7 @@ {<<"procket">>, <<"1FC0D557ACC0145DD167C566C68F83B183E923016904DFE980534B3C9150E92B">>}, {<<"ranch">>, <<"F04166F456790FEE2AC1AA05A02745CC75783C2BFB26D39FAF6AEFC9A3D3A58A">>}, {<<"rand_compat">>, <<"011646BC1F0B0C432FE101B816F25B9BBB74A085713CEE1DAFD2D62E9415EAD3">>}, + {<<"sidejob">>, <<"5D6A7C9C620778CB1908E46B552D767DF2ED4D77070BB7B5B8773D4FF18D1D37">>}, {<<"small_ints">>, <<"82A824C8794A2DDC73CB5CD00EAD11331DB296521AD16A619C13D668572B868A">>}, {<<"splicer">>, <<"B221E3D037CAA9993ABB5C11B709026A7371AF4A7164219241EBC3B68ED331F4">>}, {<<"throttle">>, <<"E881B46D9836AFB70F3E2FA3BE9B0140805BA324ED26AA734FF6C5C1568C6CA7">>}]}, @@ -83,6 +85,7 @@ {<<"procket">>, <<"647C2D1F5006370E457522ECC69509F1020D7D558F3D6E196825E1F4F3660218">>}, {<<"ranch">>, <<"86D40FC42AA47BCB6952DDF1DBFD3DA04B5BA69AFB65C322C99845913250B11F">>}, {<<"rand_compat">>, <<"CDF7BE2B17308EC245B912C45FE55741F93B6E4F1A24BA6074F7137B0CC09BF4">>}, + {<<"sidejob">>, <<"6DC3DAC041C8C07C64401ECD22684730DA1497F5F14377B3CA9C5B2B9A135181">>}, {<<"small_ints">>, <<"00B3BFF6C446711F8EA4EA942056F375E0F13C7983CC3950C6EA1DE014C7C416">>}, {<<"splicer">>, <<"3FC111BCEC469F4E62B2E7069A2ECFEACB40844D97CAF181D1DD3674EF0919E9">>}, {<<"throttle">>, <<"3EACFAAC1C2EBD0F17D77D9E96B1029BF07DED4AC233BA38883D70CDF1FFF740">>}]} diff --git a/src/group/libp2p_group_gossip.erl b/src/group/libp2p_group_gossip.erl index a526116d..2c405a40 100644 --- a/src/group/libp2p_group_gossip.erl +++ b/src/group/libp2p_group_gossip.erl @@ -25,14 +25,20 @@ remove_handler(Pid, Key) -> %% @doc Send the given data to all members of the group for the given %% gossip key. The implementation of the group determines the strategy %% used for delivery. Delivery is best effort. --spec send(pid(), string(), iodata() | fun(() -> iodata())) -> ok. +-spec send(pid(), string(), iodata() | fun((connection_kind()) -> iodata())) -> ok. send(Pid, Key, Data) when is_list(Key), is_binary(Data) orelse is_function(Data) -> gen_server:cast(Pid, {send, Key, Data}). -spec connected_addrs(pid(), connection_kind() | all) -> [MAddr::string()]. connected_addrs(Pid, WorkerKind) -> - gen_server:call(Pid, {connected_addrs, WorkerKind}, infinity). + [ mk_multiaddr(A) || A <- gen_server:call(Pid, {connected_addrs, WorkerKind}, infinity)]. -spec connected_pids(Pid::pid(), connection_kind() | all) -> [pid()]. connected_pids(Pid, WorkerKind) -> gen_server:call(Pid, {connected_pids, WorkerKind}, infinity). + +mk_multiaddr(Addr) when is_binary(Addr) -> + libp2p_crypto:pubkey_bin_to_p2p(Addr); +mk_multiaddr(Value) -> + Value. + diff --git a/src/group/libp2p_group_gossip_server.erl b/src/group/libp2p_group_gossip_server.erl index b5bdf710..9e7b5d1d 100644 --- a/src/group/libp2p_group_gossip_server.erl +++ b/src/group/libp2p_group_gossip_server.erl @@ -11,10 +11,10 @@ %% gen_server -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). %% libp2p_gossip_stream --export([accept_stream/4, handle_data/4]). +-export([accept_stream/4, handle_identify/4, handle_data/4]). -record(worker, - { target :: string() | undefined, + { target :: binary() | string() | undefined, kind :: libp2p_group_gossip:connection_kind(), pid :: pid() | self, ref :: reference() @@ -27,12 +27,14 @@ seednode_connections :: pos_integer(), max_inbound_connections :: non_neg_integer(), seed_nodes :: [string()], - workers=[] :: [#worker{}], + workers=#{} :: #{atom() => #{reference() => #worker{}}}, + targets=#{} :: #{string() => {atom(), reference()}}, handlers=#{} :: #{string() => libp2p_group_gossip:handler()}, drop_timeout :: pos_integer(), drop_timer :: reference(), supported_paths :: [string()], - bloom :: bloom_nif:bloom() + bloom :: bloom_nif:bloom(), + sidejob_sup :: atom() }). -define(DEFAULT_PEERBOOK_CONNECTIONS, 5). @@ -66,13 +68,38 @@ handle_data(Pid, StreamPid, Key, {Path, Bin}) -> _ -> {Path, Bin} end, - gen_server:cast(Pid, {handle_data, StreamPid, Key, ListOrData}). + %% check the cache, see the lookup_handler function for details + case lookup_handler(Pid, Key) of + error -> + ok; + {ok, M, S} -> + %% Catch the callback response. This avoids a crash in the + %% handler taking down the gossip worker itself. + try M:handle_gossip_data(StreamPid, ListOrData, S) of + {reply, Reply} -> + %% handler wants to reply + %% NOTE - This routes direct via libp2p_framed_stream:send/2 and not via the group worker + %% As such we need to encode at this point, and send raw..no encoding actions + case (catch libp2p_gossip_stream:encode(Key, Reply, Path)) of + {'EXIT', Error} -> + lager:warning("Error encoding gossip data ~p", [Error]); + ReplyMsg -> + libp2p_framed_stream:send(StreamPid, ReplyMsg) + end; + _ -> + ok + catch _:_ -> + ok + end + end. accept_stream(Pid, SessionPid, StreamPid, Path) -> Ref = erlang:monitor(process, Pid), gen_server:cast(Pid, {accept_stream, SessionPid, Ref, StreamPid, Path}), Ref. +handle_identify(Pid, StreamPid, Path, Identify) -> + gen_server:call(Pid, {handle_identify, StreamPid, Path, Identify}, 15000). %% gen_server %% @@ -81,6 +108,7 @@ init([Sup, TID]) -> erlang:process_flag(trap_exit, true), libp2p_swarm_sup:register_gossip_group(TID), Opts = libp2p_swarm:opts(TID), + SideJobRegName = list_to_atom(atom_to_list(libp2p_swarm_sidejob_sup) ++ "_" ++ atom_to_list(TID)), PeerBookCount = get_opt(Opts, peerbook_connections, ?DEFAULT_PEERBOOK_CONNECTIONS), SeedNodes = get_opt(Opts, seed_nodes, []), SeedNodeCount = @@ -97,7 +125,7 @@ init([Sup, TID]) -> {ok, Bloom} = bloom:new_forgetful_optimal(1000, 3, 800, 1.0e-3), - self() ! start_workers, + self() ! {start_workers, Sup}, {ok, update_metadata(#state{sup=Sup, tid=TID, seed_nodes=SeedNodes, max_inbound_connections=InboundCount, @@ -106,18 +134,55 @@ init([Sup, TID]) -> drop_timeout=DropTimeOut, drop_timer=schedule_drop_timer(DropTimeOut), bloom=Bloom, + sidejob_sup = SideJobRegName, supported_paths=SupportedPaths})}. handle_call({connected_addrs, Kind}, _From, State=#state{}) -> {Addrs, _Pids} = lists:unzip(connections(Kind, State)), {reply, Addrs, State}; -handle_call({connected_pids, Kind}, _From, State=#state{}) -> +handle_call({connected_pids, Kind}, _From, State) -> {_Addrs, Pids} = lists:unzip(connections(Kind, State)), {reply, Pids, State}; handle_call({remove_handler, Key}, _From, State=#state{handlers=Handlers}) -> {reply, ok, State#state{handlers=maps:remove(Key, Handlers)}}; +handle_call({handle_identify, StreamPid, Path, {ok, Identify}}, + _From, State) -> + Target = libp2p_identify:pubkey_bin(Identify), + %% Check if we already have a worker for this target + case lookup_worker_by_target(Target, State) of + %% If not, we we check if we can accept a random inbound + %% connection and start a worker for the inbound stream if ok + false -> + case count_workers(inbound, State) > State#state.max_inbound_connections of + true -> + lager:debug("Too many inbound workers: ~p", + [State#state.max_inbound_connections]), + {reply, {error, too_many}, State}; + false -> + case start_inbound_worker(Target, StreamPid, Path, State) of + {ok, Worker} -> {reply, ok, add_worker(Worker, State)}; + {error, overload} -> {reply, {error, too_many}, State} + end + end; + %% There's an existing worker for the given address, re-assign + %% the worker the new stream. + #worker{pid=Worker} -> + libp2p_group_worker:assign_stream(Worker, StreamPid, Path), + {reply, ok, State} + end; + +handle_call({handle_data, Key}, _From, State=#state{}) -> + %% Incoming message from a gossip stream for a given key + %% for performance reasons, and for backpressure, just return + %% the module and state + case maps:find(Key, State#state.handlers) of + error -> {reply, error, State}; + {ok, {M, S}} -> + {reply, {ok, M, S}, State} + end; + handle_call(Msg, _From, State) -> lager:warning("Unhandled call: ~p", [Msg]), {reply, ok, State}. @@ -130,45 +195,18 @@ handle_cast({accept_stream, Session, ReplyRef, StreamPid, Path}, State=#state{}) libp2p_session:identify(Session, self(), {ReplyRef, StreamPid, Path}), {noreply, State}; -handle_cast({handle_data, StreamPid, Key, {Path, ListOrData}}, State=#state{}) -> - %% Incoming message from a gossip stream for a given key - case maps:find(Key, State#state.handlers) of - error -> {noreply, State}; - {ok, {M, S}} -> - %% Catch the callback response. This avoids a crash in the - %% handler taking down the gossip_server itself. - try M:handle_gossip_data(StreamPid, ListOrData, S) of - {reply, Reply} -> - %% handler wants to reply - %% NOTE - This routes direct via libp2p_framed_stream:send/2 and not via the group worker - %% As such we need to encode at this point, and send raw..no encoding actions - case (catch libp2p_gossip_stream:encode(Key, Reply, Path)) of - {'EXIT', Error} -> - lager:warning("Error encoding gossip data ~p", [Error]); - ReplyMsg -> - spawn(fun() -> libp2p_framed_stream:send(StreamPid, ReplyMsg) end) - end; - _ -> - ok - catch _:_ -> - ok - end, - {noreply, State} - end; - handle_cast({add_handler, Key, Handler}, State=#state{handlers=Handlers}) -> {noreply, State#state{handlers=maps:put(Key, Handler, Handlers)}}; -handle_cast({request_target, inbound, WorkerPid, _Ref}, State=#state{}) -> - {noreply, stop_inbound_worker(WorkerPid, State)}; +handle_cast({request_target, inbound, WorkerPid, Ref}, State=#state{}) -> + {noreply, stop_inbound_worker(Ref, WorkerPid, State)}; handle_cast({request_target, peerbook, WorkerPid, Ref}, State=#state{tid=TID}) -> LocalAddr = libp2p_swarm:pubkey_bin(TID), PeerList = case libp2p_swarm:peerbook(TID) of false -> []; Peerbook -> - WorkerAddrs = [ libp2p_crypto:p2p_to_pubkey_bin(W#worker.target) - || W <- State#state.workers, W#worker.target /= undefined, - W#worker.kind /= seed ], + + WorkerAddrs = get_addrs(State#state.workers), try Pred = application:get_env(libp2p, random_peer_pred, fun(_) -> true end), Ct = application:get_env(libp2p, random_peer_tries, 100), @@ -191,7 +229,7 @@ handle_cast({request_target, peerbook, WorkerPid, Ref}, State=#state{tid=TID}) - end, {noreply, assign_target(WorkerPid, Ref, PeerList, State)}; handle_cast({request_target, seed, WorkerPid, Ref}, State=#state{tid=TID, seed_nodes=SeedAddrs}) -> - {CurrentAddrs, _} = lists:unzip(connections(all, State)), + {CurrentAddrs, _} = lists:unzip(connections(seed, State)), LocalAddr = libp2p_swarm:p2p_address(TID), %% Exclude the local swarm address from the available addresses ExcludedAddrs = CurrentAddrs ++ [LocalAddr], @@ -199,16 +237,19 @@ handle_cast({request_target, seed, WorkerPid, Ref}, State=#state{tid=TID, seed_n sets:from_list(ExcludedAddrs))), TargetAddrs = maybe_lookup_seed_in_dns(BaseAddrs), {noreply, assign_target(WorkerPid, Ref, TargetAddrs, State)}; -handle_cast({send, Key, Fun}, State=#state{}) when is_function(Fun, 0) -> +handle_cast({send, Key, Fun}, State) when is_function(Fun, 1) -> %% use a fun to generate the send data for each gossip peer %% this can be helpful to send a unique random subset of data to each peer - {_, Pids} = lists:unzip(connections(all, State)), - lists:foreach(fun(Pid) -> - Data = Fun(), - %% Catch errors encoding the given arguments to avoid a bad key or - %% value taking down the gossip server - libp2p_group_worker:send(Pid, Key, Data, true) - end, Pids), + + %% find out what kind of connection we are dealing with and pass that type to the fun + {_, SeedPids} = lists:unzip(connections(seed, State)), + {_, PeerbookPids} = lists:unzip(connections(peerbook, State)), + {_, InboundPids} = lists:unzip(connections(inbound, State)), + spawn(fun() -> + [ libp2p_group_worker:send(Pid, Key, Fun(seed), true) || Pid <- SeedPids ], + [ libp2p_group_worker:send(Pid, Key, Fun(peerbook), true) || Pid <- PeerbookPids ], + [ libp2p_group_worker:send(Pid, Key, Fun(inbound), true) || Pid <- InboundPids ] + end), {noreply, State}; handle_cast({send, Key, Data}, State=#state{bloom=Bloom}) -> @@ -217,14 +258,17 @@ handle_cast({send, Key, Data}, State=#state{bloom=Bloom}) -> ok; false -> bloom:set(Bloom, {out, Data}), + {_, Pids} = lists:unzip(connections(all, State)), lager:debug("sending data via connection pids: ~p",[Pids]), - lists:foreach(fun(Pid) -> - %% TODO we could check the connections's Address here for - %% if we received this data from that address and avoid - %% bouncing the gossip data back - libp2p_group_worker:send(Pid, Key, Data, true) - end, Pids) + spawn(fun() -> + lists:foreach(fun(Pid) -> + %% TODO we could check the connections's Address here for + %% if we received this data from that address and avoid + %% bouncing the gossip data back + libp2p_group_worker:send(Pid, Key, Data, true) + end, Pids) + end) end, {noreply, State}; handle_cast({send_ready, _Target, _Ref, false}, State=#state{}) -> @@ -233,7 +277,7 @@ handle_cast({send_ready, _Target, _Ref, false}, State=#state{}) -> %% gossip_data. {noreply, State}; handle_cast({send_ready, Target, _Ref, _Ready}, State=#state{}) -> - case lookup_worker(Target, #worker.target, State) of + case lookup_worker_by_target(Target, State) of #worker{pid=WorkerPid} -> NewState = maps:fold(fun(Key, {M, S}, Acc) -> case (catch M:init_gossip_data(S)) of @@ -257,23 +301,46 @@ handle_cast(Msg, State) -> lager:warning("Unhandled cast: ~p", [Msg]), {noreply, State}. -handle_info(start_workers, State=#state{tid=TID, seednode_connections=SeedCount, - peerbook_connections=PeerCount, - bloom=Bloom, - supported_paths = SupportedPaths}) -> - PeerBookWorkers = [start_worker(peerbook, State) || _ <- lists:seq(1, PeerCount)], - SeedWorkers = [start_worker(seed, State) || _ <- lists:seq(1, SeedCount)], +handle_info({start_workers, Sup}, + State0 = #state{tid=TID, seednode_connections=SeedCount, + peerbook_connections=PeerCount, + bloom=Bloom, + supported_paths = SupportedPaths}) -> + WorkerSup = libp2p_group_gossip_sup:workers(Sup), + State = State0#state{sup = WorkerSup}, + PeerBookWorkers = + lists:foldl( + fun(_, Acc) -> + case start_worker(peerbook, State) of + {ok, Worker} -> + Acc#{Worker#worker.ref => Worker}; + _ -> Acc + end + end, + #{}, + lists:seq(1, PeerCount)), + SeedWorkers = + lists:foldl( + fun(_, Acc) -> + case start_worker(seed, State) of + {ok, Worker} -> + Acc#{Worker#worker.ref => Worker}; + _ -> Acc + end + end, + #{}, + lists:seq(1, SeedCount)), GossipAddFun = fun(Path) -> - libp2p_swarm:add_stream_handler(TID, Path, + libp2p_swarm:add_stream_handler(TID, Path, {libp2p_gossip_stream, server, [Path, ?MODULE, self(), Bloom]}) end, lists:foreach(GossipAddFun, SupportedPaths), - {noreply, State#state{workers=SeedWorkers ++ PeerBookWorkers}}; + {noreply, State#state{workers=#{seed => SeedWorkers, peerbook => PeerBookWorkers}}}; handle_info(drop_timeout, State=#state{drop_timeout=DropTimeOut, drop_timer=DropTimer, workers=Workers}) -> erlang:cancel_timer(DropTimer), - case lists:filter(fun(#worker{target=MAddr}) -> MAddr =/= undefined end, Workers) of + case get_workers(Workers) of [] -> {noreply, State#state{drop_timer=schedule_drop_timer(DropTimeOut)}}; ConnectedWorkers -> Worker = lists:nth(rand:uniform(length(ConnectedWorkers)), ConnectedWorkers), @@ -285,9 +352,9 @@ handle_info({handle_identify, {ReplyRef, StreamPid, _Path}, {error, Error}}, Sta StreamPid ! {ReplyRef, {error, Error}}, {noreply, State}; handle_info({handle_identify, {ReplyRef, StreamPid, Path}, {ok, Identify}}, State=#state{}) -> - Target = libp2p_crypto:pubkey_bin_to_p2p(libp2p_identify:pubkey_bin(Identify)), + Target = libp2p_identify:pubkey_bin(Identify), %% Check if we already have a worker for this target - case lookup_worker(Target, #worker.target, State) of + case lookup_worker_by_target(Target, State) of %% If not, we we check if we can accept a random inbound %% connection and start a worker for the inbound stream if ok false -> @@ -299,9 +366,14 @@ handle_info({handle_identify, {ReplyRef, StreamPid, Path}, {ok, Identify}}, Stat StreamPid ! {ReplyRef, {error, too_many}}, {noreply, State}; false -> - NewWorkers = [start_inbound_worker(Target, StreamPid, Path, State) | State#state.workers], - StreamPid ! {ReplyRef, ok}, - {noreply, State#state{workers=NewWorkers}} + case start_inbound_worker(Target, StreamPid, Path, State) of + {ok, Worker} -> + StreamPid ! {ReplyRef, ok}, + {noreply, add_worker(Worker, State)}; + {error, Reason} -> + StreamPid ! {error, Reason}, + {noreply, State} + end end; %% There's an existing worker for the given address, re-assign %% the worker the new stream. @@ -333,22 +405,30 @@ schedule_drop_timer(DropTimeOut) -> -spec connections(libp2p_group_gossip:connection_kind() | all, #state{}) -> [{MAddr::string(), Pid::pid()}]. +connections(all, State = #state{workers=Workers}) -> + maps:fold( + fun(Kind, _, Acc) -> + Conns = connections(Kind, State), + lists:append(Conns, Acc) + end, + [], + Workers); connections(Kind, #state{workers=Workers}) -> - lists:foldl(fun(#worker{target=undefined}, Acc) -> - Acc; - (#worker{pid=Pid, target=MAddr}, Acc) when Kind == all -> - [{MAddr, Pid} | Acc]; - (#worker{kind=WorkerKind, pid=Pid, target=MAddr}, Acc) when WorkerKind == Kind -> - [{MAddr, Pid} | Acc]; - (_, Acc) -> - Acc - end, [], Workers). + KindMap = maps:get(Kind, Workers, #{}), + maps:fold( + fun(_Ref, #worker{target = undefined}, Acc) -> + Acc; + (_Ref, #worker{target = Target, pid = Pid}, Acc) -> + [{Target, Pid} | Acc] + end, + [], + KindMap). assign_target(WorkerPid, WorkerRef, TargetAddrs, State=#state{workers=Workers, supported_paths = SupportedPaths, bloom=Bloom}) -> case length(TargetAddrs) of 0 -> %% the ref is stable across restarts, so use that as the lookup key - case lookup_worker(WorkerRef, #worker.ref, State) of + case lookup_worker(WorkerRef, State) of Worker=#worker{kind=seed, target=SelectedAddr, pid=StoredWorkerPid} when SelectedAddr /= undefined -> %% don't give up on the seed nodes in case we're entirely offline %% we need at least one connection to bootstrap the swarm @@ -357,8 +437,8 @@ assign_target(WorkerPid, WorkerRef, TargetAddrs, State=#state{workers=Workers, s %% check if this worker got restarted case WorkerPid /= StoredWorkerPid of true -> - NewWorkers = lists:keyreplace(WorkerRef, #worker.ref, Workers, - Worker#worker{pid=WorkerPid}), + SeedMap = maps:get(seed, Workers), + NewWorkers = Workers#{seed => SeedMap#{WorkerRef => Worker#worker{pid=WorkerPid}}}, State#state{workers=NewWorkers}; false -> State @@ -371,13 +451,12 @@ assign_target(WorkerPid, WorkerRef, TargetAddrs, State=#state{workers=Workers, s ClientSpec = {SupportedPaths, {libp2p_gossip_stream, [?MODULE, self(), Bloom]}}, libp2p_group_worker:assign_target(WorkerPid, {SelectedAddr, ClientSpec}), %% the ref is stable across restarts, so use that as the lookup key - case lookup_worker(WorkerRef, #worker.ref, State) of + case lookup_worker(WorkerRef, State) of Worker=#worker{} -> %% since we have to update the worker here anyway, update the worker pid as well %% so we handle restarts smoothly - NewWorkers = lists:keyreplace(WorkerRef, #worker.ref, Workers, - Worker#worker{target=SelectedAddr, pid=WorkerPid}), - State#state{workers=NewWorkers}; + NewWorker = Worker#worker{target=SelectedAddr, pid=WorkerPid}, + add_worker(NewWorker, State); _ -> State end @@ -438,61 +517,126 @@ to_string(V) when is_integer(V) -> integer_to_list(V); to_string(V) when is_binary(V) -> binary_to_list(V); to_string(V) when is_list(V) -> V. -drop_target(Worker=#worker{pid=WorkerPid}, State=#state{workers=Workers}) -> +drop_target(Worker=#worker{pid=WorkerPid, ref = Ref, kind = Kind}, + State=#state{workers=Workers}) -> libp2p_group_worker:clear_target(WorkerPid), - NewWorkers = lists:keyreplace(WorkerPid, #worker.pid, Workers, - Worker#worker{target=undefined}), + lager:info("dropping target for ~p ~p", [Kind, WorkerPid]), + KindMap = maps:get(Kind, Workers, #{}), + NewWorkers = Workers#{Kind => KindMap#{Ref => Worker#worker{target = undefined}}}, State#state{workers=NewWorkers}. -lookup_worker(Key, KeyIndex, #state{workers=Workers}) -> - lists:keyfind(Key, KeyIndex, Workers). +add_worker(Worker = #worker{kind = Kind, ref = Ref, target = Target}, + State = #state{workers = Workers, targets = Targets}) -> + KindMap = maps:get(Kind, Workers, #{}), + Workers1 = Workers#{Kind => KindMap#{Ref => Worker}}, + State#state{workers = Workers1, targets = Targets#{Target => {Kind, Ref}}}. + +remove_worker(#worker{ref = Ref, kind = Kind, target = Target}, + State = #state{workers = Workers, targets = Targets}) -> + KindMap = maps:get(Kind, Workers, #{}), + Workers1 = Workers#{Kind => maps:remove(Ref, KindMap)}, + State#state{workers = Workers1, targets = maps:remove(Target, Targets)}. + +lookup_worker(Ref, #state{workers=Workers}) -> + maps:fold( + fun(_, V, A) -> + case maps:get(Ref, V, not_found) of + not_found -> + A; + Worker -> + Worker + end + end, + not_found, + Workers). + +lookup_worker(Kind, Ref, #state{workers=Workers}) -> + KindMap = maps:get(Kind, Workers, #{}), + maps:get(Ref, KindMap, not_found). + +lookup_worker_by_target(Target, #state{workers=Workers, targets = Targets}) -> + case maps:get(Target, Targets, not_found) of + not_found -> + false; + {Kind, Ref} -> + KindMap = maps:get(Kind, Workers, #{}), + maps:get(Ref, KindMap, false) + end. +get_addrs(Workers) -> + lists:append(do_get_addrs(maps:get(inbound, Workers, #{})), + do_get_addrs(maps:get(peerbook, Workers, #{}))). + +do_get_addrs(Map) -> + maps:fold( + fun(_Ref, #worker{target = undefined}, Acc) -> + Acc; + (_Ref, #worker{target = Target}, Acc) -> + TargetAddr = Target, + [TargetAddr | Acc] + end, + [], + Map). + +get_workers(Workers) -> + lists:append(do_get_workers(maps:get(inbound, Workers, #{})), + do_get_workers(maps:get(peerbook, Workers, #{}))). + +do_get_workers(Map) -> + maps:fold( + fun(_Ref, #worker{target = undefined}, Acc) -> + Acc; + (_Ref, W, Acc) -> + [W | Acc] + end, + [], + Map). -spec count_workers(libp2p_group_gossip:connection_kind(), #state{}) -> non_neg_integer(). count_workers(Kind, #state{workers=Workers}) -> - FilteredWorkers = lists:filter(fun(#worker{kind=WorkerKind}) -> - WorkerKind == Kind - end, Workers), - length(FilteredWorkers). + KindMap = maps:get(Kind, Workers, #{}), + maps:size(KindMap). -spec start_inbound_worker(string(), pid(), string(), #state{}) -> #worker{}. -start_inbound_worker(Target, StreamPid, Path, #state{tid=TID, sup=Sup}) -> - WorkerSup = libp2p_group_gossip_sup:workers(Sup), +start_inbound_worker(Target, StreamPid, Path, #state{tid=TID, sidejob_sup=WorkerSup}) -> Ref = make_ref(), - {ok, WorkerPid} = supervisor:start_child( - WorkerSup, - #{ id => Ref, - start => {libp2p_group_worker, start_link, - [Ref, inbound, StreamPid, self(), - ?GROUP_ID, [], TID, Path]}, - restart => temporary - }), - #worker{kind=inbound, pid=WorkerPid, target=Target, ref=Ref}. - --spec stop_inbound_worker(pid(), #state{}) -> #state{}. -stop_inbound_worker(StreamPid, State) -> - case lookup_worker(StreamPid, #worker.pid, State) of - #worker{ref=Ref} -> - WorkerSup = libp2p_group_gossip_sup:workers(State#state.sup), - supervisor:terminate_child(WorkerSup, Ref), - State#state{workers=lists:keydelete(StreamPid, #worker.pid, State#state.workers)}; + case sidejob_supervisor:start_child( + WorkerSup, + libp2p_group_worker, start_link, + [Ref, inbound, StreamPid, self(), ?GROUP_ID, [], TID, Path]) of + {ok, WorkerPid} -> + {ok, #worker{kind=inbound, pid=WorkerPid, target=Target, ref=Ref}}; + {error, overload} -> + {error, overload} + end. + +-spec stop_inbound_worker(reference(), pid(), #state{}) -> #state{}. +stop_inbound_worker(StreamRef, Pid, State) -> + spawn(fun() -> gen_statem:stop(Pid) end), + case lookup_worker(inbound, StreamRef, State) of + Worker = #worker{pid = Pid} -> + remove_worker(Worker, State); + Worker = #worker{pid = OtherPid} -> + lager:info("pid mixup got ~p ref ~p", [Pid, OtherPid]), + spawn(fun() -> gen_statem:stop(OtherPid) end), + remove_worker(Worker, State); _ -> + lager:info("trying to stop worker with unknown ref ~p pid ~p", [StreamRef, Pid]), State end. --spec start_worker(atom(), #state{}) -> #worker{}. -start_worker(Kind, #state{tid=TID, sup=Sup}) -> - WorkerSup = libp2p_group_gossip_sup:workers(Sup), +-spec start_worker(atom(), #state{}) -> {ok, #worker{}} | {error, overload}. +start_worker(Kind, #state{tid=TID, sidejob_sup = WorkerSup}) -> Ref = make_ref(), DialOptions = [], - {ok, WorkerPid} = supervisor:start_child( + case sidejob_supervisor:start_child( WorkerSup, - #{ id => Ref, - start => {libp2p_group_worker, start_link, - [Ref, Kind, self(), ?GROUP_ID, DialOptions, TID]}, - restart => transient - }), - #worker{kind=Kind, pid=WorkerPid, target=undefined, ref=Ref}. + libp2p_group_worker, start_link, + [Ref, Kind, self(), ?GROUP_ID, DialOptions, TID]) of + {ok, WorkerPid} -> + {ok, #worker{kind=Kind, pid=WorkerPid, target=undefined, ref=Ref}}; + Other -> Other + end. -spec get_opt(libp2p_config:opts(), atom(), any()) -> any(). get_opt(Opts, Key, Default) -> @@ -509,3 +653,21 @@ update_metadata(State=#state{}) -> {group_id, ?GROUP_ID} ]), State. + +lookup_handler(Pid, Key) -> + %% XXX ASSUMPTION: gossip handlers are not removed or changed once added + case get(Key) of + undefined -> + Res = gen_server:call(Pid, {handle_data, Key}, infinity), + put(Key, {erlang:timestamp(), Res}), + Res; + {Time, Val} -> + %% cache for 10 minutes + case timer:now_diff(erlang:timestamp(), Time) > 600000000 of + true -> + erase(Key), + lookup_handler(Pid, Key); + false -> + Val + end + end. diff --git a/src/group/libp2p_group_worker.erl b/src/group/libp2p_group_worker.erl index 1dba0fae..4597f554 100644 --- a/src/group/libp2p_group_worker.erl +++ b/src/group/libp2p_group_worker.erl @@ -305,6 +305,10 @@ connected(info, {assign_stream, StreamPid, Path}, Data0=#data{}) -> {ok, NewData} -> {keep_state, NewData}; _ -> {keep_state, Data} end; +connected(info, {'EXIT', StreamPid, _Reason}, Data=#data{stream_pid=StreamPid, kind=inbound}) -> + %% don't try to reconnect inbound streams, it never seems to work + libp2p_group_server:request_target(Data#data.server, Data#data.kind, self(), Data#data.ref), + {next_state, closing, Data}; connected(info, {'EXIT', StreamPid, Reason}, Data=#data{stream_pid=StreamPid, target={MAddr, _}}) -> %% The stream we're using died. Let's go back to connecting, but %% do not trigger a connect retry right away, (re-)start the diff --git a/src/identify/libp2p_stream_identify.erl b/src/identify/libp2p_stream_identify.erl index 876f2e95..229e0455 100644 --- a/src/identify/libp2p_stream_identify.erl +++ b/src/identify/libp2p_stream_identify.erl @@ -17,13 +17,14 @@ -define(PATH, "identify/1.0.0"). -define(TIMEOUT, 5000). --spec dial_spawn(Session::pid(), ets:tab(), Handler::pid()) -> pid(). +-spec dial_spawn(Session::pid(), ets:tab(), Handler::pid()) -> {pid(), reference()}. dial_spawn(Session, TID, Handler) -> - spawn(fun() -> - Challenge = crypto:strong_rand_bytes(20), - Path = lists:flatten([?PATH, "/", base58:binary_to_base58(Challenge)]), - libp2p_session:dial_framed_stream(Path, Session, ?MODULE, [TID, Handler]) - end). + spawn_monitor( + fun() -> + Challenge = crypto:strong_rand_bytes(20), + Path = lists:flatten([?PATH, "/", base58:binary_to_base58(Challenge)]), + libp2p_session:dial_framed_stream(Path, Session, ?MODULE, [TID, Handler]) + end). client(Connection, Args=[_TID, _Handler]) -> libp2p_framed_stream:client(?MODULE, Connection, Args). diff --git a/src/libp2p.app.src b/src/libp2p.app.src index 7e9946d1..79a11284 100644 --- a/src/libp2p.app.src +++ b/src/libp2p.app.src @@ -34,7 +34,8 @@ relcast, libp2p_crypto, throttle, - erbloom + erbloom, + sidejob ]}, {env,[]}, diff --git a/src/libp2p_stream_stungun.erl b/src/libp2p_stream_stungun.erl index 0abc0f42..49e3249b 100644 --- a/src/libp2p_stream_stungun.erl +++ b/src/libp2p_stream_stungun.erl @@ -18,7 +18,8 @@ -record(client_state, { txn_id :: binary(), - handler :: pid() + handler :: pid(), + direction :: inbound | outbound }). %% diff --git a/src/libp2p_swarm.erl b/src/libp2p_swarm.erl index 83d1303e..730827e3 100644 --- a/src/libp2p_swarm.erl +++ b/src/libp2p_swarm.erl @@ -48,6 +48,9 @@ start(Name) when is_atom(Name) -> -spec start(atom(), swarm_opts()) -> {ok, pid()} | ignore | {error, term()}. start(Name, Opts) -> RegName = list_to_atom(atom_to_list(libp2p_swarm_sup) ++ "_" ++ atom_to_list(Name)), + SideJobRegName = list_to_atom(atom_to_list(libp2p_swarm_sidejob_sup) ++ "_" ++ atom_to_list(Name)), + {ok, _} = application:ensure_all_started(sidejob), + sidejob:new_resource(SideJobRegName, sidejob_supervisor, application:get_env(libp2p, sidejob_limit, 15000)), case supervisor:start_link({local,RegName}, libp2p_swarm_sup, [Name, Opts]) of {ok, Pid} -> unlink(Pid), @@ -178,7 +181,7 @@ store_peerbook(TID, Handle) -> %% @doc Get the peerbook pid for a swarm. -spec peerbook_pid(ets:tab() | pid()) -> pid(). peerbook_pid(Sup) when is_pid(Sup) -> - peerbook(tid(Sup)); + peerbook_pid(tid(Sup)); peerbook_pid(TID) -> libp2p_swarm_sup:peerbook(TID). diff --git a/src/libp2p_swarm_server.erl b/src/libp2p_swarm_server.erl index 980f445c..c3196260 100644 --- a/src/libp2p_swarm_server.erl +++ b/src/libp2p_swarm_server.erl @@ -7,7 +7,7 @@ sig_fun :: libp2p_crypto:sig_fun(), ecdh_fun :: libp2p_crypto:ecdh_fun(), pid_gc_monitor = make_ref() :: reference(), - monitors=[] :: [{pid(), {reference(), atom()}}] + monitors=#{} :: #{pid() => {reference(), atom()}} }). -export([start_link/3, init/1, handle_call/3, handle_info/2, handle_cast/2, terminate/2]). @@ -70,6 +70,7 @@ handle_info({handle_identify, Session, {ok, Identify}}, State=#state{tid=TID}) - %% %% Store the session in config and tell the peerbook about the %% session change as well as the new identify record. + spawn(fun() -> Addr = libp2p_crypto:pubkey_bin_to_p2p(libp2p_identify:pubkey_bin(Identify)), lager:debug("received identity for peer ~p. Putting this peer", [Addr]), libp2p_config:insert_session(TID, @@ -77,16 +78,22 @@ handle_info({handle_identify, Session, {ok, Identify}}, State=#state{tid=TID}) - Session), PeerBook = libp2p_swarm:peerbook(TID), libp2p_peerbook:register_session(PeerBook, Session, Identify), - libp2p_peerbook:put(PeerBook, [libp2p_identify:peer(Identify)]), + libp2p_peerbook:put(PeerBook, [libp2p_identify:peer(Identify)]) + end), + {noreply, State}; +handle_info({'DOWN', MonitorRef, process, _Pid, _Reason}, State=#state{pid_gc_monitor=MonitorRef}) -> + erlang:send_after(timer:minutes(5), self(), gc_pids), {noreply, State}; handle_info({'DOWN', MonitorRef, process, _Pid, _Reason}, State=#state{pid_gc_monitor=MonitorRef}) -> erlang:send_after(timer:minutes(5), self(), gc_pids), {noreply, State}; handle_info({'DOWN', MonitorRef, process, Pid, _}, State=#state{tid=TID}) -> NewState = remove_monitor(MonitorRef, Pid, State), + spawn(fun() -> libp2p_config:remove_pid(TID, Pid), PeerBook = libp2p_swarm:peerbook(TID), - libp2p_peerbook:unregister_session(PeerBook, Pid), + libp2p_peerbook:unregister_session(PeerBook, Pid) + end), {noreply, NewState}; handle_info({'EXIT', _From, Reason}, State=#state{}) -> {stop, Reason, State}; @@ -129,17 +136,17 @@ terminate(Reason, #state{tid=TID}) -> -spec add_monitor(atom(), pid(), #state{}) -> #state{}. add_monitor(Kind, Pid, State=#state{monitors=Monitors}) -> - Value = case lists:keyfind(Pid, 1, Monitors) of - false -> {erlang:monitor(process, Pid), Kind}; - {Pid, {MonitorRef, Kind}} -> {MonitorRef, Kind} + Value = case maps:find(Pid, Monitors) of + error -> {erlang:monitor(process, Pid), Kind}; + {ok, {MonitorRef, Kind}} -> {MonitorRef, Kind} end, - State#state{monitors=lists:keystore(Pid, 1, Monitors, {Pid, Value})}. + State#state{monitors=Monitors#{Pid => Value}}. -spec remove_monitor(reference(), pid(), #state{}) -> #state{}. remove_monitor(MonitorRef, Pid, State=#state{tid=TID, monitors=Monitors}) -> - case lists:keytake(Pid, 1, Monitors) of - false -> State; - {value, {Pid, {MonitorRef, _}}, NewMonitors} -> + case maps:find(Pid, Monitors) of + error -> State; + {ok, {MonitorRef, _}} -> libp2p_config:remove_pid(TID, Pid), - State#state{monitors=NewMonitors} + State#state{monitors=maps:remove(Pid, Monitors)} end. diff --git a/src/libp2p_transport_tcp.erl b/src/libp2p_transport_tcp.erl index 1b12407b..618a3333 100644 --- a/src/libp2p_transport_tcp.erl +++ b/src/libp2p_transport_tcp.erl @@ -685,7 +685,7 @@ terminate(_Reason, #state{}) -> listen_options(IP, TID) -> OptionDefaults = [ {ip, IP}, - {backlog, 1024}, + {backlog, application:get_env(libp2p, listen_backlog, 1024)}, {nodelay, true}, {send_timeout, 30000}, {send_timeout_close, true} @@ -1007,11 +1007,10 @@ record_observed_addr(PeerAddr, ObservedAddr, State=#state{tid=TID, observed_addr true -> lager:info("Saw 3 distinct observed addresses, assuming symmetric NAT"), libp2p_peerbook:update_nat_type(libp2p_swarm:peerbook(TID), symmetric), - Ref = monitor_relay_server(State), libp2p_relay:init(libp2p_swarm:swarm(TID)), %% also check if we have a port forward from the same external port to our internal port %% as this is a common configuration - attempt_port_forward_discovery(ObservedAddr, PeerAddr, State#state{observed_addrs=ObservedAddresses, relay_monitor=Ref, nat_type=symmetric}); + attempt_port_forward_discovery(ObservedAddr, PeerAddr, State#state{observed_addrs=ObservedAddresses}); false -> State#state{observed_addrs=ObservedAddresses} end @@ -1030,21 +1029,15 @@ attempt_port_forward_discovery(ObservedAddr, PeerAddr, State=#state{tid=TID, stu {PeerPath, TxnID} = libp2p_stream_stungun:mk_stun_txn(list_to_integer(PortStr)), [{"ip4", IP}, {"tcp", _}] = multiaddr:protocols(ObservedAddr), ObservedAddr1 = "/ip4/"++IP++"/tcp/"++PortStr, - case lists:member(ObservedAddr1, State#state.resolved_addresses) of - true -> - %% we don't need to try this again - StateAcc; - false -> - case libp2p_stream_stungun:dial(TID, PeerAddr, PeerPath, TxnID, self()) of - {ok, StunPid} -> - lager:info("dialed stungun peer ~p looking for validation of ~p", [PeerAddr, ObservedAddr1]), - %% TODO: Remove this once dial stops using start_link - unlink(StunPid), - erlang:send_after(60000, self(), {stungun_timeout, TxnID}), - StateAcc#state{stun_txns=add_stun_txn(TxnID, ObservedAddr1, StunTxns)}; - _ -> - StateAcc - end + case libp2p_stream_stungun:dial(TID, PeerAddr, PeerPath, TxnID, self()) of + {ok, StunPid} -> + lager:debug("dialed stungun peer ~p looking for validation of ~p", [PeerAddr, ObservedAddr1]), + %% TODO: Remove this once dial stops using start_link + unlink(StunPid), + erlang:send_after(60000, self(), {stungun_timeout, TxnID}), + StateAcc#state{stun_txns=add_stun_txn(TxnID, ObservedAddr1, StunTxns)}; + _ -> + StateAcc end; _ -> StateAcc diff --git a/src/libp2p_yamux_session.erl b/src/libp2p_yamux_session.erl index fdac5153..fd7d5cb6 100644 --- a/src/libp2p_yamux_session.erl +++ b/src/libp2p_yamux_session.erl @@ -25,6 +25,7 @@ -record(ident, { identify=undefined :: libp2p_identify:identify() | undefined, pid=undefined :: pid() | undefined, + ref :: reference() | undefined, waiters=[] :: [term()] }). @@ -209,13 +210,14 @@ handle_info({identify, Handler, HandlerData}, State=#state{ident=#ident{identify Handler ! {handle_identify, HandlerData, {ok, I}}, {noreply, State}; handle_info({identify, Handler, HandlerData}, State=#state{ident=Ident=#ident{pid=undefined}, tid=TID}) -> - Pid = libp2p_stream_identify:dial_spawn(self(), TID, self()), - NewIdent=Ident#ident{waiters=[{Handler, HandlerData} | Ident#ident.waiters], pid=Pid}, + {Pid, Ref} = libp2p_stream_identify:dial_spawn(self(), TID, self()), + NewIdent=Ident#ident{waiters=[{Handler, HandlerData} | Ident#ident.waiters], pid=Pid, ref=Ref}, {noreply, State#state{ident=NewIdent}}; handle_info({identify, Handler, HandlerData}, State=#state{ident=Ident=#ident{}}) -> NewIdent=Ident#ident{waiters=[{Handler, HandlerData} | Ident#ident.waiters]}, {noreply, State#state{ident=NewIdent}}; -handle_info({handle_identify, _Session, Response}, State=#state{ident=Ident=#ident{}}) -> +handle_info({handle_identify, _Session, Response}, State=#state{ident=Ident=#ident{ref = Ref}}) -> + erlang:demonitor(Ref, [flush]), lists:foreach(fun({Handler, HandlerData}) -> Handler ! {handle_identify, HandlerData, Response} end, Ident#ident.waiters), @@ -224,6 +226,21 @@ handle_info({handle_identify, _Session, Response}, State=#state{ident=Ident=#ide {error, _} -> undefined end, {noreply, State#state{ident=Ident#ident{pid=undefined, waiters=[], identify=NewIdentify}}}; +handle_info({'DOWN', _Ref, process, _Pid, normal}, State) -> + %% down beat the reply somehow, ignore and it'll get cleaned up elsewhere + {noreply, State}; +handle_info({'DOWN', Ref, process, Pid, Reason}, State=#state{ident=Ident=#ident{ref = IRef}}) -> + case IRef == Ref of + true -> + lager:warning("crash of known dial ~p ~p ~p", [Ref, Pid, Reason]), + lists:foreach(fun({Handler, HandlerData}) -> + Handler ! {handle_identify, HandlerData, {error, Reason}} + end, Ident#ident.waiters), + {noreply, State#state{ident=Ident#ident{pid=undefined, waiters=[], identify=undefined}}}; + false -> + lager:warning("crash of unknown ref ~p ~p ~p", [Ref, Pid, Reason]), + {noreply, State} + end; handle_info(Msg, State) -> lager:warning("Unhandled message: ~p", [Msg]), diff --git a/src/peerbook/libp2p_peerbook.erl b/src/peerbook/libp2p_peerbook.erl index 5baa78e1..a7200caa 100644 --- a/src/peerbook/libp2p_peerbook.erl +++ b/src/peerbook/libp2p_peerbook.erl @@ -358,7 +358,7 @@ lookup_association(Handle=#peerbook{}, AssocType, AssocAddress) -> %% -spec handle_gossip_data(pid(), binary(), peerbook()) -> noreply. -handle_gossip_data(_StreamPid, DecodedList, Handle) -> +handle_gossip_data(_StreamPid, {"gossip/1.0."++_, DecodedList}, Handle) -> %% DecodedList = libp2p_peer:decode_list(Data), ?MODULE:put(Handle, DecodedList, true), noreply. @@ -693,7 +693,10 @@ notify_peers(State=#state{notify_peers=NotifyPeers, notify_group=NotifyGroup, Opts = libp2p_swarm:opts(TID), PeerCount = libp2p_config:get_opt(Opts, [?MODULE, notify_peer_gossip_limit], ?DEFAULT_NOTIFY_PEER_GOSSIP_LIMIT), %% Gossip to any attached parties - SendFun = fun() -> + SendFun = fun(seed) -> + %% send everything to the seed nodes + libp2p_peer:encode_list(PeerList); + (_Type) -> {_, RandomNPeers} = lists:unzip(lists:sublist(lists:keysort(1, [ {rand:uniform(), E} || E <- PeerList]), PeerCount)), libp2p_peer:encode_list(RandomNPeers) end, diff --git a/src/relay/libp2p_relay_server.erl b/src/relay/libp2p_relay_server.erl index a763560f..f18a301f 100644 --- a/src/relay/libp2p_relay_server.erl +++ b/src/relay/libp2p_relay_server.erl @@ -250,6 +250,7 @@ init_relay(#state{tid = TID, banlist = Banlist}) -> {error, retry} end. +-spec banlist(undefined | string(), #state{}) -> #state{}. banlist(undefined, State) -> %% relay failed before we found out the address State; diff --git a/test/group_gossip_SUITE.erl b/test/group_gossip_SUITE.erl index 825a2f0c..06070fcd 100644 --- a/test/group_gossip_SUITE.erl +++ b/test/group_gossip_SUITE.erl @@ -378,7 +378,8 @@ seed_test(Config) -> init_gossip_data(_) -> ok. -handle_gossip_data(_StreamPid, Msg, Parent) -> +handle_gossip_data(_StreamPid, {"gossip/1.0"++_, Msg}, Parent) -> + ct:pal("handle gossip data ~p ~p ~p", [_StreamPid, Msg, Parent]), Parent ! {handle_gossip_data, Msg}, noreply. diff --git a/test/group_relcast_SUITE.erl b/test/group_relcast_SUITE.erl index c0558f63..767d5a9a 100644 --- a/test/group_relcast_SUITE.erl +++ b/test/group_relcast_SUITE.erl @@ -54,6 +54,7 @@ unicast_test(Config) -> test_util:connect_swarms(S1, S2), test_util:connect_swarms(S1, S3), + test_util:connect_swarms(S2, S3), test_util:await_gossip_groups(Swarms), test_util:await_gossip_streams(Swarms), @@ -111,6 +112,7 @@ multicast_test(Config) -> test_util:connect_swarms(S1, S2), test_util:connect_swarms(S1, S3), + test_util:connect_swarms(S2, S3), test_util:await_gossip_groups(Swarms), test_util:await_gossip_streams(Swarms),