From 277c551afede83eaef934cd28e2eb4399e8b44a8 Mon Sep 17 00:00:00 2001 From: Julian Santander Date: Fri, 23 Sep 2016 18:15:29 +0200 Subject: [PATCH 1/6] Add support to RangeByMinutes backfilling tasks --- luigi/tools/range.py | 106 ++++++++++++ test/range_test.py | 372 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 476 insertions(+), 2 deletions(-) mode change 100644 => 100755 luigi/tools/range.py mode change 100644 => 100755 test/range_test.py diff --git a/luigi/tools/range.py b/luigi/tools/range.py old mode 100644 new mode 100755 index ba1c83cf2d..96db6886a7 --- a/luigi/tools/range.py +++ b/luigi/tools/range.py @@ -402,6 +402,84 @@ def _format_datetime(self, dt): return luigi.DateHourParameter().serialize(dt) +class RangeByMinutesBase(RangeBase): + """ + Produces a contiguous completed range of an recurring tasks separated a specified number of minutes. + """ + start = luigi.DateMinuteParameter( + default=None, + description="beginning date-hour-minute, inclusive. Default: None - work backward forever (requires reverse=True)") + stop = luigi.DateMinuteParameter( + default=None, + description="ending date-hour-minute, exclusive. Default: None - work forward forever") + minutes_back = luigi.IntParameter( + default=60*24, # one day + description=("extent to which contiguousness is to be assured into " + "past, in minutes from current time. Prevents infinite " + "loop when start is none. If the dataset has limited " + "retention (i.e. old outputs get removed), this should " + "be set shorter to that, too, to prevent the oldest " + "outputs flapping. Increase freely if you intend to " + "process old dates - worker's memory is the limit")) + minutes_forward = luigi.IntParameter( + default=0, + description="extent to which contiguousness is to be assured into future, " + "in minutes from current time. Prevents infinite loop when stop is none") + + minutes_interval = luigi.IntParameter( + default=5, + description="separation between events in minutes" + ) + + def datetime_to_parameter(self, dt): + return dt + + def parameter_to_datetime(self, p): + return p + + def datetime_to_parameters(self, dt): + """ + Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter + """ + return self._task_parameters(dt) + + def parameters_to_datetime(self, p): + """ + Given a dictionary of parameters, will extract the ranged task parameter value + """ + dt = p[self._param_name] + return datetime(dt.year, dt.month, dt.day, dt.hour, dt.minute) + + def moving_start(self, now): + return now - timedelta(minutes=self.minutes_back) + + def moving_stop(self, now): + return now + timedelta(minutes=self.minutes_forward) + + def finite_datetimes(self, finite_start, finite_stop): + """ + Simply returns the points in time that correspond to a whole number of minutes intervals. + """ + # start of a complete interval, e.g. 20:13 and the interval is 5 -> 20:10 + start_minute = int(finite_start.minute/self.minutes_interval)*self.minutes_interval + datehour_start = datetime( + year=finite_start.year, + month=finite_start.month, + day=finite_start.day, + hour=finite_start.hour, + minute=start_minute) + datehours = [] + for i in itertools.count(): + t = datehour_start + timedelta(minutes=i*self.minutes_interval) + if t >= finite_stop: + return datehours + if t >= finite_start: + datehours.append(t) + + def _format_datetime(self, dt): + return luigi.DateMinuteParameter().serialize(dt) + + def _constrain_glob(glob, paths, limit=5): """ Tweaks glob into a list of more specific globs that together still cover paths and not too much extra. @@ -613,3 +691,31 @@ def missing_datetimes(self, finite_datetimes): finite_datetimes, lambda d: self._instantiate_task_cls(self.datetime_to_parameter(d)), lambda d: d.strftime('(%Y).*(%m).*(%d).*(%H)')) + + +class RangeByMinutes(RangeByMinutesBase): + """Efficiently produces a contiguous completed range of an recurring + task every interval minutes that takes a single DateMinuteParameter. + + Benefits from bulk_complete information to efficiently cover gaps. + + Falls back to infer it from output filesystem listing to facilitate the + common case usage. + + Convenient to use even from command line, like: + + .. code-block:: console + + luigi --module your.module RangeByMinutes --of YourActualTask --start 2014-01-01T0123 + """ + + def missing_datetimes(self, finite_datetimes): + try: + cls_with_params = functools.partial(self.of, **self.of_params) + complete_parameters = self.of.bulk_complete.__func__(cls_with_params, map(self.datetime_to_parameter, finite_datetimes)) + return set(finite_datetimes) - set(map(self.parameter_to_datetime, complete_parameters)) + except NotImplementedError: + return infer_bulk_complete_from_fs( + finite_datetimes, + lambda d: self._instantiate_task_cls(self.datetime_to_parameter(d)), + lambda d: d.strftime('(%Y).*(%m).*(%d).*(%H).*(%M)')) diff --git a/test/range_test.py b/test/range_test.py old mode 100644 new mode 100755 index 2a64af04da..b479bf1be3 --- a/test/range_test.py +++ b/test/range_test.py @@ -22,8 +22,17 @@ import luigi import mock from luigi.mock import MockTarget, MockFileSystem -from luigi.tools.range import (RangeDaily, RangeDailyBase, RangeEvent, RangeHourly, RangeHourlyBase, _constrain_glob, - _get_filesystems_and_globs) +from luigi.tools.range import (RangeDaily, RangeDailyBase, RangeEvent, + RangeHourly, RangeHourlyBase, + RangeByMinutes, RangeByMinutesBase, + _constrain_glob, _get_filesystems_and_globs) + + +class CommonDateMinuteTask(luigi.Task): + dh = luigi.DateMinuteParameter() + + def output(self): + return MockTarget(self.dh.strftime('/n2000y01a05n/%Y_%m-_-%daww/21mm%H%Mdara21/ooo')) class CommonDateHourTask(luigi.Task): @@ -118,6 +127,36 @@ def requires(self): yield TaskB(dh=self.dh, complicator='no/worries') # str(self.dh) would complicate beyond working +class TaskMinutesA(luigi.Task): + dm = luigi.DateMinuteParameter() + + def output(self): + return MockTarget(self.dm.strftime('TaskA/%Y-%m-%d/%H%M')) + + +class TaskMinutesB(luigi.Task): + dm = luigi.DateMinuteParameter() + complicator = luigi.Parameter() + + def output(self): + return MockTarget(self.dm.strftime('TaskB/%%s%Y-%m-%d/%H%M') % self.complicator) + + +class TaskMinutesC(luigi.Task): + dm = luigi.DateMinuteParameter() + + def output(self): + return MockTarget(self.dm.strftime('not/a/real/path/%Y-%m-%d/%H%M')) + + +class CommonWrapperTaskMinutes(luigi.WrapperTask): + dm = luigi.DateMinuteParameter() + + def requires(self): + yield TaskMinutesA(dm=self.dm) + yield TaskMinutesB(dm=self.dm, complicator='no/worries') # str(self.dh) would complicate beyond working + + def mock_listdir(contents): def contents_listdir(_, glob): for path in fnmatch.filter(contents, glob + '*'): @@ -466,6 +505,210 @@ def test_start_long_before_long_hours_back_and_with_long_hours_forward(self): ) +class RangeByMinutesBaseTest(unittest.TestCase): + maxDiff = None + + def setUp(self): + # yucky to create separate callbacks; would be nicer if the callback + # received an instance of a subclass of Event, so one callback could + # accumulate all types + @RangeByMinutesBase.event_handler(RangeEvent.DELAY) + def callback_delay(*args): + self.events.setdefault(RangeEvent.DELAY, []).append(args) + + @RangeByMinutesBase.event_handler(RangeEvent.COMPLETE_COUNT) + def callback_complete_count(*args): + self.events.setdefault(RangeEvent.COMPLETE_COUNT, []).append(args) + + @RangeByMinutesBase.event_handler(RangeEvent.COMPLETE_FRACTION) + def callback_complete_fraction(*args): + self.events.setdefault(RangeEvent.COMPLETE_FRACTION, []).append(args) + + self.events = {} + + def test_consistent_formatting(self): + task = RangeByMinutesBase(of=CommonDateMinuteTask, + start=datetime.datetime(2016, 1, 1, 13)) + self.assertEqual(task._format_range( + [datetime.datetime(2016, 1, 2, 13, 10), datetime.datetime(2016, 2, 29, 23, 20)]), + '[2016-01-02T1310, 2016-02-29T2320]') + + def _empty_subcase(self, kwargs, expected_events): + calls = [] + + class RangeByMinutesDerived(RangeByMinutesBase): + def missing_datetimes(a, b, c): + args = [a, b, c] + calls.append(args) + return args[-1][:5] + + task = RangeByMinutesDerived(of=CommonDateMinuteTask, **kwargs) + self.assertEqual(task.requires(), []) + self.assertEqual(calls, []) + self.assertEqual(task.requires(), []) + self.assertEqual(calls, []) # subsequent requires() should return the cached result, never call missing_datetimes + self.assertEqual(self.events, expected_events) + self.assertTrue(task.complete()) + + def test_start_after_minutes_forward(self): + # nothing to do because start is later + self._empty_subcase( + { + 'now': datetime_to_epoch(datetime.datetime(2000, 1, 1, 4)), + 'start': datetime.datetime(2014, 3, 20, 17, 10), + 'minutes_back': 4, + 'minutes_forward': 20, + }, + { + 'event.tools.range.delay': [ + ('CommonDateMinuteTask', 0), + ], + 'event.tools.range.complete.count': [ + ('CommonDateMinuteTask', 0), + ], + 'event.tools.range.complete.fraction': [ + ('CommonDateMinuteTask', 1.), + ], + } + ) + + def _nonempty_subcase(self, kwargs, expected_finite_datetimes_range, expected_requires, expected_events): + calls = [] + + class RangeByMinutesDerived(RangeByMinutesBase): + def missing_datetimes(a, b, c): + args = [a, b, c] + calls.append(args) + return args[-1][:7] + + task = RangeByMinutesDerived(of=CommonDateMinuteTask, **kwargs) + self.assertEqual(list(map(str, task.requires())), expected_requires) + self.assertEqual(calls[0][1], CommonDateMinuteTask) + self.assertEqual((min(calls[0][2]), max(calls[0][2])), expected_finite_datetimes_range) + self.assertEqual(list(map(str, task.requires())), expected_requires) + self.assertEqual(len(calls), 1) # subsequent requires() should return the cached result, not call missing_datetimes again + self.assertEqual(self.events, expected_events) + self.assertFalse(task.complete()) + + def test_start_and_minutes_period(self): + self._nonempty_subcase( + { + 'now': datetime_to_epoch(datetime.datetime(2016, 9, 1, 12, 0, 0)), + 'start': datetime.datetime(2016, 9, 1, 11, 0, 0), + 'minutes_back': 24 * 60, + 'minutes_forward': 0, + 'minutes_interval': 3, + }, + (datetime.datetime(2016, 9, 1, 11, 0), datetime.datetime(2016, 9, 1, 11, 57, 0)), + [ + 'CommonDateMinuteTask(dh=2016-09-01T1100)', + 'CommonDateMinuteTask(dh=2016-09-01T1103)', + 'CommonDateMinuteTask(dh=2016-09-01T1106)', + 'CommonDateMinuteTask(dh=2016-09-01T1109)', + 'CommonDateMinuteTask(dh=2016-09-01T1112)', + 'CommonDateMinuteTask(dh=2016-09-01T1115)', + 'CommonDateMinuteTask(dh=2016-09-01T1118)', + ], + { + 'event.tools.range.delay': [ + ('CommonDateMinuteTask', 20), # First missing is the 20th + ], + 'event.tools.range.complete.count': [ + ('CommonDateMinuteTask', 13), # 20 intervals - 7 missing + ], + 'event.tools.range.complete.fraction': [ + ('CommonDateMinuteTask', 13. / (13 + 7)), # (exptected - missing) / expected + ], + } + ) + + def test_start_long_before_minutes_back(self): + self._nonempty_subcase( + { + 'now': datetime_to_epoch(datetime.datetime(2000, 1, 1, 0, 3, 0)), + 'start': datetime.datetime(1960, 1, 1, 0, 0, 0), + 'minutes_back': 5, + 'minutes_forward': 20, + }, + (datetime.datetime(2000, 1, 1, 0, 0), datetime.datetime(2000, 1, 1, 0, 20, 0)), + [ + 'CommonDateMinuteTask(dh=2000-01-01T0000)', + 'CommonDateMinuteTask(dh=2000-01-01T0005)', + 'CommonDateMinuteTask(dh=2000-01-01T0010)', + 'CommonDateMinuteTask(dh=2000-01-01T0015)', + 'CommonDateMinuteTask(dh=2000-01-01T0020)', + ], + { + 'event.tools.range.delay': [ + ('CommonDateMinuteTask', 5), # because of short minutes_back we're oblivious to those 40 preceding years + ], + 'event.tools.range.complete.count': [ + ('CommonDateMinuteTask', 4207680), # expected intervals - missing. + ], + 'event.tools.range.complete.fraction': [ + ('CommonDateMinuteTask', 4207680. / 4207685), # (expected - missing) / expected + ], + } + ) + + def test_start_after_long_minutes_back(self): + self._nonempty_subcase( + { + 'now': datetime_to_epoch(datetime.datetime(2014, 3, 20, 18, 4, 29)), + 'start': datetime.datetime(2014, 3, 20, 17, 10), + 'task_limit': 4, + 'minutes_back': 365 * 24 * 60, + }, + (datetime.datetime(2014, 3, 20, 17, 10, 0), datetime.datetime(2014, 3, 20, 18, 0, 0)), + [ + 'CommonDateMinuteTask(dh=2014-03-20T1710)', + 'CommonDateMinuteTask(dh=2014-03-20T1715)', + 'CommonDateMinuteTask(dh=2014-03-20T1720)', + 'CommonDateMinuteTask(dh=2014-03-20T1725)', + ], + { + 'event.tools.range.delay': [ + ('CommonDateMinuteTask', 11), + ], + 'event.tools.range.complete.count': [ + ('CommonDateMinuteTask', 4), + ], + 'event.tools.range.complete.fraction': [ + ('CommonDateMinuteTask', 4. / 11), + ], + } + ) + + def test_start_long_before_long_minutes_back_and_with_long_minutes_forward(self): + self._nonempty_subcase( + { + 'now': datetime_to_epoch(datetime.datetime(2017, 3, 22, 20, 4, 29)), + 'start': datetime.datetime(2011, 3, 20, 17, 10, 0), + 'task_limit': 4, + 'minutes_back': 365 * 24 * 60, + 'minutes_forward': 365 * 24 * 60, + }, + (datetime.datetime(2016, 3, 22, 20, 5), datetime.datetime(2018, 3, 22, 20, 0)), + [ + 'CommonDateMinuteTask(dh=2016-03-22T2005)', + 'CommonDateMinuteTask(dh=2016-03-22T2010)', + 'CommonDateMinuteTask(dh=2016-03-22T2015)', + 'CommonDateMinuteTask(dh=2016-03-22T2020)', + ], + { + 'event.tools.range.delay': [ + ('CommonDateMinuteTask', 210240), + ], + 'event.tools.range.complete.count': [ + ('CommonDateMinuteTask', 737020), + ], + 'event.tools.range.complete.fraction': [ + ('CommonDateMinuteTask', 737020. / (737020 + 7)), + ], + } + ) + + class FilesystemInferenceTest(unittest.TestCase): def _test_filesystems_and_globs(self, datetime_to_task, datetime_to_re, expected): @@ -493,6 +736,15 @@ def test_datehour_glob_successfully_inferred(self): ] ) + def test_dateminute_glob_successfully_inferred(self): + self._test_filesystems_and_globs( + lambda d: CommonDateMinuteTask(d), + lambda d: d.strftime('(%Y).*(%m).*(%d).*(%H).*(%M)'), + [ + (MockFileSystem, '/n2000y01a05n/[0-9][0-9][0-9][0-9]_[0-9][0-9]-_-[0-9][0-9]aww/21mm[0-9][0-9][0-9][0-9]dara21'), + ] + ) + def test_wrapped_datehour_globs_successfully_inferred(self): self._test_filesystems_and_globs( lambda d: CommonWrapperTask(d), @@ -724,6 +976,122 @@ def test_missing_directory(self): self.assertEqual([str(t) for t in task.requires()], expected) +class RangeByMinutesTest(unittest.TestCase): + + # fishy to mock the mock, but MockFileSystem doesn't support globs yet + @mock.patch('luigi.mock.MockFileSystem.listdir', new=mock_listdir(mock_contents)) + @mock.patch('luigi.mock.MockFileSystem.exists', + new=mock_exists_always_true) + def test_missing_tasks_correctly_required(self): + expected_tasks = [ + 'SomeByMinutesTask(d=2016-03-31T0000)', + 'SomeByMinutesTask(d=2016-03-31T0005)', + 'SomeByMinutesTask(d=2016-03-31T0010)'] + + class SomeByMinutesTask(luigi.Task): + d = luigi.DateMinuteParameter() + + def output(self): + return MockTarget(self.d.strftime('/data/2014/p/v/z/%Y_/_%m-_-%doctor/20/%HZ%MOOO')) + + for task_path in task_a_paths: + MockTarget(task_path) + # this test takes a few seconds. Since stop is not defined, + # finite_datetimes constitute many years to consider + task = RangeByMinutes(now=datetime_to_epoch(datetime.datetime(2016, 4, 1)), + of=SomeByMinutesTask, + start=datetime.datetime(2014, 3, 20, 17), + task_limit=3, + minutes_back=24 * 60) + actual = [str(t) for t in task.requires()] + self.assertEqual(actual, expected_tasks) + + @mock.patch('luigi.mock.MockFileSystem.listdir', new=mock_listdir(mock_contents)) + @mock.patch('luigi.mock.MockFileSystem.exists', + new=mock_exists_always_true) + def test_missing_wrapper_tasks_correctly_required(self): + expected_wrapper = [ + 'CommonWrapperTaskMinutes(dm=2014-03-20T2300)', + 'CommonWrapperTaskMinutes(dm=2014-03-20T2305)', + 'CommonWrapperTaskMinutes(dm=2014-03-20T2310)', + 'CommonWrapperTaskMinutes(dm=2014-03-20T2315)'] + task = RangeByMinutes( + now=datetime_to_epoch(datetime.datetime(2040, 4, 1, 0, 0, 0)), + of=CommonWrapperTaskMinutes, + start=datetime.datetime(2014, 3, 20, 23, 0, 0), + stop=datetime.datetime(2014, 3, 20, 23, 20, 0), + minutes_back=30 * 365 * 24 * 60) + actual = [str(t) for t in task.requires()] + self.assertEqual(actual, expected_wrapper) + + def test_bulk_complete_correctly_interfaced(self): + class BulkCompleteByMinutesTask(luigi.Task): + dh = luigi.DateMinuteParameter() + + @classmethod + def bulk_complete(cls, parameter_tuples): + return parameter_tuples[:-2] + + def output(self): + raise RuntimeError("Shouldn't get called while resolving deps via bulk_complete") + + task = RangeByMinutes(now=datetime_to_epoch(datetime.datetime(2015, 12, 1)), + of=BulkCompleteByMinutesTask, + start=datetime.datetime(2015, 11, 1), + stop=datetime.datetime(2015, 12, 1)) + + expected = [ + 'BulkCompleteByMinutesTask(dh=2015-11-30T2350)', + 'BulkCompleteByMinutesTask(dh=2015-11-30T2355)', + ] + + actual = [str(t) for t in task.requires()] + self.assertEqual(actual, expected) + + def test_bulk_complete_of_params(self): + class BulkCompleteByMinutesTask(luigi.Task): + non_positional_arbitrary_argument = luigi.Parameter(default="whatever", positional=False, significant=False) + dh = luigi.DateMinuteParameter() + arbitrary_argument = luigi.BoolParameter() + + @classmethod + def bulk_complete(cls, parameter_tuples): + for t in map(cls, parameter_tuples): + assert t.arbitrary_argument + return parameter_tuples[:-2] + + def output(self): + raise RuntimeError("Shouldn't get called while resolving deps via bulk_complete") + + task = RangeByMinutes(now=datetime_to_epoch(datetime.datetime(2015, 12, 1)), + of=BulkCompleteByMinutesTask, + of_params=dict(arbitrary_argument=True), + start=datetime.datetime(2015, 11, 1), + stop=datetime.datetime(2015, 12, 1)) + + expected = [ + 'BulkCompleteByMinutesTask(dh=2015-11-30T2350, arbitrary_argument=True)', + 'BulkCompleteByMinutesTask(dh=2015-11-30T2355, arbitrary_argument=True)', + ] + + actual = [str(t) for t in task.requires()] + self.assertEqual(actual, expected) + + @mock.patch('luigi.mock.MockFileSystem.exists', + new=mock_exists_always_false) + def test_missing_directory(self): + task = RangeByMinutes(now=datetime_to_epoch( + datetime.datetime(2014, 3, 21, 0, 0)), + of=TaskMinutesC, + start=datetime.datetime(2014, 3, 20, 23, 11), + stop=datetime.datetime(2014, 3, 20, 23, 21)) + self.assertFalse(task.complete()) + expected = [ + 'TaskMinutesC(dm=2014-03-20T2315)', + 'TaskMinutesC(dm=2014-03-20T2320)'] + self.assertEqual([str(t) for t in task.requires()], expected) + + class RangeInstantiationTest(LuigiTestCase): def test_old_instantiation(self): From 899807cd09c61b7d422bf92d2753c50237e3143c Mon Sep 17 00:00:00 2001 From: Julian Santander Date: Mon, 26 Sep 2016 08:47:33 +0200 Subject: [PATCH 2/6] Modify default value of RangeByMinute's minute-interval to 1 minute --- luigi/tools/range.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/luigi/tools/range.py b/luigi/tools/range.py index 96db6886a7..e504074aa8 100755 --- a/luigi/tools/range.py +++ b/luigi/tools/range.py @@ -427,7 +427,7 @@ class RangeByMinutesBase(RangeBase): "in minutes from current time. Prevents infinite loop when stop is none") minutes_interval = luigi.IntParameter( - default=5, + default=1, description="separation between events in minutes" ) From 4a1a823909d6aa954a52829d3af052c0060d65ec Mon Sep 17 00:00:00 2001 From: Julian Santander Date: Mon, 26 Sep 2016 08:58:39 +0200 Subject: [PATCH 3/6] Update unit tests after modifying the default minutes_interval in RangeByMinutes tasks --- test/range_test.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/test/range_test.py b/test/range_test.py index b479bf1be3..d216e02582 100755 --- a/test/range_test.py +++ b/test/range_test.py @@ -528,7 +528,8 @@ def callback_complete_fraction(*args): def test_consistent_formatting(self): task = RangeByMinutesBase(of=CommonDateMinuteTask, - start=datetime.datetime(2016, 1, 1, 13)) + start=datetime.datetime(2016, 1, 1, 13), + minutes_interval=5) self.assertEqual(task._format_range( [datetime.datetime(2016, 1, 2, 13, 10), datetime.datetime(2016, 2, 29, 23, 20)]), '[2016-01-02T1310, 2016-02-29T2320]') @@ -558,6 +559,7 @@ def test_start_after_minutes_forward(self): 'start': datetime.datetime(2014, 3, 20, 17, 10), 'minutes_back': 4, 'minutes_forward': 20, + 'minutes_interval': 5, }, { 'event.tools.range.delay': [ @@ -629,6 +631,7 @@ def test_start_long_before_minutes_back(self): 'start': datetime.datetime(1960, 1, 1, 0, 0, 0), 'minutes_back': 5, 'minutes_forward': 20, + 'minutes_interval': 5, }, (datetime.datetime(2000, 1, 1, 0, 0), datetime.datetime(2000, 1, 1, 0, 20, 0)), [ @@ -658,6 +661,7 @@ def test_start_after_long_minutes_back(self): 'start': datetime.datetime(2014, 3, 20, 17, 10), 'task_limit': 4, 'minutes_back': 365 * 24 * 60, + 'minutes_interval': 5, }, (datetime.datetime(2014, 3, 20, 17, 10, 0), datetime.datetime(2014, 3, 20, 18, 0, 0)), [ @@ -687,6 +691,7 @@ def test_start_long_before_long_minutes_back_and_with_long_minutes_forward(self) 'task_limit': 4, 'minutes_back': 365 * 24 * 60, 'minutes_forward': 365 * 24 * 60, + 'minutes_interval': 5, }, (datetime.datetime(2016, 3, 22, 20, 5), datetime.datetime(2018, 3, 22, 20, 0)), [ @@ -1002,7 +1007,8 @@ def output(self): of=SomeByMinutesTask, start=datetime.datetime(2014, 3, 20, 17), task_limit=3, - minutes_back=24 * 60) + minutes_back=24 * 60, + minutes_interval=5) actual = [str(t) for t in task.requires()] self.assertEqual(actual, expected_tasks) @@ -1020,7 +1026,8 @@ def test_missing_wrapper_tasks_correctly_required(self): of=CommonWrapperTaskMinutes, start=datetime.datetime(2014, 3, 20, 23, 0, 0), stop=datetime.datetime(2014, 3, 20, 23, 20, 0), - minutes_back=30 * 365 * 24 * 60) + minutes_back=30 * 365 * 24 * 60, + minutes_interval=5) actual = [str(t) for t in task.requires()] self.assertEqual(actual, expected_wrapper) @@ -1038,7 +1045,8 @@ def output(self): task = RangeByMinutes(now=datetime_to_epoch(datetime.datetime(2015, 12, 1)), of=BulkCompleteByMinutesTask, start=datetime.datetime(2015, 11, 1), - stop=datetime.datetime(2015, 12, 1)) + stop=datetime.datetime(2015, 12, 1), + minutes_interval=5) expected = [ 'BulkCompleteByMinutesTask(dh=2015-11-30T2350)', @@ -1067,7 +1075,8 @@ def output(self): of=BulkCompleteByMinutesTask, of_params=dict(arbitrary_argument=True), start=datetime.datetime(2015, 11, 1), - stop=datetime.datetime(2015, 12, 1)) + stop=datetime.datetime(2015, 12, 1), + minutes_interval=5) expected = [ 'BulkCompleteByMinutesTask(dh=2015-11-30T2350, arbitrary_argument=True)', @@ -1084,7 +1093,8 @@ def test_missing_directory(self): datetime.datetime(2014, 3, 21, 0, 0)), of=TaskMinutesC, start=datetime.datetime(2014, 3, 20, 23, 11), - stop=datetime.datetime(2014, 3, 20, 23, 21)) + stop=datetime.datetime(2014, 3, 20, 23, 21), + minutes_interval=5) self.assertFalse(task.complete()) expected = [ 'TaskMinutesC(dm=2014-03-20T2315)', From 927ba15b27b6c43a171c92929d7bcad0e9e28f81 Mon Sep 17 00:00:00 2001 From: Julian Santander Date: Mon, 26 Sep 2016 10:28:32 +0200 Subject: [PATCH 4/6] Add validations on minutes_interval parameter --- luigi/tools/range.py | 7 ++++++- test/range_test.py | 43 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/luigi/tools/range.py b/luigi/tools/range.py index e504074aa8..7abbf5ab0a 100755 --- a/luigi/tools/range.py +++ b/luigi/tools/range.py @@ -428,7 +428,7 @@ class RangeByMinutesBase(RangeBase): minutes_interval = luigi.IntParameter( default=1, - description="separation between events in minutes" + description="separation between events in minutes. It must evenly divide 60" ) def datetime_to_parameter(self, dt): @@ -460,6 +460,11 @@ def finite_datetimes(self, finite_start, finite_stop): """ Simply returns the points in time that correspond to a whole number of minutes intervals. """ + # Validate that the minutes_interval can divide 60 and it is greater than 0. + if self.minutes_interval <= 0: + raise ParameterException('minutes-interval must be > 0') + if (60 / self.minutes_interval) * self.minutes_interval != 60: + raise ParameterException('minutes-interval does not evenly divide 60') # start of a complete interval, e.g. 20:13 and the interval is 5 -> 20:10 start_minute = int(finite_start.minute/self.minutes_interval)*self.minutes_interval datehour_start = datetime( diff --git a/test/range_test.py b/test/range_test.py index d216e02582..56ec6e5399 100755 --- a/test/range_test.py +++ b/test/range_test.py @@ -592,6 +592,49 @@ def missing_datetimes(a, b, c): self.assertEqual(self.events, expected_events) self.assertFalse(task.complete()) + def test_negative_interval(self): + class SomeByMinutesTask(luigi.Task): + d = luigi.DateMinuteParameter() + + def output(self): + return MockTarget(self.d.strftime('/data/2014/p/v/z/%Y_/_%m-_-%doctor/20/%HZ%MOOO')) + + for task_path in task_a_paths: + MockTarget(task_path) + # this test takes a few seconds. Since stop is not defined, + # finite_datetimes constitute many years to consider + task = RangeByMinutes(now=datetime_to_epoch(datetime.datetime(2016, 4, 1)), + of=SomeByMinutesTask, + start=datetime.datetime(2014, 3, 20, 17), + minutes_interval=-1) + try: + task.requires() + except luigi.parameter.ParameterException: + return + self.fail("Expected a parameter exception") + + def test_non_dividing_interval(self): + class SomeByMinutesTask(luigi.Task): + d = luigi.DateMinuteParameter() + + def output(self): + return MockTarget(self.d.strftime('/data/2014/p/v/z/%Y_/_%m-_-%doctor/20/%HZ%MOOO')) + + for task_path in task_a_paths: + MockTarget(task_path) + # this test takes a few seconds. Since stop is not defined, + # finite_datetimes constitute many years to consider + task = RangeByMinutes(now=datetime_to_epoch(datetime.datetime(2016, 4, 1)), + of=SomeByMinutesTask, + start=datetime.datetime(2014, 3, 20, 17), + minutes_interval=8) + try: + task.requires() + except luigi.parameter.ParameterException: + return + self.fail("Expected a parameter exception") + + def test_start_and_minutes_period(self): self._nonempty_subcase( { From dc7e203674527db140b824b38622610f6008542d Mon Sep 17 00:00:00 2001 From: Julian Santander Date: Mon, 26 Sep 2016 11:22:47 +0200 Subject: [PATCH 5/6] Fix flake8 validations --- test/range_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/test/range_test.py b/test/range_test.py index 56ec6e5399..d21189c700 100755 --- a/test/range_test.py +++ b/test/range_test.py @@ -634,7 +634,6 @@ def output(self): return self.fail("Expected a parameter exception") - def test_start_and_minutes_period(self): self._nonempty_subcase( { From fa4587f888185bb8c680251ca438c8a52426c661 Mon Sep 17 00:00:00 2001 From: Julian Santander Date: Mon, 26 Sep 2016 12:27:42 +0200 Subject: [PATCH 6/6] Address review comments --- luigi/tools/range.py | 6 +++--- test/range_test.py | 20 ++------------------ 2 files changed, 5 insertions(+), 21 deletions(-) diff --git a/luigi/tools/range.py b/luigi/tools/range.py index 7abbf5ab0a..eaf58af33f 100755 --- a/luigi/tools/range.py +++ b/luigi/tools/range.py @@ -460,9 +460,9 @@ def finite_datetimes(self, finite_start, finite_stop): """ Simply returns the points in time that correspond to a whole number of minutes intervals. """ - # Validate that the minutes_interval can divide 60 and it is greater than 0. - if self.minutes_interval <= 0: - raise ParameterException('minutes-interval must be > 0') + # Validate that the minutes_interval can divide 60 and it is greater than 0 and lesser than 60 + if not (0 < self.minutes_interval < 60): + raise ParameterException('minutes-interval must be within 0..60') if (60 / self.minutes_interval) * self.minutes_interval != 60: raise ParameterException('minutes-interval does not evenly divide 60') # start of a complete interval, e.g. 20:13 and the interval is 5 -> 20:10 diff --git a/test/range_test.py b/test/range_test.py index d21189c700..9d56a24dfa 100755 --- a/test/range_test.py +++ b/test/range_test.py @@ -599,19 +599,11 @@ class SomeByMinutesTask(luigi.Task): def output(self): return MockTarget(self.d.strftime('/data/2014/p/v/z/%Y_/_%m-_-%doctor/20/%HZ%MOOO')) - for task_path in task_a_paths: - MockTarget(task_path) - # this test takes a few seconds. Since stop is not defined, - # finite_datetimes constitute many years to consider task = RangeByMinutes(now=datetime_to_epoch(datetime.datetime(2016, 4, 1)), of=SomeByMinutesTask, start=datetime.datetime(2014, 3, 20, 17), minutes_interval=-1) - try: - task.requires() - except luigi.parameter.ParameterException: - return - self.fail("Expected a parameter exception") + self.assertRaises(luigi.parameter.ParameterException, task.requires) def test_non_dividing_interval(self): class SomeByMinutesTask(luigi.Task): @@ -620,19 +612,11 @@ class SomeByMinutesTask(luigi.Task): def output(self): return MockTarget(self.d.strftime('/data/2014/p/v/z/%Y_/_%m-_-%doctor/20/%HZ%MOOO')) - for task_path in task_a_paths: - MockTarget(task_path) - # this test takes a few seconds. Since stop is not defined, - # finite_datetimes constitute many years to consider task = RangeByMinutes(now=datetime_to_epoch(datetime.datetime(2016, 4, 1)), of=SomeByMinutesTask, start=datetime.datetime(2014, 3, 20, 17), minutes_interval=8) - try: - task.requires() - except luigi.parameter.ParameterException: - return - self.fail("Expected a parameter exception") + self.assertRaises(luigi.parameter.ParameterException, task.requires) def test_start_and_minutes_period(self): self._nonempty_subcase(