From 39870a417e02eaabdec754a379f537287ff6efec Mon Sep 17 00:00:00 2001 From: "Lerer, Eran" Date: Thu, 9 Jan 2025 10:50:30 +0200 Subject: [PATCH] Handling next round model tensors --- openfl/component/aggregator/aggregator.py | 102 ++++++---- openfl/databases/persistent_db.py | 221 +++++++++++++--------- openfl/databases/tensor_db.py | 35 ++++ 3 files changed, 235 insertions(+), 123 deletions(-) diff --git a/openfl/component/aggregator/aggregator.py b/openfl/component/aggregator/aggregator.py index 99b20ec7fc..1489222705 100644 --- a/openfl/component/aggregator/aggregator.py +++ b/openfl/component/aggregator/aggregator.py @@ -114,6 +114,7 @@ def __init__( callbacks: List of callbacks to be used during the experiment. """ self.round_number = 0 + self.next_model_round_number = 0 if single_col_cert_common_name: logger.warning( @@ -145,6 +146,7 @@ def __init__( logger.info("Persistent checkpoint is enabled") self.persistent_db = PersistentTensorDB(persistent_db_path) else: + logger.info("Persistent checkpoint is disabled") self.persistent_db = None # FIXME: I think next line generates an error on the second round # if it is set to 1 for the aggregator. @@ -204,37 +206,61 @@ def __init__( self.callbacks.on_round_begin(self.round_number) def _recover(self): - if self.persistent_db.is_task_table_empty(): - return False - # load tensors persistent DB - logger.info("Recovering previous state from persistent storage") - tensor_key_dict = self.persistent_db.load_tensors() - if len(tensor_key_dict) > 0: - self.tensor_db.cache_tensor(tensor_key_dict) - logger.debug("Recovery - this is the tensor_db after recovery: %s", self.tensor_db) - logger.info("Recovery - Finished populating tensor DB") + """Populates the aggregator state to the state it was prior a restart + """ + recovered = False + # load tensors persistent DB + logger.info("Recovering previous state from persistent storage") + tensor_key_dict = self.persistent_db.load_tensors(self.persistent_db.get_tensors_table_name()) + if len(tensor_key_dict) > 0: + logger.info(f"Recovering {len(tensor_key_dict)} model tensors") + recovered = True + self.tensor_db.cache_tensor(tensor_key_dict) committed_round_number, self.best_model_score = self.persistent_db.get_round_and_best_score() + logger.info("Recovery - Setting model proto") + to_proto_tensor_dict = {} + for tk in tensor_key_dict: + tk_name, _, _, _, _ = tk + to_proto_tensor_dict[tk_name] = tensor_key_dict[tk] + self.model = utils.construct_model_proto( + to_proto_tensor_dict, committed_round_number, self.compression_pipeline + ) # round number is the current round which is still in process i.e. committed_round_number + 1 self.round_number = committed_round_number + 1 logger.info("Recovery - loaded round number %s and best score %s", self.round_number,self.best_model_score) - logger.info("Recovery - Replaying saved task results") - task_id = 1 - while True: - task_result = self.persistent_db.get_task_result_by_id(task_id) - if not task_result: - break - collaborator_name = task_result["collaborator_name"] - round_number = task_result["round_number"] - task_name = task_result["task_name"] - data_size = task_result["data_size"] - serialized_tensors = task_result["named_tensors"] - named_tensors = [ - NamedTensor.FromString(serialized_tensor) - for serialized_tensor in serialized_tensors - ] - logger.info("Recovery - Replaying task results %s %s %s",collaborator_name ,round_number, task_name ) - self.process_task_results(collaborator_name, round_number, task_name, data_size, named_tensors) - task_id += 1 + + next_round_tensor_key_dict = self.persistent_db.load_tensors(self.persistent_db.get_next_round_tensors_table_name()) + if len(next_round_tensor_key_dict) > 0: + logger.info(f"Recovering {len(next_round_tensor_key_dict)} next round model tensors") + recovered = True + self.tensor_db.cache_tensor(next_round_tensor_key_dict) + + + logger.info("Recovery - Finished populating tensor DB") + logger.debug("Recovery - this is the tensor_db after recovery: %s", self.tensor_db) + + if self.persistent_db.is_task_table_empty(): + logger.debug("task table is empty") + return recovered + + logger.info("Recovery - Replaying saved task results") + task_id = 1 + while True: + task_result = self.persistent_db.get_task_result_by_id(task_id) + if not task_result: + break + collaborator_name = task_result["collaborator_name"] + round_number = task_result["round_number"] + task_name = task_result["task_name"] + data_size = task_result["data_size"] + serialized_tensors = task_result["named_tensors"] + named_tensors = [ + NamedTensor.FromString(serialized_tensor) + for serialized_tensor in serialized_tensors + ] + logger.info("Recovery - Replaying task results %s %s %s",collaborator_name ,round_number, task_name ) + self.process_task_results(collaborator_name, round_number, task_name, data_size, named_tensors) + task_id += 1 def _load_initial_tensors(self): """Load all of the tensors required to begin federated learning. @@ -308,15 +334,14 @@ def _save_model(self, round_number, file_path): round_number, ) return - #E.L here we can save the tensor_dict as well. as transaction. - # we can omit the proto save, at the end of the experiment to write the last and best model tensors as proto - # and clean all the db. if file_path == self.best_state_path: self.best_tensor_dict = tensor_dict if file_path == self.last_state_path: # Transaction to persist/delete all data needed to increment the round if self.persistent_db: - self.persistent_db.finalize_round(tensor_tuple_dict,self.round_number,self.best_model_score) + if self.next_model_round_number > 0: + next_round_tensors = self.tensor_db.get_tensors_by_round_and_tags(self.next_model_round_number,("model",)) + self.persistent_db.finalize_round(tensor_tuple_dict,next_round_tensors,self.round_number,self.best_model_score) logger.info( "Persist model and clean task result for round %s", round_number, @@ -662,7 +687,13 @@ def send_local_task_results( """ # Save task and its metadata for recovery serialized_tensors = [tensor.SerializeToString() for tensor in named_tensors] - self.persistent_db and self.persistent_db.save_task_results(collaborator_name,round_number,task_name,data_size,serialized_tensors) + if self.persistent_db: + self.persistent_db.save_task_results(collaborator_name,round_number,task_name,data_size,serialized_tensors) + logger.debug(f"Persisting task results {task_name} from {collaborator_name} round {round_number}") + logger.info( + f"Collaborator {collaborator_name} is sending task results " + f"for {task_name}, round {round_number}" + ) self.process_task_results(collaborator_name,round_number,task_name,data_size,named_tensors) def process_task_results( @@ -687,11 +718,6 @@ def process_task_results( ) return - logger.info( - f"Collaborator {collaborator_name} is sending task results " - f"for {task_name}, round {round_number}" - ) - task_key = TaskResultKey(task_name, collaborator_name, round_number) # we mustn't have results already @@ -931,7 +957,7 @@ def _prepare_trained(self, tensor_name, origin, round_number, report, agg_result new_model_report, ("model",), ) - + self.next_model_round_number = new_model_round_number # Finally, cache the updated model tensor self.tensor_db.cache_tensor({final_model_tk: new_model_nparray}) diff --git a/openfl/databases/persistent_db.py b/openfl/databases/persistent_db.py index 79881f486e..2453880759 100644 --- a/openfl/databases/persistent_db.py +++ b/openfl/databases/persistent_db.py @@ -16,14 +16,17 @@ class PersistentTensorDB: """ - The PersistentTensorDB class implements a database for storing tensors using SQLite. + The PersistentTensorDB class implements a database for storing tensors and metadata using SQLite. Attributes: conn: The SQLite connection object. cursor: The SQLite cursor object. lock: A threading Lock object used to ensure thread-safe operations. """ - + TENSORS_TABLE = "tensors" + NEXT_ROUND_TENSORS_TABLE = "next_round_tensors" + TASK_RESULT_TABLE = "task_results" + KEY_VALUE_TABLE = "key_value_store" def __init__(self, db_path: str = "") -> None: """Initializes a new instance of the PersistentTensorDB class.""" full_path = "tensordb.sqlite" @@ -31,16 +34,19 @@ def __init__(self, db_path: str = "") -> None: full_path = os.path.join(db_path, full_path) logger.info("Initializing persistent db at %s",full_path) self.conn = sqlite3.connect(full_path, check_same_thread=False) - self.cursor = self.conn.cursor() self.lock = Lock() - self._create_model_tensors_table() - self._create_task_results_table() - self._create_key_value_store() + + cursor = self.conn.cursor() + self._create_model_tensors_table(cursor,PersistentTensorDB.TENSORS_TABLE) + self._create_model_tensors_table(cursor,PersistentTensorDB.NEXT_ROUND_TENSORS_TABLE) + self._create_task_results_table(cursor) + self._create_key_value_store(cursor) + self.conn.commit() - def _create_model_tensors_table(self) -> None: + def _create_model_tensors_table(self,cursor,table_name) -> None: """Create the database table for storing tensors if it does not exist.""" - self.cursor.execute(""" - CREATE TABLE IF NOT EXISTS tensors ( + query = f""" + CREATE TABLE IF NOT EXISTS {table_name} ( id INTEGER PRIMARY KEY AUTOINCREMENT, tensor_name TEXT NOT NULL, origin TEXT NOT NULL, @@ -49,33 +55,33 @@ def _create_model_tensors_table(self) -> None: tags TEXT, nparray BLOB NOT NULL ) - """) - self.conn.commit() + """ + cursor.execute(query) + - def _create_task_results_table(self) -> None: + def _create_task_results_table(self,cursor) -> None: """Creates a table for storing task results.""" - create_table_query = """ - CREATE TABLE IF NOT EXISTS task_results ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - collaborator_name TEXT NOT NULL, - round_number INTEGER NOT NULL, - task_name TEXT NOT NULL, - data_size INTEGER NOT NULL, - named_tensors BLOB NOT NULL - ); + query = f""" + CREATE TABLE IF NOT EXISTS {PersistentTensorDB.TASK_RESULT_TABLE} ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + collaborator_name TEXT NOT NULL, + round_number INTEGER NOT NULL, + task_name TEXT NOT NULL, + data_size INTEGER NOT NULL, + named_tensors BLOB NOT NULL + ) """ - self.cursor.execute(create_table_query) - self.conn.commit() + cursor.execute(query) - def _create_key_value_store(self) -> None: + def _create_key_value_store(self,cursor) -> None: """Create a key-value store table for storing additional metadata.""" - self.cursor.execute(""" - CREATE TABLE IF NOT EXISTS key_value_store ( + query = f""" + CREATE TABLE IF NOT EXISTS {PersistentTensorDB.KEY_VALUE_TABLE} ( key TEXT PRIMARY KEY, value REAL NOT NULL ) - """) - self.conn.commit() + """ + cursor.execute(query) def save_task_results( self, @@ -99,16 +105,18 @@ def save_task_results( # Insert into the database - insert_query = """ - INSERT INTO task_results + insert_query = f""" + INSERT INTO {PersistentTensorDB.TASK_RESULT_TABLE} (collaborator_name, round_number, task_name, data_size, named_tensors) VALUES (?, ?, ?, ?, ?); """ - self.cursor.execute( - insert_query, - (collaborator_name, round_number, task_name, data_size, serialized_blob), - ) - self.conn.commit() + with self.lock: + cursor = self.conn.cursor() + cursor.execute( + insert_query, + (collaborator_name, round_number, task_name, data_size, serialized_blob), + ) + self.conn.commit() def get_task_result_by_id(self, task_result_id: int): """ @@ -121,12 +129,14 @@ def get_task_result_by_id(self, task_result_id: int): A dictionary containing the task result details, or None if not found. """ with self.lock: - self.cursor.execute(""" + cursor = self.conn.cursor() + query = f""" SELECT collaborator_name, round_number, task_name, data_size, named_tensors - FROM task_results + FROM {PersistentTensorDB.TASK_RESULT_TABLE} WHERE id = ? - """, (task_result_id,)) - result = self.cursor.fetchone() + """ + cursor.execute(query, (task_result_id,)) + result = cursor.fetchone() if result: collaborator_name, round_number, task_name, data_size, serialized_blob = result serialized_tensors = pickle.loads(serialized_blob) @@ -140,89 +150,126 @@ def get_task_result_by_id(self, task_result_id: int): return None def _serialize_array(self, array: np.ndarray) -> bytes: - """Serialize a NumPy array into bytes for storing in SQLite.""" - return array.tobytes() + """Serialize a NumPy array into bytes for storing in SQLite. + note: using pickle since in some cases the array is actually a scalar. + """ + return pickle.dumps(array) def _deserialize_array(self, blob: bytes, dtype: Optional[np.dtype] = None) -> np.ndarray: """Deserialize bytes from SQLite into a NumPy array.""" - return np.frombuffer(blob, dtype=dtype) + try: + return pickle.loads(blob) + except Exception as e: + raise ValueError(f"Failed to deserialize array: {e}") def __repr__(self) -> str: """Returns a string representation of the PersistentTensorDB.""" with self.lock: - self.cursor.execute("SELECT tensor_name, origin, round, report, tags FROM tensors") - rows = self.cursor.fetchall() + cursor = self.conn.cursor() + cursor.execute("SELECT tensor_name, origin, round, report, tags FROM tensors") + rows = cursor.fetchall() return f"PersistentTensorDB contents:\n{rows}" - def finalize_round(self,tensor_key_dict: Dict[TensorKey, np.ndarray],round_number: int, best_score: float): + def finalize_round(self,tensor_key_dict: Dict[TensorKey, np.ndarray],next_round_tensor_key_dict: Dict[TensorKey, np.ndarray],round_number: int, best_score: float): + """Finalize a training round by saving tensors, preparing for the next round, + and updating metadata in the database. + + This function performs the following steps as a single transaction: + 1. Persist the tensors of the current round into the database. + 2. Persist the tensors for the next training round into the database. + 3. Reinitialize the task results table to prepare for new tasks. + 4. Update the round number and best score in the key-value store. + + If any step fails, the transaction is rolled back to ensure data integrity. + + Args: + tensor_key_dict (Dict[TensorKey, np.ndarray]): + A dictionary mapping tensor keys to their corresponding NumPy arrays for the current round. + next_round_tensor_key_dict (Dict[TensorKey, np.ndarray]): + A dictionary mapping tensor keys to their corresponding NumPy arrays for the next round. + round_number (int): + The current training round number. + best_score (float): + The best score achieved during the current round. + + Raises: + RuntimeError: If an error occurs during the transaction, the transaction is rolled back, + and a RuntimeError is raised with the details of the failure. + """ with self.lock: try: # Begin transaction - self.cursor.execute("BEGIN TRANSACTION") - self._persist_tensors(tensor_key_dict) - self._init_task_results_table() - self._save_round_and_best_score(round_number,best_score) + cursor = self.conn.cursor() + cursor.execute("BEGIN TRANSACTION") + self._persist_tensors(cursor,PersistentTensorDB.TENSORS_TABLE,tensor_key_dict) + self._persist_next_round_tensors(cursor,next_round_tensor_key_dict) + self._init_task_results_table(cursor) + self._save_round_and_best_score(cursor,round_number,best_score) # Commit transaction self.conn.commit() - logger.info(f"Committed model for round {round_number}, saved {len(tensor_key_dict)} model tensors with best_score {best_score}") + logger.info(f"Committed model for round {round_number}, saved {len(tensor_key_dict)} model tensors and {len(next_round_tensor_key_dict)} next round model tensors with best_score {best_score}") except Exception as e: # Rollback transaction in case of an error self.conn.rollback() raise RuntimeError(f"Failed to finalize round: {e}") - def _persist_tensors(self, tensor_key_dict: Dict[TensorKey, np.ndarray]) -> None: - """Insert a dictionary of tensors into the SQLite database in a single transaction.""" + def _persist_tensors(self,cursor,table_name, tensor_key_dict: Dict[TensorKey, np.ndarray]) -> None: + """Insert a dictionary of tensors into the SQLite as part of transaction""" for tensor_key, nparray in tensor_key_dict.items(): tensor_name, origin, fl_round, report, tags = tensor_key serialized_array = self._serialize_array(nparray) serialized_tags = json.dumps(tags) - self.cursor.execute(""" - INSERT INTO tensors (tensor_name, origin, round, report, tags, nparray) - VALUES (?, ?, ?, ?, ?, ?) - """, (tensor_name, origin, fl_round, int(report), serialized_tags, serialized_array)) + query = f""" + INSERT INTO {table_name} (tensor_name, origin, round, report, tags, nparray) + VALUES (?, ?, ?, ?, ?, ?) + """ + cursor.execute(query, (tensor_name, origin, fl_round, int(report), serialized_tags, serialized_array)) + + def _persist_next_round_tensors(self,cursor, tensor_key_dict: Dict[TensorKey, np.ndarray]) -> None: + """Persisting the last round next_round tensors.""" + drop_table_query = f"DROP TABLE IF EXISTS {PersistentTensorDB.NEXT_ROUND_TENSORS_TABLE}" + cursor.execute(drop_table_query) + self._create_model_tensors_table(cursor,PersistentTensorDB.NEXT_ROUND_TENSORS_TABLE) + self._persist_tensors(cursor,PersistentTensorDB.NEXT_ROUND_TENSORS_TABLE,tensor_key_dict) + - def _init_task_results_table(self): + def _init_task_results_table(self,cursor): """ Creates a table for storing task results. Drops the table first if it already exists. """ drop_table_query = "DROP TABLE IF EXISTS task_results" - self.cursor.execute(drop_table_query) - - create_table_query = """ - CREATE TABLE IF NOT EXISTS task_results ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - collaborator_name TEXT NOT NULL, - round_number INTEGER NOT NULL, - task_name TEXT NOT NULL, - data_size INTEGER NOT NULL, - named_tensors BLOB NOT NULL - ); - """ - self.cursor.execute(create_table_query) + cursor.execute(drop_table_query) + self._create_task_results_table(cursor) - def _save_round_and_best_score(self, round_number: int, best_score: float) -> None: + def _save_round_and_best_score(self,cursor, round_number: int, best_score: float) -> None: """Save the round number and best score as key-value pairs in the database.""" # Create a table with key-value structure where values can be integer or float # Insert or update the round_number - self.cursor.execute(""" + cursor.execute(""" INSERT OR REPLACE INTO key_value_store (key, value) VALUES (?, ?) """, ("round_number", float(round_number))) # Insert or update the best_score - self.cursor.execute(""" + cursor.execute(""" INSERT OR REPLACE INTO key_value_store (key, value) VALUES (?, ?) """, ("best_score", float(best_score))) + def get_tensors_table_name(self) -> str: + return PersistentTensorDB.TENSORS_TABLE + def get_next_round_tensors_table_name(self) -> str: + return PersistentTensorDB.NEXT_ROUND_TENSORS_TABLE - def load_tensors(self) -> Dict[TensorKey, np.ndarray]: + def load_tensors(self,tensor_table) -> Dict[TensorKey, np.ndarray]: """Load all tensors from the SQLite database and return them as a dictionary.""" tensor_dict = {} with self.lock: - self.cursor.execute("SELECT tensor_name, origin, round, report, tags, nparray FROM tensors") - rows = self.cursor.fetchall() + cursor = self.conn.cursor() + query = f"SELECT tensor_name, origin, round, report, tags, nparray FROM {tensor_table}" + cursor.execute(query) + rows = cursor.fetchall() for row in rows: tensor_name, origin, fl_round, report, tags, nparray = row # Deserialize the JSON string back to a Python list @@ -235,21 +282,22 @@ def load_tensors(self) -> Dict[TensorKey, np.ndarray]: def get_round_and_best_score(self) -> tuple[int, float]: """Retrieve the round number and best score from the database.""" with self.lock: + cursor = self.conn.cursor() # Fetch the round_number - self.cursor.execute(""" + cursor.execute(""" SELECT value FROM key_value_store WHERE key = ? """, ("round_number",)) - round_number = self.cursor.fetchone() + round_number = cursor.fetchone() if round_number is None: round_number = -1 else: round_number = int(round_number[0]) # Cast to int # Fetch the best_score - self.cursor.execute(""" + cursor.execute(""" SELECT value FROM key_value_store WHERE key = ? """, ("best_score",)) - best_score = self.cursor.fetchone() + best_score = cursor.fetchone() if best_score is None: best_score = 0 else: @@ -262,11 +310,13 @@ def clean_up(self, remove_older_than: int = 1) -> None: if remove_older_than < 0: return with self.lock: - self.cursor.execute("SELECT MAX(round) FROM tensors") - current_round = self.cursor.fetchone()[0] + cursor = self.conn.cursor() + query = f"SELECT MAX(round) FROM {PersistentTensorDB.TENSORS_TABLE}" + cursor.execute(query) + current_round = cursor.fetchone()[0] if current_round is None: return - self.cursor.execute(""" + cursor.execute(""" DELETE FROM tensors WHERE round <= ? AND report = 0 """, (current_round - remove_older_than,)) @@ -280,6 +330,7 @@ def close(self) -> None: def is_task_table_empty(self) -> bool: """Check if the task table is empty.""" with self.lock: - self.cursor.execute("SELECT COUNT(*) FROM task_results") - count = self.cursor.fetchone()[0] + cursor = self.conn.cursor() + cursor.execute("SELECT COUNT(*) FROM task_results") + count = cursor.fetchone()[0] return count == 0 diff --git a/openfl/databases/tensor_db.py b/openfl/databases/tensor_db.py index 1b9d5ea132..6952a0e327 100644 --- a/openfl/databases/tensor_db.py +++ b/openfl/databases/tensor_db.py @@ -150,6 +150,41 @@ def get_tensor_from_cache(self, tensor_key: TensorKey) -> Optional[np.ndarray]: if len(df) == 0: return None return np.array(df["nparray"].iloc[0]) + + def get_tensors_by_round_and_tags(self, fl_round: int, tags: tuple) -> dict: + """Retrieve all tensors that match the specified round and tags. + + Args: + fl_round (int): The round number to filter tensors. + tags (tuple): The tags to filter tensors. + + Returns: + dict: A dictionary where the keys are TensorKey objects and the values are numpy arrays. + """ + # Filter the DataFrame based on the round and tags + df = self.tensor_db[ + (self.tensor_db["round"] == fl_round) & + (self.tensor_db["tags"] == tags) + ] + + # Check if any tensors match the criteria + if len(df) == 0: + return {} + + # Construct a dictionary mapping TensorKey to np.ndarray + tensor_dict = {} + for _, row in df.iterrows(): + tensor_key = TensorKey( + tensor_name=row["tensor_name"], + origin=row["origin"], + round_number=row["round"], + report=row["report"], + tags=row["tags"] + ) + tensor_dict[tensor_key] = np.array(row["nparray"]) + + return tensor_dict + def get_aggregated_tensor( self,