diff --git a/.credo.exs b/.credo.exs index 0760bd1..93b8e76 100644 --- a/.credo.exs +++ b/.credo.exs @@ -122,14 +122,14 @@ # {Credo.Check.Refactor.Apply, []}, {Credo.Check.Refactor.CondStatements, []}, - {Credo.Check.Refactor.CyclomaticComplexity, []}, + {Credo.Check.Refactor.CyclomaticComplexity, [max_complexity: 10]}, {Credo.Check.Refactor.FunctionArity, []}, {Credo.Check.Refactor.LongQuoteBlocks, []}, {Credo.Check.Refactor.MatchInCondition, []}, {Credo.Check.Refactor.MapJoin, []}, {Credo.Check.Refactor.NegatedConditionsInUnless, []}, {Credo.Check.Refactor.NegatedConditionsWithElse, []}, - {Credo.Check.Refactor.Nesting, []}, + {Credo.Check.Refactor.Nesting, [max_nesting: 3]}, {Credo.Check.Refactor.UnlessWithElse, []}, {Credo.Check.Refactor.WithClauses, []}, {Credo.Check.Refactor.FilterFilter, []}, @@ -199,7 +199,7 @@ {Credo.Check.Warning.MapGetUnsafePass, []}, {Credo.Check.Warning.MixEnv, []}, {Credo.Check.Warning.UnsafeToAtom, []}, - + # {Credo.Check.Refactor.MapInto, []}, diff --git a/example/peer.exs b/example/peer.exs index 421277c..ca80bc6 100644 --- a/example/peer.exs +++ b/example/peer.exs @@ -100,7 +100,7 @@ defmodule Peer do {_, _, _, _} -> true {_, _, _, _, _, _, _, _} -> false end, - stun_servers: ["stun:stun.l.google.com:19302"] + ice_servers: [%ICEServer{url: "stun:stun.l.google.com:19302"}] ) {:ok, ufrag, passwd} = ICEAgent.get_local_credentials(pid) diff --git a/lib/ex_ice/candidate_pair.ex b/lib/ex_ice/candidate_pair.ex index 883fe90..8c453ea 100644 --- a/lib/ex_ice/candidate_pair.ex +++ b/lib/ex_ice/candidate_pair.ex @@ -3,21 +3,19 @@ defmodule ExICE.CandidatePair do ICE candidate pair representation. """ - alias ExICE.Candidate - @type state() :: :waiting | :in_progress | :succeeded | :failed | :frozen @type t() :: %__MODULE__{ id: integer(), - local_cand: Candidate.t(), + local_cand_id: integer(), nominated?: boolean(), priority: non_neg_integer(), - remote_cand: Candidate.t(), + remote_cand_id: integer(), state: state(), valid?: boolean() } - @enforce_keys [:id, :local_cand, :remote_cand, :priority] + @enforce_keys [:id, :local_cand_id, :remote_cand_id, :priority] defstruct @enforce_keys ++ [ nominated?: false, diff --git a/lib/ex_ice/ice_agent.ex b/lib/ex_ice/ice_agent.ex index b5ac91d..f545d07 100644 --- a/lib/ex_ice/ice_agent.ex +++ b/lib/ex_ice/ice_agent.ex @@ -48,7 +48,10 @@ defmodule ExICE.ICEAgent do This behavior can be overwritten using the following options. * `ip_filter` - filter applied when gathering local candidates - * `stun_servers` - list of STUN servers + * `ice_servers` - list of STUN/TURN servers + * `ice_transport_policy` - candidate types to be used. + * `all` - all ICE candidates will be considered (default). + * `relay` - only relay candidates will be considered. * `on_gathering_state_change` - where to send gathering state change notifications. Defaults to a process that spawns `ExICE`. * `on_connection_state_change` - where to send connection state change notifications. Defaults to a process that spawns `ExICE`. * `on_data` - where to send data. Defaults to a process that spawns `ExICE`. @@ -59,7 +62,14 @@ defmodule ExICE.ICEAgent do """ @type opts() :: [ ip_filter: (:inet.ip_address() -> boolean), - stun_servers: [String.t()], + ice_servers: [ + %{ + :url => String.t(), + optional(:username) => String.t(), + optional(:credential) => String.t() + } + ], + ice_transport_policy: :all | :relay, on_gathering_state_change: pid() | nil, on_connection_state_change: pid() | nil, on_data: pid() | nil, @@ -347,6 +357,12 @@ defmodule ExICE.ICEAgent do {:noreply, %{state | ice_agent: ice_agent}} end + @impl true + def handle_info({:ex_turn, ref, msg}, state) do + ice_agent = ExICE.Priv.ICEAgent.handle_ex_turn_msg(state.ice_agent, ref, msg) + {:noreply, %{state | ice_agent: ice_agent}} + end + @impl true def handle_info(msg, state) do Logger.warning("Got unexpected msg: #{inspect(msg)}") diff --git a/lib/ex_ice/priv/candidate.ex b/lib/ex_ice/priv/candidate.ex index 995093d..6d15458 100644 --- a/lib/ex_ice/priv/candidate.ex +++ b/lib/ex_ice/priv/candidate.ex @@ -27,9 +27,6 @@ defmodule ExICE.Priv.Candidate do @callback send_data(t(), :inet.ip_address(), :inet.port_number(), binary()) :: {:ok, t()} | {:error, term(), t()} - @callback receive_data(t(), :inet.ip_address(), :inet.port_number(), binary()) :: - {:ok, t()} | {:ok, binary(), t()} | {:error, term(), t()} - @spec priority(type()) :: integer() def priority(type) do type_preference = diff --git a/lib/ex_ice/priv/candidate/host.ex b/lib/ex_ice/priv/candidate/host.ex index 5966dd0..606af10 100644 --- a/lib/ex_ice/priv/candidate/host.ex +++ b/lib/ex_ice/priv/candidate/host.ex @@ -30,9 +30,4 @@ defmodule ExICE.Priv.Candidate.Host do {:error, reason} -> {:error, reason, cand} end end - - @impl true - def receive_data(cand, _src_ip, _src_port, data) do - {:ok, data, cand} - end end diff --git a/lib/ex_ice/priv/candidate/prflx.ex b/lib/ex_ice/priv/candidate/prflx.ex index 385a010..34db343 100644 --- a/lib/ex_ice/priv/candidate/prflx.ex +++ b/lib/ex_ice/priv/candidate/prflx.ex @@ -30,9 +30,4 @@ defmodule ExICE.Priv.Candidate.Prflx do {:error, reason} -> {:error, reason, cand} end end - - @impl true - def receive_data(cand, _src_ip, _src_port, data) do - {:ok, data, cand} - end end diff --git a/lib/ex_ice/priv/candidate/relay.ex b/lib/ex_ice/priv/candidate/relay.ex index a35b5b9..e0fe6d7 100644 --- a/lib/ex_ice/priv/candidate/relay.ex +++ b/lib/ex_ice/priv/candidate/relay.ex @@ -6,12 +6,12 @@ defmodule ExICE.Priv.Candidate.Relay do @type t() :: %__MODULE__{base: CandidateBase.t()} - @enforce_keys [:base] - defstruct @enforce_keys + @enforce_keys [:base, :client] + defstruct @enforce_keys ++ [buffered_packets: []] @impl true def new(config) do - %__MODULE__{base: CandidateBase.new(:relay, config)} + %__MODULE__{base: CandidateBase.new(:relay, config), client: Keyword.fetch!(config, :client)} end @impl true @@ -25,14 +25,74 @@ defmodule ExICE.Priv.Candidate.Relay do @impl true def send_data(cand, dst_ip, dst_port, data) do - case cand.base.transport_module.send(cand.base.socket, {dst_ip, dst_port}, data) do - :ok -> {:ok, cand} - {:error, reason} -> {:error, reason, cand} + if MapSet.member?(cand.client.permissions, dst_ip) do + {:send, turn_addr, data, client} = ExTURN.Client.send(cand.client, {dst_ip, dst_port}, data) + cand = %{cand | client: client} + do_send(cand, turn_addr, data) + else + {:send, turn_addr, turn_data, client} = ExTURN.Client.create_permission(cand.client, dst_ip) + + cand = %{ + cand + | client: client, + buffered_packets: [{dst_ip, dst_port, data} | cand.buffered_packets] + } + + do_send(cand, turn_addr, turn_data) end end - @impl true - def receive_data(cand, _src_ip, _src_port, data) do - {:ok, data, cand} + @spec receive_data(t(), :inet.ip_address(), :inet.port_number(), binary()) :: + {:ok, t()} + | {:ok, :inet.ip_address(), :inet.port_number(), t()} + | {:error, term(), t()} + def receive_data(cand, src_ip, src_port, data) do + case ExTURN.Client.handle_message(cand.client, {:socket_data, src_ip, src_port, data}) do + {:permission_created, permission_ip, client} -> + cand = %{cand | client: client} + send_buffered_packets(cand, permission_ip) + + {:channel_created, _addr, client} -> + cand = %{cand | client: client} + {:ok, cand} + + {:data, {src_ip, src_port}, data, client} -> + cand = %{cand | client: client} + {:ok, src_ip, src_port, data, cand} + + {:error, reason, client} -> + cand = %{cand | client: client} + {:error, reason, cand} + end + end + + defp send_buffered_packets(cand, permission_ip) do + {packets_to_send, rest} = + Enum.split_with(cand.buffered_packets, fn {dst_ip, _dst_port, _data} -> + dst_ip == permission_ip + end) + + cand = %{cand | buffered_packets: rest} + do_send_buffered_packets(cand, Enum.reverse(packets_to_send)) + end + + defp do_send_buffered_packets(cand, []), do: {:ok, cand} + + defp do_send_buffered_packets(cand, [{dst_ip, dst_port, packet} | packets]) do + {:send, turn_addr, data, client} = ExTURN.Client.send(cand.client, {dst_ip, dst_port}, packet) + + cand = %{cand | client: client} + + case do_send(cand, turn_addr, data) do + {:ok, cand} -> do_send_buffered_packets(cand, packets) + {:error, _reason, _cand} = error -> error + end + end + + defp do_send(cand, dst_addr, data) do + case cand.base.transport_module.send(cand.base.socket, dst_addr, data) do + :ok -> {:ok, cand} + {:error, reason} -> {:error, reason, cand} + end end end diff --git a/lib/ex_ice/priv/candidate/srflx.ex b/lib/ex_ice/priv/candidate/srflx.ex index a51e845..f32f402 100644 --- a/lib/ex_ice/priv/candidate/srflx.ex +++ b/lib/ex_ice/priv/candidate/srflx.ex @@ -30,9 +30,4 @@ defmodule ExICE.Priv.Candidate.Srflx do {:error, reason} -> {:error, reason, cand} end end - - @impl true - def receive_data(cand, _src_ip, _src_port, data) do - {:ok, data, cand} - end end diff --git a/lib/ex_ice/priv/candidate_pair.ex b/lib/ex_ice/priv/candidate_pair.ex index 34a66b3..65c04b8 100644 --- a/lib/ex_ice/priv/candidate_pair.ex +++ b/lib/ex_ice/priv/candidate_pair.ex @@ -11,11 +11,11 @@ defmodule ExICE.Priv.CandidatePair do @type t() :: %__MODULE__{ id: integer(), - local_cand: Candidate.t(), + local_cand_id: integer(), nominate?: boolean(), nominated?: boolean(), priority: non_neg_integer(), - remote_cand: Candidate.t(), + remote_cand_id: integer(), state: state(), valid?: boolean, succeeded_pair_id: integer() | nil, @@ -23,7 +23,7 @@ defmodule ExICE.Priv.CandidatePair do keepalive_timer: reference() | nil } - @enforce_keys [:id, :local_cand, :remote_cand, :priority] + @enforce_keys [:id, :local_cand_id, :remote_cand_id, :priority] defstruct @enforce_keys ++ [ nominate?: false, @@ -39,12 +39,12 @@ defmodule ExICE.Priv.CandidatePair do @spec new(Candidate.t(), Candidate.t(), ExICE.ICEAgent.role(), state(), valid?: boolean()) :: t() def new(local_cand, remote_cand, agent_role, state, opts \\ []) do - priority = priority(agent_role, local_cand, remote_cand) + priority = priority(agent_role, local_cand.base.priority, remote_cand.priority) %__MODULE__{ id: Utils.id(), - local_cand: local_cand, - remote_cand: remote_cand, + local_cand_id: local_cand.base.id, + remote_cand_id: remote_cand.id, priority: priority, state: state, valid?: opts[:valid?] || false @@ -66,19 +66,19 @@ defmodule ExICE.Priv.CandidatePair do end @doc false - @spec recompute_priority(t(), ExICE.ICEAgent.role()) :: t() - def recompute_priority(pair, role) do - %__MODULE__{pair | priority: priority(role, pair.local_cand, pair.remote_cand)} + @spec recompute_priority(t(), integer(), integer(), ExICE.ICEAgent.role()) :: t() + def recompute_priority(pair, local_cand_prio, remote_cand_prio, role) do + %__MODULE__{pair | priority: priority(role, local_cand_prio, remote_cand_prio)} end @doc false - @spec priority(ExICE.ICEAgent.role(), Candidate.t(), ExICE.Candidate.t()) :: non_neg_integer() - def priority(:controlling, local_cand, remote_cand) do - do_priority(local_cand.base.priority, remote_cand.priority) + @spec priority(ExICE.ICEAgent.role(), integer(), integer()) :: non_neg_integer() + def priority(:controlling, local_cand_prio, remote_cand_prio) do + do_priority(local_cand_prio, remote_cand_prio) end - def priority(:controlled, local_cand, remote_cand) do - do_priority(remote_cand.priority, local_cand.base.priority) + def priority(:controlled, local_cand_prio, remote_cand_prio) do + do_priority(remote_cand_prio, local_cand_prio) end defp do_priority(g, d) do @@ -89,15 +89,12 @@ defmodule ExICE.Priv.CandidatePair do @doc false @spec to_candidate_pair(t()) :: ExICE.CandidatePair.t() def to_candidate_pair(pair) do - %cand_mod{} = cand = pair.local_cand - local_cand = cand_mod.to_candidate(cand) - %ExICE.CandidatePair{ id: pair.id, - local_cand: local_cand, + local_cand_id: pair.local_cand_id, nominated?: pair.nominated?, priority: pair.priority, - remote_cand: pair.remote_cand, + remote_cand_id: pair.remote_cand_id, state: pair.state, valid?: pair.valid? } diff --git a/lib/ex_ice/priv/checklist.ex b/lib/ex_ice/priv/checklist.ex index 896be5c..fd18aa5 100644 --- a/lib/ex_ice/priv/checklist.ex +++ b/lib/ex_ice/priv/checklist.ex @@ -31,20 +31,14 @@ defmodule ExICE.Priv.Checklist do @spec find_pair(t(), CandidatePair.t()) :: CandidatePair.t() | nil def find_pair(checklist, pair) do - find_pair(checklist, pair.local_cand, pair.remote_cand) + find_pair(checklist, pair.local_cand_id, pair.remote_cand_id) end - @spec find_pair(t(), Candidate.t(), Candidate.t()) :: CandidatePair.t() | nil - def find_pair(checklist, local_cand, remote_cand) do - # TODO which pairs are actually the same? + @spec find_pair(t(), integer(), integer()) :: CandidatePair.t() | nil + def find_pair(checklist, local_cand_id, remote_cand_id) do checklist |> Enum.find({nil, nil}, fn {_id, p} -> - p.local_cand.base.base_address == local_cand.base.base_address and - p.local_cand.base.base_port == local_cand.base.base_port and - p.local_cand.base.address == local_cand.base.address and - p.local_cand.base.port == local_cand.base.port and - p.remote_cand.address == remote_cand.address and - p.remote_cand.port == remote_cand.port + p.local_cand_id == local_cand_id and p.remote_cand_id == remote_cand_id end) |> elem(1) end @@ -64,13 +58,6 @@ defmodule ExICE.Priv.Checklist do not (waiting?(checklist) or in_progress?(checklist)) end - @spec get_foundations(t()) :: [{integer(), integer()}] - def get_foundations(checklist) do - for {_id, pair} <- checklist do - {pair.local_cand.base.foundation, pair.remote_cand.foundation} - end - end - @spec prune(t()) :: t() def prune(checklist) do # This is done according to RFC 8838 sec. 10 @@ -80,9 +67,12 @@ defmodule ExICE.Priv.Checklist do waiting = waiting |> Enum.sort_by(fn {_id, p} -> p.priority end, :desc) - |> Enum.uniq_by(fn {_id, p} -> - {p.local_cand.base.base_address, p.local_cand.base.base_port, p.remote_cand} - end) + # RFC 8445, sec. 6.1.2.4. states that two candidate pairs + # are redundant if their local candidates have the same base + # and their remote candidates are identical. + # But, because we replace reflexive candidates with their bases, + # checking againts local_cand_id should work fine. + |> Enum.uniq_by(fn {_id, p} -> {p.local_cand_id, p.remote_cand_id} end) Map.new(waiting ++ in_flight_or_done) end @@ -90,7 +80,7 @@ defmodule ExICE.Priv.Checklist do @spec prune(t(), Candidate.t()) :: t() def prune(checklist, local_cand) do checklist - |> Enum.reject(fn {_pair_id, pair} -> pair.local_cand.base.id == local_cand.base.id end) + |> Enum.reject(fn {_pair_id, pair} -> pair.local_cand_id == local_cand.base.id end) |> Map.new() end @@ -104,11 +94,4 @@ defmodule ExICE.Priv.Checklist do end end end - - @spec recompute_pair_prios(t(), ExICE.ICEAgent.role()) :: t() - def recompute_pair_prios(checklist, role) do - Map.new(checklist, fn {pair_id, pair} -> - {pair_id, CandidatePair.recompute_priority(pair, role)} - end) - end end diff --git a/lib/ex_ice/priv/gatherer.ex b/lib/ex_ice/priv/gatherer.ex index 27f1a96..9339211 100644 --- a/lib/ex_ice/priv/gatherer.ex +++ b/lib/ex_ice/priv/gatherer.ex @@ -1,7 +1,7 @@ defmodule ExICE.Priv.Gatherer do @moduledoc false - alias ExICE.Priv.{Candidate, Utils} + alias ExICE.Priv.{Candidate, Transport, Utils} alias ExSTUN.Message alias ExSTUN.Message.Type @@ -27,8 +27,8 @@ defmodule ExICE.Priv.Gatherer do } end - @spec gather_host_candidates(t()) :: {:ok, [Candidate.t()]} | {:error, term()} - def gather_host_candidates(gatherer) do + @spec open_sockets(t()) :: {:ok, [term()]} + def open_sockets(gatherer) do with {:ok, ints} <- gatherer.if_discovery_module.getifaddrs() do ips = ints @@ -39,15 +39,44 @@ defmodule ExICE.Priv.Gatherer do |> Enum.to_list() ips - |> Enum.map(&create_new_host_candidate(gatherer, &1)) + |> Enum.map(&open_socket(gatherer, &1)) |> Enum.reject(&(&1 == nil)) |> then(&{:ok, &1}) end end - @spec gather_srflx_candidate(t(), integer(), Candidate.t(), ExSTUN.URI.t()) :: + defp open_socket(gatherer, ip) do + inet = + case ip do + {_, _, _, _} -> :inet + {_, _, _, _, _, _, _, _} -> :inet6 + end + + case gatherer.transport_module.open(0, [ + {:inet_backend, :socket}, + {:ip, ip}, + {:active, true}, + :binary, + inet + ]) do + {:ok, socket} -> + socket + + {:error, reason} -> + Logger.debug("Couldn't open socket for ip: #{inspect(ip)}. Reason: #{inspect(reason)}.") + + nil + end + end + + @spec gather_host_candidates(t(), [Transport.socket()]) :: [Candidate.t()] + def gather_host_candidates(gatherer, sockets) do + Enum.map(sockets, &create_new_host_candidate(gatherer, &1)) + end + + @spec gather_srflx_candidate(t(), integer(), Transport.socket(), ExSTUN.URI.t()) :: :ok | {:error, term()} - def gather_srflx_candidate(gatherer, t_id, host_candidate, stun_server) do + def gather_srflx_candidate(gatherer, t_id, socket, stun_server) do binding_request = Message.new(t_id, %Type{class: :request, method: :binding}, []) |> Message.encode() @@ -62,17 +91,19 @@ defmodule ExICE.Priv.Gatherer do ip = List.first(ips) port = stun_server.port - cand_family = Utils.family(host_candidate.base.base_address) + {:ok, {sock_ip, _sock_port}} = gatherer.transport_module.sockname(socket) + + cand_family = Utils.family(sock_ip) stun_family = Utils.family(ip) if cand_family == stun_family do - gatherer.transport_module.send(host_candidate.base.socket, {ip, port}, binding_request) + gatherer.transport_module.send(socket, {ip, port}, binding_request) else Logger.debug(""" Not gathering srflx candidate becasue of incompatible ip address families. - Candidate family: #{inspect(cand_family)} + Socket family: #{inspect(cand_family)} STUN server family: #{inspect(stun_family)} - Candidate: #{inspect(host_candidate)} + Socket: #{inspect(sock_ip)} STUN server: #{inspect(stun_server)} """) end @@ -107,42 +138,21 @@ defmodule ExICE.Priv.Gatherer do Keyword.get_values(int, :addr) end - defp create_new_host_candidate(gatherer, ip) do - inet = - case ip do - {_, _, _, _} -> :inet - {_, _, _, _, _, _, _, _} -> :inet6 - end + defp create_new_host_candidate(gatherer, socket) do + {:ok, {sock_ip, sock_port}} = gatherer.transport_module.sockname(socket) - with {:ok, socket} <- - gatherer.transport_module.open(0, [ - {:inet_backend, :socket}, - {:ip, ip}, - {:active, true}, - :binary, - inet - ]), - {:ok, {_ip, port}} <- gatherer.transport_module.sockname(socket) do - c = - Candidate.Host.new( - address: ip, - port: port, - base_address: ip, - base_port: port, - transport_module: gatherer.transport_module, - socket: socket - ) - - Logger.debug("New candidate: #{inspect(c)}") - - c - else - {:error, reason} -> - Logger.debug( - "Couldn't create candidate for ip: #{inspect(ip)}. Reason: #{inspect(reason)}." - ) + cand = + Candidate.Host.new( + address: sock_ip, + port: sock_port, + base_address: sock_ip, + base_port: sock_port, + transport_module: gatherer.transport_module, + socket: socket + ) - nil - end + Logger.debug("New candidate: #{inspect(cand)}") + + cand end end diff --git a/lib/ex_ice/priv/ice_agent.ex b/lib/ex_ice/priv/ice_agent.ex index 56b2327..3ed3974 100644 --- a/lib/ex_ice/priv/ice_agent.ex +++ b/lib/ex_ice/priv/ice_agent.ex @@ -3,8 +3,17 @@ defmodule ExICE.Priv.ICEAgent do require Logger - alias ExICE.Priv.IfDiscovery - alias ExICE.Priv.{Candidate, CandidatePair, Checklist, ConnCheckHandler, Gatherer, Transport} + alias ExICE.Priv.{ + Candidate, + CandidatePair, + Checklist, + ConnCheckHandler, + Gatherer, + IfDiscovery, + Transport, + Utils + } + alias ExICE.Priv.Attribute.{ICEControlling, ICEControlled, Priority, UseCandidate} alias ExSTUN.Message @@ -24,12 +33,7 @@ defmodule ExICE.Priv.ICEAgent do } defguardp are_pairs_equal(p1, p2) - when p1.local_cand.base.base_address == p2.local_cand.base.base_address and - p1.local_cand.base.base_port == p2.local_cand.base.base_port and - p1.local_cand.base.address == p2.local_cand.base.address and - p1.local_cand.base.port == p2.local_cand.base.port and - p1.remote_cand.address == p2.remote_cand.address and - p1.remote_cand.port == p2.remote_cand.port + when p1.local_cand_id == p2.local_cand_id and p1.remote_cand_id == p2.remote_cand_id defguardp is_response(class) when class in [:success_response, :error_response] @@ -44,6 +48,7 @@ defmodule ExICE.Priv.ICEAgent do :if_discovery_module, :transport_module, :gatherer, + :ice_transport_policy, :ta_timer, :role, :tiebreaker, @@ -60,10 +65,12 @@ defmodule ExICE.Priv.ICEAgent do eoc: false, # {did we nominate pair, pair id} nominating?: {false, nil}, + sockets: MapSet.new(), local_cands: %{}, - remote_cands: [], + remote_cands: %{}, stun_servers: [], turn_servers: [], + resolved_turn_servers: [], # stats bytes_sent: 0, bytes_received: 0, @@ -73,7 +80,7 @@ defmodule ExICE.Priv.ICEAgent do @spec new(Keyword.t()) :: t() def new(opts) do - stun_servers = parse_stun_servers(opts[:stun_servers] || []) + {stun_servers, turn_servers} = parse_ice_servers(opts[:ice_servers] || []) {local_ufrag, local_pwd} = generate_credentials() @@ -92,11 +99,13 @@ defmodule ExICE.Priv.ICEAgent do if_discovery_module: if_discovery_module, transport_module: transport_module, gatherer: Gatherer.new(if_discovery_module, transport_module, ip_filter), + ice_transport_policy: opts[:ice_transport_policy] || :all, role: Keyword.fetch!(opts, :role), tiebreaker: generate_tiebreaker(), local_ufrag: local_ufrag, local_pwd: local_pwd, - stun_servers: stun_servers + stun_servers: stun_servers, + turn_servers: turn_servers } end @@ -127,14 +136,14 @@ defmodule ExICE.Priv.ICEAgent do @spec get_local_candidates(t()) :: [binary()] def get_local_candidates(ice_agent) do - Enum.map(ice_agent.local_cands, fn {_, %cand_mod{} = cand} -> + Enum.map(ice_agent.local_cands, fn {_id, %cand_mod{} = cand} -> cand_mod.marshal(cand) end) end @spec get_remote_candidates(t()) :: [binary()] def get_remote_candidates(ice_agent) do - Enum.map(ice_agent.remote_cands, &ExICE.Candidate.marshal/1) + Enum.map(ice_agent.remote_cands, fn {_id, cand} -> ExICE.Candidate.marshal(cand) end) end @spec get_stats(t()) :: map() @@ -144,6 +153,8 @@ defmodule ExICE.Priv.ICEAgent do |> Map.values() |> Enum.map(fn %cand_mod{} = cand -> cand_mod.to_candidate(cand) end) + remote_cands = Map.values(ice_agent.remote_cands) + candidate_pairs = ice_agent.checklist |> Map.values() @@ -158,7 +169,7 @@ defmodule ExICE.Priv.ICEAgent do role: ice_agent.role, local_ufrag: ice_agent.local_ufrag, local_candidates: local_cands, - remote_candidates: ice_agent.remote_cands, + remote_candidates: remote_cands, candidate_pairs: candidate_pairs } end @@ -199,35 +210,55 @@ defmodule ExICE.Priv.ICEAgent do ice_agent end - def gather_candidates(%__MODULE__{gathering_state: :new} = ice_agent) do + def gather_candidates( + %__MODULE__{gathering_state: :new, ice_transport_policy: :all} = ice_agent + ) do Logger.debug("Gathering state change: #{ice_agent.gathering_state} -> gathering") notify(ice_agent.on_gathering_state_change, {:gathering_state_change, :gathering}) ice_agent = %{ice_agent | gathering_state: :gathering} - {:ok, host_candidates} = Gatherer.gather_host_candidates(ice_agent.gatherer) + {:ok, sockets} = Gatherer.open_sockets(ice_agent.gatherer) + host_cands = Gatherer.gather_host_candidates(ice_agent.gatherer, sockets) - for %cand_mod{} = cand <- host_candidates do + for %cand_mod{} = cand <- host_cands do notify(ice_agent.on_new_candidate, {:new_candidate, cand_mod.marshal(cand)}) end - gathering_transactions = - for stun_server <- ice_agent.stun_servers, host_cand <- host_candidates, into: %{} do - <> = :crypto.strong_rand_bytes(12) + srflx_gathering_transactions = + create_srflx_gathering_transactions(ice_agent.stun_servers, sockets) - t = %{ - t_id: t_id, - host_cand: host_cand, - stun_server: stun_server, - send_time: nil, - state: :waiting - } + relay_gathering_transactions = + create_relay_gathering_transactions(ice_agent, ice_agent.turn_servers, sockets) - {t_id, t} - end + gathering_transactions = Map.merge(srflx_gathering_transactions, relay_gathering_transactions) + + local_cands = Map.new(host_cands, fn cand -> {cand.base.id, cand} end) + + %{ + ice_agent + | sockets: sockets, + local_cands: local_cands, + gathering_transactions: gathering_transactions + } + |> update_gathering_state() + |> update_ta_timer() + end + + def gather_candidates( + %__MODULE__{gathering_state: :new, ice_transport_policy: :relay} = ice_agent + ) do + change_gathering_state(ice_agent, :gathering) - local_cands = Map.new(host_candidates, fn cand -> {cand.base.id, cand} end) + {:ok, sockets} = Gatherer.open_sockets(ice_agent.gatherer) - %{ice_agent | gathering_transactions: gathering_transactions, local_cands: local_cands} + relay_gathering_transactions = + create_relay_gathering_transactions(ice_agent, ice_agent.turn_servers, sockets) + + %{ + ice_agent + | sockets: sockets, + gathering_transactions: relay_gathering_transactions + } |> update_gathering_state() |> update_ta_timer() end @@ -300,9 +331,12 @@ defmodule ExICE.Priv.ICEAgent do %CandidatePair{} = pair = ice_agent.selected_pair || Checklist.get_valid_pair(ice_agent.checklist) - dst = {pair.remote_cand.address, pair.remote_cand.port} + local_cand = Map.fetch!(ice_agent.local_cands, pair.local_cand_id) + remote_cand = Map.fetch!(ice_agent.remote_cands, pair.remote_cand_id) - case do_send(ice_agent, pair.local_cand, dst, data) do + dst = {remote_cand.address, remote_cand.port} + + case do_send(ice_agent, local_cand, dst, data) do {:ok, ice_agent} -> %{ ice_agent @@ -373,9 +407,9 @@ defmodule ExICE.Priv.ICEAgent do nil -> # credo:disable-for-lines:3 Credo.Check.Refactor.Nesting - case get_next_gathering_transaction(ice_agent.gathering_transactions) do + case get_next_gathering_transaction(ice_agent) do {_t_id, transaction} -> - case handle_gathering_transaction(ice_agent, transaction) do + case execute_gathering_transaction(ice_agent, transaction) do {:ok, ice_agent} -> {true, ice_agent} {:error, ice_agent} -> {false, ice_agent} end @@ -439,101 +473,120 @@ defmodule ExICE.Priv.ICEAgent do binary() ) :: t() def handle_udp(ice_agent, socket, src_ip, src_port, packet) do - local_cands = Map.values(ice_agent.local_cands) + turn_tr_id = {socket, {src_ip, src_port}} + turn_tr = Map.get(ice_agent.gathering_transactions, turn_tr_id) - case find_host_cand(local_cands, socket) do - nil -> - Logger.debug( - "No local candidate corresponding to message received from #{inspect(src_ip)}:#{src_port}. Ignoring." - ) + cond do + # if we are still in a process of creating a relay candidate + # and we received a message from a turn server + turn_tr != nil and turn_tr.state == :in_progress -> + handle_turn_gathering_transaction_response(ice_agent, turn_tr_id, turn_tr, packet) + + from_turn?(ice_agent, src_ip, src_port) -> + handle_turn_message(ice_agent, socket, src_ip, src_port, packet) + + ExSTUN.stun?(packet) -> + handle_stun_message(ice_agent, socket, src_ip, src_port, packet) + + true -> + handle_data_message(ice_agent, packet) + end + end + @spec handle_ex_turn_msg(t(), reference(), ExTURN.Client.notification_message()) :: t() + def handle_ex_turn_msg(ice_agent, client_ref, msg) do + tr_id_tr = find_gathering_transaction(ice_agent.gathering_transactions, client_ref) + + cand = find_relay_cand_by_client(Map.values(ice_agent.local_cands), client_ref) + + case {tr_id_tr, cand} do + {nil, nil} -> ice_agent - %cand_mod{} = local_cand -> - case cand_mod.receive_data(local_cand, src_ip, src_port, packet) do - {:ok, local_cand} -> - %{ - ice_agent - | local_cands: Map.put(ice_agent.local_cands, local_cand.base.id, local_cand) - } + {{tr_id, tr}, nil} -> + case ExTURN.Client.handle_message(tr.client, msg) do + {:ok, client} -> + tr = %{tr | client: client} + put_in(ice_agent.gathering_transactions[tr_id], tr) + + {:send, dst, data, client} -> + tr = %{tr | client: client} + :ok = ice_agent.transport_module.send(tr.socket, dst, data) + put_in(ice_agent.gathering_transactions[tr_id], tr) + + {:error, _reason, client} -> + tr = %{tr | client: client, state: :failed} - {:ok, packet, local_cand} -> - local_cands = Map.put(ice_agent.local_cands, local_cand.base.id, local_cand) - ice_agent = %{ice_agent | local_cands: local_cands} + put_in(ice_agent.gathering_transactions[tr_id], tr) + |> update_gathering_state() + end + + {nil, cand} -> + case ExTURN.Client.handle_message(cand.client, msg) do + {:ok, client} -> + cand = %{cand | client: client} + put_in(ice_agent.local_cands[cand.base.id], cand) - do_handle_udp(ice_agent, local_cand, src_ip, src_port, packet) + {:send, dst, data, client} -> + cand = %{cand | client: client} + ice_agent = put_in(ice_agent.local_cands[cand.base.id], cand) + do_send(ice_agent, cand, dst, data) - {:error, reason, local_cand} -> - Logger.warning(""" - Error receiving data on candidate, reason: #{reason}. - Candidate: #{inspect(local_cand)}. - Closing candidate. - """) + {:error, _reason, client} -> + cand = %{cand | client: client} + ice_agent = put_in(ice_agent.local_cands[cand.base.id], cand) - close_candidate(ice_agent, local_cand) + close_candidate(ice_agent, cand) end end end - defp do_handle_udp(ice_agent, local_cand, src_ip, src_port, packet) do - if ExSTUN.stun?(packet) do - case ExSTUN.Message.decode(packet) do - {:ok, msg} -> - handle_stun_msg(ice_agent, local_cand, src_ip, src_port, msg) + ## PRIV API - {:error, reason} -> - Logger.warning("Couldn't decode stun message: #{inspect(reason)}") - ice_agent - end - else - notify(ice_agent.on_data, {:data, packet}) + defp create_srflx_gathering_transactions(stun_servers, sockets) do + for stun_server <- stun_servers, socket <- sockets, into: %{} do + <> = :crypto.strong_rand_bytes(12) - %{ - ice_agent - | bytes_received: ice_agent.bytes_received + byte_size(packet), - packets_received: ice_agent.packets_received + 1 + t = %{ + t_id: t_id, + socket: socket, + stun_server: stun_server, + send_time: nil, + state: :waiting } + + {t_id, t} end end - defp close_candidate(ice_agent, local_cand) do - local_cands = Map.delete(ice_agent.local_cands, local_cand.base.id) - - selected_pair = - if ice_agent.selected_pair != nil and - ice_agent.selected_pair.local_cand.base.id == local_cand.base.id, - do: nil, - else: ice_agent.selected_pair - - nominating? = - case ice_agent.nominating? do - {true, pair_id} -> - pair = Map.fetch!(ice_agent.checklist, pair_id) + defp create_relay_gathering_transactions(ice_agent, turn_servers, sockets) do + # TODO revisit this + for turn_server <- turn_servers, socket <- sockets do + with {:ok, client} <- + ExTURN.Client.new(turn_server.url, turn_server.username, turn_server.credential), + {:ok, {sock_ip, _sock_port}} <- ice_agent.transport_module.sockname(socket), + true <- Utils.family(client.turn_ip) == Utils.family(sock_ip) do + t_id = {socket, {client.turn_ip, client.turn_port}} - if pair.local_cand.base.id == local_cand.base.id do - {false, nil} - else - ice_agent.nominating? - end + t = %{ + t_id: t_id, + socket: socket, + client: client, + send_time: nil, + state: :waiting + } - other -> - other + {t_id, t} end - - %{ - ice_agent - | local_cands: local_cands, - selected_pair: selected_pair, - checklist: Checklist.prune(ice_agent.checklist, local_cand), - nominating?: nominating? - } - |> update_connection_state() + end + |> Enum.reject(fn {tr_id, tr} -> tr_id == nil and tr == nil end) + |> Map.new() end defp do_add_remote_candidate(ice_agent, remote_cand) do local_cands = get_matching_candidates_remote(Map.values(ice_agent.local_cands), remote_cand) - checklist_foundations = Checklist.get_foundations(ice_agent.checklist) + checklist_foundations = get_foundations(ice_agent) new_pairs = for local_cand <- local_cands, into: %{} do @@ -563,29 +616,28 @@ defmodule ExICE.Priv.ICEAgent do %__MODULE__{ ice_agent | checklist: checklist, - remote_cands: [remote_cand | ice_agent.remote_cands] + remote_cands: Map.put(ice_agent.remote_cands, remote_cand.id, remote_cand) } end - defp get_next_gathering_transaction(gathering_transactions) do - Enum.find(gathering_transactions, fn {_t_id, t} -> t.state == :waiting end) + defp get_next_gathering_transaction(ice_agent) do + Enum.find(ice_agent.gathering_transactions, fn {_t_id, t} -> t.state == :waiting end) end - defp handle_gathering_transaction( - ice_agent, - %{t_id: t_id, host_cand: host_cand, stun_server: stun_server} = t - ) do + defp execute_gathering_transaction(ice_agent, %{stun_server: stun_server} = tr) do + {:ok, {sock_ip, sock_port}} = ice_agent.transport_module.sockname(tr.socket) + Logger.debug(""" Sending binding request to gather srflx candidate for: - host_cand: #{inspect(host_cand)}, + socket: #{inspect(sock_ip)}:#{sock_port}, stun_server: #{inspect(stun_server)} """) - case Gatherer.gather_srflx_candidate(ice_agent.gatherer, t_id, host_cand, stun_server) do + case Gatherer.gather_srflx_candidate(ice_agent.gatherer, tr.t_id, tr.socket, stun_server.url) do :ok -> now = System.monotonic_time(:millisecond) - t = %{t | state: :in_progress, send_time: now} - gathering_transactions = Map.put(ice_agent.gathering_transactions, t_id, t) + tr = %{tr | state: :in_progress, send_time: now} + gathering_transactions = Map.put(ice_agent.gathering_transactions, tr.t_id, tr) ice_agent = %__MODULE__{ice_agent | gathering_transactions: gathering_transactions} {:ok, ice_agent} @@ -593,7 +645,7 @@ defmodule ExICE.Priv.ICEAgent do Logger.debug("Couldn't send binding request, reason: #{reason}") gathering_transactions = - put_in(ice_agent.gathering_transactions, [t.t_id, :state], :failed) + put_in(ice_agent.gathering_transactions, [tr.t_id, :state], :failed) ice_agent = %__MODULE__{ice_agent | gathering_transactions: gathering_transactions} ice_agent = update_gathering_state(ice_agent) @@ -602,6 +654,34 @@ defmodule ExICE.Priv.ICEAgent do end end + defp execute_gathering_transaction(ice_agent, %{client: client} = tr) do + {:ok, {sock_ip, sock_port}} = ice_agent.transport_module.sockname(tr.socket) + + Logger.debug(""" + Starting the process of gathering relay candidate for: + socket: #{inspect(sock_ip)}:#{sock_port}, + turn_server: #{inspect(client.turn_ip)}:#{client.turn_port} + """) + + {:send, turn_addr, data, client} = ExTURN.Client.allocate(client) + tr = Map.put(tr, :client, client) + + case ice_agent.transport_module.send(tr.socket, turn_addr, data) do + :ok -> + now = System.monotonic_time(:millisecond) + tr = %{tr | state: :in_progress, send_time: now} + ice_agent = put_in(ice_agent.gathering_transactions[tr.t_id], tr) + ice_agent = update_gathering_state(ice_agent) + {:ok, ice_agent} + + {:error, reason} -> + Logger.debug("Couldn't send allocate request, reason: #{reason}") + ice_agent = put_in(ice_agent.gathering_transactions[tr.t_id][:state], :failed) + ice_agent = update_gathering_state(ice_agent) + {:error, ice_agent} + end + end + defp timeout_pending_transactions(ice_agent) do now = System.monotonic_time(:millisecond) ice_agent = timeout_gathering_transactions(ice_agent, now) @@ -631,12 +711,8 @@ defmodule ExICE.Priv.ICEAgent do defp timeout_gathering_transactions(ice_agent, now) do {stale_gath_trans, gath_trans} = - Enum.split_with(ice_agent.gathering_transactions, fn {_id, - %{ - state: t_state, - send_time: send_time - }} -> - t_state == :in_progress and now - send_time >= @hto + Enum.split_with(ice_agent.gathering_transactions, fn {_id, tr} -> + tr.state == :in_progress and now - tr.send_time >= @hto end) gath_trans = Map.new(gath_trans) @@ -649,7 +725,140 @@ defmodule ExICE.Priv.ICEAgent do %__MODULE__{ice_agent | gathering_transactions: gath_trans} end - defp handle_stun_msg(ice_agent, local_cand, src_ip, src_port, %Message{} = msg) do + defp from_turn?(ice_agent, src_ip, src_port), + do: {src_ip, src_port} in ice_agent.resolved_turn_servers + + defp handle_turn_gathering_transaction_response(ice_agent, tr_id, tr, packet) do + {_socket, {src_ip, src_port}} = tr_id + + case ExTURN.Client.handle_message(tr.client, {:socket_data, src_ip, src_port, packet}) do + {:allocation_created, {alloc_ip, alloc_port}, client} -> + tr = %{tr | client: client, state: :complete} + + resolved_turn_servers = [ + {client.turn_ip, client.turn_port} | ice_agent.resolved_turn_servers + ] + + ice_agent = %{ice_agent | resolved_turn_servers: resolved_turn_servers} + ice_agent = put_in(ice_agent.gathering_transactions[tr_id], tr) + + relay_cand = + Candidate.Relay.new( + address: alloc_ip, + port: alloc_port, + base_address: alloc_ip, + base_port: alloc_port, + transport_module: ice_agent.transport_module, + socket: tr.socket, + client: tr.client + ) + + Logger.debug("New relay candidate: #{inspect(relay_cand)}") + + notify( + ice_agent.on_new_candidate, + {:new_candidate, Candidate.Relay.marshal(relay_cand)} + ) + + add_relay_cand(ice_agent, relay_cand) + + {:send, turn_addr, data, client} -> + tr = %{tr | client: client} + :ok = ice_agent.transport_module.send(tr.socket, turn_addr, data) + put_in(ice_agent.gathering_transactions[tr_id], tr) + + {:error, _reason, client} -> + Logger.debug("Failed to create TURN allocation.") + tr = %{tr | client: client, state: :failed} + put_in(ice_agent.gathering_transactions[tr_id], tr) + end + end + + defp handle_turn_message(ice_agent, socket, src_ip, src_port, packet) do + %cand_mod{} = cand = find_relay_cand_by_socket(Map.values(ice_agent.local_cands), socket) + + case cand_mod.receive_data(cand, src_ip, src_port, packet) do + {:ok, cand} -> + put_in(ice_agent.local_cands[cand.base.id], cand) + + {:ok, src_ip, src_port, packet, cand} -> + ice_agent = put_in(ice_agent.local_cands[cand.base.id], cand) + + if ExSTUN.stun?(packet) do + case ExSTUN.Message.decode(packet) do + {:ok, msg} -> + do_handle_stun_message(ice_agent, cand, src_ip, src_port, msg) + + {:error, reason} -> + Logger.warning("Couldn't decode stun message: #{inspect(reason)}") + ice_agent + end + else + handle_data_message(ice_agent, packet) + end + + {:error, _reason, cand} -> + close_candidate(ice_agent, cand) + end + end + + defp handle_stun_message(ice_agent, socket, src_ip, src_port, packet) do + case ExSTUN.Message.decode(packet) do + {:ok, msg} -> + local_cands = Map.values(ice_agent.local_cands) + + case find_host_cand(local_cands, socket) do + nil -> + ice_agent + + local_cand -> + do_handle_stun_message(ice_agent, local_cand, src_ip, src_port, msg) + end + + {:error, reason} -> + Logger.warning("Couldn't decode stun message: #{inspect(reason)}") + ice_agent + end + end + + defp handle_data_message(ice_agent, packet) do + notify(ice_agent.on_data, {:data, packet}) + + %{ + ice_agent + | bytes_received: ice_agent.bytes_received + byte_size(packet), + packets_received: ice_agent.packets_received + 1 + } + end + + defp add_relay_cand(ice_agent, relay_cand) do + ice_agent = put_in(ice_agent.local_cands[relay_cand.base.id], relay_cand) + + remote_cands = get_matching_candidates_local(Map.values(ice_agent.remote_cands), relay_cand) + + checklist_foundations = get_foundations(ice_agent) + + new_pairs = + for remote_cand <- remote_cands, into: %{} do + pair_state = get_pair_state(relay_cand, remote_cand, checklist_foundations) + pair = CandidatePair.new(relay_cand, remote_cand, ice_agent.role, pair_state) + {pair.id, pair} + end + + checklist = Checklist.prune(Map.merge(ice_agent.checklist, new_pairs)) + + added_pairs = Map.drop(checklist, Map.keys(ice_agent.checklist)) + + if added_pairs == %{} do + Logger.debug("Not adding any new pairs as they were redundant") + else + Logger.debug("New candidate pairs: #{inspect(added_pairs)}") + end + + %__MODULE__{ice_agent | checklist: checklist} + end + + defp do_handle_stun_message(ice_agent, local_cand, src_ip, src_port, %Message{} = msg) do # TODO revisit 7.3.1.4 case msg.type do @@ -674,7 +883,7 @@ defmodule ExICE.Priv.ICEAgent do Received gathering transaction response from: #{inspect({src_ip, src_port})}, on: #{inspect(local_cand.base.base_address)} \ """) - handle_gathering_transaction_response(ice_agent, msg) + handle_stun_gathering_transaction_response(ice_agent, msg) %Type{class: class, method: :binding} when is_response(class) -> Logger.warning(""" @@ -810,8 +1019,11 @@ defmodule ExICE.Priv.ICEAgent do Peer's tiebreaker: #{tiebreaker}\ """) - checklist = Checklist.recompute_pair_prios(ice_agent.checklist, :controlled) - {:ok, %__MODULE__{ice_agent | role: :controlled, checklist: checklist}} + ice_agent = %__MODULE__{ice_agent | role: :controlled} + + checklist = recompute_pair_prios(ice_agent) + + {:ok, %__MODULE__{ice_agent | checklist: checklist}} end defp check_req_role_conflict( @@ -825,8 +1037,11 @@ defmodule ExICE.Priv.ICEAgent do Peer's tiebreaker: #{tiebreaker}\ """) - checklist = Checklist.recompute_pair_prios(ice_agent.checklist, :controlling) - {:ok, %__MODULE__{ice_agent | role: :controlling, checklist: checklist}} + ice_agent = %__MODULE__{ice_agent | role: :controlling} + + checklist = recompute_pair_prios(ice_agent) + + {:ok, %__MODULE__{ice_agent | checklist: checklist}} end defp check_req_role_conflict(%__MODULE__{role: :controlled}, %ICEControlled{ @@ -862,16 +1077,19 @@ defmodule ExICE.Priv.ICEAgent do # check that the source and destination transport # addresses are symmetric - see sec. 7.2.5.2.1 - if symmetric?(local_cand.base.socket, {src_ip, src_port}, conn_check_pair) do + if symmetric?(ice_agent, local_cand.base.socket, {src_ip, src_port}, conn_check_pair) do case msg.type.class do :success_response -> handle_conn_check_success_response(ice_agent, conn_check_pair, msg) :error_response -> handle_conn_check_error_response(ice_agent, conn_check_pair, msg) end else + cc_local_cand = Map.fetch!(ice_agent.local_cands, conn_check_pair.local_cand_id) + cc_remote_cand = Map.fetch!(ice_agent.remote_cands, conn_check_pair.remote_cand_id) + Logger.warning(""" Ignoring conn check response, non-symmetric src and dst addresses. - Sent from: #{inspect({conn_check_pair.local_cand.base.base_address, conn_check_pair.local_cand.base.base_port})}, \ - to: #{inspect({conn_check_pair.remote_cand.address, conn_check_pair.remote_cand.port})} + Sent from: #{inspect({cc_local_cand.base.base_address, cc_local_cand.base.base_port})}, \ + to: #{inspect({cc_remote_cand.address, cc_remote_cand.port})} Recv from: #{inspect({src_ip, src_port})}, on: #{inspect({local_cand.base.base_address, local_cand.base.base_port})} Pair failed: #{conn_check_pair.id} """) @@ -887,7 +1105,7 @@ defmodule ExICE.Priv.ICEAgent do with :ok <- authenticate_msg(msg, ice_agent.remote_pwd), {:ok, xor_addr} <- Message.get_attribute(msg, XORMappedAddress) do {local_cand, ice_agent} = get_or_create_local_cand(ice_agent, xor_addr, conn_check_pair) - remote_cand = conn_check_pair.remote_cand + remote_cand = Map.fetch!(ice_agent.remote_cands, conn_check_pair.remote_cand_id) valid_pair = CandidatePair.new(local_cand, remote_cand, ice_agent.role, :succeeded, valid?: true) @@ -965,18 +1183,11 @@ defmodule ExICE.Priv.ICEAgent do end end - defp handle_gathering_transaction_response(ice_agent, msg) do - case msg.type.class do - :success_response -> - handle_gathering_transaction_success_response(ice_agent, msg) - - :error_response -> - handle_gathering_transaction_error_response(ice_agent, msg) - end - end - - defp handle_gathering_transaction_success_response(ice_agent, msg) do - t = Map.fetch!(ice_agent.gathering_transactions, msg.transaction_id) + defp handle_stun_gathering_transaction_response( + ice_agent, + %Message{type: %Type{class: :success_response}} = msg + ) do + tr = Map.fetch!(ice_agent.gathering_transactions, msg.transaction_id) {:ok, %XORMappedAddress{address: xor_addr, port: xor_port}} = Message.get_attribute(msg, XORMappedAddress) @@ -984,19 +1195,22 @@ defmodule ExICE.Priv.ICEAgent do ice_agent = case find_local_cand(Map.values(ice_agent.local_cands), xor_addr, xor_port) do nil -> + {:ok, {base_addr, base_port}} = ice_agent.transport_module.sockname(tr.socket) + c = Candidate.Srflx.new( address: xor_addr, port: xor_port, - base_address: t.host_cand.base.address, - base_port: t.host_cand.base.port, + base_address: base_addr, + base_port: base_port, transport_module: ice_agent.transport_module, - socket: t.host_cand.base.socket + socket: tr.socket ) Logger.debug("New srflx candidate: #{inspect(c)}") notify(ice_agent.on_new_candidate, {:new_candidate, Candidate.Srflx.marshal(c)}) - add_srflx_cand(ice_agent, c) + # don't pair reflexive candidate, it should be pruned anyway - see sec. 6.1.2.4 + put_in(ice_agent.local_cands[c.base.id], c) cand -> Logger.debug(""" @@ -1008,12 +1222,15 @@ defmodule ExICE.Priv.ICEAgent do end gathering_transactions = - Map.update!(ice_agent.gathering_transactions, t.t_id, fn t -> %{t | state: :complete} end) + Map.update!(ice_agent.gathering_transactions, tr.t_id, fn tr -> %{tr | state: :complete} end) %__MODULE__{ice_agent | gathering_transactions: gathering_transactions} end - defp handle_gathering_transaction_error_response(ice_agent, msg) do + defp handle_stun_gathering_transaction_response( + ice_agent, + %Message{type: %Type{class: :error_response}} = msg + ) do t = Map.fetch!(ice_agent.gathering_transactions, msg.transaction_id) error_code = @@ -1032,39 +1249,6 @@ defmodule ExICE.Priv.ICEAgent do %__MODULE__{ice_agent | gathering_transactions: gathering_transactions} end - defp add_srflx_cand(ice_agent, c) do - # replace address and port with candidate base - # and prune the checklist - see sec. 6.1.2.4 - local_cand = %{c | base: %{c.base | address: c.base.base_address, port: c.base.base_port}} - - remote_cands = get_matching_candidates_local(ice_agent.remote_cands, local_cand) - - checklist_foundations = Checklist.get_foundations(ice_agent.checklist) - - new_pairs = - for remote_cand <- remote_cands, into: %{} do - pair_state = get_pair_state(local_cand, remote_cand, checklist_foundations) - pair = CandidatePair.new(local_cand, remote_cand, ice_agent.role, pair_state) - {pair.id, pair} - end - - checklist = Checklist.prune(Map.merge(ice_agent.checklist, new_pairs)) - - added_pairs = Map.drop(checklist, Map.keys(ice_agent.checklist)) - - if added_pairs == %{} do - Logger.debug("Not adding any new pairs as they were redundant") - else - Logger.debug("New candidate pairs: #{inspect(added_pairs)}") - end - - %__MODULE__{ - ice_agent - | checklist: checklist, - local_cands: Map.put(ice_agent.local_cands, c.base.id, c) - } - end - # Adds valid pair according to sec 7.2.5.3.2 # TODO sec. 7.2.5.3.3 # The agent MUST set the states for all other Frozen candidate pairs in @@ -1160,7 +1344,7 @@ defmodule ExICE.Priv.ICEAgent do {checklist_pair.id, ice_agent} end - defp add_valid_pair(ice_agent, valid_pair, conn_check_pair, _) do + defp add_valid_pair(ice_agent, valid_pair, conn_check_pair, nil) do # TODO compute priority according to sec 7.2.5.3.2 Logger.debug(""" Adding new candidate pair resulted from conn check \ @@ -1194,8 +1378,11 @@ defmodule ExICE.Priv.ICEAgent do @doc false @spec send_binding_success_response(t(), CandidatePair.t(), Message.t()) :: t() def send_binding_success_response(ice_agent, pair, req) do - src_ip = pair.remote_cand.address - src_port = pair.remote_cand.port + local_cand = Map.fetch!(ice_agent.local_cands, pair.local_cand_id) + remote_cand = Map.fetch!(ice_agent.remote_cands, pair.remote_cand_id) + + src_ip = remote_cand.address + src_port = remote_cand.port type = %Type{class: :success_response, method: :binding} @@ -1205,7 +1392,7 @@ defmodule ExICE.Priv.ICEAgent do |> Message.with_fingerprint() |> Message.encode() - {_result, ice_agent} = do_send(ice_agent, pair.local_cand, {src_ip, src_port}, resp) + {_result, ice_agent} = do_send(ice_agent, local_cand, {src_ip, src_port}, resp) ice_agent end @@ -1265,9 +1452,12 @@ defmodule ExICE.Priv.ICEAgent do end) end - defp symmetric?(socket, response_src, conn_check_pair) do - request_dst = {conn_check_pair.remote_cand.address, conn_check_pair.remote_cand.port} - response_src == request_dst and socket == conn_check_pair.local_cand.base.socket + defp symmetric?(ice_agent, socket, response_src, conn_check_pair) do + local_cand = Map.fetch!(ice_agent.local_cands, conn_check_pair.local_cand_id) + remote_cand = Map.fetch!(ice_agent.remote_cands, conn_check_pair.remote_cand_id) + + request_dst = {remote_cand.address, remote_cand.port} + response_src == request_dst and socket == local_cand.base.socket end defp get_pair_state(local_cand, remote_cand, checklist_foundations) do @@ -1284,14 +1474,16 @@ defmodule ExICE.Priv.ICEAgent do else # prflx candidate sec 7.2.5.3.1 # TODO calculate correct prio and foundation + local_cand = Map.fetch!(ice_agent.local_cands, conn_check_pair.local_cand_id) + cand = Candidate.Prflx.new( address: xor_addr.address, port: xor_addr.port, - base_address: conn_check_pair.local_cand.base.base_address, - base_port: conn_check_pair.local_cand.base.base_port, + base_address: local_cand.base.base_address, + base_port: local_cand.base.base_port, transport_module: ice_agent.transport_module, - socket: conn_check_pair.local_cand.base.socket + socket: local_cand.base.socket ) Logger.debug("Adding new local prflx candidate: #{inspect(cand)}") @@ -1306,12 +1498,12 @@ defmodule ExICE.Priv.ICEAgent do end defp get_or_create_remote_cand(ice_agent, src_ip, src_port, _prio_attr) do - case find_remote_cand(ice_agent.remote_cands, src_ip, src_port) do + case find_remote_cand(Map.values(ice_agent.remote_cands), src_ip, src_port) do nil -> # TODO calculate correct prio using prio_attr cand = ExICE.Candidate.new(:prflx, address: src_ip, port: src_port) Logger.debug("Adding new remote prflx candidate: #{inspect(cand)}") - ice_agent = %__MODULE__{ice_agent | remote_cands: [cand | ice_agent.remote_cands]} + ice_agent = put_in(ice_agent.remote_cands[cand.id], cand) {cand, ice_agent} %_cand_mod{} = cand -> @@ -1319,6 +1511,40 @@ defmodule ExICE.Priv.ICEAgent do end end + defp close_candidate(ice_agent, local_cand) do + local_cands = Map.delete(ice_agent.local_cands, local_cand.base.id) + + selected_pair = + if ice_agent.selected_pair != nil and + ice_agent.selected_pair.local_cand.base.id == local_cand.base.id, + do: nil, + else: ice_agent.selected_pair + + nominating? = + case ice_agent.nominating? do + {true, pair_id} -> + pair = Map.fetch!(ice_agent.checklist, pair_id) + + if pair.local_cand.base.id == local_cand.base.id do + {false, nil} + else + ice_agent.nominating? + end + + other -> + other + end + + %{ + ice_agent + | local_cands: local_cands, + selected_pair: selected_pair, + checklist: Checklist.prune(ice_agent.checklist, local_cand), + nominating?: nominating? + } + |> update_connection_state() + end + defp maybe_nominate(ice_agent) do if time_to_nominate?(ice_agent) do Logger.debug("Time to nominate a pair! Looking for a best valid pair...") @@ -1341,9 +1567,7 @@ defmodule ExICE.Priv.ICEAgent do ice_agent.role == :controlling end - @doc false - @spec try_nominate(map()) :: map() - def try_nominate(ice_agent) do + defp try_nominate(ice_agent) do case Checklist.get_pair_for_nomination(ice_agent.checklist) do %CandidatePair{} = pair -> Logger.debug("Trying to nominate pair: #{inspect(pair.id)}") @@ -1365,30 +1589,6 @@ defmodule ExICE.Priv.ICEAgent do end end - defp update_gathering_state(%{gathering_state: :complete} = ice_agent), do: ice_agent - - defp update_gathering_state(ice_agent) do - transaction_in_progress? = - Enum.any?(ice_agent.gathering_transactions, fn {_id, %{state: t_state}} -> - t_state in [:waiting, :in_progress] - end) - - cond do - ice_agent.gathering_state == :new and transaction_in_progress? -> - Logger.debug("Gathering state change: new -> gathering") - notify(ice_agent.on_gathering_state_change, {:gathering_state_change, :gathering}) - %__MODULE__{ice_agent | gathering_state: :gathering} - - ice_agent.gathering_state == :gathering and not transaction_in_progress? -> - Logger.debug("Gathering state change: gathering -> complete") - notify(ice_agent.on_gathering_state_change, {:gathering_state_change, :complete}) - %__MODULE__{ice_agent | gathering_state: :complete} - - true -> - ice_agent - end - end - defp do_restart(ice_agent) do ice_agent.local_cands |> Enum.uniq_by(fn {_id, cand} -> cand.base.socket end) @@ -1411,19 +1611,16 @@ defmodule ExICE.Priv.ICEAgent do ice_agent end - Logger.debug("Gathering state change: #{ice_agent.gathering_state} -> new") - notify(ice_agent.on_gathering_state_change, {:gathering_state_change, :new}) + ice_agent = change_gathering_state(ice_agent, :new) %__MODULE__{ ice_agent - | state: new_ice_state, - gathering_state: :new, - gathering_transactions: %{}, + | gathering_transactions: %{}, selected_pair: nil, conn_checks: %{}, checklist: %{}, local_cands: %{}, - remote_cands: [], + remote_cands: %{}, local_ufrag: ufrag, local_pwd: pwd, remote_ufrag: nil, @@ -1434,6 +1631,31 @@ defmodule ExICE.Priv.ICEAgent do |> update_ta_timer() end + defp get_foundations(ice_agent) do + for {_id, pair} <- ice_agent.checklist do + local_cand = Map.fetch!(ice_agent.local_cands, pair.local_cand_id) + remote_cand = Map.fetch!(ice_agent.remote_cands, pair.remote_cand_id) + {local_cand.base.foundation, remote_cand.foundation} + end + end + + defp recompute_pair_prios(ice_agent) do + Map.new(ice_agent.checklist, fn {pair_id, pair} -> + local_cand = Map.fetch!(ice_agent.local_cands, pair.local_cand_id) + remote_cand = Map.fetch!(ice_agent.remote_cands, pair.remote_cand_id) + + priority = + CandidatePair.recompute_priority( + pair, + local_cand.base.priority, + remote_cand.priority, + ice_agent.role + ) + + {pair_id, priority} + end) + end + defp find_local_cand(cands, ip, port) do Enum.find(cands, fn cand -> cand.base.address == ip and cand.base.port == port end) end @@ -1447,23 +1669,35 @@ defmodule ExICE.Priv.ICEAgent do Enum.find(cands, fn cand -> cand.base.socket == socket and cand.base.type == :host end) end - defp parse_stun_servers(stun_servers) do - stun_servers - |> Enum.map(fn stun_server -> - case ExSTUN.URI.parse(stun_server) do - {:ok, stun_server} -> - stun_server + defp find_relay_cand_by_socket(cands, socket) do + Enum.find(cands, fn cand -> cand.base.type == :relay and cand.base.socket == socket end) + end - :error -> - Logger.warning(""" - Couldn't parse STUN server URI: #{inspect(stun_server)}. \ - Ignoring.\ - """) + defp find_relay_cand_by_client(cands, client_ref) do + Enum.find(cands, fn cand -> cand.base.type == :relay and cand.client.ref == client_ref end) + end + defp find_gathering_transaction(gathering_transactions, client_ref) do + Enum.find(gathering_transactions, fn + {_tr_id, %{client: %{ref: ^client_ref}}} -> true + _ -> false + end) + end + + defp parse_ice_servers(ice_servers) do + ice_servers + |> Enum.map(fn ice_server -> + case ExSTUN.URI.parse(ice_server.url) do + {:ok, url} -> + %{ice_server | url: url} + + :error -> + Logger.warning("Couldn't parse URL: #{inspect(ice_server.url)}. Ignoring.") nil end end) |> Enum.reject(&(&1 == nil)) + |> Enum.split_with(fn ice_server -> ice_server.url.scheme in [:stun, :stuns] end) end defp generate_tiebreaker() do @@ -1487,6 +1721,32 @@ defmodule ExICE.Priv.ICEAgent do end end + defp change_gathering_state(ice_agent, new_gathering_state) do + Logger.debug("Gathering state change: #{ice_agent.gathering_state} -> #{new_gathering_state}") + notify(ice_agent.on_gathering_state_change, {:gathering_state_change, new_gathering_state}) + %__MODULE__{ice_agent | gathering_state: new_gathering_state} + end + + defp update_gathering_state(%{gathering_state: :complete} = ice_agent), do: ice_agent + + defp update_gathering_state(ice_agent) do + transaction_in_progress? = + Enum.any?(ice_agent.gathering_transactions, fn {_id, %{state: t_state}} -> + t_state in [:waiting, :in_progress] + end) + + cond do + ice_agent.gathering_state == :new and transaction_in_progress? -> + change_gathering_state(ice_agent, :gathering) + + ice_agent.gathering_state == :gathering and not transaction_in_progress? -> + change_gathering_state(ice_agent, :complete) + + true -> + ice_agent + end + end + @doc false @spec change_connection_state(t(), atom()) :: t() def change_connection_state(ice_agent, new_conn_state) do @@ -1671,6 +1931,9 @@ defmodule ExICE.Priv.ICEAgent do end defp send_conn_check(ice_agent, pair) do + local_cand = Map.fetch!(ice_agent.local_cands, pair.local_cand_id) + remote_cand = Map.fetch!(ice_agent.remote_cands, pair.remote_cand_id) + type = %Type{class: :request, method: :binding} role_attr = @@ -1705,9 +1968,9 @@ defmodule ExICE.Priv.ICEAgent do |> Message.with_integrity(ice_agent.remote_pwd) |> Message.with_fingerprint() - dst = {pair.remote_cand.address, pair.remote_cand.port} + dst = {remote_cand.address, remote_cand.port} - case do_send(ice_agent, pair.local_cand, dst, Message.encode(req)) do + case do_send(ice_agent, local_cand, dst, Message.encode(req)) do {:ok, ice_agent} -> pair = %CandidatePair{pair | state: :in_progress} diff --git a/mix.exs b/mix.exs index 2dccee5..22d31da 100644 --- a/mix.exs +++ b/mix.exs @@ -52,6 +52,7 @@ defmodule ExICE.MixProject do [ # {:ex_stun, "~> 0.1.0"}, {:ex_stun, github: "elixir-webrtc/ex_stun"}, + {:ex_turn, github: "elixir-webrtc/ex_turn"}, {:excoveralls, "~> 0.15", only: :test, runtime: false}, {:ex_doc, "~> 0.27", only: :dev, runtime: false}, {:credo, "~> 1.6", only: [:dev, :test], runtime: false}, diff --git a/mix.lock b/mix.lock index fae7c99..a979fdf 100644 --- a/mix.lock +++ b/mix.lock @@ -6,7 +6,8 @@ "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, "ex_doc": {:hex, :ex_doc, "0.31.2", "8b06d0a5ac69e1a54df35519c951f1f44a7b7ca9a5bb7a260cd8a174d6322ece", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "317346c14febaba9ca40fd97b5b5919f7751fb85d399cc8e7e8872049f37e0af"}, "ex_stun": {:git, "https://github.com/elixir-webrtc/ex_stun.git", "5d1243a6c3268d0cb402c6272ae6e0df1615779a", []}, - "excoveralls": {:hex, :excoveralls, "0.18.0", "b92497e69465dc51bc37a6422226ee690ab437e4c06877e836f1c18daeb35da9", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "1109bb911f3cb583401760be49c02cbbd16aed66ea9509fc5479335d284da60b"}, + "ex_turn": {:git, "https://github.com/elixir-webrtc/ex_turn.git", "14df4a546f2e19a85731eef70258c490a71e856d", []}, + "excoveralls": {:hex, :excoveralls, "0.18.1", "a6f547570c6b24ec13f122a5634833a063aec49218f6fff27de9df693a15588c", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "d65f79db146bb20399f23046015974de0079668b9abb2f5aac074d078da60b8d"}, "file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"}, diff --git a/test/integration/p2p_test.exs b/test/integration/p2p_test.exs index a676f2e..3fc9449 100644 --- a/test/integration/p2p_test.exs +++ b/test/integration/p2p_test.exs @@ -8,8 +8,7 @@ defmodule ExICE.Integration.P2PTest do @tag :p2p @tag :tmp_dir test "P2P connection", %{tmp_dir: tmp_dir} do - stun_servers = ["stun:stun.l.google.com:19302"] - # stun_servers = [] + ice_servers = [%{url: "stun:stun.l.google.com:19302"}] ip_filter = fn {_, _, _, _, _, _, _, _} -> true @@ -18,10 +17,9 @@ defmodule ExICE.Integration.P2PTest do end {:ok, agent1} = - ICEAgent.start_link(:controlling, ip_filter: ip_filter, stun_servers: stun_servers) + ICEAgent.start_link(:controlling, ip_filter: ip_filter, ice_servers: ice_servers) - {:ok, agent2} = - ICEAgent.start_link(:controlled, ip_filter: ip_filter, stun_servers: stun_servers) + {:ok, agent2} = ICEAgent.start_link(:controlled, ip_filter: ip_filter, ice_servers: []) {:ok, a1_ufrag, a1_pwd} = ICEAgent.get_local_credentials(agent1) {:ok, a2_ufrag, a2_pwd} = ICEAgent.get_local_credentials(agent2) @@ -86,8 +84,58 @@ defmodule ExICE.Integration.P2PTest do @tag :tmp_dir @tag :role_conflict test "P2P connection with role conflict", %{tmp_dir: tmp_dir} do - stun_servers = ["stun:stun.l.google.com:19302"] - # stun_servers = [] + ice_servers = [%{url: "stun:stun.l.google.com:19302"}] + # ice_servers = [] + + ip_filter = fn + {_, _, _, _, _, _, _, _} -> true + {172, _, _, _} -> true + _other -> true + end + + {:ok, agent1} = + ICEAgent.start_link(:controlled, ip_filter: ip_filter, ice_servers: ice_servers) + + {:ok, agent2} = + ICEAgent.start_link(:controlled, ip_filter: ip_filter, ice_servers: ice_servers) + + {:ok, a1_ufrag, a1_pwd} = ICEAgent.get_local_credentials(agent1) + {:ok, a2_ufrag, a2_pwd} = ICEAgent.get_local_credentials(agent2) + + :ok = ICEAgent.set_remote_credentials(agent2, a1_ufrag, a1_pwd) + :ok = ICEAgent.set_remote_credentials(agent1, a2_ufrag, a2_pwd) + + :ok = ICEAgent.gather_candidates(agent1) + :ok = ICEAgent.gather_candidates(agent2) + + a1_fd = File.open!(Path.join([tmp_dir, "a1_recv_data"]), [:append]) + a2_fd = File.open!(Path.join([tmp_dir, "a2_recv_data"]), [:append]) + + a1_status = %{fd: a1_fd, completed: false, data_recv: false} + a2_status = %{fd: a2_fd, completed: false, data_recv: false} + + assert p2p(agent1, agent2, a1_status, a2_status) + + assert File.read!(Path.join([tmp_dir, "a1_recv_data"])) == + File.read!("./test/fixtures/lotr.txt") + + assert File.read!(Path.join([tmp_dir, "a2_recv_data"])) == + File.read!("./test/fixtures/lotr.txt") + end + + @tag :tmp_dir + @tag :relay + test "P2P connection via turn server", %{tmp_dir: tmp_dir} do + # This test is by default excluded from runinng. + # Before running, start coturn with: turnserver -a -u testusername:testpassword + + ice_servers = [ + %{ + url: "turn:127.0.0.1:3478?transport=udp", + username: "testusername", + credential: "testpassword" + } + ] ip_filter = fn {_, _, _, _, _, _, _, _} -> true @@ -96,10 +144,14 @@ defmodule ExICE.Integration.P2PTest do end {:ok, agent1} = - ICEAgent.start_link(:controlled, ip_filter: ip_filter, stun_servers: stun_servers) + ICEAgent.start_link(:controlling, + ip_filter: ip_filter, + ice_servers: ice_servers, + ice_transport_policy: :relay + ) {:ok, agent2} = - ICEAgent.start_link(:controlled, ip_filter: ip_filter, stun_servers: stun_servers) + ICEAgent.start_link(:controlled, ip_filter: ip_filter, ice_servers: []) {:ok, a1_ufrag, a1_pwd} = ICEAgent.get_local_credentials(agent1) {:ok, a2_ufrag, a2_pwd} = ICEAgent.get_local_credentials(agent2) diff --git a/test/priv/gatherer_test.exs b/test/priv/gatherer_test.exs index be22858..3d9e26a 100644 --- a/test/priv/gatherer_test.exs +++ b/test/priv/gatherer_test.exs @@ -34,8 +34,10 @@ defmodule ExICE.Priv.GathererTest do _ -> true end) + assert {:ok, sockets} = Gatherer.open_sockets(gatherer) + # there should only be one candidate - assert {:ok, [%Candidate.Host{} = c]} = Gatherer.gather_host_candidates(gatherer) + assert [%Candidate.Host{} = c] = Gatherer.gather_host_candidates(gatherer, sockets) assert c.base.address == {192, 168, 0, 1} assert c.base.base_address == {192, 168, 0, 1} assert c.base.port == c.base.base_port @@ -51,9 +53,11 @@ defmodule ExICE.Priv.GathererTest do _ -> true end) - {:ok, [%Candidate.Host{} = c]} = Gatherer.gather_host_candidates(gatherer) + {:ok, sockets} = Gatherer.open_sockets(gatherer) + + [%Candidate.Host{} = c] = Gatherer.gather_host_candidates(gatherer, sockets) - assert :ok = Gatherer.gather_srflx_candidate(gatherer, 1234, c, stun_server) + assert :ok = Gatherer.gather_srflx_candidate(gatherer, 1234, c.base.socket, stun_server) assert [{_socket, packet}] = :ets.lookup(:transport_mock, c.base.socket) assert {:ok, req} = ExSTUN.Message.decode(packet) assert req.attributes == [] diff --git a/test/priv/ice_agent_test.exs b/test/priv/ice_agent_test.exs index 44cd50a..342a730 100644 --- a/test/priv/ice_agent_test.exs +++ b/test/priv/ice_agent_test.exs @@ -48,11 +48,6 @@ defmodule ExICE.Priv.ICEAgentTest do def send_data(cand, _dst_ip, _dst_port, _data) do {:error, :invalid_data, cand} end - - @impl true - def receive_data(cand, _src_ip, _src_port, _data) do - {:error, :invalid_data, cand} - end end describe "add_remote_candidate/2" do @@ -72,7 +67,7 @@ defmodule ExICE.Priv.ICEAgentTest do remote_cand = ExICE.Candidate.new(:host, address: {192, 168, 0, 2}, port: 8445) ice_agent = ICEAgent.add_remote_candidate(ice_agent, ExICE.Candidate.marshal(remote_cand)) - assert [%ExICE.Candidate{} = r_cand] = ice_agent.remote_cands + assert [%ExICE.Candidate{} = r_cand] = Map.values(ice_agent.remote_cands) # override id for the purpose of comparision r_cand = %ExICE.Candidate{r_cand | id: remote_cand.id} assert r_cand == remote_cand @@ -80,7 +75,7 @@ defmodule ExICE.Priv.ICEAgentTest do test "with invalid remote candidate", %{ice_agent: ice_agent} do ice_agent = ICEAgent.add_remote_candidate(ice_agent, "some invalid candidate string") - assert [] == ice_agent.remote_cands + assert %{} == ice_agent.remote_cands end test "with invalid mdns remote candidate", %{ice_agent: ice_agent} do @@ -88,14 +83,14 @@ defmodule ExICE.Priv.ICEAgentTest do ExICE.Candidate.new(:host, address: "somehopefullyinvalidmdnscandidate.local", port: 8445) ice_agent = ICEAgent.add_remote_candidate(ice_agent, ExICE.Candidate.marshal(remote_cand)) - assert [] == ice_agent.remote_cands + assert %{} == ice_agent.remote_cands end test "after setting end-of-candidates", %{ice_agent: ice_agent} do remote_cand = ExICE.Candidate.new(:host, address: {192, 168, 0, 2}, port: 8445) ice_agent = ICEAgent.end_of_candidates(ice_agent) ice_agent = ICEAgent.add_remote_candidate(ice_agent, ExICE.Candidate.marshal(remote_cand)) - assert [] == ice_agent.remote_cands + assert %{} == ice_agent.remote_cands end end @@ -569,7 +564,7 @@ defmodule ExICE.Priv.ICEAgentTest do role: :controlling, transport_module: Transport.Mock, if_discovery_module: IfDiscovery.Mock, - stun_servers: ["stun:192.168.0.3:19302"] + ice_servers: [%{url: "stun:192.168.0.3:19302"}] ) |> ICEAgent.set_remote_credentials("someufrag", "somepwd") |> ICEAgent.gather_candidates() @@ -636,91 +631,56 @@ defmodule ExICE.Priv.ICEAgentTest do end end - describe "unstable candidate" do + test "candidate fails to send data when ice is connected" do # 1. make ice agent connected - # 2. replace candidate with the mock one that always fails to send and receive data - # 3. assert that after unsuccessful data sending/receiving, ice_agent moves to the failed state + # 2. replace candidate with the mock one that always fails to send data + # 3. assert that after unsuccessful data sending, ice_agent moves to the failed state # as there are no other pairs - setup do - remote_cand = ExICE.Candidate.new(:host, address: {192, 168, 0, 2}, port: 8445) - - ice_agent = - ICEAgent.new( - controlling_process: self(), - role: :controlling, - if_discovery_module: IfDiscovery.Mock, - transport_module: Transport.Mock - ) - |> ICEAgent.set_remote_credentials("someufrag", "somepwd") - |> ICEAgent.gather_candidates() - |> ICEAgent.add_remote_candidate(ExICE.Candidate.marshal(remote_cand)) - - [local_cand] = Map.values(ice_agent.local_cands) - - assert ice_agent.gathering_state == :complete - - # make ice_agent connected - ice_agent = ICEAgent.handle_timeout(ice_agent) - req = read_binding_request(local_cand.base.socket, ice_agent.remote_pwd) - resp = binding_response(req.transaction_id, local_cand, ice_agent.remote_pwd) - - ice_agent = - ICEAgent.handle_udp( - ice_agent, - local_cand.base.socket, - remote_cand.address, - remote_cand.port, - resp - ) - - assert [%CandidatePair{state: :succeeded}] = Map.values(ice_agent.checklist) - assert ice_agent.state == :connected - - # replace candidate with the mock one - mock_cand = %Candidate.Mock{base: local_cand.base} - [pair] = Map.values(ice_agent.checklist) - pair = %{pair | local_cand: mock_cand} - checklist = %{pair.id => pair} - - ice_agent = %{ - ice_agent - | local_cands: %{mock_cand.base.id => mock_cand}, - checklist: checklist - } - - %{ice_agent: ice_agent, mock_cand: mock_cand, remote_cand: remote_cand} - end - - test "fails to receive data when ice is connected ", %{ - ice_agent: ice_agent, - mock_cand: mock_cand, - remote_cand: remote_cand - } do - # try to receive some data - ice_agent = - ICEAgent.handle_udp( - ice_agent, - mock_cand.base.socket, - remote_cand.address, - remote_cand.port, - "somedata" - ) - - # assert that ice_agent removed failed candidate and moved to the failed state - assert ice_agent.local_cands == %{} - assert ice_agent.state == :failed - assert ice_agent.checklist == %{} - end - - test "fails to send data when ice is connected", %{ice_agent: ice_agent} do - # try to send some data - ice_agent = ICEAgent.send_data(ice_agent, "somedata") - - # assert that ice_agent removed failed candidate and moved to the failed state - assert ice_agent.local_cands == %{} - assert ice_agent.state == :failed - assert ice_agent.checklist == %{} - end + remote_cand = ExICE.Candidate.new(:host, address: {192, 168, 0, 2}, port: 8445) + + ice_agent = + ICEAgent.new( + controlling_process: self(), + role: :controlling, + if_discovery_module: IfDiscovery.Mock, + transport_module: Transport.Mock + ) + |> ICEAgent.set_remote_credentials("someufrag", "somepwd") + |> ICEAgent.gather_candidates() + |> ICEAgent.add_remote_candidate(ExICE.Candidate.marshal(remote_cand)) + + [local_cand] = Map.values(ice_agent.local_cands) + + assert ice_agent.gathering_state == :complete + + # make ice_agent connected + ice_agent = ICEAgent.handle_timeout(ice_agent) + req = read_binding_request(local_cand.base.socket, ice_agent.remote_pwd) + resp = binding_response(req.transaction_id, local_cand, ice_agent.remote_pwd) + + ice_agent = + ICEAgent.handle_udp( + ice_agent, + local_cand.base.socket, + remote_cand.address, + remote_cand.port, + resp + ) + + assert [%CandidatePair{state: :succeeded}] = Map.values(ice_agent.checklist) + assert ice_agent.state == :connected + + # replace candidate with the mock one + mock_cand = %Candidate.Mock{base: local_cand.base} + ice_agent = %{ice_agent | local_cands: %{mock_cand.base.id => mock_cand}} + + # try to send some data + ice_agent = ICEAgent.send_data(ice_agent, "somedata") + + # assert that ice_agent removed failed candidate and moved to the failed state + assert ice_agent.local_cands == %{} + assert ice_agent.state == :failed + assert ice_agent.checklist == %{} end defp binding_response(t_id, local_cand, remote_pwd) do diff --git a/test/test_helper.exs b/test/test_helper.exs index e1f54da..b5110c4 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,3 +1,3 @@ ExICE.Support.Transport.Mock.init() -ExUnit.start(capture_log: true) +ExUnit.start(capture_log: true, exclude: [:relay])