diff --git a/luigi/contrib/sge.py b/luigi/contrib/sge.py index 253b3a796f..2e6358ad96 100755 --- a/luigi/contrib/sge.py +++ b/luigi/contrib/sge.py @@ -278,9 +278,11 @@ def _dump(self, out_dir=''): d = pickle.dumps(self) module_name = os.path.basename(sys.argv[0]).rsplit('.', 1)[0] d = d.replace('(c__main__', "(c" + module_name) - open(self.job_file, "w").write(d) + with open(self.job_file, "w") as f: + f.write(d) else: - pickle.dump(self, open(self.job_file, "w")) + with open(self.job_file, "wb") as f: + pickle.dump(self, f) def _run_job(self): diff --git a/luigi/tools/range.py b/luigi/tools/range.py index b3c31d3aa3..9553dd082b 100755 --- a/luigi/tools/range.py +++ b/luigi/tools/range.py @@ -466,7 +466,7 @@ def finite_datetimes(self, finite_start, finite_stop): # Validate that the minutes_interval can divide 60 and it is greater than 0 and lesser than 60 if not (0 < self.minutes_interval < 60): raise ParameterException('minutes-interval must be within 0..60') - if (60 / self.minutes_interval) * self.minutes_interval != 60: + if 60 % self.minutes_interval != 0: raise ParameterException('minutes-interval does not evenly divide 60') # start of a complete interval, e.g. 20:13 and the interval is 5 -> 20:10 start_minute = int(finite_start.minute/self.minutes_interval)*self.minutes_interval diff --git a/test/contrib/sge_test.py b/test/contrib/sge_test.py old mode 100755 new mode 100644 index 8b768c4360..0f81cbadc3 --- a/test/contrib/sge_test.py +++ b/test/contrib/sge_test.py @@ -53,7 +53,7 @@ def on_sge_master(): class TestSGEWrappers(unittest.TestCase): def test_track_job(self): - '''`track_job` returns the state using qstat''' + """`track_job` returns the state using qstat""" self.assertEqual(_parse_qstat_state(QSTAT_OUTPUT, 1), 'r') self.assertEqual(_parse_qstat_state(QSTAT_OUTPUT, 2), 'qw') self.assertEqual(_parse_qstat_state(QSTAT_OUTPUT, 3), 't') @@ -63,7 +63,7 @@ def test_track_job(self): class TestJobTask(SGEJobTask): - '''Simple SGE job: write a test file to NSF shared drive and waits a minute''' + """Simple SGE job: write a test file to NSF shared drive and waits a minute""" i = luigi.Parameter() @@ -79,7 +79,7 @@ def output(self): @attr('contrib') class TestSGEJob(unittest.TestCase): - '''Test from SGE master node''' + """Test from SGE master node""" def test_run_job(self): if on_sge_master(): @@ -94,7 +94,7 @@ def test_run_job_with_dump(self, mock_check_output): 'Your job 12345 ("test_job") has been submitted', '' ] - task = TestJobTask(i=1, n_cpu=1, shared_tmp_dir='/tmp') + task = TestJobTask(i="1", n_cpu=1, shared_tmp_dir='/tmp') luigi.build([task], local_scheduler=True) self.assertEqual(mock_check_output.call_count, 2) diff --git a/test/range_test.py b/test/range_test.py old mode 100755 new mode 100644 index 0f1cf790be..4be8760a54 --- a/test/range_test.py +++ b/test/range_test.py @@ -1100,7 +1100,7 @@ class MyTask(luigi.Task): def complete(self): return False - range_task = RangeMonthly(now=datetime_to_epoch(datetime.datetime(2015, 12, 2)), + range_task = RangeMonthly(now=datetime_to_epoch(datetime.datetime(2016, 1, 1)), of=MyTask, start=datetime.date(2015, 12, 1), stop=datetime.date(2016, 1, 1)) @@ -1125,7 +1125,7 @@ def run(self): self.comp = True MyTask.secret = 'yay' - now = str(int(datetime_to_epoch(datetime.datetime(2015, 12, 2)))) + now = str(int(datetime_to_epoch(datetime.datetime(2016, 1, 1)))) self.run_locally_split('RangeMonthly --of wohoo.MyTask --now {now} --start 2015-12 --stop 2016-01'.format(now=now)) self.assertEqual(MyTask(month_param=datetime.date(1934, 12, 1)).secret, 'yay') @@ -1137,7 +1137,7 @@ class MyTask(luigi.Task): def complete(self): return False - range_task = RangeMonthly(now=datetime_to_epoch(datetime.datetime(2015, 12, 2)), + range_task = RangeMonthly(now=datetime_to_epoch(datetime.datetime(2016, 1, 1)), of=MyTask, start=datetime.date(2015, 12, 1), stop=datetime.date(2016, 1, 1), @@ -1153,7 +1153,7 @@ class MyTask(luigi.Task): def output(self): return MockTarget(self.month_param.strftime('/n2000y01a05n/%Y_%m-aww/21mm%Hdara21/ooo')) - range_task = RangeMonthly(now=datetime_to_epoch(datetime.datetime(2015, 12, 2)), + range_task = RangeMonthly(now=datetime_to_epoch(datetime.datetime(2016, 1, 1)), of=MyTask, start=datetime.date(2015, 12, 1), stop=datetime.date(2016, 1, 1), @@ -1197,7 +1197,7 @@ def run(self): self.comp = True MyTask.state = (self.arbitrary_param, self.arbitrary_integer_param) - now = str(int(datetime_to_epoch(datetime.datetime(2015, 12, 2)))) + now = str(int(datetime_to_epoch(datetime.datetime(2016, 1, 1)))) self.run_locally(['RangeMonthly', '--of', 'wohoo.MyTask', '--of-params', '{"arbitrary_param":"bar","arbitrary_integer_param":5}', '--now', '{0}'.format(now), '--start', '2015-12', '--stop', '2016-01']) @@ -1444,7 +1444,7 @@ class BulkCompleteByMinutesTask(luigi.Task): @classmethod def bulk_complete(cls, parameter_tuples): - return parameter_tuples[:-2] + return list(parameter_tuples)[:-2] def output(self): raise RuntimeError("Shouldn't get called while resolving deps via bulk_complete") @@ -1471,9 +1471,10 @@ class BulkCompleteByMinutesTask(luigi.Task): @classmethod def bulk_complete(cls, parameter_tuples): + ptuples = list(parameter_tuples) for t in map(cls, parameter_tuples): assert t.arbitrary_argument - return parameter_tuples[:-2] + return ptuples[:-2] def output(self): raise RuntimeError("Shouldn't get called while resolving deps via bulk_complete")