diff --git a/src/py/flwr/common/constant.py b/src/py/flwr/common/constant.py index 72d95ba220d..8be195ed385 100644 --- a/src/py/flwr/common/constant.py +++ b/src/py/flwr/common/constant.py @@ -147,6 +147,8 @@ class ErrorCode: UNKNOWN = 0 LOAD_CLIENT_APP_EXCEPTION = 1 CLIENT_APP_RAISED_EXCEPTION = 2 + MESSAGE_UNAVAILABLE = 3 + REPLY_MESSAGE_UNAVAILABLE = 4 def __new__(cls) -> ErrorCode: """Prevent instantiation.""" diff --git a/src/py/flwr/server/driver/inmemory_driver_test.py b/src/py/flwr/server/driver/inmemory_driver_test.py index 9b5b7353199..d755454391e 100644 --- a/src/py/flwr/server/driver/inmemory_driver_test.py +++ b/src/py/flwr/server/driver/inmemory_driver_test.py @@ -277,6 +277,6 @@ def test_task_store_consistency_after_push_pull_inmemory_state(self) -> None: reply_tos = get_replies(self.driver, msg_ids, node_id) # Assert - self.assertEqual(reply_tos, msg_ids) + self.assertEqual(set(reply_tos), set(msg_ids)) self.assertEqual(len(state.task_res_store), 0) self.assertEqual(len(state.task_ins_store), 0) diff --git a/src/py/flwr/server/superlink/linkstate/in_memory_linkstate.py b/src/py/flwr/server/superlink/linkstate/in_memory_linkstate.py index b689fb8b079..c2273a36a5d 100644 --- a/src/py/flwr/server/superlink/linkstate/in_memory_linkstate.py +++ b/src/py/flwr/server/superlink/linkstate/in_memory_linkstate.py @@ -40,6 +40,8 @@ generate_rand_int_from_bytes, has_valid_sub_status, is_valid_transition, + verify_found_taskres, + verify_taskins_ids, ) @@ -67,12 +69,13 @@ def __init__(self) -> None: self.federation_options: dict[int, ConfigsRecord] = {} self.task_ins_store: dict[UUID, TaskIns] = {} self.task_res_store: dict[UUID, TaskRes] = {} + self.task_ins_id_to_task_res_id: dict[UUID, UUID] = {} self.node_public_keys: set[bytes] = set() self.server_public_key: Optional[bytes] = None self.server_private_key: Optional[bytes] = None - self.lock = threading.Lock() + self.lock = threading.RLock() def store_task_ins(self, task_ins: TaskIns) -> Optional[UUID]: """Store one TaskIns.""" @@ -222,42 +225,50 @@ def store_task_res(self, task_res: TaskRes) -> Optional[UUID]: task_res.task_id = str(task_id) with self.lock: self.task_res_store[task_id] = task_res + self.task_ins_id_to_task_res_id[UUID(task_ins_id)] = task_id # Return the new task_id return task_id def get_task_res(self, task_ids: set[UUID]) -> list[TaskRes]: - """Get all TaskRes that have not been delivered yet.""" + """Get TaskRes for the given TaskIns IDs.""" + ret: dict[UUID, TaskRes] = {} + with self.lock: - # Find TaskRes that were not delivered yet - task_res_list: list[TaskRes] = [] - replied_task_ids: set[UUID] = set() - for _, task_res in self.task_res_store.items(): - reply_to = UUID(task_res.task.ancestry[0]) - - # Check if corresponding TaskIns exists and is not expired - task_ins = self.task_ins_store.get(reply_to) - if task_ins is None: - log(WARNING, "TaskIns with task_id %s does not exist.", reply_to) - task_ids.remove(reply_to) - continue - - if task_ins.task.created_at + task_ins.task.ttl <= time.time(): - log(WARNING, "TaskIns with task_id %s is expired.", reply_to) - task_ids.remove(reply_to) - continue - - if reply_to in task_ids and task_res.task.delivered_at == "": - task_res_list.append(task_res) - replied_task_ids.add(reply_to) - - # Mark all of them as delivered + current = time.time() + + # Verify TaskIns IDs + ret = verify_taskins_ids( + inquired_taskins_ids=task_ids, + found_taskins_dict=self.task_ins_store, + current_time=current, + ) + + # Find all TaskRes + task_res_found: list[TaskRes] = [] + for task_id in task_ids: + # If TaskRes exists and is not delivered, add it to the list + if task_res_id := self.task_ins_id_to_task_res_id.get(task_id): + task_res = self.task_res_store[task_res_id] + if task_res.task.delivered_at == "": + task_res_found.append(task_res) + tmp_ret_dict = verify_found_taskres( + inquired_taskins_ids=task_ids, + found_taskins_dict=self.task_ins_store, + found_taskres_list=task_res_found, + current_time=current, + ) + ret.update(tmp_ret_dict) + + # Mark existing TaskRes to be returned as delivered delivered_at = now().isoformat() - for task_res in task_res_list: + for task_res in task_res_found: task_res.task.delivered_at = delivered_at - # Return TaskRes - return task_res_list + # Cleanup + self._force_delete_tasks_by_ids(set(ret.keys())) + + return list(ret.values()) def delete_tasks(self, task_ids: set[UUID]) -> None: """Delete all delivered TaskIns/TaskRes pairs.""" @@ -278,9 +289,25 @@ def delete_tasks(self, task_ids: set[UUID]) -> None: for task_id in task_ins_to_be_deleted: del self.task_ins_store[task_id] + del self.task_ins_id_to_task_res_id[task_id] for task_id in task_res_to_be_deleted: del self.task_res_store[task_id] + def _force_delete_tasks_by_ids(self, task_ids: set[UUID]) -> None: + """Delete tasks based on a set of TaskIns IDs.""" + if not task_ids: + return + + with self.lock: + for task_id in task_ids: + # Delete TaskIns + if task_id in self.task_ins_store: + del self.task_ins_store[task_id] + # Delete TaskRes + if task_id in self.task_ins_id_to_task_res_id: + task_res_id = self.task_ins_id_to_task_res_id.pop(task_id) + del self.task_res_store[task_res_id] + def num_task_ins(self) -> int: """Calculate the number of task_ins in store. diff --git a/src/py/flwr/server/superlink/linkstate/linkstate.py b/src/py/flwr/server/superlink/linkstate/linkstate.py index a9965d430a3..05fb3c2f0cc 100644 --- a/src/py/flwr/server/superlink/linkstate/linkstate.py +++ b/src/py/flwr/server/superlink/linkstate/linkstate.py @@ -101,13 +101,27 @@ def store_task_res(self, task_res: TaskRes) -> Optional[UUID]: @abc.abstractmethod def get_task_res(self, task_ids: set[UUID]) -> list[TaskRes]: - """Get TaskRes for task_ids. + """Get TaskRes for the given TaskIns IDs. - Usually, the ServerAppIo API calls this method to get results for instructions - it has previously scheduled. + This method is typically called by the ServerAppIo API to obtain + results (TaskRes) for previously scheduled instructions (TaskIns). + For each task_id provided, this method returns one of the following responses: - Retrieves all TaskRes for the given `task_ids` and returns and empty list of - none could be found. + - An error TaskRes if the corresponding TaskIns does not exist or has expired. + - An error TaskRes if the corresponding TaskRes exists but has expired. + - The valid TaskRes if the TaskIns has a corresponding valid TaskRes. + - Nothing if the TaskIns is still valid and waiting for a TaskRes. + + Parameters + ---------- + task_ids : set[UUID] + A set of TaskIns IDs for which to retrieve results (TaskRes). + + Returns + ------- + list[TaskRes] + A list of TaskRes corresponding to the given task IDs. If no + TaskRes could be found for any of the task IDs, an empty list is returned. """ @abc.abstractmethod diff --git a/src/py/flwr/server/superlink/linkstate/linkstate_test.py b/src/py/flwr/server/superlink/linkstate/linkstate_test.py index d53b1ee51f6..202fdf38727 100644 --- a/src/py/flwr/server/superlink/linkstate/linkstate_test.py +++ b/src/py/flwr/server/superlink/linkstate/linkstate_test.py @@ -350,13 +350,6 @@ def test_store_and_delete_tasks(self) -> None: # - State has three TaskIns, all of them delivered # - State has two TaskRes, one of the delivered, the other not - assert state.num_task_ins() == 3 - assert state.num_task_res() == 2 - - # Execute - state.delete_tasks(task_ids={task_id_0, task_id_1, task_id_2}) - - # Assert assert state.num_task_ins() == 2 assert state.num_task_res() == 1 @@ -932,8 +925,8 @@ def test_get_task_ins_not_return_expired(self) -> None: task_ins_list = state.get_task_ins(node_id=1, limit=None) assert len(task_ins_list) == 0 - def test_get_task_res_not_return_expired(self) -> None: - """Test get_task_res not to return TaskRes if its TaskIns is expired.""" + def test_get_task_res_expired_task_ins(self) -> None: + """Test get_task_res to return error TaskRes if its TaskIns has expired.""" # Prepare state = self.state_factory() node_id = state.create_node(1e3) @@ -961,7 +954,9 @@ def test_get_task_res_not_return_expired(self) -> None: task_res_list = state.get_task_res(task_ids={task_id}) # Assert - assert len(task_res_list) == 0 + assert len(task_res_list) == 1 + assert task_res_list[0].task.HasField("error") + assert state.num_task_ins() == state.num_task_res() == 0 def test_get_task_res_returns_empty_for_missing_taskins(self) -> None: """Test that get_task_res returns an empty result when the corresponding TaskIns @@ -983,7 +978,9 @@ def test_get_task_res_returns_empty_for_missing_taskins(self) -> None: task_res_list = state.get_task_res(task_ids={UUID(task_ins_id)}) # Assert - assert len(task_res_list) == 0 + assert len(task_res_list) == 1 + assert task_res_list[0].task.HasField("error") + assert state.num_task_ins() == state.num_task_res() == 0 def test_get_task_res_return_if_not_expired(self) -> None: """Test get_task_res to return TaskRes if its TaskIns exists and is not diff --git a/src/py/flwr/server/superlink/linkstate/sqlite_linkstate.py b/src/py/flwr/server/superlink/linkstate/sqlite_linkstate.py index ed0acf4213c..cf5f4e8f8f4 100644 --- a/src/py/flwr/server/superlink/linkstate/sqlite_linkstate.py +++ b/src/py/flwr/server/superlink/linkstate/sqlite_linkstate.py @@ -57,6 +57,8 @@ generate_rand_int_from_bytes, has_valid_sub_status, is_valid_transition, + verify_found_taskres, + verify_taskins_ids, ) SQL_CREATE_TABLE_NODE = """ @@ -510,136 +512,67 @@ def store_task_res(self, task_res: TaskRes) -> Optional[UUID]: # pylint: disable-next=R0912,R0915,R0914 def get_task_res(self, task_ids: set[UUID]) -> list[TaskRes]: - """Get TaskRes for task_ids. + """Get TaskRes for the given TaskIns IDs.""" + ret: dict[UUID, TaskRes] = {} - Usually, the ServerAppIo API calls this method to get results for instructions - it has previously scheduled. - - Retrieves all TaskRes for the given `task_ids` and returns and empty list if - none could be found. - - Constraints - ----------- - If `limit` is not `None`, return, at most, `limit` number of TaskRes. The limit - will only take effect if enough task_ids are in the set AND are currently - available. If `limit` is set, it has to be greater than zero. - """ - # Check if corresponding TaskIns exists and is not expired - task_ids_placeholders = ",".join([f":id_{i}" for i in range(len(task_ids))]) + # Verify TaskIns IDs + current = time.time() query = f""" SELECT * FROM task_ins - WHERE task_id IN ({task_ids_placeholders}) - AND (created_at + ttl) > CAST(strftime('%s', 'now') AS REAL) + WHERE task_id IN ({",".join(["?"] * len(task_ids))}); """ - query += ";" - - task_ins_data = {} - for index, task_id in enumerate(task_ids): - task_ins_data[f"id_{index}"] = str(task_id) - - task_ins_rows = self.query(query, task_ins_data) - - if not task_ins_rows: - return [] - - for row in task_ins_rows: - # Convert values from sint64 to uint64 + rows = self.query(query, tuple(str(task_id) for task_id in task_ids)) + found_task_ins_dict: dict[UUID, TaskIns] = {} + for row in rows: convert_sint64_values_in_dict_to_uint64( row, ["run_id", "producer_node_id", "consumer_node_id"] ) - task_ins = dict_to_task_ins(row) - if task_ins.task.created_at + task_ins.task.ttl <= time.time(): - log(WARNING, "TaskIns with task_id %s is expired.", task_ins.task_id) - task_ids.remove(UUID(task_ins.task_id)) + found_task_ins_dict[UUID(row["task_id"])] = dict_to_task_ins(row) - # Retrieve all anonymous Tasks - if len(task_ids) == 0: - return [] + ret = verify_taskins_ids( + inquired_taskins_ids=task_ids, + found_taskins_dict=found_task_ins_dict, + current_time=current, + ) - placeholders = ",".join([f":id_{i}" for i in range(len(task_ids))]) + # Find all TaskRes query = f""" SELECT * FROM task_res - WHERE ancestry IN ({placeholders}) - AND delivered_at = "" + WHERE ancestry IN ({",".join(["?"] * len(task_ids))}) + AND delivered_at = ""; """ + rows = self.query(query, tuple(str(task_id) for task_id in task_ids)) + for row in rows: + convert_sint64_values_in_dict_to_uint64( + row, ["run_id", "producer_node_id", "consumer_node_id"] + ) + tmp_ret_dict = verify_found_taskres( + inquired_taskins_ids=task_ids, + found_taskins_dict=found_task_ins_dict, + found_taskres_list=[dict_to_task_res(row) for row in rows], + current_time=current, + ) + ret.update(tmp_ret_dict) - data: dict[str, Union[str, float, int]] = {} - - query += ";" - - for index, task_id in enumerate(task_ids): - data[f"id_{index}"] = str(task_id) - - rows = self.query(query, data) - - if rows: - # Prepare query - found_task_ids = [row["task_id"] for row in rows] - placeholders = ",".join([f":id_{i}" for i in range(len(found_task_ids))]) - query = f""" - UPDATE task_res - SET delivered_at = :delivered_at - WHERE task_id IN ({placeholders}) - RETURNING *; - """ - - # Prepare data for query - delivered_at = now().isoformat() - data = {"delivered_at": delivered_at} - for index, task_id in enumerate(found_task_ids): - data[f"id_{index}"] = str(task_id) - - # Run query - rows = self.query(query, data) - - for row in rows: - # Convert values from sint64 to uint64 - convert_sint64_values_in_dict_to_uint64( - row, ["run_id", "producer_node_id", "consumer_node_id"] - ) - - result = [dict_to_task_res(row) for row in rows] - - # 1. Query: Fetch consumer_node_id of remaining task_ids - # Assume the ancestry field only contains one element - data.clear() - replied_task_ids: set[UUID] = {UUID(str(row["ancestry"])) for row in rows} - remaining_task_ids = task_ids - replied_task_ids - placeholders = ",".join([f":id_{i}" for i in range(len(remaining_task_ids))]) - query = f""" - SELECT consumer_node_id - FROM task_ins - WHERE task_id IN ({placeholders}); - """ - for index, task_id in enumerate(remaining_task_ids): - data[f"id_{index}"] = str(task_id) - node_ids = [int(row["consumer_node_id"]) for row in self.query(query, data)] - - # 2. Query: Select offline nodes - placeholders = ",".join([f":id_{i}" for i in range(len(node_ids))]) + # Mark existing TaskRes to be returned as delivered + delivered_at = now().isoformat() + for task_res in ret.values(): + task_res.task.delivered_at = delivered_at + task_res_ids = [task_res.task_id for task_res in ret.values()] query = f""" - SELECT node_id - FROM node - WHERE node_id IN ({placeholders}) - AND online_until < :time; + UPDATE task_res + SET delivered_at = ? + WHERE task_id IN ({",".join(["?"] * len(task_res_ids))}); """ - data = {f"id_{i}": str(node_id) for i, node_id in enumerate(node_ids)} - data["time"] = time.time() - offline_node_ids = [int(row["node_id"]) for row in self.query(query, data)] + data: list[Any] = [delivered_at] + task_res_ids + self.query(query, data) - # 3. Query: Select TaskIns for offline nodes - placeholders = ",".join([f":id_{i}" for i in range(len(offline_node_ids))]) - query = f""" - SELECT * - FROM task_ins - WHERE consumer_node_id IN ({placeholders}); - """ - data = {f"id_{i}": str(node_id) for i, node_id in enumerate(offline_node_ids)} - task_ins_rows = self.query(query, data) + # Cleanup + self._force_delete_tasks_by_ids(set(ret.keys())) - return result + return list(ret.values()) def num_task_ins(self) -> int: """Calculate the number of task_ins in store. @@ -699,6 +632,32 @@ def delete_tasks(self, task_ids: set[UUID]) -> None: return None + def _force_delete_tasks_by_ids(self, task_ids: set[UUID]) -> None: + """Delete tasks based on a set of TaskIns IDs.""" + if not task_ids: + return + if self.conn is None: + raise AttributeError("LinkState not initialized") + + placeholders = ",".join([f":id_{index}" for index in range(len(task_ids))]) + data = {f"id_{index}": str(task_id) for index, task_id in enumerate(task_ids)} + + # Delete task_ins + query_1 = f""" + DELETE FROM task_ins + WHERE task_id IN ({placeholders}); + """ + + # Delete task_res + query_2 = f""" + DELETE FROM task_res + WHERE ancestry IN ({placeholders}); + """ + + with self.conn: + self.conn.execute(query_1, data) + self.conn.execute(query_2, data) + def create_node( self, ping_interval: float, public_key: Optional[bytes] = None ) -> int: diff --git a/src/py/flwr/server/superlink/linkstate/utils.py b/src/py/flwr/server/superlink/linkstate/utils.py index 5fd672227c9..2b73221eae3 100644 --- a/src/py/flwr/server/superlink/linkstate/utils.py +++ b/src/py/flwr/server/superlink/linkstate/utils.py @@ -15,15 +15,23 @@ """Utility functions for State.""" +from logging import ERROR from os import urandom +from typing import Optional, Union +from uuid import UUID, uuid4 -from flwr.common import ConfigsRecord, Context, serde -from flwr.common.constant import Status, SubStatus +from flwr.common import ConfigsRecord, Context, log, now, serde +from flwr.common.constant import ErrorCode, Status, SubStatus from flwr.common.typing import RunStatus -from flwr.proto.message_pb2 import Context as ProtoContext # pylint: disable=E0611 # pylint: disable=E0611 +from flwr.proto.error_pb2 import Error +from flwr.proto.message_pb2 import Context as ProtoContext +from flwr.proto.node_pb2 import Node from flwr.proto.recordset_pb2 import ConfigsRecord as ProtoConfigsRecord +from flwr.proto.task_pb2 import Task, TaskIns, TaskRes + +# pylint: enable=E0611 NODE_UNAVAILABLE_ERROR_REASON = ( "Error: Node Unavailable - The destination node is currently unavailable. " @@ -43,6 +51,13 @@ SubStatus.FAILED, SubStatus.STOPPED, } +MESSAGE_UNAVAILABLE_ERROR_REASON = ( + "Error: Message Unavailable - The requested message could not be found in the " + "database. It may have expired due to its TTL or never existed." +) +REPLY_MESSAGE_UNAVAILABLE_ERROR_REASON = ( + "Error: Reply Message Unavailable - The reply message has expired." +) def generate_rand_int_from_bytes(num_bytes: int) -> int: @@ -208,3 +223,167 @@ def has_valid_sub_status(status: RunStatus) -> bool: if status.status == Status.FINISHED: return status.sub_status in VALID_RUN_SUB_STATUSES return status.sub_status == "" + + +def create_taskres_for_unavailable_taskins(taskins_id: Union[str, UUID]) -> TaskRes: + """Generate a TaskRes with a TaskIns unavailable error. + + Parameters + ---------- + taskins_id : Union[str, UUID] + The ID of the unavailable TaskIns. + + Returns + ------- + TaskRes + A TaskRes with an error code MESSAGE_UNAVAILABLE to indicate that the + inquired TaskIns ID cannot be found (due to non-existence or expiration). + """ + current_time = now().timestamp() + return TaskRes( + task_id=str(uuid4()), + group_id="", # Unknown group ID + run_id=0, # Unknown run ID + task=Task( + # This function is only called by SuperLink, and thus it's the producer. + producer=Node(node_id=0, anonymous=False), + consumer=Node(node_id=0, anonymous=False), + created_at=current_time, + ttl=0, + ancestry=[str(taskins_id)], + task_type="", # Unknown message type + error=Error( + code=ErrorCode.MESSAGE_UNAVAILABLE, + reason=MESSAGE_UNAVAILABLE_ERROR_REASON, + ), + ), + ) + + +def create_taskres_for_unavailable_taskres(ref_taskins: TaskIns) -> TaskRes: + """Generate a TaskRes with a reply message unavailable error from a TaskIns. + + Parameters + ---------- + ref_taskins : TaskIns + The reference TaskIns object. + + Returns + ------- + TaskRes + The generated TaskRes with an error code REPLY_MESSAGE_UNAVAILABLE_ERROR_REASON, + indicating that the original TaskRes has expired. + """ + current_time = now().timestamp() + ttl = ref_taskins.task.ttl - (current_time - ref_taskins.task.created_at) + if ttl < 0: + log(ERROR, "Creating TaskRes for TaskIns that exceeds its TTL.") + ttl = 0 + return TaskRes( + task_id=str(uuid4()), + group_id=ref_taskins.group_id, + run_id=ref_taskins.run_id, + task=Task( + # This function is only called by SuperLink, and thus it's the producer. + producer=Node(node_id=0, anonymous=False), + consumer=Node(node_id=0, anonymous=False), + created_at=current_time, + ttl=ttl, + ancestry=[ref_taskins.task_id], + task_type=ref_taskins.task.task_type, + error=Error( + code=ErrorCode.REPLY_MESSAGE_UNAVAILABLE, + reason=REPLY_MESSAGE_UNAVAILABLE_ERROR_REASON, + ), + ), + ) + + +def has_expired(task_ins_or_res: Union[TaskIns, TaskRes], current_time: float) -> bool: + """Check if the TaskIns/TaskRes has expired.""" + return task_ins_or_res.task.ttl + task_ins_or_res.task.created_at < current_time + + +def verify_taskins_ids( + inquired_taskins_ids: set[UUID], + found_taskins_dict: dict[UUID, TaskIns], + current_time: Optional[float] = None, + update_set: bool = True, +) -> dict[UUID, TaskRes]: + """Verify found TaskIns and generate error TaskRes for invalid ones. + + Parameters + ---------- + inquired_taskins_ids : set[UUID] + Set of TaskIns IDs for which to generate error TaskRes if invalid. + found_taskins_dict : dict[UUID, TaskIns] + Dictionary containing all found TaskIns indexed by their IDs. + current_time : Optional[float] (default: None) + The current time to check for expiration. If set to `None`, the current time + will automatically be set to the current timestamp using `now().timestamp()`. + update_set : bool (default: True) + If True, the `inquired_taskins_ids` will be updated to remove invalid ones, + by default True. + + Returns + ------- + dict[UUID, TaskRes] + A dictionary of error TaskRes indexed by the corresponding TaskIns ID. + """ + ret_dict = {} + current = current_time if current_time else now().timestamp() + for taskins_id in list(inquired_taskins_ids): + # Generate error TaskRes if the task_ins doesn't exist or has expired + taskins = found_taskins_dict.get(taskins_id) + if taskins is None or has_expired(taskins, current): + if update_set: + inquired_taskins_ids.remove(taskins_id) + taskres = create_taskres_for_unavailable_taskins(taskins_id) + ret_dict[taskins_id] = taskres + return ret_dict + + +def verify_found_taskres( + inquired_taskins_ids: set[UUID], + found_taskins_dict: dict[UUID, TaskIns], + found_taskres_list: list[TaskRes], + current_time: Optional[float] = None, + update_set: bool = True, +) -> dict[UUID, TaskRes]: + """Verify found TaskRes and generate error TaskRes for invalid ones. + + Parameters + ---------- + inquired_taskins_ids : set[UUID] + Set of TaskIns IDs for which to generate error TaskRes if invalid. + found_taskins_dict : dict[UUID, TaskIns] + Dictionary containing all found TaskIns indexed by their IDs. + found_taskres_list : dict[TaskIns, TaskRes] + List of found TaskRes to be verified. + current_time : Optional[float] (default: None) + The current time to check for expiration. If set to `None`, the current time + will automatically be set to the current timestamp using `now().timestamp()`. + update_set : bool (default: True) + If True, the `inquired_taskins_ids` will be updated to remove ones + that have a TaskRes, by default True. + + Returns + ------- + dict[UUID, TaskRes] + A dictionary of TaskRes indexed by the corresponding TaskIns ID. + """ + ret_dict: dict[UUID, TaskRes] = {} + current = current_time if current_time else now().timestamp() + for taskres in found_taskres_list: + taskins_id = UUID(taskres.task.ancestry[0]) + if update_set: + inquired_taskins_ids.remove(taskins_id) + # Check if the TaskRes has expired + if has_expired(taskres, current): + # No need to insert the error TaskRes + taskres = create_taskres_for_unavailable_taskres( + found_taskins_dict[taskins_id] + ) + taskres.task.delivered_at = now().isoformat() + ret_dict[taskins_id] = taskres + return ret_dict