Skip to content

Commit

Permalink
Fix race condition in Salt loader.
Browse files Browse the repository at this point in the history
There was a race condition in the salt loader when injecting global
values (e.g. "__pillar__" or "__salt__") into modules. One effect of
this race condition was that in a setup with multiple threads, some
threads may see pillar data intended for other threads or the pillar
data seen by a thread might even change spuriously.

There have been earlier attempts to fix this problem (saltstack#27937, saltstack#29397).
These patches tried to fix the problem by storing the dictionary that
keeps the relevant data in a thread-local variable and referencing this
thread-local variable from the variables that are injected into the
modules.

These patches did not fix the problem completely because they only
work when a module is loaded through a single loader instance only.
When there is more than one loader, there is more than one
thread-local variable and the variable injected into a module is
changed to point to another thread-local variable when the module is
loaded again. Thus, the problem resurfaced while working on saltstack#39670.

This patch attempts to solve the problem from a slightly different
angle, complementing the earlier patches: The value injected into the
modules now is a proxy that internally uses a thread-local variable to
decide to which object it points. This means that when loading a module
again through a different loader (possibly passing different pillar
data), the data is actually only changed in the thread in which the
loader is used. Other threads are not affected by such a change.

This means that it will work correctly in the current situation where
loaders are possibly created by many different modules and these
modules do not necessary know in which context they are executed. Thus
it is much more flexible and reliable than the more explicit approach
used by the two earlier patches.

Unfortunately, the stand JSON and Msgpack serialization code cannot
handle proxied objects, so they have to be unwrapped before passing them
to that code.

The salt.utils.json and salt.utils.msgpack modules have been modified to
take care of unwrapping objects that are proxied using the
ThreadLocalProxy.
  • Loading branch information
smarsching committed Nov 27, 2018
1 parent 0133353 commit c00d9a4
Show file tree
Hide file tree
Showing 5 changed files with 741 additions and 5 deletions.
74 changes: 73 additions & 1 deletion salt/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import logging
import inspect
import tempfile
import threading
import functools
import threading
import traceback
Expand All @@ -33,6 +34,7 @@
import salt.utils.lazy
import salt.utils.odict
import salt.utils.platform
import salt.utils.thread_local_proxy
import salt.utils.versions
from salt.exceptions import LoaderError
from salt.template import check_render_pipe_str
Expand Down Expand Up @@ -1007,6 +1009,76 @@ def _mod_type(module_path):
return 'ext'


def _inject_into_mod(mod, name, value, force_lock=False):
'''
Inject a variable into a module. This is used to inject "globals" like
``__salt__``, ``__pillar``, or ``grains``.
Instead of injecting the value directly, a ``ThreadLocalProxy`` is created.
If such a proxy is already present under the specified name, it is updated
with the new value. This update only affects the current thread, so that
the same name can refer to different values depending on the thread of
execution.
This is important for data that is not truly global. For example, pillar
data might be dynamically overriden through function parameters and thus
the actual values available in pillar might depend on the thread that is
calling a module.
mod:
module object into which the value is going to be injected.
name:
name of the variable that is injected into the module.
value:
value that is injected into the variable. The value is not injected
directly, but instead set as the new reference of the proxy that has
been created for the variable.
force_lock:
whether the lock should be acquired before checking whether a proxy
object for the specified name has already been injected into the
module. If ``False`` (the default), this function checks for the
module's variable without acquiring the lock and only acquires the lock
if a new proxy has to be created and injected.
'''
from salt.utils.thread_local_proxy import ThreadLocalProxy
old_value = getattr(mod, name, None)
# We use a double-checked locking scheme in order to avoid taking the lock
# when a proxy object has already been injected.
# In most programming languages, double-checked locking is considered
# unsafe when used without explicit memory barriers because one might read
# an uninitialized value. In CPython it is safe due to the global
# interpreter lock (GIL). In Python implementations that do not have the
# GIL, it could be unsafe, but at least Jython also guarantees that (for
# Python objects) memory is not corrupted when writing and reading without
# explicit synchronization
# (http://www.jython.org/jythonbook/en/1.0/Concurrency.html).
# Please note that in order to make this code safe in a runtime environment
# that does not make this guarantees, it is not sufficient. The
# ThreadLocalProxy must also be created with fallback_to_shared set to
# False or a lock must be added to the ThreadLocalProxy.
if force_lock:
with _inject_into_mod.lock:
if isinstance(old_value, ThreadLocalProxy):
ThreadLocalProxy.set_reference(old_value, value)
else:
setattr(mod, name, ThreadLocalProxy(value, True))
else:
if isinstance(old_value, ThreadLocalProxy):
ThreadLocalProxy.set_reference(old_value, value)
else:
_inject_into_mod(mod, name, value, True)


# Lock used when injecting globals. This is needed to avoid a race condition
# when two threads try to load the same module concurrently. This must be
# outside the loader because there might be more than one loader for the same
# namespace.
_inject_into_mod.lock = threading.RLock()


# TODO: move somewhere else?
class FilterDictWrapper(MutableMapping):
'''
Expand Down Expand Up @@ -1560,7 +1632,7 @@ def _load_module(self, name):

# pack whatever other globals we were asked to
for p_name, p_value in six.iteritems(self.pack):
setattr(mod, p_name, p_value)
_inject_into_mod(mod, p_name, p_value)

module_name = mod.__name__.rsplit('.', 1)[-1]

Expand Down
17 changes: 15 additions & 2 deletions salt/utils/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# Import Salt libs
import salt.utils.data
import salt.utils.stringutils
from salt.utils.thread_local_proxy import ThreadLocalProxy

# Import 3rd-party libs
from salt.ext import six
Expand Down Expand Up @@ -114,11 +115,17 @@ def dump(obj, fp, **kwargs):
using the _json_module argument)
'''
json_module = kwargs.pop('_json_module', json)
orig_enc_func = kwargs.pop('default', lambda x: x)

def _enc_func(obj):
obj = ThreadLocalProxy.unproxy(obj)
return orig_enc_func(obj)

if 'ensure_ascii' not in kwargs:
kwargs['ensure_ascii'] = False
if six.PY2:
obj = salt.utils.data.encode(obj)
return json_module.dump(obj, fp, **kwargs) # future lint: blacklisted-function
return json_module.dump(obj, fp, default=_enc_func, **kwargs) # future lint: blacklisted-function


def dumps(obj, **kwargs):
Expand All @@ -138,8 +145,14 @@ def dumps(obj, **kwargs):
'''
import sys
json_module = kwargs.pop('_json_module', json)
orig_enc_func = kwargs.pop('default', lambda x: x)

def _enc_func(obj):
obj = ThreadLocalProxy.unproxy(obj)
return orig_enc_func(obj)

if 'ensure_ascii' not in kwargs:
kwargs['ensure_ascii'] = False
if six.PY2:
obj = salt.utils.data.encode(obj)
return json_module.dumps(obj, **kwargs) # future lint: blacklisted-function
return json_module.dumps(obj, default=_enc_func, **kwargs) # future lint: blacklisted-function
19 changes: 17 additions & 2 deletions salt/utils/msgpack.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# Fall back to msgpack_pure
import msgpack_pure as msgpack # pylint: disable=import-error

# Import Salt libs
from salt.utils.thread_local_proxy import ThreadLocalProxy


def pack(o, stream, **kwargs):
'''
Expand All @@ -26,7 +29,13 @@ def pack(o, stream, **kwargs):
msgpack module using the _msgpack_module argument.
'''
msgpack_module = kwargs.pop('_msgpack_module', msgpack)
return msgpack_module.pack(o, stream, **kwargs)
orig_enc_func = kwargs.pop('default', lambda x: x)

def _enc_func(obj):
obj = ThreadLocalProxy.unproxy(obj)
return orig_enc_func(obj)

return msgpack_module.pack(o, stream, default=_enc_func, **kwargs)


def packb(o, **kwargs):
Expand All @@ -41,7 +50,13 @@ def packb(o, **kwargs):
msgpack module using the _msgpack_module argument.
'''
msgpack_module = kwargs.pop('_msgpack_module', msgpack)
return msgpack_module.packb(o, **kwargs)
orig_enc_func = kwargs.pop('default', lambda x: x)

def _enc_func(obj):
obj = ThreadLocalProxy.unproxy(obj)
return orig_enc_func(obj)

return msgpack_module.packb(o, default=_enc_func, **kwargs)


def unpack(stream, **kwargs):
Expand Down
Loading

0 comments on commit c00d9a4

Please sign in to comment.