From c4c81336085975ea55561ca4f717998db19de5f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Thu, 25 Jul 2024 12:24:58 +0200 Subject: [PATCH] Ignore data frome stale sockets, timers, etc. Clean up state when connection is failed or completed --- .../priv/conn_check_handler/controlling.ex | 7 +- lib/ex_ice/priv/ice_agent.ex | 210 +++++++++++++++--- test/priv/ice_agent_test.exs | 171 ++++++++++++++ 3 files changed, 348 insertions(+), 40 deletions(-) diff --git a/lib/ex_ice/priv/conn_check_handler/controlling.ex b/lib/ex_ice/priv/conn_check_handler/controlling.ex index ce8638a..8c670a4 100644 --- a/lib/ex_ice/priv/conn_check_handler/controlling.ex +++ b/lib/ex_ice/priv/conn_check_handler/controlling.ex @@ -52,12 +52,10 @@ defmodule ExICE.Priv.ConnCheckHandler.Controlling do @impl true def update_nominated_flag(%ICEAgent{eoc: true} = ice_agent, pair_id, true) do Logger.debug("Nomination succeeded. Selecting pair: #{inspect(pair_id)}") - ice_agent = ICEAgent.change_connection_state(ice_agent, :completed) pair = Map.fetch!(ice_agent.checklist, pair_id) pair = %CandidatePair{pair | nominate?: false, nominated?: true} - checklist = Map.put(ice_agent.checklist, pair.id, pair) - ice_agent = %ICEAgent{ice_agent | checklist: checklist} + ice_agent = put_in(ice_agent.checklist[pair.id], pair) # the controlling agent could nominate only when eoc was set # and checklist finished @@ -65,6 +63,7 @@ defmodule ExICE.Priv.ConnCheckHandler.Controlling do Logger.warning("Nomination succeeded but checklist hasn't finished.") end - %ICEAgent{ice_agent | nominating?: {false, nil}, selected_pair_id: pair.id} + ice_agent = %ICEAgent{ice_agent | nominating?: {false, nil}, selected_pair_id: pair.id} + ICEAgent.change_connection_state(ice_agent, :completed) end end diff --git a/lib/ex_ice/priv/ice_agent.ex b/lib/ex_ice/priv/ice_agent.ex index 892c393..432208e 100644 --- a/lib/ex_ice/priv/ice_agent.ex +++ b/lib/ex_ice/priv/ice_agent.ex @@ -220,6 +220,11 @@ defmodule ExICE.Priv.ICEAgent do end @spec set_remote_credentials(t(), binary(), binary()) :: t() + def set_remote_credentials(%__MODULE__{state: :failed} = ice_agent, _, _) do + Logger.debug("Tried to set remote credentials in failed state. ICE restart needed. Ignoring.") + ice_agent + end + def set_remote_credentials( %__MODULE__{remote_ufrag: nil, remote_pwd: nil} = ice_agent, ufrag, @@ -249,6 +254,11 @@ defmodule ExICE.Priv.ICEAgent do end @spec gather_candidates(t()) :: t() + def gather_candidates(%__MODULE__{state: :failed} = ice_agent) do + Logger.warning("Can't gather candidates in state failed. ICE restart needed. Ignoring.") + ice_agent + end + def gather_candidates(%__MODULE__{gathering_state: :gathering} = ice_agent) do Logger.warning("Can't gather candidates. Gathering already in progress. Ignoring.") ice_agent @@ -311,6 +321,12 @@ defmodule ExICE.Priv.ICEAgent do end @spec add_remote_candidate(t(), Candidate.t()) :: t() + def add_remote_candidate(%__MODULE__{state: :failed} = ice_agent, _) do + # Completed state will be caught by the next clause + Logger.debug("Can't add remote candidate in state failed. ICE restart needed. Ignoring.") + ice_agent + end + def add_remote_candidate(%__MODULE__{eoc: true} = ice_agent, remote_cand) do Logger.warning(""" Received remote candidate after end-of-candidates. Ignoring. @@ -361,6 +377,11 @@ defmodule ExICE.Priv.ICEAgent do end @spec end_of_candidates(t()) :: t() + def end_of_candidates(%__MODULE__{state: :failed} = ice_agent) do + Logger.debug("Can't set end-of-candidates flag in state failed. Ignoring.") + ice_agent + end + def end_of_candidates(%__MODULE__{role: :controlled} = ice_agent) do Logger.debug("Setting end-of-candidates flag.") ice_agent = %{ice_agent | eoc: true} @@ -418,7 +439,7 @@ defmodule ExICE.Priv.ICEAgent do @spec handle_ta_timeout(t()) :: t() def handle_ta_timeout(%__MODULE__{state: state} = ice_agent) - when state.state in [:completed, :failed] do + when state in [:completed, :failed] do Logger.warning(""" Ta timer fired in unexpected state: #{state}. Trying to update gathering and connection states. @@ -477,6 +498,11 @@ defmodule ExICE.Priv.ICEAgent do end @spec handle_eoc_timeout(t()) :: t() + def handle_eoc_timeout(%__MODULE__{state: :failed} = ice_agent) do + Logger.debug("EOC timer fired but we are in the failed state. Ignoring.") + %{ice_agent | eoc_timer: nil} + end + def handle_eoc_timeout(%{eoc: true} = ice_agent) do Logger.debug("EOC timer fired but EOC flag is already set. Ignoring.") %{ice_agent | eoc_timer: nil} @@ -564,8 +590,16 @@ defmodule ExICE.Priv.ICEAgent do {:ok, _pair} -> ice_agent + :error when ice_agent.state in [:failed, :completed] -> + Logger.warning(""" + Received keepalive request for non-existant candidate pair but we are in state: #{ice_agent.state}. \ + Ignoring.\ + """) + + ice_agent + :error -> - Logger.warning("Received keepalive request for non-existant candidate pair") + Logger.warning("Received keepalive request for non-existant candidate pair. Ignoring.") ice_agent end end @@ -588,19 +622,58 @@ defmodule ExICE.Priv.ICEAgent do handle_turn_gathering_transaction_response(ice_agent, turn_tr_id, turn_tr, packet) from_turn?(ice_agent, src_ip, src_port) -> - handle_turn_message(ice_agent, socket, src_ip, src_port, packet) + local_cands = Map.values(ice_agent.local_cands) + + case find_relay_cand_by_socket(local_cands, socket) do + nil -> + Logger.debug(""" + Couldn't find relay candidate for: + socket: #{inspect(socket)} + src address: #{inspect({src_ip, src_port})}. + Ignoring incoming TURN message. + """) + + ice_agent + + relay_cand -> + handle_turn_message(ice_agent, relay_cand, src_ip, src_port, packet) + end ExSTUN.stun?(packet) -> - handle_stun_message(ice_agent, socket, src_ip, src_port, packet) + local_cands = Map.values(ice_agent.local_cands) + + case find_host_cand(local_cands, socket) do + nil -> + Logger.debug(""" + Couldn't find host candidate for #{inspect(src_ip)}:#{src_port}. \ + Ignoring incoming STUN message.\ + """) + + ice_agent + + host_cand -> + handle_stun_message(ice_agent, host_cand, src_ip, src_port, packet) + end true -> - local_cand = find_host_cand(Map.values(ice_agent.local_cands), socket) - remote_cand = find_remote_cand(Map.values(ice_agent.remote_cands), src_ip, src_port) + with remote_cands <- Map.values(ice_agent.remote_cands), + %_{} = local_cand <- find_host_cand(Map.values(ice_agent.local_cands), socket), + %_{} = remote_cand <- find_remote_cand(remote_cands, src_ip, src_port) do + %CandidatePair{} = + pair = Checklist.find_pair(ice_agent.checklist, local_cand.base.id, remote_cand.id) - %CandidatePair{} = - pair = Checklist.find_pair(ice_agent.checklist, local_cand.base.id, remote_cand.id) + handle_data_message(ice_agent, pair, packet) + else + _ -> + Logger.debug(""" + Couldn't find host or remote candidate for: + socket: #{inspect(socket)} + src address: #{inspect({src_ip, src_port})}. + Ignoring incoming data message. + """) - handle_data_message(ice_agent, pair, packet) + ice_agent + end end end @@ -900,9 +973,7 @@ defmodule ExICE.Priv.ICEAgent do end end - defp handle_turn_message(ice_agent, socket, src_ip, src_port, packet) do - %cand_mod{} = cand = find_relay_cand_by_socket(Map.values(ice_agent.local_cands), socket) - + defp handle_turn_message(ice_agent, %cand_mod{} = cand, src_ip, src_port, packet) do case cand_mod.receive_data(cand, src_ip, src_port, packet) do {:ok, cand} -> put_in(ice_agent.local_cands[cand.base.id], cand) @@ -935,23 +1006,10 @@ defmodule ExICE.Priv.ICEAgent do end end - defp handle_stun_message(ice_agent, socket, src_ip, src_port, packet) do + defp handle_stun_message(ice_agent, host_cand, src_ip, src_port, packet) do case ExSTUN.Message.decode(packet) do {:ok, msg} -> - local_cands = Map.values(ice_agent.local_cands) - - case find_host_cand(local_cands, socket) do - nil -> - Logger.debug(""" - Couldn't find host candidate for #{inspect(src_ip)}:#{src_port}. \ - Ignoring incoming STUN message.\ - """) - - ice_agent - - local_cand -> - do_handle_stun_message(ice_agent, local_cand, src_ip, src_port, msg) - end + do_handle_stun_message(ice_agent, host_cand, src_ip, src_port, msg) {:error, reason} -> Logger.warning("Couldn't decode stun message: #{inspect(reason)}") @@ -1787,14 +1845,10 @@ defmodule ExICE.Priv.ICEAgent do end defp do_restart(ice_agent) do - ice_agent.local_cands - |> Enum.uniq_by(fn {_id, cand} -> cand.base.socket end) - |> Enum.each(fn {_id, cand} -> - Logger.debug(""" - Closing local candidate's socket: #{inspect(cand.base.base_address)}:#{cand.base.base_port}. - """) - - :ok = ice_agent.transport_module.close(cand.base.socket) + Enum.each(ice_agent.sockets, fn socket -> + {:ok, {ip, port}} = ice_agent.transport_module.sockname(socket) + Logger.debug("Closing socket: #{inspect(ip)}:#{port}.") + :ok = ice_agent.transport_module.close(socket) end) {ufrag, pwd} = generate_credentials() @@ -1816,7 +1870,8 @@ defmodule ExICE.Priv.ICEAgent do %__MODULE__{ ice_agent - | gathering_transactions: %{}, + | sockets: [], + gathering_transactions: %{}, selected_pair_id: nil, conn_checks: %{}, checklist: %{}, @@ -1957,7 +2012,88 @@ defmodule ExICE.Priv.ICEAgent do @doc false @spec change_connection_state(t(), atom()) :: t() + def change_connection_state(ice_agent, :failed) do + Enum.each(ice_agent.sockets, fn socket -> + :ok = ice_agent.transport_module.close(socket) + end) + + %{ + ice_agent + | sockets: [], + gathering_transactions: %{}, + selected_pair_id: nil, + conn_checks: %{}, + checklist: %{}, + local_cands: %{}, + remote_cands: %{}, + local_ufrag: nil, + local_pwd: nil, + remote_ufrag: nil, + remote_pwd: nil, + eoc: false, + nominating?: {false, nil} + } + |> disable_timer() + |> do_change_connection_state(:failed) + end + + def change_connection_state(ice_agent, :completed) do + selected_pair = Map.fetch!(ice_agent.checklist, ice_agent.selected_pair_id) + succeeded_pair = Map.fetch!(ice_agent.checklist, selected_pair.succeeded_pair_id) + + if selected_pair.id != selected_pair.discovered_pair_id do + raise """ + Selected pair isn't also discovered pair. This should never happen. + Selected pair: #{inspect(selected_pair)}\ + """ + end + + local_cand = Map.fetch!(ice_agent.local_cands, succeeded_pair.local_cand_id) + + Enum.each(ice_agent.sockets, fn socket -> + unless socket == local_cand.base.socket do + :ok = ice_agent.transport_module.close(socket) + end + end) + + # We need to use Map.filter as selected_pair might not + # be the same as succeeded pair + local_cands = + Map.filter( + ice_agent.local_cands, + fn {cand_id, _cand} -> + cand_id in [selected_pair.local_cand_id, succeeded_pair.local_cand_id] + end + ) + + remote_cands = + Map.filter( + ice_agent.remote_cands, + fn {cand_id, _cand} -> + cand_id in [selected_pair.remote_cand_id, succeeded_pair.remote_cand_id] + end + ) + + checklist = + Map.filter(ice_agent.checklist, fn {pair_id, _pair} -> + pair_id in [selected_pair.id, succeeded_pair.id] + end) + + %{ + ice_agent + | sockets: [local_cand.base.socket], + local_cands: local_cands, + remote_cands: remote_cands, + checklist: checklist + } + |> do_change_connection_state(:completed) + end + def change_connection_state(ice_agent, new_conn_state) do + do_change_connection_state(ice_agent, new_conn_state) + end + + defp do_change_connection_state(ice_agent, new_conn_state) do Logger.debug("Connection state change: #{ice_agent.state} -> #{new_conn_state}") notify(ice_agent.on_connection_state_change, {:connection_state_change, new_conn_state}) %__MODULE__{ice_agent | state: new_conn_state} @@ -2135,6 +2271,8 @@ defmodule ExICE.Priv.ICEAgent do %{ice_agent | ta_timer: timer} end + defp disable_timer(%{ta_timer: nil} = ice_agent), do: ice_agent + defp disable_timer(ice_agent) do Process.cancel_timer(ice_agent.ta_timer) diff --git a/test/priv/ice_agent_test.exs b/test/priv/ice_agent_test.exs index ab7cfcc..debe652 100644 --- a/test/priv/ice_agent_test.exs +++ b/test/priv/ice_agent_test.exs @@ -952,6 +952,155 @@ defmodule ExICE.Priv.ICEAgentTest do assert ice_agent.state == :failed end + test "cleans up agent state when the connection fails" do + remote_cand = ExICE.Candidate.new(:host, address: {192, 168, 0, 3}, port: 8445) + + ice_agent = + ICEAgent.new( + controlling_process: self(), + role: :controlling, + transport_module: Transport.Mock, + if_discovery_module: IfDiscovery.Mock + ) + |> ICEAgent.set_remote_credentials("remoteufrag", "remotepwd") + |> ICEAgent.gather_candidates() + |> ICEAgent.add_remote_candidate(remote_cand) + + # save creds as they will be cleared after moving to the failed state + local_ufrag = ice_agent.local_ufrag + local_pwd = ice_agent.local_pwd + + [socket] = ice_agent.sockets + + # mark pair as failed + [pair] = Map.values(ice_agent.checklist) + ice_agent = put_in(ice_agent.checklist[pair.id], %{pair | state: :failed}) + + # set eoc flag + ice_agent = ICEAgent.end_of_candidates(ice_agent) + + # agent should have moved to the failed state + assert ice_agent.state == :failed + assert ice_agent.sockets == [] + assert ice_agent.local_cands == %{} + assert ice_agent.remote_cands == %{} + assert ice_agent.gathering_transactions == %{} + assert ice_agent.selected_pair_id == nil + assert ice_agent.conn_checks == %{} + assert ice_agent.checklist == %{} + assert ice_agent.local_ufrag == nil + assert ice_agent.local_pwd == nil + assert ice_agent.remote_ufrag == nil + assert ice_agent.remote_pwd == nil + assert ice_agent.eoc == false + assert ice_agent.nominating? == {false, nil} + + # assert that handle_udp ignores incoming data i.e. the state of ice agent didn't change + new_ice_agent = + ICEAgent.handle_udp(ice_agent, socket, remote_cand.address, remote_cand.port, "some data") + + assert ice_agent == new_ice_agent + + # the same with incoming binding request + req = + binding_request( + ice_agent.role, + ice_agent.tiebreaker, + "remoteufrag", + local_ufrag, + local_pwd + ) + + new_ice_agent = + ICEAgent.handle_udp(ice_agent, socket, remote_cand.address, remote_cand.port, req) + + assert ice_agent == new_ice_agent + + # and handle_ta_timeout + new_ice_agent = ICEAgent.handle_ta_timeout(ice_agent) + assert ice_agent == new_ice_agent + end + + test "cleans up agent state when the connection completes" do + r_cand1 = ExICE.Candidate.new(:host, address: {192, 168, 0, 3}, port: 8445) + r_cand2 = ExICE.Candidate.new(:srflx, address: {192, 168, 0, 4}, port: 8445) + + ice_agent = + ICEAgent.new( + controlling_process: self(), + role: :controlled, + transport_module: Transport.Mock, + if_discovery_module: IfDiscovery.Mock + ) + |> ICEAgent.set_remote_credentials("remoteufrag", "remotepwd") + |> ICEAgent.gather_candidates() + |> ICEAgent.add_remote_candidate(r_cand1) + + [socket] = ice_agent.sockets + + raw_req = + binding_request( + ice_agent.role, + ice_agent.tiebreaker, + "remoteufrag", + ice_agent.local_ufrag, + ice_agent.local_pwd + ) + + ice_agent = ICEAgent.handle_udp(ice_agent, socket, r_cand1.address, r_cand1.port, raw_req) + # read binding response + _ = Transport.Mock.recv(socket) + # assert there is nothing else on socket + assert nil == Transport.Mock.recv(socket) + + # execute conn-check + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) + assert req = Transport.Mock.recv(socket) + {:ok, req} = ExSTUN.Message.decode(req) + + resp = binding_response(req.transaction_id, ice_agent.transport_module, socket, "remotepwd") + ice_agent = ICEAgent.handle_udp(ice_agent, socket, r_cand1.address, r_cand1.port, resp) + + # add second candidate and repeat + ice_agent = ICEAgent.add_remote_candidate(ice_agent, r_cand2) + ice_agent = ICEAgent.handle_udp(ice_agent, socket, r_cand2.address, r_cand2.port, raw_req) + _ = Transport.Mock.recv(socket) + assert nil == Transport.Mock.recv(socket) + + ice_agent = ICEAgent.handle_ta_timeout(ice_agent) + assert req = Transport.Mock.recv(socket) + {:ok, req} = ExSTUN.Message.decode(req) + resp = binding_response(req.transaction_id, ice_agent.transport_module, socket, "remotepwd") + ice_agent = ICEAgent.handle_udp(ice_agent, socket, r_cand2.address, r_cand2.port, resp) + + # assert we have two succeeded pairs + assert [%{state: :succeeded}, %{state: :succeeded}] = Map.values(ice_agent.checklist) + + {_id, srflx_pair} = + Enum.find(ice_agent.checklist, fn {_pair_id, pair} -> pair.remote_cand_id == r_cand2.id end) + + assert :connected == ice_agent.state + + # set end-of-candidates + ice_agent = ICEAgent.end_of_candidates(ice_agent) + + # assert ice agent changed its state to completed + # and we have one pair and one remote cand + assert ice_agent.state == :completed + assert [%{state: :succeeded}] = Map.values(ice_agent.checklist) + assert [%{type: :host}] = Map.values(ice_agent.remote_cands) + + # try to feed data from the srflx remote cand + new_ice_agent = + ICEAgent.handle_udp(ice_agent, socket, r_cand2.address, r_cand2.port, "some data") + + assert ice_agent == new_ice_agent + + # try to handle keepalive on the srflx pair + new_ice_agent = ICEAgent.handle_keepalive(ice_agent, srflx_pair.id) + assert ice_agent == new_ice_agent + end + @stun_ip {192, 168, 0, 3} @stun_ip_str :inet.ntoa(@stun_ip) @stun_port 19_302 @@ -1439,6 +1588,28 @@ defmodule ExICE.Priv.ICEAgentTest do Message.new(%Type{class: :indication, method: :binding}) |> Message.encode() end + defp binding_request(role, tiebreaker, local_ufrag, remote_ufrag, remote_pwd) do + ice_attrs = + if role == :controlled do + [%ICEControlling{tiebreaker: tiebreaker + 1}, %UseCandidate{}] + else + [%ICEControlled{tiebreaker: tiebreaker - 1}] + end + + attrs = + [ + %Username{value: "#{remote_ufrag}:#{local_ufrag}"}, + %Priority{priority: 1234} + ] ++ ice_attrs + + request = + Message.new(%Type{class: :request, method: :binding}, attrs) + |> Message.with_integrity(remote_pwd) + |> Message.with_fingerprint() + + Message.encode(request) + end + defp binding_response(t_id, transport_module, socket, remote_pwd) do {:ok, {sock_ip, sock_port}} = transport_module.sockname(socket)