Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Luigi task does not accept dict defined in TOML Config file. #2538

Closed
adamist521 opened this issue Sep 28, 2018 · 5 comments · Fixed by #2540
Closed

Luigi task does not accept dict defined in TOML Config file. #2538

adamist521 opened this issue Sep 28, 2018 · 5 comments · Fixed by #2540
Labels

Comments

@adamist521
Copy link

Overview

Using TOML for configuration file and faced error parsing dict.

Tried three ways to parse dict (key-value pairs) on TOML config.

  • Case 1: Parsing with luigi.Parameter inside luigi.Task .
    -> Task fails but somehow outputs expected results.

  • Case 2: Parsing with luigi.Parameter inside luigi.Config .
    -> Task success with warning and outputs expected results.

  • Case 3: Parsing with luigi.DictParameter inside luigi.Config.
    -> Task Fails and no creates no output.

Using Case 2 for now... (Because it works even with warnings...)
Should be problem with string, dict and JSON stuff...

Any suggestions on what the problem is or where to start with if I wanted to commit?

Reproduction

Using test.py & config.toml for all of the cases.

Environment

  • python 3.7.0
  • luigi 2.7.8
  • toml 0.9.6

Files

Luigi Task Python Script (includes all Task)

test.py

import luigi


class TestCase1(luigi.Task):
    text = luigi.Parameter()
    dic = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget("data/case1.txt")

    def requires(self):
        return None

    def run(self):
        with self.output().open('w') as out_file:
            out_file.write('text: {}, dict: {}'.format(self.text, self.dic))


class TaskConfig2(luigi.Config):
    dic = luigi.Parameter()


class TestCase2(luigi.Task):
    text = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget("data/case2.txt")

    def requires(self):
        return None

    def run(self):
        dic = TaskConfig2().dic
        with self.output().open('w') as out_file:
            out_file.write('text: {}, dict: {}'.format(self.text, dic))


class TaskConfig3(luigi.Config):
    dic = luigi.DictParameter()


class TestCase3(luigi.Task):
    text = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget("data/case3.txt")

    def requires(self):
        return None

    def run(self):
        dic = TaskConfig3().dic
        with self.output().open('w') as out_file:
            out_file.write('text: {}, dict: {}'.format(self.text, dic))

TOML config file

config.toml

[TestCase1]
text = "sample text"

[TestCase1.dic]
key1 = 'value1'
key2 = 'value2'

[TestCase2]
text = "sample text"

[TaskConfig2.dic]
ckey1 = 'value1'
ckey2 = 'value2'


[TestCase3]
text = "sample text"

[TaskConfig3.dic]
ckey1 = 'value1'
ckey2 = 'value2'

Case 1.

Case 1: Parsing with luigi.Parameter inside luigi.Task .

Command:

PYTHONPATH='.' LUIGI_CONFIG_PARSER='toml' LUIGI_CONFIG_PATH='config.toml' pipenv run luigi --module test TestCase1 --local-scheduler

CLI Output:

/Users/****/.local/share/virtualenvs/*****-6FWU2-1k/lib/python3.7/site-packages/luigi/parameter.py:284: UserWarning: Parameter "dic" with value "{'key1': 'value1', 'key2': 'value2'}" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
/Users/****/.local/share/virtualenvs/*****-6FWU2-1k/lib/python3.7/site-packages/luigi/parameter.py:284: UserWarning: Parameter "task_process_context" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
DEBUG: Checking if TestCase1(text=sample text, dic={'key1': 'value1', 'key2': 'value2'}) is complete
INFO: Informed scheduler that task   TestCase1___key1____value1_sample_text_7b550efa06   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 64804] Worker Worker(salt=932163268, workers=1, host=CA1952.local, username=****, pid=64804) running   TestCase1(text=sample text, dic={'key1': 'value1', 'key2': 'value2'})
INFO: [pid 64804] Worker Worker(salt=932163268, workers=1, host=CA1952.local, username=****, pid=64804) done      TestCase1(text=sample text, dic={'key1': 'value1', 'key2': 'value2'})
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   TestCase1___key1____value1_sample_text_7b550efa06   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=932163268, workers=1, host=CA1952.local, username=****, pid=64804) was stopped. Shutting down Keep-Alive thread
ERROR: Uncaught exception in luigi
Traceback (most recent call last):
  File "/Users/****/.local/share/virtualenvs/*****-6FWU2-1k/lib/python3.7/site-packages/luigi/retcodes.py", line 74, in run_with_retcodes
    worker = luigi.interface._run(argv)['worker']
  File "/Users/****/.local/share/virtualenvs/*****-6FWU2-1k/lib/python3.7/site-packages/luigi/interface.py", line 248, in _run
    return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory)
  File "/Users/****/.local/share/virtualenvs/*****-6FWU2-1k/lib/python3.7/site-packages/luigi/interface.py", line 208, in _schedule_and_run
    logger.info(execution_summary.summary(worker))
  File "/Users/****/.local/share/virtualenvs/*****-6FWU2-1k/lib/python3.7/site-packages/luigi/execution_summary.py", line 424, in summary
    return _summary_wrap(_summary_format(_summary_dict(worker), worker))
  File "/Users/****/.local/share/virtualenvs/*****-6FWU2-1k/lib/python3.7/site-packages/luigi/execution_summary.py", line 358, in _summary_format
    str_output += '{0}\n'.format(_get_str(group_tasks[status], status in _PENDING_SUB_STATUSES))
  File "/Users/****/.local/share/virtualenvs/*****-6FWU2-1k/lib/python3.7/site-packages/luigi/execution_summary.py", line 148, in _get_str
    params = _get_set_of_params(tasks)
  File "/Users/****/.local/share/virtualenvs/*****-6FWU2-1k/lib/python3.7/site-packages/luigi/execution_summary.py", line 191, in _get_set_of_params
    params[param] = {getattr(task, param[0]) for task in tasks}
  File "/Users/****/.local/share/virtualenvs/*****-6FWU2-1k/lib/python3.7/site-packages/luigi/execution_summary.py", line 191, in <setcomp>
    params[param] = {getattr(task, param[0]) for task in tasks}
TypeError: unhashable type: 'dict'

Result:

Outputs failure on CLI but task outputs expected text file.
cat case1.txt
-> text: sample text, dict: {'key1': 'value1', 'key2': 'value2'}

Case 2.

Parsing with luigi.Parameter inside luigi.Config .

Command:

CLI Output:

/Users/****/.local/share/virtualenvs/*****-6FWU2-1k/lib/python3.7/site-packages/luigi/parameter.py:284: UserWarning: Parameter "task_process_context" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
DEBUG: Checking if TestCase2(text=sample text) is complete
INFO: Informed scheduler that task   TestCase2_sample_text_543f36b081   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 65217] Worker Worker(salt=964552474, workers=1, host=CA1952.local, username=****, pid=65217) running   TestCase2(text=sample text)
DEBUG: Not all parameter values are hashable so instance isn't coming from the cache
/Users/****/.local/share/virtualenvs/*****-6FWU2-1k/lib/python3.7/site-packages/luigi/parameter.py:284: UserWarning: Parameter "dic" with value "{'ckey1': 'value1', 'ckey2': 'value2'}" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
INFO: [pid 65217] Worker Worker(salt=964552474, workers=1, host=CA1952.local, username=****, pid=65217) done      TestCase2(text=sample text)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   TestCase2_sample_text_543f36b081   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=964552474, workers=1, host=CA1952.local, username=****, pid=65217) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 TestCase2(text=sample text)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

Result:

CLI outputs Warning but works as expected.

cat case2.txt
-> text: sample text, dict: {'ckey1': 'value1', 'ckey2': 'value2'}

Case3.

Parsing with luigi.DictParameter inside luigi.Config.

Command:

PYTHONPATH='.' LUIGI_CONFIG_PARSER='toml' LUIGI_CONFIG_PATH='config.toml' pipenv run luigi --module test TestCase3 --local-scheduler

CLI Output:

/Users/****/.local/share/virtualenvs/*****-6FWU2-1k/lib/python3.7/site-packages/luigi/parameter.py:284: UserWarning: Parameter "task_process_context" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
DEBUG: Checking if TestCase3(text=sample text) is complete
INFO: Informed scheduler that task   TestCase3_sample_text_543f36b081   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 65260] Worker Worker(salt=050177714, workers=1, host=CA1952.local, username=****, pid=65260) running   TestCase3(text=sample text)
ERROR: [pid 65260] Worker Worker(salt=050177714, workers=1, host=CA1952.local, username=****, pid=65260) failed    TestCase3(text=sample text)
Traceback (most recent call last):
  File "/Users/****/.local/share/virtualenvs/*****-6FWU2-1k/lib/python3.7/site-packages/luigi/worker.py", line 199, in run
    new_deps = self._run_get_new_deps()
  File "/Users/****/.local/share/virtualenvs/*****-6FWU2-1k/lib/python3.7/site-packages/luigi/worker.py", line 139, in _run_get_new_deps
    task_gen = self.task.run()
  File "/Users/****/tmp/*****/test.py", line 52, in run
    dic = TaskConfig3().dic
  File "/Users/****/.local/share/virtualenvs/*****-6FWU2-1k/lib/python3.7/site-packages/luigi/task_register.py", line 88, in __call__
    param_values = cls.get_param_values(params, args, kwargs)
  File "/Users/****/.local/share/virtualenvs/*****-6FWU2-1k/lib/python3.7/site-packages/luigi/task.py", line 420, in get_param_values
    if not param_obj.has_task_value(task_family, param_name):
  File "/Users/****/.local/share/virtualenvs/*****-6FWU2-1k/lib/python3.7/site-packages/luigi/parameter.py", line 229, in has_task_value
    return self._get_value(task_name, param_name) != _no_value
  File "/Users/****/.local/share/virtualenvs/*****-6FWU2-1k/lib/python3.7/site-packages/luigi/parameter.py", line 200, in _get_value
    for value, warn in self._value_iterator(task_name, param_name):
  File "/Users/****/.local/share/virtualenvs/*****-6FWU2-1k/lib/python3.7/site-packages/luigi/parameter.py", line 218, in _value_iterator
    yield (self._get_value_from_config(task_name, param_name), None)
  File "/Users/****/.local/share/virtualenvs/*****-6FWU2-1k/lib/python3.7/site-packages/luigi/parameter.py", line 197, in _get_value_from_config
    return self.parse(value)
  File "/Users/****/.local/share/virtualenvs/*****-6FWU2-1k/lib/python3.7/site-packages/luigi/parameter.py", line 991, in parse
    return json.loads(s, object_pairs_hook=_FrozenOrderedDict)
  File "/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/json/__init__.py", line 341, in loads
    raise TypeError(f'the JSON object must be str, bytes or bytearray, '
TypeError: the JSON object must be str, bytes or bytearray, not dict
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   TestCase3_sample_text_543f36b081   has status   FAILED
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 1 pending tasks possibly being run by other workers
DEBUG: There are 1 pending tasks unique to this worker
DEBUG: There are 1 pending tasks last scheduled by this worker
INFO: Worker Worker(salt=050177714, workers=1, host=CA1952.local, username=****, pid=65260) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 failed:
    - 1 TestCase3(text=sample text)

This progress looks :( because there were failed tasks

===== Luigi Execution Summary =====

Result:

Task fails and produces no outputs.

@dlstadther
Copy link
Collaborator

dlstadther commented Sep 28, 2018

Thanks for opening this bug @adamist521

@orsinium Can you look into this since you added the TOML config support?

@orsinium
Copy link
Contributor

@dlstadther, assign me this task, please. I'll debug this later.

@adamist521, thank you for detailed report. This is very helpful, I guess.

@orsinium
Copy link
Contributor

orsinium commented Sep 28, 2018

@adamist521, you can use fixes from #2540. Your test cases works for it. Thank you for issue submission :)

@stale
Copy link

stale bot commented Jan 30, 2019

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. If closed, you may revisit when your time allows and reopen! Thank you for your contributions.

@stale stale bot added the wontfix label Jan 30, 2019
@orsinium
Copy link
Contributor

We have to merge #2540...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants