Skip to content

Commit

Permalink
Merge pull request #16 from plone/zope-root-auto-csrf
Browse files Browse the repository at this point in the history
get auto-csrf protection working on the zope root
  • Loading branch information
vangheem committed Sep 22, 2015
2 parents fb21786 + 70b58a3 commit 7979ee6
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 85 deletions.
3 changes: 2 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ Changelog
3.0.9 (unreleased)
------------------

- Nothing changed yet.
- get auto-csrf protection working on the zope root
[vangheem]


3.0.8 (2015-09-20)
Expand Down
17 changes: 9 additions & 8 deletions plone/protect/authenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ def _is_equal(val1, val2):
return result == 0


def _getKeyring(username):
manager = getUtility(IKeyManager)
def _getKeyring(username, manager=None):
if manager is None:
manager = getUtility(IKeyManager)
if username == ANONYMOUS_USER:
try:
ring = manager[u'_anon']
Expand All @@ -59,7 +60,7 @@ def _getKeyring(username):
return ring


def _verify_request(request, extra='', name='_authenticator'):
def _verify_request(request, extra='', name='_authenticator', manager=None):
auth = request.get(name)
if auth is None:
auth = request.getHeader('X-CSRF-TOKEN')
Expand All @@ -70,7 +71,7 @@ def _verify_request(request, extra='', name='_authenticator'):
auth = auth[0]

user = _getUserName()
ring = _getKeyring(user)
ring = _getKeyring(user, manager=manager)

for key in ring:
if key is None:
Expand All @@ -85,9 +86,9 @@ def _verify_request(request, extra='', name='_authenticator'):
_verify = _verify_request


def createToken(extra=''):
def createToken(extra='', manager=None):
user = _getUserName()
ring = _getKeyring(user)
ring = _getKeyring(user, manager=manager)
secret = ring.random()
return hmac.new(secret, user + extra, sha).hexdigest()

Expand All @@ -106,9 +107,9 @@ def verify(self, extra='', name="_authenticator"):
return _verify_request(self.request, extra=extra, name=name)


def check(request, extra='', name="_authenticator"):
def check(request, extra='', name="_authenticator", manager=None):
if isinstance(request, HTTPRequest):
if not _verify_request(request, extra=extra, name=name):
if not _verify_request(request, extra=extra, name=name, manager=manager):
raise Forbidden('Form authenticator is invalid.')


Expand Down
57 changes: 14 additions & 43 deletions plone/protect/auto.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from AccessControl import getSecurityManager
from Acquisition import aq_parent
from OFS.interfaces import IApplication
from lxml import etree
from plone.keyring.interfaces import IKeyManager
from plone.portlets.interfaces import IPortletAssignment
Expand All @@ -16,40 +15,28 @@
from plone.protect.authenticator import isAnonymousUser
from plone.protect.interfaces import IConfirmView
from plone.protect.interfaces import IDisableCSRFProtection
from plone.protect.utils import SAFE_WRITE_KEY
from plone.protect.utils import getRoot
from plone.protect.utils import getRootKeyManager
from plone.protect.utils import safeWrite # noqa b/w compat import
from plone.transformchain.interfaces import ITransform
from repoze.xmliter.utils import getHTMLSerializer
import transaction
import types
from zExceptions import Forbidden
from zope.component import ComponentLookupError
from zope.component import adapts
from zope.component import getUtility
from zope.component.hooks import getSite
from zope.globalrequest import getRequest
from zope.interface import implements, Interface

LOGGER = logging.getLogger('plone.protect')

SAFE_WRITE_KEY = 'plone.protect.safe_oids'

X_FRAME_OPTIONS = os.environ.get('PLONE_X_FRAME_OPTIONS', 'SAMEORIGIN')
CSRF_DISABLED = os.environ.get('PLONE_CSRF_DISABLED', 'false') == 'true'


def safeWrite(obj, request=None):
if request is None:
request = getRequest()
if request is None:
LOGGER.debug('could not mark object as a safe write')
return
if SAFE_WRITE_KEY not in request.environ:
request.environ[SAFE_WRITE_KEY] = []
try:
if obj._p_oid not in request.environ[SAFE_WRITE_KEY]:
request.environ[SAFE_WRITE_KEY].append(obj._p_oid)
except AttributeError:
LOGGER.debug('object you attempted to mark safe does not have an oid')


class ProtectTransform(object):
"""
XXX Need to be extremely careful with everything we do in here
Expand Down Expand Up @@ -103,6 +90,7 @@ def transformUnicode(self, result, encoding):
def transformIterable(self, result, encoding):
"""Apply the transform if required
"""

# before anything, do the clickjacking protection
self.request.response.setHeader('X-Frame-Options', X_FRAME_OPTIONS)

Expand Down Expand Up @@ -130,7 +118,8 @@ def transformIterable(self, result, encoding):
try:
self.key_manager = getUtility(IKeyManager)
except ComponentLookupError:
pass
root = getRoot(context)
self.key_manager = getRootKeyManager(root)

if self.site is None and self.key_manager is None:
# key manager not installed and no site object.
Expand All @@ -146,6 +135,8 @@ def transformIterable(self, result, encoding):

def getContext(self):
published = self.request.get('PUBLISHED')
if isinstance(published, types.MethodType):
return published.im_self
return aq_parent(published)

def check(self):
Expand Down Expand Up @@ -176,16 +167,11 @@ def _check(self):
not IDisableCSRFProtection.providedBy(self.request):
# Okay, we're writing here, we need to protect!
try:
check(self.request)
check(self.request, manager=self.key_manager)
return True
except ComponentLookupError:
# okay, it's possible we're at the zope root and the KeyManager
# hasn't been installed yet. Let's check and carry on
# if this is the case
if IApplication.providedBy(self.getContext()):
LOGGER.info('skipping csrf protection on zope root until '
'keymanager gets installed')
return True
LOGGER.info('Can not find key manager for CSRF protection. '
'This should not happen.')
raise
except Forbidden:
# XXX
Expand Down Expand Up @@ -250,7 +236,7 @@ def transform(self, result):
root = result.tree.getroot()
url = urlparse(self.request.URL)
try:
token = createToken()
token = createToken(manager=self.key_manager)
except ComponentLookupError:
if self.site is not None:
LOGGER.warn('Keyring not found on site. This should not happen', exc_info=True)
Expand Down Expand Up @@ -282,18 +268,3 @@ def transform(self, result):
form.append(hidden)

return result

def __call__(self, result, encoding):
if CSRF_DISABLED:
return result
if isinstance(result, unicode):
newResult = self.transformUnicode(result, encoding)
elif isinstance(result, str):
newResult = self.transformBytes(result, encoding)
else:
newResult = self.transformIterable(result, encoding)

if newResult is not None:
result = newResult

return result
29 changes: 14 additions & 15 deletions plone/protect/subscribers.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
from Acquisition import aq_parent
import logging
import time

from Products.PluggableAuthService.interfaces.events import IUserLoggedInEvent
from plone.keyring.interfaces import IKeyManager
from plone.protect.interfaces import IDisableCSRFProtection
from Products.PluggableAuthService.interfaces.events import IUserLoggedInEvent
from zope.component import ComponentLookupError
from zope.component import adapter
from zope.component import getUtility
from zope.component import getSiteManager
from zope.component.hooks import getSite
from zope.globalrequest import getRequest
from zope.interface import alsoProvides
from plone.protect.utils import getRootKeyManager
from plone.protect.utils import getRoot


import time
import logging
LOGGER = logging.getLogger('plone.protect')

_ring_rotation_schedules = (
Expand Down Expand Up @@ -38,25 +41,21 @@ def onUserLogsIn(event):
since we already write to the database when a user logs in,
let's check for key rotation here
"""
try:
# disable csrf protection on login requests
req = event.object.REQUEST
alsoProvides(req, IDisableCSRFProtection)
except AttributeError:
req = None
# disable csrf protection on login requests
req = getRequest()
alsoProvides(req, IDisableCSRFProtection)

try:
manager = getUtility(IKeyManager)
_rotate(manager)
# also check rotation of zope root keyring
app = aq_parent(getSite())
sm = getSiteManager(app)
manager = sm.getUtility(IKeyManager)
root = getRoot(getSite())
manager = getRootKeyManager(root)
if manager:
_rotate(manager)
except ComponentLookupError:
if req:
url = req.URL
else:
url = ''
LOGGER.warn('cannot find key manager for url %s' % url)
LOGGER.warn('cannot find key manager for url %s' % url)
48 changes: 33 additions & 15 deletions plone/protect/tests/testAuto.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from plone.app.testing import TEST_USER_NAME
from plone.app.testing import TEST_USER_PASSWORD
from plone.app.testing import SITE_OWNER_NAME
from plone.app.testing import SITE_OWNER_PASSWORD
from plone.app.testing import login
from plone.app.testing import logout
from plone.keyring.interfaces import IKeyManager
Expand All @@ -15,23 +17,9 @@
from zope.component import getUtility


class AutoCSRFProtectTests(unittest.TestCase):
class _BaseAutoTest(object):
layer = PROTECT_FUNCTIONAL_TESTING

def setUp(self):
self.portal = self.layer['portal']
self.browser = Browser(self.layer['app'])
self.request = self.layer['request']
login(self.portal, TEST_USER_NAME)
self.open('login_form')
self.browser.getControl(name='__ac_name').value = TEST_USER_NAME
self.browser.getControl(
name='__ac_password').value = TEST_USER_PASSWORD
self.browser.getControl(name='submit').click()

def open(self, path):
self.browser.open(self.portal.absolute_url() + '/' + path)

def test_adds_csrf_protection_input(self):
self.open('test-unprotected')
self.assertTrue('name="_authenticator"' in self.browser.contents)
Expand Down Expand Up @@ -96,6 +84,23 @@ def test_forbidden_raised_if_auth_failure(self):
self.browser.getControl('submit1').click()
self.assertFalse(hasattr(self.portal, "foo"))


class AutoCSRFProtectTests(unittest.TestCase, _BaseAutoTest):

def setUp(self):
self.portal = self.layer['portal']
self.browser = Browser(self.layer['app'])
self.request = self.layer['request']
login(self.portal, TEST_USER_NAME)
self.browser.open(self.portal.absolute_url() + '/login_form')
self.browser.getControl(name='__ac_name').value = TEST_USER_NAME
self.browser.getControl(
name='__ac_password').value = TEST_USER_PASSWORD
self.browser.getControl(name='submit').click()

def open(self, path):
self.browser.open(self.portal.absolute_url() + '/' + path)

def test_CSRF_header(self):
self.request.environ['HTTP_X_CSRF_TOKEN'] = createToken()
view = AuthenticatorView(None, self.request)
Expand All @@ -117,6 +122,19 @@ def test_only_add_auth_when_user_logged_in(self):
pass


class TestRoot(unittest.TestCase, _BaseAutoTest):

def setUp(self):
self.portal = self.layer['portal']
self.browser = Browser(self.layer['app'])
self.request = self.layer['request']
self.browser.addHeader(
'Authorization', 'Basic %s:%s' % (SITE_OWNER_NAME, SITE_OWNER_PASSWORD,))

def open(self, path):
self.browser.open(self.portal.aq_parent.absolute_url() + '/' + path)


class AutoRotateTests(unittest.TestCase):
layer = PROTECT_FUNCTIONAL_TESTING

Expand Down
Loading

0 comments on commit 7979ee6

Please sign in to comment.