diff --git a/polkadot/node/network/bridge/src/network.rs b/polkadot/node/network/bridge/src/network.rs
index b31359f48a56f..1f438df2d148c 100644
--- a/polkadot/node/network/bridge/src/network.rs
+++ b/polkadot/node/network/bridge/src/network.rs
@@ -204,6 +204,13 @@ pub trait Network: Clone + Send + 'static {
 		multiaddresses: HashSet<Multiaddr>,
 	) -> Result<(), String>;
 
+	/// Ask the network to extend the reserved set with these nodes.
+	async fn add_peers_to_reserved_set(
+		&mut self,
+		protocol: ProtocolName,
+		multiaddresses: HashSet<Multiaddr>,
+	) -> Result<(), String>;
+
 	/// Removes the peers for the protocol's peer set (both reserved and non-reserved).
 	async fn remove_from_peers_set(
 		&mut self,
@@ -240,6 +247,14 @@ impl Network for Arc<dyn NetworkService> {
 		<dyn NetworkService>::set_reserved_peers(&**self, protocol, multiaddresses)
 	}
 
+	async fn add_peers_to_reserved_set(
+		&mut self,
+		protocol: ProtocolName,
+		multiaddresses: HashSet<Multiaddr>,
+	) -> Result<(), String> {
+		<dyn NetworkService>::add_peers_to_reserved_set(&**self, protocol, multiaddresses)
+	}
+
 	async fn remove_from_peers_set(
 		&mut self,
 		protocol: ProtocolName,
diff --git a/polkadot/node/network/bridge/src/rx/tests.rs b/polkadot/node/network/bridge/src/rx/tests.rs
index 392ff7391a1c1..601dca5cb8a3c 100644
--- a/polkadot/node/network/bridge/src/rx/tests.rs
+++ b/polkadot/node/network/bridge/src/rx/tests.rs
@@ -124,6 +124,14 @@ impl Network for TestNetwork {
 		Ok(())
 	}
 
+	async fn add_peers_to_reserved_set(
+		&mut self,
+		_protocol: ProtocolName,
+		_: HashSet<Multiaddr>,
+	) -> Result<(), String> {
+		Ok(())
+	}
+
 	async fn remove_from_peers_set(
 		&mut self,
 		_protocol: ProtocolName,
diff --git a/polkadot/node/network/bridge/src/tx/mod.rs b/polkadot/node/network/bridge/src/tx/mod.rs
index 7b6dea748572b..6c353195d41ad 100644
--- a/polkadot/node/network/bridge/src/tx/mod.rs
+++ b/polkadot/node/network/bridge/src/tx/mod.rs
@@ -370,6 +370,22 @@ where
 				.await;
 			return (network_service, authority_discovery_service)
 		},
+
+		NetworkBridgeTxMessage::AddToResolvedValidators { validator_addrs, peer_set } => {
+			gum::trace!(
+				target: LOG_TARGET,
+				action = "AddToResolvedValidators",
+				peer_set = ?peer_set,
+				?validator_addrs,
+				"Received a resolved validator connection request",
+			);
+
+			let all_addrs = validator_addrs.into_iter().flatten().collect();
+			let network_service = validator_discovery
+				.on_add_to_resolved_request(all_addrs, peer_set, network_service)
+				.await;
+			return (network_service, authority_discovery_service)
+		},
 	}
 	(network_service, authority_discovery_service)
 }
diff --git a/polkadot/node/network/bridge/src/tx/tests.rs b/polkadot/node/network/bridge/src/tx/tests.rs
index 9265358196dbf..30b2c3421372a 100644
--- a/polkadot/node/network/bridge/src/tx/tests.rs
+++ b/polkadot/node/network/bridge/src/tx/tests.rs
@@ -148,6 +148,14 @@ impl Network for TestNetwork {
 		Ok(())
 	}
 
+	async fn add_peers_to_reserved_set(
+		&mut self,
+		_protocol: ProtocolName,
+		_: HashSet<Multiaddr>,
+	) -> Result<(), String> {
+		Ok(())
+	}
+
 	async fn remove_from_peers_set(
 		&mut self,
 		_protocol: ProtocolName,
diff --git a/polkadot/node/network/bridge/src/validator_discovery.rs b/polkadot/node/network/bridge/src/validator_discovery.rs
index f0ef038d5eb40..9accd56d86ae6 100644
--- a/polkadot/node/network/bridge/src/validator_discovery.rs
+++ b/polkadot/node/network/bridge/src/validator_discovery.rs
@@ -92,6 +92,44 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
 		network_service
 	}
 
+	/// Connect to already resolved addresses.
+	pub async fn on_add_to_resolved_request(
+		&mut self,
+		newly_requested: HashSet<Multiaddr>,
+		peer_set: PeerSet,
+		mut network_service: N,
+	) -> N {
+		let state = &mut self.state[peer_set];
+		let new_peer_ids: HashSet<PeerId> = extract_peer_ids(newly_requested.iter().cloned());
+		let num_peers = new_peer_ids.len();
+
+		state.previously_requested.extend(new_peer_ids);
+
+		gum::debug!(
+			target: LOG_TARGET,
+			?peer_set,
+			?num_peers,
+			"New add to resolved validators request",
+		);
+
+		// ask the network to connect to these nodes and not disconnect
+		// from them until they are removed from the set.
+		//
+		// for peer-set management, the main protocol name should be used regardless of
+		// the negotiated version.
+		if let Err(e) = network_service
+			.add_peers_to_reserved_set(
+				self.peerset_protocol_names.get_main_name(peer_set),
+				newly_requested,
+			)
+			.await
+		{
+			gum::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress");
+		}
+
+		network_service
+	}
+
 	/// On a new connection request, a peer set update will be issued.
 	/// It will ask the network to connect to the validators and not disconnect
 	/// from them at least until the next request is issued for the same peer set.
@@ -222,6 +260,15 @@ mod tests {
 			Ok(())
 		}
 
+		async fn add_peers_to_reserved_set(
+			&mut self,
+			_protocol: ProtocolName,
+			multiaddresses: HashSet<Multiaddr>,
+		) -> Result<(), String> {
+			self.peers_set.extend(extract_peer_ids(multiaddresses.into_iter()));
+			Ok(())
+		}
+
 		async fn remove_from_peers_set(
 			&mut self,
 			_protocol: ProtocolName,
diff --git a/polkadot/node/network/gossip-support/src/lib.rs b/polkadot/node/network/gossip-support/src/lib.rs
index 4dfdd1f7208f6..cd327c11e408c 100644
--- a/polkadot/node/network/gossip-support/src/lib.rs
+++ b/polkadot/node/network/gossip-support/src/lib.rs
@@ -69,6 +69,16 @@ const BACKOFF_DURATION: Duration = Duration::from_secs(5);
 #[cfg(test)]
 const BACKOFF_DURATION: Duration = Duration::from_millis(500);
 
+// The authorithy_discovery queries runs every ten minutes,
+// so it make sense to run a bit more often than that to
+// detect changes as often as we can, but not too often since
+// it won't help.
+#[cfg(not(test))]
+const TRY_RERESOLVE_AUTHORITIES: Duration = Duration::from_secs(5 * 60);
+
+#[cfg(test)]
+const TRY_RERESOLVE_AUTHORITIES: Duration = Duration::from_secs(2);
+
 /// Duration after which we consider low connectivity a problem.
 ///
 /// Especially at startup low connectivity is expected (authority discovery cache needs to be
@@ -91,6 +101,14 @@ pub struct GossipSupport<AD> {
 	// `None` otherwise.
 	last_failure: Option<Instant>,
 
+	// Validators can restart during a session, so if they change
+	// their PeerID, we will connect to them in the best case after
+	// a session, so we need to try more often to resolved peers and
+	// reconnect to them. The authorithy_discovery queries runs every ten
+	// minutes, so we can't detect changes in the address more often
+	// that that.
+	last_connection_request: Option<Instant>,
+
 	/// First time we did not reach our connectivity threshold.
 	///
 	/// This is the time of the first failed attempt to connect to >2/3 of all validators in a
@@ -131,6 +149,7 @@ where
 			keystore,
 			last_session_index: None,
 			last_failure: None,
+			last_connection_request: None,
 			failure_start: None,
 			resolved_authorities: HashMap::new(),
 			connected_authorities: HashMap::new(),
@@ -196,15 +215,22 @@ where
 		for leaf in leaves {
 			let current_index = util::request_session_index_for_child(leaf, sender).await.await??;
 			let since_failure = self.last_failure.map(|i| i.elapsed()).unwrap_or_default();
+			let since_last_reconnect =
+				self.last_connection_request.map(|i| i.elapsed()).unwrap_or_default();
+
 			let force_request = since_failure >= BACKOFF_DURATION;
+			let re_resolve_authorities = since_last_reconnect >= TRY_RERESOLVE_AUTHORITIES;
 			let leaf_session = Some((current_index, leaf));
 			let maybe_new_session = match self.last_session_index {
 				Some(i) if current_index <= i => None,
 				_ => leaf_session,
 			};
 
-			let maybe_issue_connection =
-				if force_request { leaf_session } else { maybe_new_session };
+			let maybe_issue_connection = if force_request || re_resolve_authorities {
+				leaf_session
+			} else {
+				maybe_new_session
+			};
 
 			if let Some((session_index, relay_parent)) = maybe_issue_connection {
 				let session_info =
@@ -248,7 +274,7 @@ where
 				// connections to a much broader set of validators.
 				{
 					let mut connections = authorities_past_present_future(sender, leaf).await?;
-
+					self.last_connection_request = Some(Instant::now());
 					// Remove all of our locally controlled validator indices so we don't connect to
 					// ourself.
 					let connections =
@@ -259,7 +285,12 @@ where
 							// to clean up all connections.
 							Vec::new()
 						};
-					self.issue_connection_request(sender, connections).await;
+
+					if force_request || is_new_session {
+						self.issue_connection_request(sender, connections).await;
+					} else if re_resolve_authorities {
+						self.issue_connection_request_to_changed(sender, connections).await;
+					}
 				}
 
 				if is_new_session {
@@ -324,17 +355,14 @@ where
 		authority_check_result
 	}
 
-	async fn issue_connection_request<Sender>(
+	async fn resolve_authorities(
 		&mut self,
-		sender: &mut Sender,
 		authorities: Vec<AuthorityDiscoveryId>,
-	) where
-		Sender: overseer::GossipSupportSenderTrait,
-	{
-		let num = authorities.len();
+	) -> (Vec<HashSet<Multiaddr>>, HashMap<AuthorityDiscoveryId, HashSet<Multiaddr>>, usize) {
 		let mut validator_addrs = Vec::with_capacity(authorities.len());
-		let mut failures = 0;
 		let mut resolved = HashMap::with_capacity(authorities.len());
+		let mut failures = 0;
+
 		for authority in authorities {
 			if let Some(addrs) =
 				self.authority_discovery.get_addresses_by_authority_id(authority.clone()).await
@@ -350,6 +378,67 @@ where
 				);
 			}
 		}
+		(validator_addrs, resolved, failures)
+	}
+
+	async fn issue_connection_request_to_changed<Sender>(
+		&mut self,
+		sender: &mut Sender,
+		authorities: Vec<AuthorityDiscoveryId>,
+	) where
+		Sender: overseer::GossipSupportSenderTrait,
+	{
+		let (_, resolved, _) = self.resolve_authorities(authorities).await;
+
+		let mut changed = Vec::new();
+
+		for (authority, new_addresses) in &resolved {
+			let new_peer_ids = new_addresses
+				.iter()
+				.flat_map(|addr| parse_addr(addr.clone()).ok().map(|(p, _)| p))
+				.collect::<HashSet<_>>();
+			match self.resolved_authorities.get(authority) {
+				Some(old_addresses) => {
+					let old_peer_ids = old_addresses
+						.iter()
+						.flat_map(|addr| parse_addr(addr.clone()).ok().map(|(p, _)| p))
+						.collect::<HashSet<_>>();
+					if !old_peer_ids.is_superset(&new_peer_ids) {
+						changed.push(new_addresses.clone());
+					}
+				},
+				None => changed.push(new_addresses.clone()),
+			}
+		}
+		gum::debug!(
+			target: LOG_TARGET,
+			num_changed = ?changed.len(),
+			?changed,
+			"Issuing a connection request to changed validators"
+		);
+		if !changed.is_empty() {
+			self.resolved_authorities = resolved;
+
+			sender
+				.send_message(NetworkBridgeTxMessage::AddToResolvedValidators {
+					validator_addrs: changed,
+					peer_set: PeerSet::Validation,
+				})
+				.await;
+		}
+	}
+
+	async fn issue_connection_request<Sender>(
+		&mut self,
+		sender: &mut Sender,
+		authorities: Vec<AuthorityDiscoveryId>,
+	) where
+		Sender: overseer::GossipSupportSenderTrait,
+	{
+		let num = authorities.len();
+
+		let (validator_addrs, resolved, failures) = self.resolve_authorities(authorities).await;
+
 		self.resolved_authorities = resolved;
 		gum::debug!(target: LOG_TARGET, %num, "Issuing a connection request");
 
@@ -399,16 +488,24 @@ where
 	{
 		let mut authority_ids: HashMap<PeerId, HashSet<AuthorityDiscoveryId>> = HashMap::new();
 		for authority in authorities {
-			let peer_id = self
+			let peer_ids = self
 				.authority_discovery
 				.get_addresses_by_authority_id(authority.clone())
 				.await
 				.into_iter()
 				.flat_map(|list| list.into_iter())
-				.find_map(|addr| parse_addr(addr).ok().map(|(p, _)| p));
+				.flat_map(|addr| parse_addr(addr).ok().map(|(p, _)| p))
+				.collect::<HashSet<_>>();
+
+			gum::trace!(
+				target: LOG_TARGET,
+				?peer_ids,
+				?authority,
+				"Resolved to peer ids"
+			);
 
-			if let Some(p) = peer_id {
-				authority_ids.entry(p).or_default().insert(authority);
+			for p in peer_ids {
+				authority_ids.entry(p).or_default().insert(authority.clone());
 			}
 		}
 
diff --git a/polkadot/node/network/gossip-support/src/tests.rs b/polkadot/node/network/gossip-support/src/tests.rs
index 42197d00e6f3d..09622254f523e 100644
--- a/polkadot/node/network/gossip-support/src/tests.rs
+++ b/polkadot/node/network/gossip-support/src/tests.rs
@@ -119,6 +119,14 @@ impl MockAuthorityDiscovery {
 		}
 	}
 
+	fn change_address_for_authority(&self, authority_id: AuthorityDiscoveryId) -> PeerId {
+		let new_peer_id = PeerId::random();
+		let addr = Multiaddr::empty().with(Protocol::P2p(new_peer_id.into()));
+		self.addrs.lock().insert(authority_id.clone(), HashSet::from([addr]));
+		self.authorities.lock().insert(new_peer_id, HashSet::from([authority_id]));
+		new_peer_id
+	}
+
 	fn authorities(&self) -> HashMap<PeerId, HashSet<AuthorityDiscoveryId>> {
 		self.authorities.lock().clone()
 	}
@@ -809,6 +817,313 @@ fn issues_update_authorities_after_session() {
 	);
 }
 
+// Test we connect to authorities that changed their address `TRY_RERESOLVE_AUTHORITIES` rate
+// and that is is no-op if no authority changed.
+#[test]
+fn test_quickly_connect_to_authorities_that_changed_address() {
+	let hash = Hash::repeat_byte(0xAA);
+
+	let authorities = PAST_PRESENT_FUTURE_AUTHORITIES.clone();
+	let authority_that_changes_address = authorities.get(5).unwrap().clone();
+
+	let mut authority_discovery_mock = MockAuthorityDiscovery::new(authorities);
+
+	test_harness(
+		make_subsystem_with_authority_discovery(authority_discovery_mock.clone()),
+		|mut virtual_overseer| async move {
+			let overseer = &mut virtual_overseer;
+			// 1. Initialize with the first leaf in the session.
+			overseer_signal_active_leaves(overseer, hash).await;
+			assert_matches!(
+				overseer_recv(overseer).await,
+				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+					relay_parent,
+					RuntimeApiRequest::SessionIndexForChild(tx),
+				)) => {
+					assert_eq!(relay_parent, hash);
+					tx.send(Ok(1)).unwrap();
+				}
+			);
+
+			assert_matches!(
+				overseer_recv(overseer).await,
+				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+					relay_parent,
+					RuntimeApiRequest::SessionInfo(s, tx),
+				)) => {
+					assert_eq!(relay_parent, hash);
+					assert_eq!(s, 1);
+					let mut session_info = make_session_info();
+					session_info.discovery_keys = PAST_PRESENT_FUTURE_AUTHORITIES.clone();
+					tx.send(Ok(Some(session_info))).unwrap();
+				}
+			);
+
+			assert_matches!(
+				overseer_recv(overseer).await,
+				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+					relay_parent,
+					RuntimeApiRequest::Authorities(tx),
+				)) => {
+					assert_eq!(relay_parent, hash);
+					tx.send(Ok(PAST_PRESENT_FUTURE_AUTHORITIES.clone())).unwrap();
+				}
+			);
+
+			assert_matches!(
+				overseer_recv(overseer).await,
+				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToResolvedValidators {
+					validator_addrs,
+					peer_set,
+				}) => {
+					let all_without_ferdie: Vec<_> = PAST_PRESENT_FUTURE_AUTHORITIES
+						.iter()
+						.cloned()
+						.filter(|p| p != &Sr25519Keyring::Ferdie.public().into())
+						.collect();
+
+					let addrs = get_multiaddrs(all_without_ferdie, authority_discovery_mock.clone()).await;
+
+					assert_eq!(validator_addrs, addrs);
+					assert_eq!(peer_set, PeerSet::Validation);
+				}
+			);
+
+			// Ensure neighbors are unaffected
+			assert_matches!(
+				overseer_recv(overseer).await,
+				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+					_,
+					RuntimeApiRequest::CurrentBabeEpoch(tx),
+				)) => {
+					let _ = tx.send(Ok(BabeEpoch {
+						epoch_index: 2 as _,
+						start_slot: 0.into(),
+						duration: 200,
+						authorities: vec![(Sr25519Keyring::Alice.public().into(), 1)],
+						randomness: [0u8; 32],
+						config: BabeEpochConfiguration {
+							c: (1, 4),
+							allowed_slots: AllowedSlots::PrimarySlots,
+						},
+					})).unwrap();
+				}
+			);
+
+			assert_matches!(
+				overseer_recv(overseer).await,
+				AllMessages::NetworkBridgeRx(NetworkBridgeRxMessage::NewGossipTopology {
+					session: _,
+					local_index: _,
+					canonical_shuffling: _,
+					shuffled_indices: _,
+				}) => {
+
+				}
+			);
+
+			// 2. Connect all authorities that are known so far.
+			let known_authorities = authority_discovery_mock.authorities();
+			for (peer_id, _id) in known_authorities.iter() {
+				let msg =
+					GossipSupportMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected(
+						*peer_id,
+						ObservedRole::Authority,
+						ValidationVersion::V3.into(),
+						None,
+					));
+				overseer.send(FromOrchestra::Communication { msg }).await
+			}
+
+			// 3. Send a new leaf after TRY_RERESOLVE_AUTHORITIES, we should notice
+			//    UpdateAuthorithies is emitted for all ConnectedPeers.
+			Delay::new(TRY_RERESOLVE_AUTHORITIES).await;
+			let hash = Hash::repeat_byte(0xBB);
+			overseer_signal_active_leaves(overseer, hash).await;
+
+			assert_matches!(
+				overseer_recv(overseer).await,
+				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+					relay_parent,
+					RuntimeApiRequest::SessionIndexForChild(tx),
+				)) => {
+					assert_eq!(relay_parent, hash);
+					tx.send(Ok(1)).unwrap();
+				}
+			);
+
+			assert_matches!(
+				overseer_recv(overseer).await,
+				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+					relay_parent,
+					RuntimeApiRequest::SessionInfo(s, tx),
+				)) => {
+					assert_eq!(relay_parent, hash);
+					assert_eq!(s, 1);
+					let mut session_info = make_session_info();
+					session_info.discovery_keys = PAST_PRESENT_FUTURE_AUTHORITIES.clone();
+					tx.send(Ok(Some(session_info))).unwrap();
+
+				}
+			);
+
+			assert_matches!(
+				overseer_recv(overseer).await,
+				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+					relay_parent,
+					RuntimeApiRequest::Authorities(tx),
+				)) => {
+					assert_eq!(relay_parent, hash);
+					tx.send(Ok(PAST_PRESENT_FUTURE_AUTHORITIES.clone())).unwrap();
+				}
+			);
+
+			for _ in 0..known_authorities.len() {
+				assert_matches!(
+					overseer_recv(overseer).await,
+					AllMessages::NetworkBridgeRx(NetworkBridgeRxMessage::UpdatedAuthorityIds {
+						peer_id,
+						authority_ids,
+					}) => {
+						assert_eq!(authority_discovery_mock.get_authority_ids_by_peer_id(peer_id).await.unwrap_or_default(), authority_ids);
+					}
+				);
+			}
+
+			// 4. At next re-resolve no-authorithy changes their address, so it should be no-op.
+			Delay::new(TRY_RERESOLVE_AUTHORITIES).await;
+			let hash = Hash::repeat_byte(0xCC);
+			overseer_signal_active_leaves(overseer, hash).await;
+			assert_matches!(
+				overseer_recv(overseer).await,
+				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+					relay_parent,
+					RuntimeApiRequest::SessionIndexForChild(tx),
+				)) => {
+					assert_eq!(relay_parent, hash);
+					tx.send(Ok(1)).unwrap();
+				}
+			);
+			assert_matches!(
+				overseer_recv(overseer).await,
+				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+					relay_parent,
+					RuntimeApiRequest::SessionInfo(s, tx),
+				)) => {
+					assert_eq!(relay_parent, hash);
+					assert_eq!(s, 1);
+					let mut session_info = make_session_info();
+					session_info.discovery_keys = PAST_PRESENT_FUTURE_AUTHORITIES.clone();
+					tx.send(Ok(Some(session_info))).unwrap();
+
+				}
+			);
+
+			assert_matches!(
+				overseer_recv(overseer).await,
+				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+					relay_parent,
+					RuntimeApiRequest::Authorities(tx),
+				)) => {
+					assert_eq!(relay_parent, hash);
+					tx.send(Ok(PAST_PRESENT_FUTURE_AUTHORITIES.clone())).unwrap();
+				}
+			);
+			assert!(overseer.recv().timeout(TIMEOUT).await.is_none());
+
+			// Change address for one authorithy and check we try to connect to it and
+			// that we emit UpdateAuthorityID for the old PeerId and the new one.
+			Delay::new(TRY_RERESOLVE_AUTHORITIES).await;
+			let changed_peerid = authority_discovery_mock
+				.change_address_for_authority(authority_that_changes_address.clone());
+			let hash = Hash::repeat_byte(0xDD);
+			let msg = GossipSupportMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected(
+				changed_peerid,
+				ObservedRole::Authority,
+				ValidationVersion::V3.into(),
+				None,
+			));
+			overseer.send(FromOrchestra::Communication { msg }).await;
+
+			overseer_signal_active_leaves(overseer, hash).await;
+			assert_matches!(
+				overseer_recv(overseer).await,
+				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+					relay_parent,
+					RuntimeApiRequest::SessionIndexForChild(tx),
+				)) => {
+					assert_eq!(relay_parent, hash);
+					tx.send(Ok(1)).unwrap();
+				}
+			);
+			assert_matches!(
+				overseer_recv(overseer).await,
+				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+					relay_parent,
+					RuntimeApiRequest::SessionInfo(s, tx),
+				)) => {
+					assert_eq!(relay_parent, hash);
+					assert_eq!(s, 1);
+					let mut session_info = make_session_info();
+					session_info.discovery_keys = PAST_PRESENT_FUTURE_AUTHORITIES.clone();
+					tx.send(Ok(Some(session_info))).unwrap();
+
+				}
+			);
+
+			assert_matches!(
+				overseer_recv(overseer).await,
+				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+					relay_parent,
+					RuntimeApiRequest::Authorities(tx),
+				)) => {
+					assert_eq!(relay_parent, hash);
+					tx.send(Ok(PAST_PRESENT_FUTURE_AUTHORITIES.clone())).unwrap();
+				}
+			);
+
+			assert_matches!(
+				overseer_recv(overseer).await,
+				AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::AddToResolvedValidators {
+					validator_addrs,
+					peer_set,
+				}) => {
+					let expected = get_address_map(vec![authority_that_changes_address.clone()], authority_discovery_mock.clone()).await;
+					let expected: HashSet<Multiaddr> = expected.into_values().flat_map(|v| v.into_iter()).collect();
+					assert_eq!(validator_addrs.into_iter().flat_map(|v| v.into_iter()).collect::<HashSet<_>>(), expected);
+					assert_eq!(peer_set, PeerSet::Validation);
+				}
+			);
+
+			assert_matches!(
+				overseer_recv(overseer).await,
+				AllMessages::NetworkBridgeRx(NetworkBridgeRxMessage::UpdatedAuthorityIds {
+					peer_id,
+					authority_ids,
+				}) => {
+					assert_eq!(authority_discovery_mock.get_authority_ids_by_peer_id(peer_id).await.unwrap(), HashSet::from([authority_that_changes_address.clone()]));
+					assert!(authority_ids.is_empty());
+				}
+			);
+
+			assert_matches!(
+				overseer_recv(overseer).await,
+				AllMessages::NetworkBridgeRx(NetworkBridgeRxMessage::UpdatedAuthorityIds {
+					peer_id,
+					authority_ids,
+				}) => {
+					assert_eq!(authority_ids, HashSet::from([authority_that_changes_address]));
+					assert_eq!(changed_peerid, peer_id);
+				}
+			);
+
+			assert!(overseer.recv().timeout(TIMEOUT).await.is_none());
+
+			virtual_overseer
+		},
+	);
+}
+
 #[test]
 fn disconnect_when_not_in_past_present_future() {
 	sp_tracing::try_init_simple();
diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs
index ee937bca05bfe..4d27ac9b70e33 100644
--- a/polkadot/node/subsystem-types/src/messages.rs
+++ b/polkadot/node/subsystem-types/src/messages.rs
@@ -444,6 +444,16 @@ pub enum NetworkBridgeTxMessage {
 		/// The peer set we want the connection on.
 		peer_set: PeerSet,
 	},
+
+	/// Extends the known validators set with new peers we already know the `Multiaddrs`, this is
+	/// usually needed for validators that change their address mid-session. It is usually called
+	/// after a ConnectToResolvedValidators at the beginning of the session.
+	AddToResolvedValidators {
+		/// Each entry corresponds to the addresses of an already resolved validator.
+		validator_addrs: Vec<HashSet<Multiaddr>>,
+		/// The peer set we want the connection on.
+		peer_set: PeerSet,
+	},
 }
 
 /// Availability Distribution Message.
diff --git a/prdoc/pr_3786.prdoc b/prdoc/pr_3786.prdoc
new file mode 100644
index 0000000000000..0bb9e6c23f75b
--- /dev/null
+++ b/prdoc/pr_3786.prdoc
@@ -0,0 +1,22 @@
+title: Make changing of peer-id while active a bit more robust
+
+doc:
+  - audience: Node Dev
+    description: |
+      Implemetation of https://github.com/polkadot-fellows/RFCs/pull/91, to use `creation_time` field to determine
+      the newest DHT record and to update nodes known to have the old record.
+
+      Gossip-support is modified to try to re-resolve new address authorithies every 5 minutes instead of each session,
+      so that we pick autorithies that changed their address faster and try to connect to them.
+
+crates:
+- name: sc-authority-discovery
+  bump: major
+- name: polkadot-gossip-support
+  bump: major
+- name: polkadot-network-bridge
+  bump: major
+- name: polkadot-node-subsystem-types
+  bump: major
+- name: sc-network
+  bump: minor
\ No newline at end of file
diff --git a/substrate/client/authority-discovery/build.rs b/substrate/client/authority-discovery/build.rs
index 83076ac8c8937..cdabc1a74427d 100644
--- a/substrate/client/authority-discovery/build.rs
+++ b/substrate/client/authority-discovery/build.rs
@@ -18,7 +18,11 @@
 
 fn main() {
 	prost_build::compile_protos(
-		&["src/worker/schema/dht-v1.proto", "src/worker/schema/dht-v2.proto"],
+		&[
+			"src/worker/schema/dht-v1.proto",
+			"src/worker/schema/dht-v2.proto",
+			"src/worker/schema/dht-v3.proto",
+		],
 		&["src/worker/schema"],
 	)
 	.unwrap();
diff --git a/substrate/client/authority-discovery/src/worker.rs b/substrate/client/authority-discovery/src/worker.rs
index 1f1cce160786c..42994bbc7ea89 100644
--- a/substrate/client/authority-discovery/src/worker.rs
+++ b/substrate/client/authority-discovery/src/worker.rs
@@ -26,7 +26,7 @@ use std::{
 	collections::{HashMap, HashSet},
 	marker::PhantomData,
 	sync::Arc,
-	time::Duration,
+	time::{Duration, Instant, SystemTime, UNIX_EPOCH},
 };
 
 use futures::{channel::mpsc, future, stream::Fuse, FutureExt, Stream, StreamExt};
@@ -34,16 +34,17 @@ use futures::{channel::mpsc, future, stream::Fuse, FutureExt, Stream, StreamExt}
 use addr_cache::AddrCache;
 use codec::{Decode, Encode};
 use ip_network::IpNetwork;
+use libp2p::kad::{PeerRecord, Record};
 use linked_hash_set::LinkedHashSet;
 
-use log::{debug, error, log_enabled};
+use log::{debug, error};
 use prometheus_endpoint::{register, Counter, CounterVec, Gauge, Opts, U64};
 use prost::Message;
 use rand::{seq::SliceRandom, thread_rng};
 
 use sc_network::{
-	event::DhtEvent, multiaddr, KademliaKey, Multiaddr, NetworkDHTProvider, NetworkSigner,
-	NetworkStateInfo,
+	config::DEFAULT_KADEMLIA_REPLICATION_FACTOR, event::DhtEvent, multiaddr, KademliaKey,
+	Multiaddr, NetworkDHTProvider, NetworkSigner, NetworkStateInfo,
 };
 use sc_network_types::{multihash::Code, PeerId};
 use schema::PeerSignature;
@@ -62,7 +63,7 @@ mod schema {
 	#[cfg(test)]
 	mod tests;
 
-	include!(concat!(env!("OUT_DIR"), "/authority_discovery_v2.rs"));
+	include!(concat!(env!("OUT_DIR"), "/authority_discovery_v3.rs"));
 }
 #[cfg(test)]
 pub mod tests;
@@ -159,6 +160,16 @@ pub struct Worker<Client, Block: BlockT, DhtEventStream> {
 	/// Set of in-flight lookups.
 	in_flight_lookups: HashMap<KademliaKey, AuthorityId>,
 
+	/// Set of lookups we can still receive records.
+	/// These are the entries in the `in_flight_lookups` for which
+	/// we got at least one successfull result.
+	known_lookups: HashMap<KademliaKey, AuthorityId>,
+
+	/// Last known record by key, here we always keep the record with
+	/// the highest creation time and we don't accept records older than
+	/// that.
+	last_known_records: HashMap<KademliaKey, RecordInfo>,
+
 	addr_cache: addr_cache::AddrCache,
 
 	metrics: Option<Metrics>,
@@ -168,6 +179,17 @@ pub struct Worker<Client, Block: BlockT, DhtEventStream> {
 	phantom: PhantomData<Block>,
 }
 
+#[derive(Debug, Clone)]
+struct RecordInfo {
+	/// Time since UNIX_EPOCH in nanoseconds.
+	creation_time: u128,
+	/// Peers that we know have this record, bounded to no more than
+	/// DEFAULT_KADEMLIA_REPLICATION_FACTOR(20).
+	peers_with_record: HashSet<PeerId>,
+	/// The record itself.
+	record: Record,
+}
+
 /// Wrapper for [`AuthorityDiscoveryApi`](sp_authority_discovery::AuthorityDiscoveryApi). Can be
 /// be implemented by any struct without dependency on the runtime.
 #[async_trait::async_trait]
@@ -283,10 +305,12 @@ where
 			query_interval,
 			pending_lookups: Vec::new(),
 			in_flight_lookups: HashMap::new(),
+			known_lookups: HashMap::new(),
 			addr_cache,
 			role,
 			metrics,
 			phantom: PhantomData,
+			last_known_records: HashMap::new(),
 		}
 	}
 
@@ -444,7 +468,7 @@ where
 				.set(addresses.len().try_into().unwrap_or(std::u64::MAX));
 		}
 
-		let serialized_record = serialize_authority_record(addresses)?;
+		let serialized_record = serialize_authority_record(addresses, Some(build_creation_time()))?;
 		let peer_signature = sign_record_with_peer_id(&serialized_record, &self.network)?;
 
 		let keys_vec = keys.iter().cloned().collect::<Vec<_>>();
@@ -495,12 +519,17 @@ where
 		self.authorities_queried_at = Some(best_hash);
 
 		self.addr_cache.retain_ids(&authorities);
+		let now = Instant::now();
+		self.last_known_records.retain(|k, value| {
+			self.known_authorities.contains_key(k) && !value.record.is_expired(now)
+		});
 
 		authorities.shuffle(&mut thread_rng());
 		self.pending_lookups = authorities;
 		// Ignore all still in-flight lookups. Those that are still in-flight are likely stalled as
 		// query interval ticks are far enough apart for all lookups to succeed.
 		self.in_flight_lookups.clear();
+		self.known_lookups.clear();
 
 		if let Some(metrics) = &self.metrics {
 			metrics
@@ -538,16 +567,12 @@ where
 					metrics.dht_event_received.with_label_values(&["value_found"]).inc();
 				}
 
-				if log_enabled!(log::Level::Debug) {
-					let hashes: Vec<_> = v.iter().map(|(hash, _value)| hash.clone()).collect();
-					debug!(target: LOG_TARGET, "Value for hash '{:?}' found on Dht.", hashes);
-				}
+				debug!(target: LOG_TARGET, "Value for hash '{:?}' found on Dht.", v.record.key);
 
 				if let Err(e) = self.handle_dht_value_found_event(v) {
 					if let Some(metrics) = &self.metrics {
 						metrics.handle_value_found_event_failure.inc();
 					}
-
 					debug!(target: LOG_TARGET, "Failed to handle Dht value found event: {}", e);
 				}
 			},
@@ -651,6 +676,31 @@ where
 			publisher,
 			authority_id,
 		)?;
+
+		let records_creation_time: u128 =
+			schema::AuthorityRecord::decode(signed_record.record.as_slice())
+				.map_err(Error::DecodingProto)?
+				.creation_time
+				.map(|creation_time| {
+					u128::decode(&mut &creation_time.timestamp[..]).unwrap_or_default()
+				})
+				.unwrap_or_default(); // 0 is a sane default for records that do not have creation time present.
+
+		let current_record_info = self.last_known_records.get(&record_key);
+		// If record creation time is older than the current record creation time,
+		// we don't store it since we want to give higher priority to newer records.
+		if let Some(current_record_info) = current_record_info {
+			if records_creation_time < current_record_info.creation_time {
+				debug!(
+					target: LOG_TARGET,
+					"Skip storing because record creation time {:?} is older than the current known record {:?}",
+					records_creation_time,
+					current_record_info.creation_time
+				);
+				return Ok(());
+			}
+		}
+
 		self.network.store_record(record_key, record_value, Some(publisher), expires);
 		Ok(())
 	}
@@ -701,67 +751,88 @@ where
 		Ok(())
 	}
 
-	fn handle_dht_value_found_event(&mut self, values: Vec<(KademliaKey, Vec<u8>)>) -> Result<()> {
+	fn handle_dht_value_found_event(&mut self, peer_record: PeerRecord) -> Result<()> {
 		// Ensure `values` is not empty and all its keys equal.
-		let remote_key = single(values.iter().map(|(key, _)| key.clone()))
-			.map_err(|_| Error::ReceivingDhtValueFoundEventWithDifferentKeys)?
-			.ok_or(Error::ReceivingDhtValueFoundEventWithNoRecords)?;
-
-		let authority_id: AuthorityId = self
-			.in_flight_lookups
-			.remove(&remote_key)
-			.ok_or(Error::ReceivingUnexpectedRecord)?;
+		let remote_key = peer_record.record.key.clone();
+
+		let authority_id: AuthorityId =
+			if let Some(authority_id) = self.in_flight_lookups.remove(&remote_key) {
+				self.known_lookups.insert(remote_key.clone(), authority_id.clone());
+				authority_id
+			} else if let Some(authority_id) = self.known_lookups.get(&remote_key) {
+				authority_id.clone()
+			} else {
+				return Err(Error::ReceivingUnexpectedRecord);
+			};
 
 		let local_peer_id = self.network.local_peer_id();
 
-		let remote_addresses: Vec<Multiaddr> = values
-			.into_iter()
-			.map(|(_k, v)| {
-				let schema::SignedAuthorityRecord { record, peer_signature, .. } =
-					Self::check_record_signed_with_authority_id(&v, &authority_id)?;
-
-				let addresses: Vec<Multiaddr> = schema::AuthorityRecord::decode(record.as_slice())
-					.map(|a| a.addresses)
-					.map_err(Error::DecodingProto)?
-					.into_iter()
-					.map(|a| a.try_into())
-					.collect::<std::result::Result<_, _>>()
-					.map_err(Error::ParsingMultiaddress)?;
-
-				let get_peer_id = |a: &Multiaddr| match a.iter().last() {
-					Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key).ok(),
-					_ => None,
-				};
-
-				// Ignore [`Multiaddr`]s without [`PeerId`] or with own addresses.
-				let addresses: Vec<Multiaddr> = addresses
-					.into_iter()
-					.filter(|a| get_peer_id(a).filter(|p| *p != local_peer_id).is_some())
-					.collect();
-
-				let remote_peer_id = single(addresses.iter().map(get_peer_id))
-					.map_err(|_| Error::ReceivingDhtValueFoundEventWithDifferentPeerIds)? // different peer_id in records
-					.flatten()
-					.ok_or(Error::ReceivingDhtValueFoundEventWithNoPeerIds)?; // no records with peer_id in them
-
-				// At this point we know all the valid multiaddresses from the record, know that
-				// each of them belong to the same PeerId, we just need to check if the record is
-				// properly signed by the owner of the PeerId
-				self.check_record_signed_with_network_key(
-					&record,
-					peer_signature,
-					remote_peer_id,
-					&authority_id,
-				)?;
-				Ok(addresses)
+		let schema::SignedAuthorityRecord { record, peer_signature, .. } =
+			Self::check_record_signed_with_authority_id(
+				peer_record.record.value.as_slice(),
+				&authority_id,
+			)?;
+
+		let authority_record =
+			schema::AuthorityRecord::decode(record.as_slice()).map_err(Error::DecodingProto)?;
+
+		let records_creation_time: u128 = authority_record
+			.creation_time
+			.as_ref()
+			.map(|creation_time| {
+				u128::decode(&mut &creation_time.timestamp[..]).unwrap_or_default()
 			})
-			.collect::<Result<Vec<Vec<Multiaddr>>>>()?
+			.unwrap_or_default(); // 0 is a sane default for records that do not have creation time present.
+
+		let addresses: Vec<Multiaddr> = authority_record
+			.addresses
 			.into_iter()
-			.flatten()
-			.take(MAX_ADDRESSES_PER_AUTHORITY)
+			.map(|a| a.try_into())
+			.collect::<std::result::Result<_, _>>()
+			.map_err(Error::ParsingMultiaddress)?;
+
+		let get_peer_id = |a: &Multiaddr| match a.iter().last() {
+			Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key).ok(),
+			_ => None,
+		};
+
+		// Ignore [`Multiaddr`]s without [`PeerId`] or with own addresses.
+		let addresses: Vec<Multiaddr> = addresses
+			.into_iter()
+			.filter(|a| get_peer_id(&a).filter(|p| *p != local_peer_id).is_some())
 			.collect();
 
-		if !remote_addresses.is_empty() {
+		let remote_peer_id = single(addresses.iter().map(|a| get_peer_id(&a)))
+			.map_err(|_| Error::ReceivingDhtValueFoundEventWithDifferentPeerIds)? // different peer_id in records
+			.flatten()
+			.ok_or(Error::ReceivingDhtValueFoundEventWithNoPeerIds)?; // no records with peer_id in them
+
+		// At this point we know all the valid multiaddresses from the record, know that
+		// each of them belong to the same PeerId, we just need to check if the record is
+		// properly signed by the owner of the PeerId
+		self.check_record_signed_with_network_key(
+			&record,
+			peer_signature,
+			remote_peer_id,
+			&authority_id,
+		)?;
+
+		let remote_addresses: Vec<Multiaddr> =
+			addresses.into_iter().take(MAX_ADDRESSES_PER_AUTHORITY).collect();
+
+		let answering_peer_id = peer_record.peer.map(|peer| peer.into());
+
+		let addr_cache_needs_update = self.handle_new_record(
+			&authority_id,
+			remote_key.clone(),
+			RecordInfo {
+				creation_time: records_creation_time,
+				peers_with_record: answering_peer_id.into_iter().collect(),
+				record: peer_record.record,
+			},
+		);
+
+		if !remote_addresses.is_empty() && addr_cache_needs_update {
 			self.addr_cache.insert(authority_id, remote_addresses);
 			if let Some(metrics) = &self.metrics {
 				metrics
@@ -772,6 +843,68 @@ where
 		Ok(())
 	}
 
+	// Handles receiving a new DHT record for the authorithy.
+	// Returns true if the record was new, false if the record was older than the current one.
+	fn handle_new_record(
+		&mut self,
+		authority_id: &AuthorityId,
+		kademlia_key: KademliaKey,
+		new_record: RecordInfo,
+	) -> bool {
+		let current_record_info = self
+			.last_known_records
+			.entry(kademlia_key.clone())
+			.or_insert_with(|| new_record.clone());
+
+		if new_record.creation_time > current_record_info.creation_time {
+			let peers_that_need_updating = current_record_info.peers_with_record.clone();
+			self.network.put_record_to(
+				new_record.record.clone(),
+				peers_that_need_updating.clone(),
+				// If this is empty it means we received the answer from our node local
+				// storage, so we need to update that as well.
+				current_record_info.peers_with_record.is_empty(),
+			);
+			debug!(
+					target: LOG_TARGET,
+					"Found a newer record for {:?} new record creation time {:?} old record creation time {:?}",
+					authority_id, new_record.creation_time, current_record_info.creation_time
+			);
+			self.last_known_records.insert(kademlia_key, new_record);
+			return true
+		}
+
+		if new_record.creation_time == current_record_info.creation_time {
+			// Same record just update in case this is a record from old nodes that don't have
+			// timestamp.
+			debug!(
+					target: LOG_TARGET,
+					"Found same record for {:?} record creation time {:?}",
+					authority_id, new_record.creation_time
+			);
+			if current_record_info.peers_with_record.len() + new_record.peers_with_record.len() <=
+				DEFAULT_KADEMLIA_REPLICATION_FACTOR
+			{
+				current_record_info.peers_with_record.extend(new_record.peers_with_record);
+			}
+			return true
+		}
+
+		debug!(
+				target: LOG_TARGET,
+				"Found old record for {:?} received record creation time {:?} current record creation time {:?}",
+				authority_id, new_record.creation_time, current_record_info.creation_time,
+		);
+		self.network.put_record_to(
+			current_record_info.record.clone(),
+			new_record.peers_with_record.clone(),
+			// If this is empty it means we received the answer from our node local
+			// storage, so we need to update that as well.
+			new_record.peers_with_record.is_empty(),
+		);
+		return false
+	}
+
 	/// Retrieve our public keys within the current and next authority set.
 	// A node might have multiple authority discovery keys within its keystore, e.g. an old one and
 	// one for the upcoming session. In addition it could be participating in the current and (/ or)
@@ -838,9 +971,21 @@ fn serialize_addresses(addresses: impl Iterator<Item = Multiaddr>) -> Vec<Vec<u8
 	addresses.map(|a| a.to_vec()).collect()
 }
 
-fn serialize_authority_record(addresses: Vec<Vec<u8>>) -> Result<Vec<u8>> {
+fn build_creation_time() -> schema::TimestampInfo {
+	let creation_time = SystemTime::now()
+		.duration_since(UNIX_EPOCH)
+		.map(|time| time.as_nanos())
+		.unwrap_or_default();
+	schema::TimestampInfo { timestamp: creation_time.encode() }
+}
+
+fn serialize_authority_record(
+	addresses: Vec<Vec<u8>>,
+	creation_time: Option<schema::TimestampInfo>,
+) -> Result<Vec<u8>> {
 	let mut serialized_record = vec![];
-	schema::AuthorityRecord { addresses }
+
+	schema::AuthorityRecord { addresses, creation_time }
 		.encode(&mut serialized_record)
 		.map_err(Error::EncodingProto)?;
 	Ok(serialized_record)
@@ -876,7 +1021,6 @@ fn sign_record_with_authority_ids(
 
 		// Scale encode
 		let auth_signature = auth_signature.encode();
-
 		let signed_record = schema::SignedAuthorityRecord {
 			record: serialized_record.clone(),
 			auth_signature,
diff --git a/substrate/client/authority-discovery/src/worker/schema/dht-v3.proto b/substrate/client/authority-discovery/src/worker/schema/dht-v3.proto
new file mode 100644
index 0000000000000..547237573af29
--- /dev/null
+++ b/substrate/client/authority-discovery/src/worker/schema/dht-v3.proto
@@ -0,0 +1,31 @@
+syntax = "proto3";
+
+package authority_discovery_v3;
+
+// First we need to serialize the addresses in order to be able to sign them.
+message AuthorityRecord {
+	// Possibly multiple `MultiAddress`es through which the node can be reached.
+	repeated bytes addresses = 1;
+	// Information about the creation time of the record
+	TimestampInfo creation_time = 2;
+}
+
+message PeerSignature {
+	bytes signature = 1;
+	bytes public_key = 2;
+}
+
+// Information regarding the creation data of the record
+message TimestampInfo {
+	// Time since UNIX_EPOCH in nanoseconds, scale encoded
+	bytes timestamp = 1;
+}
+
+// Then we need to serialize the authority record and signature to send them over the wire.
+message SignedAuthorityRecord {
+	bytes record = 1;
+	bytes auth_signature = 2;
+	// Even if there are multiple `record.addresses`, all of them have the same peer id.
+	// Old versions are missing this field. It is optional in order to provide compatibility both ways.
+	PeerSignature peer_signature = 3;
+}
diff --git a/substrate/client/authority-discovery/src/worker/schema/tests.rs b/substrate/client/authority-discovery/src/worker/schema/tests.rs
index ef06ed7d336b0..557fa9641f97b 100644
--- a/substrate/client/authority-discovery/src/worker/schema/tests.rs
+++ b/substrate/client/authority-discovery/src/worker/schema/tests.rs
@@ -20,7 +20,12 @@ mod schema_v1 {
 	include!(concat!(env!("OUT_DIR"), "/authority_discovery_v1.rs"));
 }
 
+mod schema_v2 {
+	include!(concat!(env!("OUT_DIR"), "/authority_discovery_v2.rs"));
+}
+
 use super::*;
+use codec::Encode;
 use libp2p::identity::Keypair;
 use prost::Message;
 use sc_network::{Multiaddr, PeerId};
@@ -65,7 +70,7 @@ fn v1_decodes_v2() {
 	let vec_auth_signature = b"Totally valid signature, I promise!".to_vec();
 	let vec_peer_signature = b"Surprisingly hard to crack crypto".to_vec();
 
-	let record_v2 = AuthorityRecord { addresses: vec_addresses.clone() };
+	let record_v2 = schema_v2::AuthorityRecord { addresses: vec_addresses.clone() };
 	let mut vec_record_v2 = vec![];
 	record_v2.encode(&mut vec_record_v2).unwrap();
 	let vec_peer_public = peer_public.encode_protobuf();
@@ -85,6 +90,82 @@ fn v1_decodes_v2() {
 	assert_eq!(&signed_addresses_v1_decoded.addresses, &vec_record_v2);
 	assert_eq!(&signed_addresses_v1_decoded.signature, &vec_auth_signature);
 
-	let addresses_v2_decoded = AuthorityRecord::decode(vec_record_v2.as_slice()).unwrap();
+	let addresses_v2_decoded =
+		schema_v2::AuthorityRecord::decode(vec_record_v2.as_slice()).unwrap();
+	assert_eq!(&addresses_v2_decoded.addresses, &vec_addresses);
+}
+
+#[test]
+fn v1_decodes_v3() {
+	let peer_secret = Keypair::generate_ed25519();
+	let peer_public = peer_secret.public();
+	let peer_id = peer_public.to_peer_id();
+	let multiaddress: Multiaddr =
+		format!("/ip4/127.0.0.1/tcp/3003/p2p/{}", peer_id).parse().unwrap();
+	let vec_addresses = vec![multiaddress.to_vec()];
+	let vec_auth_signature = b"Totally valid signature, I promise!".to_vec();
+	let vec_peer_signature = b"Surprisingly hard to crack crypto".to_vec();
+
+	let record_v3 = AuthorityRecord {
+		addresses: vec_addresses.clone(),
+		creation_time: Some(TimestampInfo { timestamp: Encode::encode(&55) }),
+	};
+	let mut vec_record_v3 = vec![];
+	record_v3.encode(&mut vec_record_v3).unwrap();
+	let vec_peer_public = peer_public.encode_protobuf();
+	let peer_signature_v3 =
+		PeerSignature { public_key: vec_peer_public, signature: vec_peer_signature };
+	let signed_record_v3 = SignedAuthorityRecord {
+		record: vec_record_v3.clone(),
+		auth_signature: vec_auth_signature.clone(),
+		peer_signature: Some(peer_signature_v3.clone()),
+	};
+	let mut vec_signed_record_v3 = vec![];
+	signed_record_v3.encode(&mut vec_signed_record_v3).unwrap();
+
+	let signed_addresses_v1_decoded =
+		schema_v1::SignedAuthorityAddresses::decode(vec_signed_record_v3.as_slice()).unwrap();
+
+	assert_eq!(&signed_addresses_v1_decoded.addresses, &vec_record_v3);
+	assert_eq!(&signed_addresses_v1_decoded.signature, &vec_auth_signature);
+
+	let addresses_v2_decoded =
+		schema_v2::AuthorityRecord::decode(vec_record_v3.as_slice()).unwrap();
 	assert_eq!(&addresses_v2_decoded.addresses, &vec_addresses);
 }
+
+#[test]
+fn v3_decodes_v2() {
+	let peer_secret = Keypair::generate_ed25519();
+	let peer_public = peer_secret.public();
+	let peer_id = peer_public.to_peer_id();
+	let multiaddress: Multiaddr =
+		format!("/ip4/127.0.0.1/tcp/3003/p2p/{}", peer_id).parse().unwrap();
+	let vec_addresses = vec![multiaddress.to_vec()];
+	let vec_auth_signature = b"Totally valid signature, I promise!".to_vec();
+	let vec_peer_signature = b"Surprisingly hard to crack crypto".to_vec();
+
+	let record_v2 = schema_v2::AuthorityRecord { addresses: vec_addresses.clone() };
+	let mut vec_record_v2 = vec![];
+	record_v2.encode(&mut vec_record_v2).unwrap();
+	let vec_peer_public = peer_public.encode_protobuf();
+	let peer_signature_v2 =
+		schema_v2::PeerSignature { public_key: vec_peer_public, signature: vec_peer_signature };
+	let signed_record_v2 = schema_v2::SignedAuthorityRecord {
+		record: vec_record_v2.clone(),
+		auth_signature: vec_auth_signature.clone(),
+		peer_signature: Some(peer_signature_v2.clone()),
+	};
+	let mut vec_signed_record_v2 = vec![];
+	signed_record_v2.encode(&mut vec_signed_record_v2).unwrap();
+
+	let signed_addresses_v3_decoded =
+		SignedAuthorityRecord::decode(vec_signed_record_v2.as_slice()).unwrap();
+
+	assert_eq!(&signed_addresses_v3_decoded.record, &vec_record_v2);
+	assert_eq!(&signed_addresses_v3_decoded.auth_signature, &vec_auth_signature);
+
+	let addresses_v3_decoded = AuthorityRecord::decode(vec_record_v2.as_slice()).unwrap();
+	assert_eq!(&addresses_v3_decoded.addresses, &vec_addresses);
+	assert_eq!(&addresses_v3_decoded.creation_time, &None);
+}
diff --git a/substrate/client/authority-discovery/src/worker/tests.rs b/substrate/client/authority-discovery/src/worker/tests.rs
index de7443d634fa7..b49615382b8a2 100644
--- a/substrate/client/authority-discovery/src/worker/tests.rs
+++ b/substrate/client/authority-discovery/src/worker/tests.rs
@@ -119,6 +119,7 @@ sp_api::mock_impl_runtime_apis! {
 pub enum TestNetworkEvent {
 	GetCalled(KademliaKey),
 	PutCalled(KademliaKey, Vec<u8>),
+	PutToCalled(Record, HashSet<sc_network_types::PeerId>, bool),
 	StoreRecordCalled(KademliaKey, Vec<u8>, Option<sc_network_types::PeerId>, Option<Instant>),
 }
 
@@ -129,6 +130,7 @@ pub struct TestNetwork {
 	// Whenever functions on `TestNetwork` are called, the function arguments are added to the
 	// vectors below.
 	pub put_value_call: Arc<Mutex<Vec<(KademliaKey, Vec<u8>)>>>,
+	pub put_value_to_call: Arc<Mutex<Vec<(Record, HashSet<sc_network_types::PeerId>, bool)>>>,
 	pub get_value_call: Arc<Mutex<Vec<KademliaKey>>>,
 	pub store_value_call:
 		Arc<Mutex<Vec<(KademliaKey, Vec<u8>, Option<sc_network_types::PeerId>, Option<Instant>)>>>,
@@ -153,6 +155,7 @@ impl Default for TestNetwork {
 			external_addresses: vec!["/ip6/2001:db8::/tcp/30333".parse().unwrap()],
 			put_value_call: Default::default(),
 			get_value_call: Default::default(),
+			put_value_to_call: Default::default(),
 			store_value_call: Default::default(),
 			event_sender: tx,
 			event_receiver: Some(rx),
@@ -200,6 +203,23 @@ impl NetworkDHTProvider for TestNetwork {
 			.unwrap();
 	}
 
+	fn put_record_to(
+		&self,
+		record: Record,
+		peers: HashSet<sc_network_types::PeerId>,
+		update_local_storage: bool,
+	) {
+		self.put_value_to_call.lock().unwrap().push((
+			record.clone(),
+			peers.clone(),
+			update_local_storage,
+		));
+		self.event_sender
+			.clone()
+			.unbounded_send(TestNetworkEvent::PutToCalled(record, peers, update_local_storage))
+			.unwrap();
+	}
+
 	fn store_record(
 		&self,
 		key: KademliaKey,
@@ -262,9 +282,11 @@ fn build_dht_event<Signer: NetworkSigner>(
 	public_key: AuthorityId,
 	key_store: &MemoryKeystore,
 	network: Option<&Signer>,
+	creation_time: Option<schema::TimestampInfo>,
 ) -> Vec<(KademliaKey, Vec<u8>)> {
 	let serialized_record =
-		serialize_authority_record(serialize_addresses(addresses.into_iter())).unwrap();
+		serialize_authority_record(serialize_addresses(addresses.into_iter()), creation_time)
+			.unwrap();
 
 	let peer_signature = network.map(|n| sign_record_with_peer_id(&serialized_record, n).unwrap());
 	let kv_pairs = sign_record_with_authority_ids(
@@ -372,7 +394,10 @@ fn publish_discover_cycle() {
 
 			let dht_event = {
 				let (key, value) = network.put_value_call.lock().unwrap().pop().unwrap();
-				DhtEvent::ValueFound(vec![(key, value)])
+				DhtEvent::ValueFound(PeerRecord {
+					peer: None,
+					record: Record { key, value, publisher: None, expires: None },
+				})
 			};
 
 			// Node B discovering node A's address.
@@ -515,21 +540,39 @@ fn dont_stop_polling_dht_event_stream_after_bogus_event() {
 
 		// Send an event that should generate an error
 		dht_event_tx
-			.send(DhtEvent::ValueFound(Default::default()))
+			.send(DhtEvent::ValueFound(PeerRecord {
+				peer: None,
+				record: Record {
+					key: vec![0x9u8].into(),
+					value: Default::default(),
+					publisher: None,
+					expires: None,
+				},
+			}))
 			.await
 			.expect("Channel has capacity of 1.");
 
 		// Make previously triggered lookup succeed.
-		let dht_event = {
-			let kv_pairs = build_dht_event::<TestNetwork>(
-				vec![remote_multiaddr.clone()],
-				remote_public_key.clone(),
-				&remote_key_store,
-				None,
-			);
-			DhtEvent::ValueFound(kv_pairs)
-		};
-		dht_event_tx.send(dht_event).await.expect("Channel has capacity of 1.");
+		let kv_pairs: Vec<PeerRecord> = build_dht_event::<TestNetwork>(
+			vec![remote_multiaddr.clone()],
+			remote_public_key.clone(),
+			&remote_key_store,
+			None,
+			Some(build_creation_time()),
+		)
+		.into_iter()
+		.map(|(key, value)| PeerRecord {
+			peer: None,
+			record: Record { key, value, publisher: None, expires: None },
+		})
+		.collect();
+
+		for kv_pair in kv_pairs {
+			dht_event_tx
+				.send(DhtEvent::ValueFound(kv_pair))
+				.await
+				.expect("Channel has capacity of 1.");
+		}
 
 		// Expect authority discovery to function normally, now knowing the
 		// address for the remote node.
@@ -581,37 +624,51 @@ impl DhtValueFoundTester {
 		&mut self,
 		strict_record_validation: bool,
 		values: Vec<(KademliaKey, Vec<u8>)>,
-	) -> Option<&HashSet<Multiaddr>> {
+	) -> (Option<HashSet<Multiaddr>>, Option<Arc<TestNetwork>>) {
 		let (_dht_event_tx, dht_event_rx) = channel(1);
 		let local_test_api =
 			Arc::new(TestApi { authorities: vec![self.remote_authority_public.into()] });
-		let local_network: Arc<TestNetwork> = Arc::new(Default::default());
 		let local_key_store = MemoryKeystore::new();
 
 		let (_to_worker, from_service) = mpsc::channel(0);
-		let mut local_worker = Worker::new(
-			from_service,
-			local_test_api,
-			local_network.clone(),
-			Box::pin(dht_event_rx),
-			Role::PublishAndDiscover(Arc::new(local_key_store)),
-			None,
-			WorkerConfig { strict_record_validation, ..Default::default() },
-		);
+		let (local_worker, local_network) = if let Some(local_work) = self.local_worker.as_mut() {
+			(local_work, None)
+		} else {
+			let local_network: Arc<TestNetwork> = Arc::new(Default::default());
+
+			self.local_worker = Some(Worker::new(
+				from_service,
+				local_test_api,
+				local_network.clone(),
+				Box::pin(dht_event_rx),
+				Role::PublishAndDiscover(Arc::new(local_key_store)),
+				None,
+				WorkerConfig { strict_record_validation, ..Default::default() },
+			));
+			(self.local_worker.as_mut().unwrap(), Some(local_network))
+		};
 
 		block_on(local_worker.refill_pending_lookups_queue()).unwrap();
 		local_worker.start_new_lookups();
 
-		drop(local_worker.handle_dht_value_found_event(values));
-
-		self.local_worker = Some(local_worker);
+		for record in values.into_iter().map(|(key, value)| PeerRecord {
+			peer: Some(PeerId::random().into()),
+			record: Record { key, value, publisher: None, expires: None },
+		}) {
+			drop(local_worker.handle_dht_value_found_event(record))
+		}
 
-		self.local_worker
-			.as_ref()
-			.map(|w| {
-				w.addr_cache.get_addresses_by_authority_id(&self.remote_authority_public.into())
-			})
-			.unwrap()
+		(
+			self.local_worker
+				.as_ref()
+				.map(|w| {
+					w.addr_cache
+						.get_addresses_by_authority_id(&self.remote_authority_public.into())
+						.cloned()
+				})
+				.unwrap(),
+			local_network,
+		)
 	}
 }
 
@@ -625,9 +682,10 @@ fn limit_number_of_addresses_added_to_cache_per_authority() {
 		tester.remote_authority_public.into(),
 		&tester.remote_key_store,
 		None,
+		Some(build_creation_time()),
 	);
 
-	let cached_remote_addresses = tester.process_value_found(false, kv_pairs);
+	let cached_remote_addresses = tester.process_value_found(false, kv_pairs).0;
 	assert_eq!(MAX_ADDRESSES_PER_AUTHORITY, cached_remote_addresses.unwrap().len());
 }
 
@@ -640,17 +698,242 @@ fn strict_accept_address_with_peer_signature() {
 		tester.remote_authority_public.into(),
 		&tester.remote_key_store,
 		Some(&TestSigner { keypair: &tester.remote_node_key }),
+		Some(build_creation_time()),
 	);
 
-	let cached_remote_addresses = tester.process_value_found(true, kv_pairs);
+	let cached_remote_addresses = tester.process_value_found(true, kv_pairs).0;
 
 	assert_eq!(
-		Some(&HashSet::from([addr])),
+		Some(HashSet::from([addr])),
 		cached_remote_addresses,
 		"Expect worker to only cache `Multiaddr`s with `PeerId`s.",
 	);
 }
 
+#[test]
+fn strict_accept_address_without_creation_time() {
+	let mut tester = DhtValueFoundTester::new();
+	let addr = tester.multiaddr_with_peer_id(1);
+	let kv_pairs = build_dht_event(
+		vec![addr.clone()],
+		tester.remote_authority_public.into(),
+		&tester.remote_key_store,
+		Some(&TestSigner { keypair: &tester.remote_node_key }),
+		None,
+	);
+
+	let cached_remote_addresses = tester.process_value_found(true, kv_pairs).0;
+
+	assert_eq!(
+		Some(HashSet::from([addr])),
+		cached_remote_addresses,
+		"Expect worker to cache address without creation time",
+	);
+}
+
+#[test]
+fn keep_last_received_if_no_creation_time() {
+	let mut tester: DhtValueFoundTester = DhtValueFoundTester::new();
+	let addr = tester.multiaddr_with_peer_id(1);
+	let kv_pairs = build_dht_event(
+		vec![addr.clone()],
+		tester.remote_authority_public.into(),
+		&tester.remote_key_store,
+		Some(&TestSigner { keypair: &tester.remote_node_key }),
+		None,
+	);
+
+	let (cached_remote_addresses, network) = tester.process_value_found(true, kv_pairs);
+
+	assert_eq!(
+		Some(HashSet::from([addr])),
+		cached_remote_addresses,
+		"Expect worker to cache address without creation time",
+	);
+
+	assert!(network
+		.as_ref()
+		.map(|network| network.put_value_to_call.lock().unwrap().is_empty())
+		.unwrap_or_default());
+
+	let addr2 = tester.multiaddr_with_peer_id(2);
+	let kv_pairs = build_dht_event(
+		vec![addr2.clone()],
+		tester.remote_authority_public.into(),
+		&tester.remote_key_store,
+		Some(&TestSigner { keypair: &tester.remote_node_key }),
+		None,
+	);
+
+	let cached_remote_addresses = tester.process_value_found(true, kv_pairs).0;
+
+	assert_eq!(
+		Some(HashSet::from([addr2])),
+		cached_remote_addresses,
+		"Expect worker to cache last received when no creation time",
+	);
+	assert!(network
+		.as_ref()
+		.map(|network| network.put_value_to_call.lock().unwrap().is_empty())
+		.unwrap_or_default());
+}
+
+#[test]
+fn records_with_incorrectly_signed_creation_time_are_ignored() {
+	let mut tester: DhtValueFoundTester = DhtValueFoundTester::new();
+	let addr = tester.multiaddr_with_peer_id(1);
+	let kv_pairs = build_dht_event(
+		vec![addr.clone()],
+		tester.remote_authority_public.into(),
+		&tester.remote_key_store,
+		Some(&TestSigner { keypair: &tester.remote_node_key }),
+		Some(build_creation_time()),
+	);
+
+	let (cached_remote_addresses, network) = tester.process_value_found(true, kv_pairs);
+
+	assert_eq!(
+		Some(HashSet::from([addr.clone()])),
+		cached_remote_addresses,
+		"Expect worker to cache record with creation time",
+	);
+	assert!(network
+		.as_ref()
+		.map(|network| network.put_value_to_call.lock().unwrap().is_empty())
+		.unwrap_or_default());
+
+	let alternative_key = tester
+		.remote_key_store
+		.sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None)
+		.unwrap();
+
+	let addr2 = tester.multiaddr_with_peer_id(2);
+	let mut kv_pairs = build_dht_event(
+		vec![addr2.clone()],
+		alternative_key.into(),
+		&tester.remote_key_store,
+		Some(&TestSigner { keypair: &tester.remote_node_key }),
+		Some(build_creation_time()),
+	);
+	let kademlia_key = hash_authority_id(tester.remote_authority_public.as_slice());
+	for key in kv_pairs.iter_mut() {
+		key.0 = kademlia_key.clone();
+	}
+	let cached_remote_addresses = tester.process_value_found(true, kv_pairs).0;
+
+	assert_eq!(
+		Some(HashSet::from([addr])),
+		cached_remote_addresses,
+		"Expect `Multiaddr` to remain the same",
+	);
+	assert!(network
+		.as_ref()
+		.map(|network| network.put_value_to_call.lock().unwrap().is_empty())
+		.unwrap_or_default());
+}
+
+#[test]
+fn newer_records_overwrite_older_ones() {
+	let mut tester: DhtValueFoundTester = DhtValueFoundTester::new();
+	let old_record = tester.multiaddr_with_peer_id(1);
+	let kv_pairs = build_dht_event(
+		vec![old_record.clone()],
+		tester.remote_authority_public.into(),
+		&tester.remote_key_store,
+		Some(&TestSigner { keypair: &tester.remote_node_key }),
+		Some(build_creation_time()),
+	);
+
+	let (cached_remote_addresses, network) = tester.process_value_found(true, kv_pairs);
+
+	assert_eq!(
+		Some(HashSet::from([old_record])),
+		cached_remote_addresses,
+		"Expect worker to cache record with creation time",
+	);
+
+	let nothing_updated = network
+		.as_ref()
+		.map(|network| network.put_value_to_call.lock().unwrap().is_empty())
+		.unwrap();
+	assert!(nothing_updated);
+
+	let new_record = tester.multiaddr_with_peer_id(2);
+	let kv_pairs = build_dht_event(
+		vec![new_record.clone()],
+		tester.remote_authority_public.into(),
+		&tester.remote_key_store,
+		Some(&TestSigner { keypair: &tester.remote_node_key }),
+		Some(build_creation_time()),
+	);
+
+	let cached_remote_addresses = tester.process_value_found(true, kv_pairs).0;
+
+	assert_eq!(
+		Some(HashSet::from([new_record])),
+		cached_remote_addresses,
+		"Expect worker to store the newest recrod",
+	);
+
+	let result = network
+		.as_ref()
+		.map(|network| network.put_value_to_call.lock().unwrap().first().unwrap().clone())
+		.unwrap();
+	assert!(matches!(result, (_, _, false)));
+	assert_eq!(result.1.len(), 1);
+}
+
+#[test]
+fn older_records_dont_affect_newer_ones() {
+	let mut tester: DhtValueFoundTester = DhtValueFoundTester::new();
+	let old_record = tester.multiaddr_with_peer_id(1);
+	let old_kv_pairs = build_dht_event(
+		vec![old_record.clone()],
+		tester.remote_authority_public.into(),
+		&tester.remote_key_store,
+		Some(&TestSigner { keypair: &tester.remote_node_key }),
+		Some(build_creation_time()),
+	);
+
+	let new_record = tester.multiaddr_with_peer_id(2);
+	let kv_pairs = build_dht_event(
+		vec![new_record.clone()],
+		tester.remote_authority_public.into(),
+		&tester.remote_key_store,
+		Some(&TestSigner { keypair: &tester.remote_node_key }),
+		Some(build_creation_time()),
+	);
+
+	let (cached_remote_addresses, network) = tester.process_value_found(true, kv_pairs);
+
+	assert_eq!(
+		Some(HashSet::from([new_record.clone()])),
+		cached_remote_addresses,
+		"Expect worker to store new record",
+	);
+
+	let nothing_updated = network
+		.as_ref()
+		.map(|network| network.put_value_to_call.lock().unwrap().is_empty())
+		.unwrap();
+	assert!(nothing_updated);
+
+	let cached_remote_addresses = tester.process_value_found(true, old_kv_pairs).0;
+
+	assert_eq!(
+		Some(HashSet::from([new_record])),
+		cached_remote_addresses,
+		"Expect worker to not update stored record",
+	);
+
+	let update_peers_info = network
+		.as_ref()
+		.map(|network| network.put_value_to_call.lock().unwrap().remove(0))
+		.unwrap();
+	assert!(matches!(update_peers_info, (_, _, false)));
+	assert_eq!(update_peers_info.1.len(), 1);
+}
+
 #[test]
 fn reject_address_with_rogue_peer_signature() {
 	let mut tester = DhtValueFoundTester::new();
@@ -660,9 +943,10 @@ fn reject_address_with_rogue_peer_signature() {
 		tester.remote_authority_public.into(),
 		&tester.remote_key_store,
 		Some(&TestSigner { keypair: &rogue_remote_node_key }),
+		Some(build_creation_time()),
 	);
 
-	let cached_remote_addresses = tester.process_value_found(false, kv_pairs);
+	let cached_remote_addresses = tester.process_value_found(false, kv_pairs).0;
 
 	assert!(
 		cached_remote_addresses.is_none(),
@@ -678,13 +962,14 @@ fn reject_address_with_invalid_peer_signature() {
 		tester.remote_authority_public.into(),
 		&tester.remote_key_store,
 		Some(&TestSigner { keypair: &tester.remote_node_key }),
+		Some(build_creation_time()),
 	);
 	// tamper with the signature
 	let mut record = schema::SignedAuthorityRecord::decode(kv_pairs[0].1.as_slice()).unwrap();
 	record.peer_signature.as_mut().map(|p| p.signature[1] = !p.signature[1]);
 	record.encode(&mut kv_pairs[0].1).unwrap();
 
-	let cached_remote_addresses = tester.process_value_found(false, kv_pairs);
+	let cached_remote_addresses = tester.process_value_found(false, kv_pairs).0;
 
 	assert!(
 		cached_remote_addresses.is_none(),
@@ -700,9 +985,10 @@ fn reject_address_without_peer_signature() {
 		tester.remote_authority_public.into(),
 		&tester.remote_key_store,
 		None,
+		Some(build_creation_time()),
 	);
 
-	let cached_remote_addresses = tester.process_value_found(true, kv_pairs);
+	let cached_remote_addresses = tester.process_value_found(true, kv_pairs).0;
 
 	assert!(cached_remote_addresses.is_none(), "Expected worker to ignore unsigned record.",);
 }
@@ -718,12 +1004,13 @@ fn do_not_cache_addresses_without_peer_id() {
 		tester.remote_authority_public.into(),
 		&tester.remote_key_store,
 		None,
+		Some(build_creation_time()),
 	);
 
-	let cached_remote_addresses = tester.process_value_found(false, kv_pairs);
+	let cached_remote_addresses = tester.process_value_found(false, kv_pairs).0;
 
 	assert_eq!(
-		Some(&HashSet::from([multiaddr_with_peer_id])),
+		Some(HashSet::from([multiaddr_with_peer_id])),
 		cached_remote_addresses,
 		"Expect worker to only cache `Multiaddr`s with `PeerId`s.",
 	);
@@ -861,16 +1148,24 @@ fn lookup_throttling() {
 			// Make first lookup succeed.
 			let remote_hash = network.get_value_call.lock().unwrap().pop().unwrap();
 			let remote_key: AuthorityId = remote_hash_to_key.get(&remote_hash).unwrap().clone();
-			let dht_event = {
-				let kv_pairs = build_dht_event::<TestNetwork>(
-					vec![remote_multiaddr.clone()],
-					remote_key,
-					&remote_key_store,
-					None,
-				);
-				DhtEvent::ValueFound(kv_pairs)
-			};
-			dht_event_tx.send(dht_event).await.expect("Channel has capacity of 1.");
+			let kv_pairs = build_dht_event::<TestNetwork>(
+				vec![remote_multiaddr.clone()],
+				remote_key,
+				&remote_key_store,
+				None,
+				Some(build_creation_time()),
+			)
+			.into_iter()
+			.map(|(key, value)| PeerRecord {
+				peer: None,
+				record: Record { key, value, publisher: None, expires: None },
+			});
+			for kv_pair in kv_pairs {
+				dht_event_tx
+					.send(DhtEvent::ValueFound(kv_pair))
+					.await
+					.expect("Channel has capacity of 1.");
+			}
 
 			// Assert worker to trigger another lookup.
 			assert!(matches!(receiver.next().await, Some(TestNetworkEvent::GetCalled(_))));
@@ -899,14 +1194,17 @@ fn lookup_throttling() {
 
 #[test]
 fn test_handle_put_record_request() {
-	let network = TestNetwork::default();
-	let peer_id = network.peer_id;
+	let local_node_network = TestNetwork::default();
+	let remote_node_network = TestNetwork::default();
+	let peer_id = remote_node_network.peer_id;
 
 	let remote_multiaddr = {
 		let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:1/tcp/30333".parse().unwrap();
 
-		address.with(multiaddr::Protocol::P2p(peer_id.into()))
+		address.with(multiaddr::Protocol::P2p(remote_node_network.peer_id.into()))
 	};
+
+	println!("{:?}", remote_multiaddr);
 	let remote_key_store = MemoryKeystore::new();
 	let remote_public_keys: Vec<AuthorityId> = (0..20)
 		.map(|_| {
@@ -928,7 +1226,7 @@ fn test_handle_put_record_request() {
 
 	let (_dht_event_tx, dht_event_rx) = channel(1);
 	let (_to_worker, from_service) = mpsc::channel(0);
-	let network = Arc::new(network);
+	let network = Arc::new(local_node_network);
 	let mut worker = Worker::new(
 		from_service,
 		Arc::new(TestApi { authorities: remote_public_keys.clone() }),
@@ -944,10 +1242,11 @@ fn test_handle_put_record_request() {
 	let valid_authorithy_key = remote_public_keys.first().unwrap().clone();
 
 	let kv_pairs = build_dht_event(
-		vec![remote_multiaddr],
-		valid_authorithy_key.into(),
+		vec![remote_multiaddr.clone()],
+		valid_authorithy_key.clone().into(),
 		&remote_key_store,
-		Some(&TestSigner { keypair: &network.identity }),
+		Some(&TestSigner { keypair: &remote_node_network.identity }),
+		Some(build_creation_time()),
 	);
 
 	pool.run_until(
@@ -986,7 +1285,7 @@ fn test_handle_put_record_request() {
 			let key = hash_authority_id(another_authorithy_id.as_ref());
 
 			// Valid record signed with a different key should return error.
-			for (_, value) in kv_pairs {
+			for (_, value) in kv_pairs.clone() {
 				assert!(matches!(
 					worker
 						.handle_put_record_requested(key.clone(), value, Some(peer_id), None)
@@ -995,6 +1294,57 @@ fn test_handle_put_record_request() {
 				));
 			}
 			assert_eq!(network.store_value_call.lock().unwrap().len(), 1);
+			let newer_kv_pairs = build_dht_event(
+				vec![remote_multiaddr],
+				valid_authorithy_key.clone().into(),
+				&remote_key_store,
+				Some(&TestSigner { keypair: &remote_node_network.identity }),
+				Some(build_creation_time()),
+			);
+
+			// Valid old authority, should not throw error, but it should not be stored since a
+			// newer one already exists.
+			for (new_key, new_value) in newer_kv_pairs.clone() {
+				worker.in_flight_lookups.insert(new_key.clone(), valid_authorithy_key.clone());
+
+				let found = PeerRecord {
+					peer: Some(peer_id.into()),
+					record: Record {
+						key: new_key,
+						value: new_value,
+						publisher: Some(peer_id.into()),
+						expires: None,
+					},
+				};
+				assert!(worker.handle_dht_value_found_event(found).is_ok());
+			}
+
+			for (key, value) in kv_pairs.clone() {
+				assert!(worker
+					.handle_put_record_requested(key, value, Some(peer_id), None)
+					.await
+					.is_ok());
+			}
+			assert_eq!(network.store_value_call.lock().unwrap().len(), 1);
+
+			// Newer kv pairs should always be stored.
+			for (key, value) in newer_kv_pairs.clone() {
+				assert!(worker
+					.handle_put_record_requested(key, value, Some(peer_id), None)
+					.await
+					.is_ok());
+			}
+
+			assert_eq!(network.store_value_call.lock().unwrap().len(), 2);
+
+			worker.refill_pending_lookups_queue().await.unwrap();
+			assert_eq!(worker.last_known_records.len(), 1);
+
+			// Check known records gets clean up, when an authorithy gets out of the
+			// active set.
+			worker.client = Arc::new(TestApi { authorities: Default::default() });
+			worker.refill_pending_lookups_queue().await.unwrap();
+			assert_eq!(worker.last_known_records.len(), 0);
 		}
 		.boxed_local(),
 	);
diff --git a/substrate/client/network/src/behaviour.rs b/substrate/client/network/src/behaviour.rs
index 68816a10980d4..9a6324dafd37d 100644
--- a/substrate/client/network/src/behaviour.rs
+++ b/substrate/client/network/src/behaviour.rs
@@ -31,8 +31,13 @@ use crate::{
 
 use futures::channel::oneshot;
 use libp2p::{
-	connection_limits::ConnectionLimits, core::Multiaddr, identify::Info as IdentifyInfo,
-	identity::PublicKey, kad::RecordKey, swarm::NetworkBehaviour, PeerId, StreamProtocol,
+	connection_limits::ConnectionLimits,
+	core::Multiaddr,
+	identify::Info as IdentifyInfo,
+	identity::PublicKey,
+	kad::{Record, RecordKey},
+	swarm::NetworkBehaviour,
+	PeerId, StreamProtocol,
 };
 
 use parking_lot::Mutex;
@@ -289,6 +294,16 @@ impl<B: BlockT> Behaviour<B> {
 		self.discovery.put_value(key, value);
 	}
 
+	/// Puts a record into DHT, on the provided Peers
+	pub fn put_record_to(
+		&mut self,
+		record: Record,
+		peers: HashSet<sc_network_types::PeerId>,
+		update_local_storage: bool,
+	) {
+		self.discovery.put_record_to(record, peers, update_local_storage);
+	}
+
 	/// Stores value in DHT
 	pub fn store_record(
 		&mut self,
diff --git a/substrate/client/network/src/discovery.rs b/substrate/client/network/src/discovery.rs
index 3145b891a8d3c..86c66c22701cc 100644
--- a/substrate/client/network/src/discovery.rs
+++ b/substrate/client/network/src/discovery.rs
@@ -58,7 +58,8 @@ use libp2p::{
 		self,
 		record::store::{MemoryStore, RecordStore},
 		Behaviour as Kademlia, BucketInserts, Config as KademliaConfig, Event as KademliaEvent,
-		GetClosestPeersError, GetRecordOk, QueryId, QueryResult, Quorum, Record, RecordKey,
+		GetClosestPeersError, GetRecordOk, PeerRecord, QueryId, QueryResult, Quorum, Record,
+		RecordKey,
 	},
 	mdns::{self, tokio::Behaviour as TokioMdns},
 	multiaddr::Protocol,
@@ -92,8 +93,12 @@ const MAX_KNOWN_EXTERNAL_ADDRESSES: usize = 32;
 /// record is replicated to.
 pub const DEFAULT_KADEMLIA_REPLICATION_FACTOR: usize = 20;
 
+// The minimum number of peers we expect an answer before we terminate the request.
+const GET_RECORD_REDUNDANCY_FACTOR: u32 = 4;
+
 /// `DiscoveryBehaviour` configuration.
 ///
+///
 /// Note: In order to discover nodes or load and store values via Kademlia one has to add
 ///       Kademlia protocol via [`DiscoveryConfig::with_kademlia`].
 pub struct DiscoveryConfig {
@@ -234,7 +239,6 @@ impl DiscoveryConfig {
 			// auto-insertion and instead add peers manually.
 			config.set_kbucket_inserts(BucketInserts::Manual);
 			config.disjoint_query_paths(kademlia_disjoint_query_paths);
-
 			let store = MemoryStore::new(local_peer_id);
 			let mut kad = Kademlia::with_config(local_peer_id, store, config);
 			kad.set_mode(Some(kad::Mode::Server));
@@ -437,6 +441,31 @@ impl DiscoveryBehaviour {
 		}
 	}
 
+	/// Puts a record into the DHT on the provided `peers`
+	///
+	/// If `update_local_storage` is true, the local storage is update as well.
+	pub fn put_record_to(
+		&mut self,
+		record: Record,
+		peers: HashSet<sc_network_types::PeerId>,
+		update_local_storage: bool,
+	) {
+		if let Some(kad) = self.kademlia.as_mut() {
+			if update_local_storage {
+				if let Err(_e) = kad.store_mut().put(record.clone()) {
+					warn!(target: "sub-libp2p", "Failed to update local starage");
+				}
+			}
+
+			if !peers.is_empty() {
+				kad.put_record_to(
+					record,
+					peers.into_iter().map(|peer_id| peer_id.into()),
+					Quorum::All,
+				);
+			}
+		}
+	}
 	/// Store a record in the Kademlia record store.
 	pub fn store_record(
 		&mut self,
@@ -527,7 +556,7 @@ pub enum DiscoveryOut {
 	/// The DHT yielded results for the record request.
 	///
 	/// Returning the result grouped in (key, value) pairs as well as the request duration.
-	ValueFound(Vec<(RecordKey, Vec<u8>)>, Duration),
+	ValueFound(PeerRecord, Duration),
 
 	/// The DHT received a put record request.
 	PutRecordRequest(
@@ -860,16 +889,24 @@ impl NetworkBehaviour for DiscoveryBehaviour {
 							Ok(GetRecordOk::FoundRecord(r)) => {
 								debug!(
 									target: "sub-libp2p",
-									"Libp2p => Found record ({:?}) with value: {:?}",
+									"Libp2p => Found record ({:?}) with value: {:?} id {:?} stats {:?}",
 									r.record.key,
 									r.record.value,
+									id,
+									stats,
 								);
 
-								// Let's directly finish the query, as we are only interested in a
-								// quorum of 1.
-								if let Some(kad) = self.kademlia.as_mut() {
-									if let Some(mut query) = kad.query_mut(&id) {
-										query.finish();
+								// Let's directly finish the query if we are above 4.
+								// This number is small enough to make sure we don't
+								// unnecessarily flood the network with queries, but high
+								// enough to make sure we also touch peers which might have
+								// old record, so that we can update them once we notice
+								// they have old records.
+								if stats.num_successes() > GET_RECORD_REDUNDANCY_FACTOR {
+									if let Some(kad) = self.kademlia.as_mut() {
+										if let Some(mut query) = kad.query_mut(&id) {
+											query.finish();
+										}
 									}
 								}
 
@@ -877,14 +914,18 @@ impl NetworkBehaviour for DiscoveryBehaviour {
 								// `FinishedWithNoAdditionalRecord`.
 								self.records_to_publish.insert(id, r.record.clone());
 
-								DiscoveryOut::ValueFound(
-									vec![(r.record.key, r.record.value)],
-									stats.duration().unwrap_or_default(),
-								)
+								DiscoveryOut::ValueFound(r, stats.duration().unwrap_or_default())
 							},
 							Ok(GetRecordOk::FinishedWithNoAdditionalRecord {
 								cache_candidates,
 							}) => {
+								debug!(
+									target: "sub-libp2p",
+									"Libp2p => Finished with no-additional-record {:?} stats {:?} took {:?} ms",
+									id,
+									stats,
+									stats.duration().map(|val| val.as_millis())
+								);
 								// We always need to remove the record to not leak any data!
 								if let Some(record) = self.records_to_publish.remove(&id) {
 									if cache_candidates.is_empty() {
diff --git a/substrate/client/network/src/event.rs b/substrate/client/network/src/event.rs
index b518a2094d766..5400d11cb6ac2 100644
--- a/substrate/client/network/src/event.rs
+++ b/substrate/client/network/src/event.rs
@@ -22,7 +22,10 @@
 use crate::types::ProtocolName;
 
 use bytes::Bytes;
-use libp2p::{kad::record::Key, PeerId};
+use libp2p::{
+	kad::{record::Key, PeerRecord},
+	PeerId,
+};
 
 use sc_network_common::role::ObservedRole;
 
@@ -31,7 +34,7 @@ use sc_network_common::role::ObservedRole;
 #[must_use]
 pub enum DhtEvent {
 	/// The value was found.
-	ValueFound(Vec<(Key, Vec<u8>)>),
+	ValueFound(PeerRecord),
 
 	/// The requested record has not been found in the DHT.
 	ValueNotFound(Key),
diff --git a/substrate/client/network/src/litep2p/discovery.rs b/substrate/client/network/src/litep2p/discovery.rs
index 6ff05e6af327a..22285e7906c6c 100644
--- a/substrate/client/network/src/litep2p/discovery.rs
+++ b/substrate/client/network/src/litep2p/discovery.rs
@@ -50,6 +50,7 @@ use schnellru::{ByLength, LruMap};
 use std::{
 	cmp,
 	collections::{HashMap, HashSet, VecDeque},
+	num::NonZeroUsize,
 	pin::Pin,
 	sync::Arc,
 	task::{Context, Poll},
@@ -68,6 +69,9 @@ const MDNS_QUERY_INTERVAL: Duration = Duration::from_secs(30);
 /// Minimum number of confirmations received before an address is verified.
 const MIN_ADDRESS_CONFIRMATIONS: usize = 5;
 
+// The minimum number of peers we expect an answer before we terminate the request.
+const GET_RECORD_REDUNDANCY_FACTOR: usize = 4;
+
 /// Discovery events.
 #[derive(Debug)]
 pub enum DiscoveryEvent {
@@ -340,7 +344,10 @@ impl Discovery {
 	/// Start Kademlia `GET_VALUE` query for `key`.
 	pub async fn get_value(&mut self, key: KademliaKey) -> QueryId {
 		self.kademlia_handle
-			.get_record(RecordKey::new(&key.to_vec()), Quorum::One)
+			.get_record(
+				RecordKey::new(&key.to_vec()),
+				Quorum::N(NonZeroUsize::new(GET_RECORD_REDUNDANCY_FACTOR).unwrap()),
+			)
 			.await
 	}
 
@@ -351,6 +358,22 @@ impl Discovery {
 			.await
 	}
 
+	/// Put record to given peers.
+	pub async fn put_value_to_peers(
+		&mut self,
+		record: Record,
+		peers: Vec<sc_network_types::PeerId>,
+		update_local_storage: bool,
+	) -> QueryId {
+		self.kademlia_handle
+			.put_record_to_peers(
+				record,
+				peers.into_iter().map(|peer| peer.into()).collect(),
+				update_local_storage,
+			)
+			.await
+	}
+
 	/// Store record in the local DHT store.
 	pub async fn store_record(
 		&mut self,
diff --git a/substrate/client/network/src/litep2p/mod.rs b/substrate/client/network/src/litep2p/mod.rs
index 4f3cc57894311..475046cb9dbab 100644
--- a/substrate/client/network/src/litep2p/mod.rs
+++ b/substrate/client/network/src/litep2p/mod.rs
@@ -50,7 +50,7 @@ use crate::{
 
 use codec::Encode;
 use futures::StreamExt;
-use libp2p::kad::RecordKey;
+use libp2p::kad::{PeerRecord, Record as P2PRecord, RecordKey};
 use litep2p::{
 	config::ConfigBuilder,
 	crypto::ed25519::Keypair,
@@ -703,6 +703,12 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
 							let query_id = self.discovery.put_value(key.clone(), value).await;
 							self.pending_put_values.insert(query_id, (key, Instant::now()));
 						}
+						NetworkServiceCommand::PutValueTo { record, peers, update_local_storage} => {
+							let kademlia_key = record.key.to_vec().into();
+							let query_id = self.discovery.put_value_to_peers(record, peers, update_local_storage).await;
+							self.pending_put_values.insert(query_id, (kademlia_key, Instant::now()));
+						}
+
 						NetworkServiceCommand::StoreRecord { key, value, publisher, expires } => {
 							self.discovery.store_record(key, value, publisher.map(Into::into), expires).await;
 						}
@@ -819,19 +825,15 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
 									"`GET_VALUE` for {:?} ({query_id:?}) succeeded",
 									key,
 								);
-
-								let value_found = match records {
-									RecordsType::LocalStore(record) => vec![
-										(libp2p::kad::RecordKey::new(&record.key), record.value)
-									],
-									RecordsType::Network(records) => records.into_iter().map(|peer_record| {
-										(libp2p::kad::RecordKey::new(&peer_record.record.key), peer_record.record.value)
-									}).collect(),
-								};
-
-								self.event_streams.send(Event::Dht(
-									DhtEvent::ValueFound(value_found)
-								));
+								for record in litep2p_to_libp2p_peer_record(records) {
+									self.event_streams.send(
+										Event::Dht(
+											DhtEvent::ValueFound(
+												record
+											)
+										)
+									);
+								}
 
 								if let Some(ref metrics) = self.metrics {
 									metrics
@@ -1026,3 +1028,42 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
 		}
 	}
 }
+
+// Glue code to convert from a litep2p records type to a libp2p2 PeerRecord.
+fn litep2p_to_libp2p_peer_record(records: RecordsType) -> Vec<PeerRecord> {
+	match records {
+		litep2p::protocol::libp2p::kademlia::RecordsType::LocalStore(record) => {
+			vec![PeerRecord {
+				record: P2PRecord {
+					key: record.key.to_vec().into(),
+					value: record.value,
+					publisher: record.publisher.map(|peer_id| {
+						let peer_id: sc_network_types::PeerId = peer_id.into();
+						peer_id.into()
+					}),
+					expires: record.expires,
+				},
+				peer: None,
+			}]
+		},
+		litep2p::protocol::libp2p::kademlia::RecordsType::Network(records) => records
+			.into_iter()
+			.map(|record| {
+				let peer_id: sc_network_types::PeerId = record.peer.into();
+
+				PeerRecord {
+					record: P2PRecord {
+						key: record.record.key.to_vec().into(),
+						value: record.record.value,
+						publisher: record.record.publisher.map(|peer_id| {
+							let peer_id: sc_network_types::PeerId = peer_id.into();
+							peer_id.into()
+						}),
+						expires: record.record.expires,
+					},
+					peer: Some(peer_id.into()),
+				}
+			})
+			.collect::<Vec<_>>(),
+	}
+}
diff --git a/substrate/client/network/src/litep2p/service.rs b/substrate/client/network/src/litep2p/service.rs
index 7d972bbeee5c7..67fc44e6bfe0e 100644
--- a/substrate/client/network/src/litep2p/service.rs
+++ b/substrate/client/network/src/litep2p/service.rs
@@ -32,6 +32,7 @@ use crate::{
 	RequestFailure, Signature,
 };
 
+use crate::litep2p::Record;
 use codec::DecodeAll;
 use futures::{channel::oneshot, stream::BoxStream};
 use libp2p::{identity::SigningError, kad::record::Key as KademliaKey};
@@ -76,6 +77,15 @@ pub enum NetworkServiceCommand {
 		value: Vec<u8>,
 	},
 
+	/// Put value to DHT.
+	PutValueTo {
+		/// Record.
+		record: Record,
+		/// Peers we want to put the record.
+		peers: Vec<sc_network_types::PeerId>,
+		/// If we should update the local storage or not.
+		update_local_storage: bool,
+	},
 	/// Store record in the local DHT store.
 	StoreRecord {
 		/// Record key.
@@ -253,6 +263,27 @@ impl NetworkDHTProvider for Litep2pNetworkService {
 		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::PutValue { key, value });
 	}
 
+	fn put_record_to(
+		&self,
+		record: libp2p::kad::Record,
+		peers: HashSet<PeerId>,
+		update_local_storage: bool,
+	) {
+		let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::PutValueTo {
+			record: Record {
+				key: record.key.to_vec().into(),
+				value: record.value,
+				publisher: record.publisher.map(|peer_id| {
+					let peer_id: sc_network_types::PeerId = peer_id.into();
+					peer_id.into()
+				}),
+				expires: record.expires,
+			},
+			peers: peers.into_iter().collect(),
+			update_local_storage,
+		});
+	}
+
 	fn store_record(
 		&self,
 		key: KademliaKey,
diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs
index 550e00a6cf2fb..71d0b45aa06da 100644
--- a/substrate/client/network/src/service.rs
+++ b/substrate/client/network/src/service.rs
@@ -68,7 +68,7 @@ use libp2p::{
 	core::{upgrade, ConnectedPoint, Endpoint},
 	identify::Info as IdentifyInfo,
 	identity::ed25519,
-	kad::record::Key as KademliaKey,
+	kad::{record::Key as KademliaKey, Record},
 	multiaddr::{self, Multiaddr},
 	swarm::{
 		Config as SwarmConfig, ConnectionError, ConnectionId, DialError, Executor, ListenError,
@@ -946,6 +946,19 @@ where
 		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutValue(key, value));
 	}
 
+	fn put_record_to(
+		&self,
+		record: Record,
+		peers: HashSet<sc_network_types::PeerId>,
+		update_local_storage: bool,
+	) {
+		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutRecordTo {
+			record,
+			peers,
+			update_local_storage,
+		});
+	}
+
 	fn store_record(
 		&self,
 		key: KademliaKey,
@@ -1314,6 +1327,11 @@ impl<'a> NotificationSenderReadyT for NotificationSenderReady<'a> {
 enum ServiceToWorkerMsg {
 	GetValue(KademliaKey),
 	PutValue(KademliaKey, Vec<u8>),
+	PutRecordTo {
+		record: Record,
+		peers: HashSet<sc_network_types::PeerId>,
+		update_local_storage: bool,
+	},
 	StoreRecord(KademliaKey, Vec<u8>, Option<PeerId>, Option<Instant>),
 	AddKnownAddress(PeerId, Multiaddr),
 	EventStream(out_events::Sender),
@@ -1440,6 +1458,10 @@ where
 				self.network_service.behaviour_mut().get_value(key),
 			ServiceToWorkerMsg::PutValue(key, value) =>
 				self.network_service.behaviour_mut().put_value(key, value),
+			ServiceToWorkerMsg::PutRecordTo { record, peers, update_local_storage } => self
+				.network_service
+				.behaviour_mut()
+				.put_record_to(record, peers, update_local_storage),
 			ServiceToWorkerMsg::StoreRecord(key, value, publisher, expires) => self
 				.network_service
 				.behaviour_mut()
diff --git a/substrate/client/network/src/service/traits.rs b/substrate/client/network/src/service/traits.rs
index 69c9fbc3a6186..bd4f83c7fd440 100644
--- a/substrate/client/network/src/service/traits.rs
+++ b/substrate/client/network/src/service/traits.rs
@@ -32,6 +32,7 @@ use crate::{
 };
 
 use futures::{channel::oneshot, Stream};
+use libp2p::kad::Record;
 use prometheus_endpoint::Registry;
 
 use sc_client_api::BlockBackend;
@@ -217,6 +218,11 @@ pub trait NetworkDHTProvider {
 	/// Start putting a value in the DHT.
 	fn put_value(&self, key: KademliaKey, value: Vec<u8>);
 
+	/// Start putting the record to `peers`.
+	///
+	/// If `update_local_storage` is true the local storage is udpated as well.
+	fn put_record_to(&self, record: Record, peers: HashSet<PeerId>, update_local_storage: bool);
+
 	/// Store a record in the DHT memory store.
 	fn store_record(
 		&self,
@@ -240,6 +246,10 @@ where
 		T::put_value(self, key, value)
 	}
 
+	fn put_record_to(&self, record: Record, peers: HashSet<PeerId>, update_local_storage: bool) {
+		T::put_record_to(self, record, peers, update_local_storage)
+	}
+
 	fn store_record(
 		&self,
 		key: KademliaKey,