Skip to content

Commit

Permalink
[QC-1229] Repocleaner policy for the moving windows (#2407)
Browse files Browse the repository at this point in the history
* [QC-1229] Repocleaner policy for the moving windows

* Update multiple_per_run.py

* print info if an object is preserved

* Update Framework/script/RepoCleaner/qcrepocleaner/rules/multiple_per_run.py

Co-authored-by: Piotr Konopka <piotr.jan.konopka@cern.ch>

---------

Co-authored-by: Piotr Konopka <piotr.jan.konopka@cern.ch>
  • Loading branch information
Barthelemy and knopers8 authored Sep 13, 2024
1 parent ff2ce9a commit 4d3b576
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 2 deletions.
8 changes: 8 additions & 0 deletions Framework/script/RepoCleaner/qcrepocleaner/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ Rules:
delay: 0
policy: 1_per_run
to_timestamp: 1674700609718
- object_path: qc/.*/mw/.*
delay: 1
policy: multiple_per_run
mw_deletion_delay: 15
- object_path: qc/TST/MO/QcTask
delay: 1440
policy: multiple_per_run
mw_deletion_delay: 15
# - object_path: qc/TST/MO/QcTask-barth/example3[/.*]{0,1}
# delay: 0
# policy: none_kept
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,21 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t
- interval_between_versions: Period in minutes between the versions we will keep. (default: 90)
- period_pass: Keep 1 version for a combination of run+pass+period if true. (default: false)
- delete_first_last: delete the first and last of the run[+pass+period] before actually applying the rule.
Useful to keep the second and second to last instead of first and last.
- mw_deletion_delay: delete moving windows data entirely after this number of minutes. If not present or negative, don't delete.
As an extra safety, and because it is designed for Moving Windows, we only delete if the object has `mw` in the path.
It is implemented like this :
Map of buckets: run[+pass+period] -> list of versions
Go through all objects: Add the object to the corresponding key (run[+pass+period])
Sort the versions in the bucket
Remove the empty run from the map (we ignore objects without a run)
Go through the map: for each run (resp. run+pass+period)
Get SOR (validity of first object)
if SOR < now - mw_deletion_delay
delete the data for this run
if SOR < now - delay
if delete_first_last
Get flag cleaner_2nd from first object (if there)
Expand Down Expand Up @@ -75,6 +81,8 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t
logger.debug(f"migrate_to_EOS : {migrate_to_EOS}")
delete_first_last = (extra_params.get("delete_first_last", False) is True)
logger.debug(f"delete_first_last : {delete_first_last}")
mw_deletion_delay = int(extra_params.get("mw_deletion_delay", -1))
logger.debug(f"mw_deletion_delay : {mw_deletion_delay}")

# Find all the runs and group the versions (by run or by a combination of multiple attributes)
policies_utils.group_versions(ccdb, object_path, period_pass, versions_buckets_dict)
Expand All @@ -95,9 +103,19 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t
if policies_utils.in_grace_period(first_object, delay):
logger.debug(f" in grace period, skip this bucket")
preservation_list.extend(run_versions)
elif not (from_timestamp < first_object.createdAt < to_timestamp): # in the allowed period
elif not (from_timestamp < first_object.createdAt < to_timestamp): # not in the allowed period
logger.debug(f" not in the allowed period, skip this bucket")
preservation_list.extend(run_versions)
elif first_object.createdAtDt < datetime.now() - timedelta(minutes=mw_deletion_delay):
logger.debug(f" after mw_deletion_delay period, delete this bucket")
for v in run_versions:
if "/mw/" in v.path: # this is because we really don't want to take the risk of batch deleting non moving windows
logger.debug(f" deleting {v}")
deletion_list.append(v)
ccdb.deleteVersion(v)
else:
logger.debug(f" deletion is aborted as path does not contain `mw` ({v})")
preservation_list.append(v)
else:
logger.debug(f" not in the grace period")

Expand Down

0 comments on commit 4d3b576

Please sign in to comment.