diff --git a/luigi/contrib/spark.py b/luigi/contrib/spark.py index d84d08312c..3282b61bfa 100644 --- a/luigi/contrib/spark.py +++ b/luigi/contrib/spark.py @@ -17,6 +17,7 @@ import logging import os +import re import sys import tempfile import shutil @@ -298,8 +299,9 @@ def app_command(self): return [self.app, pickle_loc] + self.app_options() def run(self): - self.run_path = tempfile.mkdtemp(prefix=self.name) - self.run_pickle = os.path.join(self.run_path, '.'.join([self.name.replace(' ', '_'), 'pickle'])) + path_name_fragment = re.sub(r'[^\w]', '_', self.name) + self.run_path = tempfile.mkdtemp(prefix=path_name_fragment) + self.run_pickle = os.path.join(self.run_path, '.'.join([path_name_fragment, 'pickle'])) with open(self.run_pickle, 'wb') as fd: # Copy module file to run path. module_path = os.path.abspath(inspect.getfile(self.__class__)) diff --git a/test/contrib/spark_test.py b/test/contrib/spark_test.py index 74344d188a..45341051e3 100644 --- a/test/contrib/spark_test.py +++ b/test/contrib/spark_test.py @@ -97,6 +97,10 @@ def main(self, sc, *args): sc.textFile(self.input().path).saveAsTextFile(self.output().path) +class MessyNamePySparkTask(TestPySparkTask): + name = 'AppName(a,b,c,1:2,3/4)' + + @attr('apache') class SparkSubmitTaskTest(unittest.TestCase): ss = 'ss-stub' @@ -289,3 +293,10 @@ def mock_spark_submit(task): sc.textFile.assert_called_with('input') sc.textFile.return_value.saveAsTextFile.assert_called_with('output') + + @patch('luigi.contrib.external_program.subprocess.Popen') + def test_name_cleanup(self, proc): + setup_run_process(proc) + job = MessyNamePySparkTask() + job.run() + assert 'AppName_a_b_c_1_2_3_4_' in job.run_path