From 70b58a3fdb3034be5bb41c1d9540806c168537bd Mon Sep 17 00:00:00 2001 From: vangheem Date: Mon, 21 Sep 2015 23:19:41 -0500 Subject: [PATCH] get auto-csrf protection working on the zope root --- CHANGES.rst | 3 +- plone/protect/authenticator.py | 17 +++++----- plone/protect/auto.py | 57 ++++++++------------------------- plone/protect/subscribers.py | 29 ++++++++--------- plone/protect/tests/testAuto.py | 48 ++++++++++++++++++--------- plone/protect/utils.py | 46 ++++++++++++++++++++++++-- 6 files changed, 115 insertions(+), 85 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 37f8d6a..e10e034 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -4,7 +4,8 @@ Changelog 3.0.9 (unreleased) ------------------ -- Nothing changed yet. +- get auto-csrf protection working on the zope root + [vangheem] 3.0.8 (2015-09-20) diff --git a/plone/protect/authenticator.py b/plone/protect/authenticator.py index 066f812..2267ba7 100644 --- a/plone/protect/authenticator.py +++ b/plone/protect/authenticator.py @@ -42,8 +42,9 @@ def _is_equal(val1, val2): return result == 0 -def _getKeyring(username): - manager = getUtility(IKeyManager) +def _getKeyring(username, manager=None): + if manager is None: + manager = getUtility(IKeyManager) if username == ANONYMOUS_USER: try: ring = manager[u'_anon'] @@ -59,7 +60,7 @@ def _getKeyring(username): return ring -def _verify_request(request, extra='', name='_authenticator'): +def _verify_request(request, extra='', name='_authenticator', manager=None): auth = request.get(name) if auth is None: auth = request.getHeader('X-CSRF-TOKEN') @@ -70,7 +71,7 @@ def _verify_request(request, extra='', name='_authenticator'): auth = auth[0] user = _getUserName() - ring = _getKeyring(user) + ring = _getKeyring(user, manager=manager) for key in ring: if key is None: @@ -85,9 +86,9 @@ def _verify_request(request, extra='', name='_authenticator'): _verify = _verify_request -def createToken(extra=''): +def createToken(extra='', manager=None): user = _getUserName() - ring = _getKeyring(user) + ring = _getKeyring(user, manager=manager) secret = ring.random() return hmac.new(secret, user + extra, sha).hexdigest() @@ -106,9 +107,9 @@ def verify(self, extra='', name="_authenticator"): return _verify_request(self.request, extra=extra, name=name) -def check(request, extra='', name="_authenticator"): +def check(request, extra='', name="_authenticator", manager=None): if isinstance(request, HTTPRequest): - if not _verify_request(request, extra=extra, name=name): + if not _verify_request(request, extra=extra, name=name, manager=manager): raise Forbidden('Form authenticator is invalid.') diff --git a/plone/protect/auto.py b/plone/protect/auto.py index 6374558..c27051b 100644 --- a/plone/protect/auto.py +++ b/plone/protect/auto.py @@ -7,7 +7,6 @@ from AccessControl import getSecurityManager from Acquisition import aq_parent -from OFS.interfaces import IApplication from lxml import etree from plone.keyring.interfaces import IKeyManager from plone.portlets.interfaces import IPortletAssignment @@ -16,40 +15,28 @@ from plone.protect.authenticator import isAnonymousUser from plone.protect.interfaces import IConfirmView from plone.protect.interfaces import IDisableCSRFProtection +from plone.protect.utils import SAFE_WRITE_KEY +from plone.protect.utils import getRoot +from plone.protect.utils import getRootKeyManager +from plone.protect.utils import safeWrite # noqa b/w compat import from plone.transformchain.interfaces import ITransform from repoze.xmliter.utils import getHTMLSerializer import transaction +import types from zExceptions import Forbidden from zope.component import ComponentLookupError from zope.component import adapts from zope.component import getUtility from zope.component.hooks import getSite -from zope.globalrequest import getRequest from zope.interface import implements, Interface LOGGER = logging.getLogger('plone.protect') -SAFE_WRITE_KEY = 'plone.protect.safe_oids' X_FRAME_OPTIONS = os.environ.get('PLONE_X_FRAME_OPTIONS', 'SAMEORIGIN') CSRF_DISABLED = os.environ.get('PLONE_CSRF_DISABLED', 'false') == 'true' -def safeWrite(obj, request=None): - if request is None: - request = getRequest() - if request is None: - LOGGER.debug('could not mark object as a safe write') - return - if SAFE_WRITE_KEY not in request.environ: - request.environ[SAFE_WRITE_KEY] = [] - try: - if obj._p_oid not in request.environ[SAFE_WRITE_KEY]: - request.environ[SAFE_WRITE_KEY].append(obj._p_oid) - except AttributeError: - LOGGER.debug('object you attempted to mark safe does not have an oid') - - class ProtectTransform(object): """ XXX Need to be extremely careful with everything we do in here @@ -103,6 +90,7 @@ def transformUnicode(self, result, encoding): def transformIterable(self, result, encoding): """Apply the transform if required """ + # before anything, do the clickjacking protection self.request.response.setHeader('X-Frame-Options', X_FRAME_OPTIONS) @@ -130,7 +118,8 @@ def transformIterable(self, result, encoding): try: self.key_manager = getUtility(IKeyManager) except ComponentLookupError: - pass + root = getRoot(context) + self.key_manager = getRootKeyManager(root) if self.site is None and self.key_manager is None: # key manager not installed and no site object. @@ -146,6 +135,8 @@ def transformIterable(self, result, encoding): def getContext(self): published = self.request.get('PUBLISHED') + if isinstance(published, types.MethodType): + return published.im_self return aq_parent(published) def check(self): @@ -176,16 +167,11 @@ def _check(self): not IDisableCSRFProtection.providedBy(self.request): # Okay, we're writing here, we need to protect! try: - check(self.request) + check(self.request, manager=self.key_manager) return True except ComponentLookupError: - # okay, it's possible we're at the zope root and the KeyManager - # hasn't been installed yet. Let's check and carry on - # if this is the case - if IApplication.providedBy(self.getContext()): - LOGGER.info('skipping csrf protection on zope root until ' - 'keymanager gets installed') - return True + LOGGER.info('Can not find key manager for CSRF protection. ' + 'This should not happen.') raise except Forbidden: # XXX @@ -250,7 +236,7 @@ def transform(self, result): root = result.tree.getroot() url = urlparse(self.request.URL) try: - token = createToken() + token = createToken(manager=self.key_manager) except ComponentLookupError: if self.site is not None: LOGGER.warn('Keyring not found on site. This should not happen', exc_info=True) @@ -282,18 +268,3 @@ def transform(self, result): form.append(hidden) return result - - def __call__(self, result, encoding): - if CSRF_DISABLED: - return result - if isinstance(result, unicode): - newResult = self.transformUnicode(result, encoding) - elif isinstance(result, str): - newResult = self.transformBytes(result, encoding) - else: - newResult = self.transformIterable(result, encoding) - - if newResult is not None: - result = newResult - - return result diff --git a/plone/protect/subscribers.py b/plone/protect/subscribers.py index 8b02f92..2589ca7 100644 --- a/plone/protect/subscribers.py +++ b/plone/protect/subscribers.py @@ -1,16 +1,19 @@ -from Acquisition import aq_parent +import logging +import time + +from Products.PluggableAuthService.interfaces.events import IUserLoggedInEvent from plone.keyring.interfaces import IKeyManager from plone.protect.interfaces import IDisableCSRFProtection -from Products.PluggableAuthService.interfaces.events import IUserLoggedInEvent from zope.component import ComponentLookupError from zope.component import adapter from zope.component import getUtility -from zope.component import getSiteManager from zope.component.hooks import getSite +from zope.globalrequest import getRequest from zope.interface import alsoProvides +from plone.protect.utils import getRootKeyManager +from plone.protect.utils import getRoot + -import time -import logging LOGGER = logging.getLogger('plone.protect') _ring_rotation_schedules = ( @@ -38,20 +41,16 @@ def onUserLogsIn(event): since we already write to the database when a user logs in, let's check for key rotation here """ - try: - # disable csrf protection on login requests - req = event.object.REQUEST - alsoProvides(req, IDisableCSRFProtection) - except AttributeError: - req = None + # disable csrf protection on login requests + req = getRequest() + alsoProvides(req, IDisableCSRFProtection) try: manager = getUtility(IKeyManager) _rotate(manager) # also check rotation of zope root keyring - app = aq_parent(getSite()) - sm = getSiteManager(app) - manager = sm.getUtility(IKeyManager) + root = getRoot(getSite()) + manager = getRootKeyManager(root) if manager: _rotate(manager) except ComponentLookupError: @@ -59,4 +58,4 @@ def onUserLogsIn(event): url = req.URL else: url = '' - LOGGER.warn('cannot find key manager for url %s' % url) + LOGGER.warn('cannot find key manager for url %s' % url) \ No newline at end of file diff --git a/plone/protect/tests/testAuto.py b/plone/protect/tests/testAuto.py index bbaedbf..33f19e5 100644 --- a/plone/protect/tests/testAuto.py +++ b/plone/protect/tests/testAuto.py @@ -1,5 +1,7 @@ from plone.app.testing import TEST_USER_NAME from plone.app.testing import TEST_USER_PASSWORD +from plone.app.testing import SITE_OWNER_NAME +from plone.app.testing import SITE_OWNER_PASSWORD from plone.app.testing import login from plone.app.testing import logout from plone.keyring.interfaces import IKeyManager @@ -15,23 +17,9 @@ from zope.component import getUtility -class AutoCSRFProtectTests(unittest.TestCase): +class _BaseAutoTest(object): layer = PROTECT_FUNCTIONAL_TESTING - def setUp(self): - self.portal = self.layer['portal'] - self.browser = Browser(self.layer['app']) - self.request = self.layer['request'] - login(self.portal, TEST_USER_NAME) - self.open('login_form') - self.browser.getControl(name='__ac_name').value = TEST_USER_NAME - self.browser.getControl( - name='__ac_password').value = TEST_USER_PASSWORD - self.browser.getControl(name='submit').click() - - def open(self, path): - self.browser.open(self.portal.absolute_url() + '/' + path) - def test_adds_csrf_protection_input(self): self.open('test-unprotected') self.assertTrue('name="_authenticator"' in self.browser.contents) @@ -96,6 +84,23 @@ def test_forbidden_raised_if_auth_failure(self): self.browser.getControl('submit1').click() self.assertFalse(hasattr(self.portal, "foo")) + +class AutoCSRFProtectTests(unittest.TestCase, _BaseAutoTest): + + def setUp(self): + self.portal = self.layer['portal'] + self.browser = Browser(self.layer['app']) + self.request = self.layer['request'] + login(self.portal, TEST_USER_NAME) + self.browser.open(self.portal.absolute_url() + '/login_form') + self.browser.getControl(name='__ac_name').value = TEST_USER_NAME + self.browser.getControl( + name='__ac_password').value = TEST_USER_PASSWORD + self.browser.getControl(name='submit').click() + + def open(self, path): + self.browser.open(self.portal.absolute_url() + '/' + path) + def test_CSRF_header(self): self.request.environ['HTTP_X_CSRF_TOKEN'] = createToken() view = AuthenticatorView(None, self.request) @@ -117,6 +122,19 @@ def test_only_add_auth_when_user_logged_in(self): pass +class TestRoot(unittest.TestCase, _BaseAutoTest): + + def setUp(self): + self.portal = self.layer['portal'] + self.browser = Browser(self.layer['app']) + self.request = self.layer['request'] + self.browser.addHeader( + 'Authorization', 'Basic %s:%s' % (SITE_OWNER_NAME, SITE_OWNER_PASSWORD,)) + + def open(self, path): + self.browser.open(self.portal.aq_parent.absolute_url() + '/' + path) + + class AutoRotateTests(unittest.TestCase): layer = PROTECT_FUNCTIONAL_TESTING diff --git a/plone/protect/utils.py b/plone/protect/utils.py index 738cc67..8476fb9 100644 --- a/plone/protect/utils.py +++ b/plone/protect/utils.py @@ -1,8 +1,15 @@ +import inspect +import logging +from Acquisition import aq_parent from AccessControl.requestmethod import _buildFacade +from plone.keyring.keymanager import KeyManager from plone.protect.authenticator import createToken from zope.globalrequest import getRequest -import inspect +from OFS.interfaces import IApplication + +SAFE_WRITE_KEY = 'plone.protect.safe_oids' +LOGGER = logging.getLogger('plone.protect') _default = [] @@ -59,7 +66,7 @@ def _curried(*args, **kw): return facade_globs[name] -def addTokenToUrl(url, req=None): +def addTokenToUrl(url, req=None, manager=None): if not url: return url if req is None: @@ -70,7 +77,7 @@ def addTokenToUrl(url, req=None): if '_auth_token' not in req.environ: # let's cache this value since this could be called # many times for one request - req.environ['_auth_token'] = createToken() + req.environ['_auth_token'] = createToken(manager=manager) token = req.environ['_auth_token'] if '_authenticator' not in url: @@ -80,3 +87,36 @@ def addTokenToUrl(url, req=None): url += '&' url += '_authenticator=' + token return url + + +def getRootKeyManager(root): + if not IApplication.providedBy(root): + return + try: + manager = root._key_manager + except AttributeError: + manager = root._key_manager = KeyManager() + safeWrite(root._key_manager) + safeWrite(root) + return manager + + +def getRoot(context): + while not IApplication.providedBy(context) and context is not None: + context = aq_parent(context) + return context + + +def safeWrite(obj, request=None): + if request is None: + request = getRequest() + if request is None: + LOGGER.debug('could not mark object as a safe write') + return + if SAFE_WRITE_KEY not in request.environ: + request.environ[SAFE_WRITE_KEY] = [] + try: + if obj._p_oid not in request.environ[SAFE_WRITE_KEY]: + request.environ[SAFE_WRITE_KEY].append(obj._p_oid) + except AttributeError: + LOGGER.debug('object you attempted to mark safe does not have an oid') \ No newline at end of file