Skip to content

Commit

Permalink
[luigi.contrib.external_program.ExternalProgramTask] logs_output_patt…
Browse files Browse the repository at this point in the history
…ern_to_url provided (#2822)

* logs_output_pattern_to_url provided

* no odd line

* build_tracking_url

* 2.7 mentions removed from README and setup.py
  • Loading branch information
drowoseque authored and honnix committed Dec 3, 2019
1 parent 3f2da3b commit 63bb867
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 1 deletion.
12 changes: 11 additions & 1 deletion luigi/contrib/external_program.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ def _clean_output_file(self, file_object):
file_object.seek(0)
return ''.join(map(lambda s: s.decode('utf-8'), file_object.readlines()))

def build_tracking_url(self, logs_output):
"""
This method is intended for transforming pattern match in logs to an URL
:param logs_output: Found match of `self.tracking_url_pattern`
:return: a tracking URL for the task
"""
return logs_output

def run(self):
args = list(map(str, self.program_args()))

Expand Down Expand Up @@ -180,7 +188,9 @@ def _track_url_by_pattern():
file_to_write.write(new_line)
match = re.search(pattern, new_line.decode('utf-8'))
if match:
self.set_tracking_url(match.group(1))
self.set_tracking_url(
self.build_tracking_url(match.group(1))
)
else:
sleep(time_to_sleep)

Expand Down
25 changes: 25 additions & 0 deletions test/contrib/external_program_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,31 @@ def fake_set_tracking_url(val, url):
proc.wait()
self.assertEqual(test_val.value, 1)

def test_tracking_url_context_works_correctly_when_logs_output_pattern_to_url_is_not_default(self):

class _Task(TestEchoTask):
def build_tracking_url(self, logs_output):
return 'The {} is mine'.format(logs_output)

test_val = Value('i', 0)

def fake_set_tracking_url(val, url):
if url == "The world is mine":
val.value += 1

task = _Task(
capture_output=False,
stream_for_searching_tracking_url='stdout',
tracking_url_pattern=r"Hello, (.*)!"
)

test_args = list(map(str, task.program_args()))

with mock.patch.object(task, 'set_tracking_url', new=partial(fake_set_tracking_url, test_val)):
with task._proc_with_tracking_url_context(proc_args=test_args, proc_kwargs={}) as proc:
proc.wait()
self.assertEqual(test_val.value, 1)


class TestExternalPythonProgramTask(ExternalPythonProgramTask):
virtualenv = '/path/to/venv'
Expand Down

0 comments on commit 63bb867

Please sign in to comment.