Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Anonymous CSRF cookie fix, test fixes, and ANON_ALWAYS cleanup #11

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 20 additions & 35 deletions session_csrf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,7 @@ def process_request(self, request):
else:
request.csrf_token = request.session['csrf_token']
else:
key = None
token = ''
if ANON_COOKIE in request.COOKIES:
key = request.COOKIES[ANON_COOKIE]
token = cache.get(PREFIX + key, '')
if ANON_ALWAYS:
if not key:
key = django_csrf._get_new_csrf_key()
if not token:
token = django_csrf._get_new_csrf_key()
request._anon_csrf_key = key
cache.set(PREFIX + key, token, ANON_TIMEOUT)
request.csrf_token = token
request.csrf_token = '' # to be filled in later if applicable

def process_view(self, request, view_func, args, kwargs):
"""Check the CSRF token if this is a POST."""
Expand All @@ -73,6 +61,22 @@ def process_view(self, request, view_func, args, kwargs):
and not request.user.is_authenticated()):
return

if hasattr(request, 'user') and not request.user.is_authenticated():
if ANON_ALWAYS or getattr(view_func, 'anonymous_csrf', False):
key = None
token = ''
if ANON_COOKIE in request.COOKIES:
key = request.COOKIES[ANON_COOKIE]
token = cache.get(PREFIX + key, '')
if not key:
key = django_csrf._get_new_csrf_key()
if not token:
token = django_csrf._get_new_csrf_key()

request._anon_csrf_key = key
cache.set(PREFIX + key, token, ANON_TIMEOUT)
request.csrf_token = token

# Bail if this is a safe method.
if request.method in ('GET', 'HEAD', 'OPTIONS', 'TRACE'):
return self._accept(request)
Expand Down Expand Up @@ -104,34 +108,15 @@ def process_response(self, request, response):
if hasattr(request, '_anon_csrf_key'):
# Set or reset the cache and cookie timeouts.
response.set_cookie(ANON_COOKIE, request._anon_csrf_key,
max_age=ANON_TIMEOUT, httponly=True,
secure=request.is_secure())
httponly=True, secure=request.is_secure())
patch_vary_headers(response, ['Cookie'])
return response


def anonymous_csrf(f):
"""Decorator that assigns a CSRF token to an anonymous user."""
@functools.wraps(f)
def wrapper(request, *args, **kw):
use_anon_cookie = not (request.user.is_authenticated() or ANON_ALWAYS)
if use_anon_cookie:
if ANON_COOKIE in request.COOKIES:
key = request.COOKIES[ANON_COOKIE]
token = cache.get(PREFIX + key) or django_csrf._get_new_csrf_key()
else:
key = django_csrf._get_new_csrf_key()
token = django_csrf._get_new_csrf_key()
cache.set(PREFIX + key, token, ANON_TIMEOUT)
request.csrf_token = token
response = f(request, *args, **kw)
if use_anon_cookie:
# Set or reset the cache and cookie timeouts.
response.set_cookie(ANON_COOKIE, key, max_age=ANON_TIMEOUT,
httponly=True, secure=request.is_secure())
patch_vary_headers(response, ['Cookie'])
return response
return wrapper
f.anonymous_csrf = True
return f


def anonymous_csrf_exempt(f):
Expand Down
21 changes: 20 additions & 1 deletion session_csrf/tests.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import django.test
from django import http
from django.conf import settings
from django.conf.urls.defaults import patterns
from django.contrib.auth import logout
from django.contrib.auth.middleware import AuthenticationMiddleware
from django.contrib.auth.models import User
from django.contrib.sessions.models import Session
from django.core import signals
Expand All @@ -26,6 +28,7 @@


class TestCsrfToken(django.test.TestCase):
urls = 'session_csrf.tests'

def setUp(self):
self.client.handler = ClientHandler()
Expand Down Expand Up @@ -77,6 +80,14 @@ def setUp(self):
self.token = 'a' * 32
self.rf = django.test.RequestFactory()
self.mw = CsrfMiddleware()
self.save_ANON_ALWAYS = session_csrf.ANON_ALWAYS
session_csrf.ANON_ALWAYS = False
self.save_CSRF_FAILURE_VIEW = settings.CSRF_FAILURE_VIEW
settings.CSRF_FAILURE_VIEW = 'django.views.csrf.csrf_failure'

def tearDown(self):
session_csrf.ANON_ALWAYS = self.save_ANON_ALWAYS
settings.CSRF_FAILURE_VIEW = self.save_CSRF_FAILURE_VIEW

def process_view(self, request, view=None):
return self.mw.process_view(request, view, None, None)
Expand All @@ -92,7 +103,9 @@ def test_anon_token_from_cookie(self):
}
# Hack to set up request middleware.
ClientHandler()(self.rf._base_environ(**r))
self.mw.process_request(request)
auth_mw = AuthenticationMiddleware()
auth_mw.process_request(request)
self.mw.process_view(request, anonymous_csrf(lambda: None), [], {})
self.assertEqual(request.csrf_token, 'woo')

def test_set_csrftoken_once(self):
Expand Down Expand Up @@ -175,9 +188,12 @@ def setUp(self):
self.client.handler = ClientHandler(enforce_csrf_checks=True)
self.save_ANON_ALWAYS = session_csrf.ANON_ALWAYS
session_csrf.ANON_ALWAYS = False
self.save_CSRF_FAILURE_VIEW = settings.CSRF_FAILURE_VIEW
settings.CSRF_FAILURE_VIEW = 'django.views.csrf.csrf_failure'

def tearDown(self):
session_csrf.ANON_ALWAYS = self.save_ANON_ALWAYS
settings.CSRF_FAILURE_VIEW = self.save_CSRF_FAILURE_VIEW

def login(self):
assert self.client.login(username='jbalogh', password='password')
Expand Down Expand Up @@ -272,9 +288,12 @@ def setUp(self):
self.client.handler = ClientHandler(enforce_csrf_checks=True)
self.save_ANON_ALWAYS = session_csrf.ANON_ALWAYS
session_csrf.ANON_ALWAYS = True
self.save_CSRF_FAILURE_VIEW = settings.CSRF_FAILURE_VIEW
settings.CSRF_FAILURE_VIEW = 'django.views.csrf.csrf_failure'

def tearDown(self):
session_csrf.ANON_ALWAYS = self.save_ANON_ALWAYS
settings.CSRF_FAILURE_VIEW = self.save_CSRF_FAILURE_VIEW

def login(self):
assert self.client.login(username='jbalogh', password='password')
Expand Down