Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QC-996] policy multiple_per_run can delete first and last #1921

Merged
merged 3 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Framework/script/RepoCleaner/qcrepocleaner/Ccdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,24 @@ def updateValidity(self, version: ObjectVersion, valid_from: int, valid_to: int,
except requests.exceptions.RequestException as e:
logging.error(f"Exception in updateValidity: {traceback.format_exc()}")

@dryable.Dryable()
def updateMetadata(self, version: ObjectVersion, metadata):
logger.debug(f"update metadata : {metadata}")
full_path = self.url + '/' + version.path + '/' + str(version.validFrom) + '/' + str(version.uuid) + '?'
if metadata is not None:
for key in metadata:
full_path += key + "=" + metadata[key] + "&"
if self.set_adjustable_eov:
logger.debug(f"As the parameter force is set, we add metadata adjustableEOV")
full_path += "adjustableEOV=1&"
try:
headers = {'Connection': 'close'}
r = requests.put(full_path, headers=headers)
r.raise_for_status()
except requests.exceptions.RequestException as e:
logging.error(f"Exception in updateMetadata: {traceback.format_exc()}")

@dryable.Dryable()
def putVersion(self, version: ObjectVersion, data):
'''
:param version: An ObjectVersion that describes the data to be uploaded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def get_run(v: ObjectVersion) -> str:
def group_versions(ccdb, object_path, period_pass, versions_buckets_dict: DefaultDict[str, List[ObjectVersion]]):
# Find all the runs and group the versions (by run or by a combination of multiple attributes)
versions = ccdb.getVersionsList(object_path)
logger.debug(f"group_versions: found {len(versions)} versions")
for v in versions:
logger.debug(f"Assigning {v} to a bucket")
run = get_run(v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

logger = logging # default logger


def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_timestamp: int,
extra_params: Dict[str, str]):
'''
Expand All @@ -24,12 +25,22 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t
- migrate_to_EOS: Migrate the object to EOS. (default: false)
- interval_between_versions: Period in minutes between the versions we will keep. (default: 90)
- period_pass: Keep 1 version for a combination of run+pass+period if true. (default: false)
- delete_first_last: delete the first and last of the run[+pass+period] before actually applying the rule.

It is implemented like this :
Map of buckets: run[+pass+period] -> list of versions
Go through all objects: Add the object to the corresponding key (run[+pass+period])
Sort the versions in the bucket
Remove the empty run from the map (we ignore objects without a run)
Go through the map: for each run (resp. run+pass+period)

if delete_first_last
Get flag cleaner_2nd from first object (if there)
if cleaner_2nd
continue # we do not want to reprocess the same run twice
flag second with `cleaner_2nd`
delete first and last versions in the bucket

Get SOR (validity of first object)
if SOR < now - delay
do
Expand Down Expand Up @@ -62,6 +73,8 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t
logger.debug(f"interval_between_versions : {interval_between_versions}")
migrate_to_EOS = (extra_params.get("migrate_to_EOS", False) is True)
logger.debug(f"migrate_to_EOS : {migrate_to_EOS}")
delete_first_last = (extra_params.get("delete_first_last", False) is True)
logger.debug(f"delete_first_last : {delete_first_last}")

# Find all the runs and group the versions (by run or by a combination of multiple attributes)
policies_utils.group_versions(ccdb, object_path, period_pass, versions_buckets_dict)
Expand All @@ -85,7 +98,28 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t
logger.debug(f" not in the allowed period, skip this bucket")
preservation_list.extend(run_versions)
else:
logger.debug(f" not in the grace period")
logger.debug(f" not in the grace period")

if delete_first_last:
knopers8 marked this conversation as resolved.
Show resolved Hide resolved
logger.debug(f" delete_first_last is set")
run_versions.sort(key=lambda x: x.createdAt)
# Get flag cleaner_2nd from first object (if there)
cleaner_2nd = "cleaner_2nd" in run_versions[0].metadata
if cleaner_2nd or len(run_versions) < 4:
logger.debug(f" first version has flag cleaner_2nd or there are less than 4 version, "
f"we continue to next bucket")
preservation_list.extend(run_versions)
continue
# flag second with `cleaner_2nd`
ccdb.updateMetadata(run_versions[1], {'cleaner_2nd': 'true'})
# delete first and last versions in the bucket
logger.debug(f" delete the first and last versions")
deletion_list.append(run_versions[-1])
ccdb.deleteVersion(run_versions[-1])
del run_versions[-1]
deletion_list.append(run_versions[0])
ccdb.deleteVersion(run_versions[0])
del run_versions[0]

last_preserved: ObjectVersion = None
for v in run_versions:
Expand All @@ -98,7 +132,7 @@ def process(ccdb: Ccdb, object_path: str, delay: int, from_timestamp: int, to_t
logger.debug(f" --> preserve")
last_preserved = v
if migrate_to_EOS:
ccdb.updateValidity(v, v.validFrom, v.validTo, metadata_for_preservation)
ccdb.updateMetadata(v, metadata_for_preservation)
preservation_list.append(last_preserved)
else: # in between period --> delete
logger.debug(f" --> delete")
Expand Down
5 changes: 3 additions & 2 deletions Framework/script/RepoCleaner/tests/test_MultiplePerRun.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class TestProduction(unittest.TestCase):
one_minute = 60000

def setUp(self):
self.ccdb = Ccdb('http://ccdb-test.cern.ch:8080')
self.ccdb = Ccdb('http://137.138.47.222:8080')
self.extra = {"interval_between_versions": "90", "migrate_to_EOS": False}
self.path = "qc/TST/MO/repo/test"

Expand Down Expand Up @@ -77,7 +77,8 @@ def test_5_runs(self):

# Prepare data
test_path = self.path + "/test_5_runs"
self.prepare_data(test_path, [3, 3, 3, 3, 3], [60, 120, 190, 240, 24*60], 123)
self.prepare_data(test_path, [1*60, 2*60, 3*60+10, 4*60, 5*60],
[60, 120, 190, 240, 24*60], 123)

stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1,
to_timestamp=self.in_ten_years, extra_params=self.extra)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
import logging
import time
import unittest
from datetime import timedelta, date, datetime
from typing import List

from qcrepocleaner.Ccdb import Ccdb, ObjectVersion
from qcrepocleaner.rules import multiple_per_run


class TestProduction(unittest.TestCase):
"""
This test pushes data to the CCDB and then run the Rule Production and then check.
It does it for several use cases.
One should truncate /qc/TST/MO/repo/test before running it.
"""

thirty_minutes = 1800000
one_hour = 3600000
in_ten_years = 1975323342000
one_minute = 60000

def setUp(self):
self.ccdb = Ccdb('http://137.138.47.222:8080')
self.extra = {"interval_between_versions": "90", "migrate_to_EOS": False, "delete_first_last": True}
self.path = "qc/TST/MO/repo/test"

def test_1_finished_run(self):
"""
1 run of 2.5 hours finished 22 hours ago.
Expected output: SOR, EOR, 1 in the middle
"""
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(int(10))

# Prepare data
test_path = self.path + "/test_1_finished_run"
self.prepare_data(test_path, [150], [22*60], 123)
objectsBefore = self.ccdb.getVersionsList(test_path)

stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1,
to_timestamp=self.in_ten_years, extra_params=self.extra)
objectsAfter = self.ccdb.getVersionsList(test_path)

self.assertEqual(stats["deleted"], 147)
self.assertEqual(stats["preserved"], 3)
self.assertEqual(stats["updated"], 0)

self.assertEqual(objectsAfter[0].validFrom, objectsBefore[1].validFrom)
self.assertEqual(objectsAfter[2].validFrom, objectsBefore[-2].validFrom)

def test_2_runs(self):
"""
2 runs of 2.5 hours, separated by 3 hours, second finished 20h ago.
Expected output: SOR, EOR, 1 in the middle for the first one, all for the second
"""
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(int(10))

# Prepare data
test_path = self.path + "/test_2_runs"
self.prepare_data(test_path, [150, 150], [3*60, 20*60], 123)

stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1,
to_timestamp=self.in_ten_years, extra_params=self.extra)

self.assertEqual(stats["deleted"], 147)
self.assertEqual(stats["preserved"], 3+150)
self.assertEqual(stats["updated"], 0)

def test_5_runs(self):
"""
1 hour Run - 1h - 2 hours Run - 2h - 3h10 run - 3h10 - 4 hours run - 4 hours - 5 hours run - 5 h
All more than 24 hours
Expected output: 2 + 3 + 4 + 4 + 5
"""
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(int(10))

# Prepare data
test_path = self.path + "/test_5_runs"
self.prepare_data(test_path, [1*60, 2*60, 3*60+10, 4*60, 5*60],
[60, 120, 190, 240, 24*60], 123)

stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1,
to_timestamp=self.in_ten_years, extra_params=self.extra)
self.assertEqual(stats["deleted"], 60+120+190+240+300-18)
self.assertEqual(stats["preserved"], 18)
self.assertEqual(stats["updated"], 0)

# and now re-run it to make sure we preserve the state
stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1,
to_timestamp=self.in_ten_years, extra_params=self.extra)

self.assertEqual(stats["deleted"], 0)
self.assertEqual(stats["preserved"], 18)
self.assertEqual(stats["updated"], 0)

def test_run_one_object(self):
"""
A run with a single object
Expected output: keep the object
"""
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(int(10))

# Prepare data
test_path = self.path + "/test_run_one_object"
self.prepare_data(test_path, [1], [25*60], 123)

stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1,
to_timestamp=self.in_ten_years, extra_params=self.extra)

self.assertEqual(stats["deleted"], 0)
self.assertEqual(stats["preserved"], 1)
self.assertEqual(stats["updated"], 0)

def test_run_two_object(self):
"""
A run with 2 objects
Expected output: keep the 2 objects
"""
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(int(10))

# Prepare data
test_path = self.path + "/test_run_two_object"
self.prepare_data(test_path, [2], [25*60], 123)

stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=1,
to_timestamp=self.in_ten_years, extra_params=self.extra)

self.assertEqual(stats["deleted"], 0)
self.assertEqual(stats["preserved"], 2)
self.assertEqual(stats["updated"], 0)

def test_3_runs_with_period(self):
"""
3 runs more than 24h in the past but only the middle one starts in the period that is allowed.
Expected output: second run is trimmed, not the other
"""
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(int(10))

# Prepare data
test_path = self.path + "/test_3_runs_with_period"
self.prepare_data(test_path, [30,30, 30], [120,120,25*60], 123)

current_timestamp = int(time.time() * 1000)
stats = multiple_per_run.process(self.ccdb, test_path, delay=60*24, from_timestamp=current_timestamp-29*60*60*1000,
to_timestamp=current_timestamp-26*60*60*1000, extra_params=self.extra)

self.assertEqual(stats["deleted"], 28)
self.assertEqual(stats["preserved"], 90-28)
self.assertEqual(stats["updated"], 0)

def test_asdf(self):
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(int(10))
test_path = self.path + "/asdf"
self.prepare_data(test_path, [70, 70, 70], [6*60, 6*60, 25*60], 55555)

def prepare_data(self, path, run_durations: List[int], time_till_next_run: List[int], first_run_number: int):
"""
Prepare a data set populated with a number of runs.
run_durations contains the duration of each of these runs in minutes
time_till_next_run is the time between two runs in minutes.
The first element of time_till_next_run is used to separate the first two runs.
Both lists must have the same number of elements.
"""

if len(run_durations) != len(time_till_next_run):
logging.error(f"run_durations and time_till_next_run must have the same length")
exit(1)

total_duration = 0
for a, b in zip(run_durations, time_till_next_run):
total_duration += a + b
logging.info(f"Total duration : {total_duration}")

current_timestamp = int(time.time() * 1000)
cursor = current_timestamp - total_duration * 60 * 1000
first_ts = cursor
data = {'part': 'part'}
run = first_run_number

for run_duration, time_till_next in zip(run_durations, time_till_next_run):
metadata = {'RunNumber': str(run)}
logging.debug(f"cursor: {cursor}")
logging.debug(f"time_till_next: {time_till_next}")

for i in range(run_duration):
to_ts = cursor + 24 * 60 * 60 * 1000 # a day
metadata2 = {**metadata, 'Created': str(cursor)}
version_info = ObjectVersion(path=path, validFrom=cursor, validTo=to_ts, metadata=metadata2,
createdAt=cursor)
self.ccdb.putVersion(version=version_info, data=data)
cursor += 1 * 60 * 1000

run += 1
cursor += time_till_next * 60 * 1000

return first_ts


if __name__ == '__main__':
unittest.main()
Loading