From 6ce540977410e1159acc40e4735e099cb13fe7ca Mon Sep 17 00:00:00 2001 From: Barthelemy Date: Tue, 3 Sep 2024 13:28:23 +0200 Subject: [PATCH 1/4] [QC-1229] Repocleaner policy for the moving windows --- .../RepoCleaner/qcrepocleaner/config.yaml | 4 ++++ .../qcrepocleaner/rules/multiple_per_run.py | 19 +++++++++++++++++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/Framework/script/RepoCleaner/qcrepocleaner/config.yaml b/Framework/script/RepoCleaner/qcrepocleaner/config.yaml index e6766c18c8..c2741ad1dd 100644 --- a/Framework/script/RepoCleaner/qcrepocleaner/config.yaml +++ b/Framework/script/RepoCleaner/qcrepocleaner/config.yaml @@ -8,6 +8,10 @@ Rules: delay: 0 policy: 1_per_run to_timestamp: 1674700609718 + - object_path: qc/.*/mw/.* + delay: 1 + policy: multiple_per_run + mw_deletion_delay: 15 # - object_path: qc/TST/MO/QcTask-barth/example3[/.*]{0,1} # delay: 0 # policy: none_kept diff --git a/Framework/script/RepoCleaner/qcrepocleaner/rules/multiple_per_run.py b/Framework/script/RepoCleaner/qcrepocleaner/rules/multiple_per_run.py index 2923fcef07..3398216aff 100644 --- a/Framework/script/RepoCleaner/qcrepocleaner/rules/multiple_per_run.py +++ b/Framework/script/RepoCleaner/qcrepocleaner/rules/multiple_per_run.py @@ -26,6 +26,9 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t - interval_between_versions: Period in minutes between the versions we will keep. (default: 90) - period_pass: Keep 1 version for a combination of run+pass+period if true. (default: false) - delete_first_last: delete the first and last of the run[+pass+period] before actually applying the rule. + Useful to keep the second and second to last instead of first and last. + - mw_deletion_delay: delete moving windows data entirely after this number of minutes. If not present or negative, don't delete. + As an extra safety, and because it is designed for Moving Windows, we only delete if the object has `mw` in the path. It is implemented like this : Map of buckets: run[+pass+period] -> list of versions @@ -33,8 +36,11 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t Sort the versions in the bucket Remove the empty run from the map (we ignore objects without a run) Go through the map: for each run (resp. run+pass+period) - Get SOR (validity of first object) + + if SOR < now - mw_deletion_delay + delete the data for this run + if SOR < now - delay if delete_first_last Get flag cleaner_2nd from first object (if there) @@ -75,6 +81,8 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t logger.debug(f"migrate_to_EOS : {migrate_to_EOS}") delete_first_last = (extra_params.get("delete_first_last", False) is True) logger.debug(f"delete_first_last : {delete_first_last}") + mw_deletion_delay = int(extra_params.get("mw_deletion_delay", -1)) + logger.info(f"mw_deletion_delay : {mw_deletion_delay}") # Find all the runs and group the versions (by run or by a combination of multiple attributes) policies_utils.group_versions(ccdb, object_path, period_pass, versions_buckets_dict) @@ -95,9 +103,16 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t if policies_utils.in_grace_period(first_object, delay): logger.debug(f" in grace period, skip this bucket") preservation_list.extend(run_versions) - elif not (from_timestamp < first_object.createdAt < to_timestamp): # in the allowed period + elif not (from_timestamp < first_object.createdAt < to_timestamp): # not in the allowed period logger.debug(f" not in the allowed period, skip this bucket") preservation_list.extend(run_versions) + elif first_object.createdAtDt < datetime.now() - timedelta(minutes=mw_deletion_delay): + logger.debug(f" after mw_deletion_delay period, delete this bucket") + for v in run_versions: + logger.debug(f"process {v}") + if "mw" in v.path: # this is because we really don't want to take the risk of batch deleting non moving windows + deletion_list.append(v) + ccdb.deleteVersion(v) else: logger.debug(f" not in the grace period") From bffb9d4753f0a33626d204190cf68ea452609aa3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Barth=C3=A9l=C3=A9my=20von=20Haller?= Date: Tue, 3 Sep 2024 13:36:03 +0200 Subject: [PATCH 2/4] Update multiple_per_run.py --- .../script/RepoCleaner/qcrepocleaner/rules/multiple_per_run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Framework/script/RepoCleaner/qcrepocleaner/rules/multiple_per_run.py b/Framework/script/RepoCleaner/qcrepocleaner/rules/multiple_per_run.py index 3398216aff..760a9baa16 100644 --- a/Framework/script/RepoCleaner/qcrepocleaner/rules/multiple_per_run.py +++ b/Framework/script/RepoCleaner/qcrepocleaner/rules/multiple_per_run.py @@ -82,7 +82,7 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t delete_first_last = (extra_params.get("delete_first_last", False) is True) logger.debug(f"delete_first_last : {delete_first_last}") mw_deletion_delay = int(extra_params.get("mw_deletion_delay", -1)) - logger.info(f"mw_deletion_delay : {mw_deletion_delay}") + logger.debug(f"mw_deletion_delay : {mw_deletion_delay}") # Find all the runs and group the versions (by run or by a combination of multiple attributes) policies_utils.group_versions(ccdb, object_path, period_pass, versions_buckets_dict) From 6e2007df158014ca7dd1eca87dc259f08015f762 Mon Sep 17 00:00:00 2001 From: Barthelemy Date: Tue, 3 Sep 2024 13:56:40 +0200 Subject: [PATCH 3/4] print info if an object is preserved --- Framework/script/RepoCleaner/qcrepocleaner/config.yaml | 4 ++++ .../RepoCleaner/qcrepocleaner/rules/multiple_per_run.py | 5 ++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/Framework/script/RepoCleaner/qcrepocleaner/config.yaml b/Framework/script/RepoCleaner/qcrepocleaner/config.yaml index c2741ad1dd..9540efa651 100644 --- a/Framework/script/RepoCleaner/qcrepocleaner/config.yaml +++ b/Framework/script/RepoCleaner/qcrepocleaner/config.yaml @@ -12,6 +12,10 @@ Rules: delay: 1 policy: multiple_per_run mw_deletion_delay: 15 + - object_path: qc/TST/MO/QcTask + delay: 1440 + policy: multiple_per_run + mw_deletion_delay: 15 # - object_path: qc/TST/MO/QcTask-barth/example3[/.*]{0,1} # delay: 0 # policy: none_kept diff --git a/Framework/script/RepoCleaner/qcrepocleaner/rules/multiple_per_run.py b/Framework/script/RepoCleaner/qcrepocleaner/rules/multiple_per_run.py index 760a9baa16..55f75f5c05 100644 --- a/Framework/script/RepoCleaner/qcrepocleaner/rules/multiple_per_run.py +++ b/Framework/script/RepoCleaner/qcrepocleaner/rules/multiple_per_run.py @@ -109,10 +109,13 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t elif first_object.createdAtDt < datetime.now() - timedelta(minutes=mw_deletion_delay): logger.debug(f" after mw_deletion_delay period, delete this bucket") for v in run_versions: - logger.debug(f"process {v}") if "mw" in v.path: # this is because we really don't want to take the risk of batch deleting non moving windows + logger.debug(f" deleting {v}") deletion_list.append(v) ccdb.deleteVersion(v) + else: + logger.debug(f" deletion is aborted as path does not contain `mw` ({v})") + preservation_list.append(v) else: logger.debug(f" not in the grace period") From 78c4a58f73a4981883cd787d039bbe8f40fd3ee7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Barth=C3=A9l=C3=A9my=20von=20Haller?= Date: Wed, 11 Sep 2024 17:55:13 +0200 Subject: [PATCH 4/4] Update Framework/script/RepoCleaner/qcrepocleaner/rules/multiple_per_run.py Co-authored-by: Piotr Konopka --- .../script/RepoCleaner/qcrepocleaner/rules/multiple_per_run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Framework/script/RepoCleaner/qcrepocleaner/rules/multiple_per_run.py b/Framework/script/RepoCleaner/qcrepocleaner/rules/multiple_per_run.py index 55f75f5c05..f93f3e6de8 100644 --- a/Framework/script/RepoCleaner/qcrepocleaner/rules/multiple_per_run.py +++ b/Framework/script/RepoCleaner/qcrepocleaner/rules/multiple_per_run.py @@ -109,7 +109,7 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t elif first_object.createdAtDt < datetime.now() - timedelta(minutes=mw_deletion_delay): logger.debug(f" after mw_deletion_delay period, delete this bucket") for v in run_versions: - if "mw" in v.path: # this is because we really don't want to take the risk of batch deleting non moving windows + if "/mw/" in v.path: # this is because we really don't want to take the risk of batch deleting non moving windows logger.debug(f" deleting {v}") deletion_list.append(v) ccdb.deleteVersion(v)