Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix params hashing #2540

Merged
merged 8 commits into from
Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions luigi/configuration/toml_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
toml = False

from .base_parser import BaseParser
from ..freezing import recursively_freeze
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we referencing relative paths? can we not from luigi.configuration.freezing import recursively_freeze?



class LuigiTomlParser(BaseParser):
Expand Down Expand Up @@ -51,6 +52,13 @@ def read(self, config_paths):
for path in config_paths:
if os.path.isfile(path):
self.data = self._update_data(self.data, toml.load(path))

# freeze dict params
for section, content in self.data.items():
for key, value in content.items():
if isinstance(value, dict):
self.data[section][key] = recursively_freeze(value)

return self.data

def get(self, section, option, default=NO_DEFAULT, **kwargs):
Expand Down
54 changes: 54 additions & 0 deletions luigi/freezing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Internal-only module with immutable data structures.

Please, do not use it outside of Luigi codebase itself.
"""


from collections import OrderedDict, Mapping
orsinium marked this conversation as resolved.
Show resolved Hide resolved
import operator
import functools


class FrozenOrderedDict(Mapping):
orsinium marked this conversation as resolved.
Show resolved Hide resolved
"""
It is an immutable wrapper around ordered dictionaries that implements the complete :py:class:`collections.Mapping`
interface. It can be used as a drop-in replacement for dictionaries where immutability and ordering are desired.
"""

def __init__(self, *args, **kwargs):
self.__dict = OrderedDict(*args, **kwargs)
self.__hash = None

def __getitem__(self, key):
return self.__dict[key]

def __iter__(self):
return iter(self.__dict)

def __len__(self):
return len(self.__dict)

def __repr__(self):
# We should use short representation for beautiful console output
return repr(dict(self.__dict))

def __hash__(self):
if self.__hash is None:
hashes = map(hash, self.items())
self.__hash = functools.reduce(operator.xor, hashes, 0)

return self.__hash

def get_wrapped(self):
return self.__dict


def recursively_freeze(value):
"""
Recursively walks ``Mapping``s and ``list``s and converts them to ``FrozenOrderedDict`` and ``tuples``, respectively.
"""
if isinstance(value, Mapping):
return FrozenOrderedDict(((k, recursively_freeze(v)) for k, v in value.items()))
elif isinstance(value, list) or isinstance(value, tuple):
return tuple(recursively_freeze(v) for v in value)
return value
74 changes: 15 additions & 59 deletions luigi/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,7 @@
from enum import IntEnum
import json
from json import JSONEncoder
from collections import OrderedDict
try:
from collections.abc import Mapping
except ImportError:
from collections import Mapping
import operator
import functools
from ast import literal_eval

try:
Expand All @@ -46,6 +40,9 @@
from luigi import configuration
from luigi.cmdline_parser import CmdlineParser

from .freezing import recursively_freeze, FrozenOrderedDict


_no_value = object()


Expand Down Expand Up @@ -893,57 +890,13 @@ def serialize(self, e):
return e.name


class _FrozenOrderedDict(Mapping):
"""
It is an immutable wrapper around ordered dictionaries that implements the complete :py:class:`collections.Mapping`
interface. It can be used as a drop-in replacement for dictionaries where immutability and ordering are desired.
"""

def __init__(self, *args, **kwargs):
self.__dict = OrderedDict(*args, **kwargs)
self.__hash = None

def __getitem__(self, key):
return self.__dict[key]

def __iter__(self):
return iter(self.__dict)

def __len__(self):
return len(self.__dict)

def __repr__(self):
return '<FrozenOrderedDict %s>' % repr(self.__dict)

def __hash__(self):
if self.__hash is None:
hashes = map(hash, self.items())
self.__hash = functools.reduce(operator.xor, hashes, 0)

return self.__hash

def get_wrapped(self):
return self.__dict


def _recursively_freeze(value):
"""
Recursively walks ``Mapping``s and ``list``s and converts them to ``_FrozenOrderedDict`` and ``tuples``, respectively.
"""
if isinstance(value, Mapping):
return _FrozenOrderedDict(((k, _recursively_freeze(v)) for k, v in value.items()))
elif isinstance(value, list) or isinstance(value, tuple):
return tuple(_recursively_freeze(v) for v in value)
return value


class _DictParamEncoder(JSONEncoder):
"""
JSON encoder for :py:class:`~DictParameter`, which makes :py:class:`~_FrozenOrderedDict` JSON serializable.
JSON encoder for :py:class:`~DictParameter`, which makes :py:class:`~FrozenOrderedDict` JSON serializable.
"""

def default(self, obj):
if isinstance(obj, _FrozenOrderedDict):
if isinstance(obj, FrozenOrderedDict):
return obj.get_wrapped()
return json.JSONEncoder.default(self, obj)

Expand Down Expand Up @@ -983,11 +936,11 @@ def run(self):

def normalize(self, value):
"""
Ensure that dictionary parameter is converted to a _FrozenOrderedDict so it can be hashed.
Ensure that dictionary parameter is converted to a FrozenOrderedDict so it can be hashed.
"""
return _recursively_freeze(value)
return recursively_freeze(value)

def parse(self, s):
def parse(self, source):
"""
Parses an immutable and ordered ``dict`` from a JSON string using standard JSON library.

Expand All @@ -998,7 +951,10 @@ def parse(self, s):

:param s: String to be parse
"""
return json.loads(s, object_pairs_hook=_FrozenOrderedDict)
# TOML based config convert params to python types itself.
if not isinstance(source, six.string_types):
return source
return json.loads(source, object_pairs_hook=FrozenOrderedDict)

def serialize(self, x):
return json.dumps(x, cls=_DictParamEncoder)
Expand Down Expand Up @@ -1042,7 +998,7 @@ def normalize(self, x):
:param str x: the value to parse.
:return: the normalized (hashable/immutable) value.
"""
return _recursively_freeze(x)
return recursively_freeze(x)

def parse(self, x):
"""
Expand All @@ -1051,7 +1007,7 @@ def parse(self, x):
:param str x: the value to parse.
:return: the parsed value.
"""
return list(json.loads(x, object_pairs_hook=_FrozenOrderedDict))
return list(json.loads(x, object_pairs_hook=FrozenOrderedDict))

def serialize(self, x):
"""
Expand Down Expand Up @@ -1114,7 +1070,7 @@ def parse(self, x):
# ast.literal_eval(t_str) == t
try:
# loop required to parse tuple of tuples
return tuple(tuple(x) for x in json.loads(x, object_pairs_hook=_FrozenOrderedDict))
return tuple(tuple(x) for x in json.loads(x, object_pairs_hook=FrozenOrderedDict))
except (ValueError, TypeError):
return tuple(literal_eval(x)) # if this causes an error, let that error be raised.

Expand Down
3 changes: 3 additions & 0 deletions test/testconfig/luigi.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ logging_conf_file = "test/testconfig/logging.cfg"
client = "hadoopcli"
snakebite_autoconfig = false
namenode_host = "must be overridden in local config"

[SomeTask]
param = {key1 = "value1", key2 = "value2"}
orsinium marked this conversation as resolved.
Show resolved Hide resolved