Skip to content

Commit

Permalink
Merge branch '1.x' into avara1986/APPSEC-8131-rcm-forks-error
Browse files Browse the repository at this point in the history
  • Loading branch information
emmettbutler authored Feb 17, 2023
2 parents 8e67129 + 066688c commit 70ff85c
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 146 deletions.

This file was deleted.

27 changes: 19 additions & 8 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,21 @@ def find(self, file):
return t


def is_stream_ok(stream, expected):
if expected is None:
return True

if isinstance(expected, str):
ex = expected.encode("utf-8")
elif isinstance(expected, bytes):
ex = expected
else:
# Assume it's a callable condition
return expected(stream.decode("utf-8"))

return stream == ex


def run_function_from_file(item, params=None):
file, _, func = item.location
marker = item.get_closest_marker("subprocess")
Expand All @@ -188,14 +203,8 @@ def run_function_from_file(item, params=None):
env.update(params)

expected_status = marker.kwargs.get("status", 0)

expected_out = marker.kwargs.get("out", "")
if expected_out is not None:
expected_out = expected_out.encode("utf-8")

expected_err = marker.kwargs.get("err", "")
if expected_err is not None:
expected_err = expected_err.encode("utf-8")

with NamedTemporaryFile(mode="wb", suffix=".pyc") as fp:
dump_code_to_file(compile(FunctionDefFinder(func).find(file), file, "exec"), fp.file)
Expand All @@ -222,9 +231,11 @@ def _subprocess_wrapper():
"\n=== Captured STDERR ===\n%s=== End of captured STDERR ==="
% (expected_status, status, out.decode("utf-8"), err.decode("utf-8"))
)
elif expected_out is not None and out != expected_out:

if not is_stream_ok(out, expected_out):
raise AssertionError("STDOUT: Expected [%s] got [%s]" % (expected_out, out))
elif expected_err is not None and err != expected_err:

if not is_stream_ok(err, expected_err):
raise AssertionError("STDERR: Expected [%s] got [%s]" % (expected_err, err))

return TestReport.from_item_and_call(item, CallInfo.from_call(_subprocess_wrapper, "call"))
Expand Down
188 changes: 188 additions & 0 deletions tests/contrib/django/test_django_appsec_snapshots.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
from contextlib import contextmanager
import os
import subprocess

import django
import pytest

from ddtrace.internal.compat import PY3
import ddtrace.internal.constants as constants
from tests.appsec.test_processor import RULES_GOOD_PATH
from tests.appsec.test_processor import _ALLOWED_IP
from tests.appsec.test_processor import _BLOCKED_IP
from tests.utils import snapshot
from tests.webclient import Client


SERVER_PORT = 8000


@contextmanager
def daphne_client(django_asgi, additional_env=None):
"""Runs a django app hosted with a daphne webserver in a subprocess and
returns a client which can be used to query it.
Traces are flushed by invoking a tracer.shutdown() using a /shutdown-tracer route
at the end of the testcase.
"""

# Make sure to copy the environment as we need the PYTHONPATH and _DD_TRACE_WRITER_ADDITIONAL_HEADERS (for the test
# token) propagated to the new process.
env = os.environ.copy()
env.update(additional_env or {})
assert "_DD_TRACE_WRITER_ADDITIONAL_HEADERS" in env, "Client fixture needs test token in headers"
env.update(
{
"DJANGO_SETTINGS_MODULE": "tests.contrib.django.django_app.settings",
}
)

# ddtrace-run uses execl which replaces the process but the webserver process itself might spawn new processes.
# Right now it doesn't but it's possible that it might in the future (ex. uwsgi).
cmd = ["ddtrace-run", "daphne", "-p", str(SERVER_PORT), "tests.contrib.django.asgi:%s" % django_asgi]
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=True,
env=env,
)

client = Client("http://localhost:%d" % SERVER_PORT)

# Wait for the server to start up
client.wait()

try:
yield client
finally:
resp = client.get_ignored("/shutdown-tracer")
assert resp.status_code == 200
proc.terminate()


@pytest.mark.skipif(django.VERSION < (3, 2, 0), reason="Only want to test with latest Django")
@snapshot(
ignores=[
"meta.error.stack",
"meta.http.request.headers.user-agent",
"meta.http.useragent",
"metrics._dd.appsec.waf.duration",
"metrics._dd.appsec.waf.duration_ext",
]
)
def test_appsec_enabled():
with daphne_client("application", additional_env={"DD_APPSEC_ENABLED": "true"}) as client:
resp = client.get("/")
assert resp.status_code == 200
assert resp.content == b"Hello, test app."


@pytest.mark.skipif(django.VERSION < (3, 2, 0), reason="Only want to test with latest Django")
@snapshot(
ignores=[
"meta.error.stack",
"meta.http.request.headers.user-agent",
"meta.http.useragent",
"metrics._dd.appsec.waf.duration",
"metrics._dd.appsec.waf.duration_ext",
]
)
def test_appsec_enabled_attack():
with daphne_client("application", additional_env={"DD_APPSEC_ENABLED": "true"}) as client:
resp = client.get("/.git")
assert resp.status_code == 404


@pytest.mark.skipif(django.VERSION < (3, 2, 0), reason="Only want to test with latest Django")
@snapshot(
ignores=[
"meta.http.request.headers.accept-encoding",
"meta.http.request.headers.user-agent",
"meta.http.useragent",
"metrics._dd.appsec.waf.duration",
"metrics._dd.appsec.waf.duration_ext",
"metrics._dd.appsec.event_rules.loaded",
]
)
def test_request_ipblock_nomatch_200():
with daphne_client(
"application",
additional_env={
"DD_DJANGO_INSTRUMENT_TEMPLATES": "false",
"DD_APPSEC_ENABLED": "true",
"DD_APPSEC_RULES": RULES_GOOD_PATH,
},
) as client:
result = client.get("/", headers={"Via": _ALLOWED_IP})
assert result.status_code == 200
assert result.content == b"Hello, test app."


@pytest.mark.skipif(django.VERSION < (3, 2, 0), reason="Only want to test with latest Django")
@snapshot(
ignores=[
"meta._dd.appsec.waf.duration",
"meta._dd.appsec.waf.duration_ext",
"meta.http.request.headers.accept-encoding",
"meta.http.request.headers.user-agent",
"meta.http.useragent",
"metrics._dd.appsec.waf.duration",
"metrics._dd.appsec.waf.duration_ext",
"metrics._dd.appsec.event_rules.loaded",
]
)
def test_request_ipblock_match_403():
with daphne_client(
"application",
additional_env={
"DD_APPSEC_ENABLED": "true",
"DD_APPSEC_RULES": RULES_GOOD_PATH,
},
) as client:
result = client.get(
"/",
headers={
"Via": _BLOCKED_IP,
"Accept": "text/html",
},
)
assert result.status_code == 403
as_bytes = (
bytes(constants.APPSEC_BLOCKED_RESPONSE_HTML, "utf-8") if PY3 else constants.APPSEC_BLOCKED_RESPONSE_HTML
)
assert result.content == as_bytes


@pytest.mark.skipif(django.VERSION < (3, 2, 0), reason="Only want to test with latest Django")
@snapshot(
ignores=[
"meta._dd.appsec.waf.duration",
"meta._dd.appsec.waf.duration_ext",
"meta.http.request.headers.accept-encoding",
"meta.http.request.headers.user-agent",
"meta.http.useragent",
"metrics._dd.appsec.waf.duration",
"metrics._dd.appsec.waf.duration_ext",
"metrics._dd.appsec.event_rules.loaded",
]
)
def test_request_ipblock_match_403_json():
with daphne_client(
"application",
additional_env={
"DD_APPSEC_ENABLED": "true",
"DD_APPSEC_RULES": RULES_GOOD_PATH,
},
) as client:
result = client.get(
"/",
headers={
"Via": _BLOCKED_IP,
},
)
assert result.status_code == 403
as_bytes = (
bytes(constants.APPSEC_BLOCKED_RESPONSE_JSON, "utf-8") if PY3 else constants.APPSEC_BLOCKED_RESPONSE_JSONP
)
assert result.content == as_bytes
132 changes: 0 additions & 132 deletions tests/contrib/django/test_django_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@
import django
import pytest

from ddtrace.internal.compat import PY3
import ddtrace.internal.constants as constants
from tests.appsec.test_processor import RULES_GOOD_PATH
from tests.appsec.test_processor import _ALLOWED_IP
from tests.appsec.test_processor import _BLOCKED_IP
from tests.utils import snapshot
from tests.webclient import Client

Expand Down Expand Up @@ -224,39 +219,6 @@ def test_asgi_500():
assert resp.status_code == 500


@pytest.mark.skipif(django.VERSION < (3, 2, 0), reason="Only want to test with latest Django")
@snapshot(
ignores=[
"meta.error.stack",
"meta.http.request.headers.user-agent",
"meta.http.useragent",
"metrics._dd.appsec.waf.duration",
"metrics._dd.appsec.waf.duration_ext",
]
)
def test_appsec_enabled():
with daphne_client("application", additional_env={"DD_APPSEC_ENABLED": "true"}) as client:
resp = client.get("/")
assert resp.status_code == 200
assert resp.content == b"Hello, test app."


@pytest.mark.skipif(django.VERSION < (3, 2, 0), reason="Only want to test with latest Django")
@snapshot(
ignores=[
"meta.error.stack",
"meta.http.request.headers.user-agent",
"meta.http.useragent",
"metrics._dd.appsec.waf.duration",
"metrics._dd.appsec.waf.duration_ext",
]
)
def test_appsec_enabled_attack():
with daphne_client("application", additional_env={"DD_APPSEC_ENABLED": "true"}) as client:
resp = client.get("/.git")
assert resp.status_code == 404


@pytest.mark.skipif(django.VERSION < (3, 0, 0), reason="ASGI not supported in django<3")
@snapshot(
ignores=["meta.http.useragent"],
Expand Down Expand Up @@ -289,97 +251,3 @@ def test_templates_disabled():
resp = client.get("/template-view/")
assert resp.status_code == 200
assert resp.content == b"some content\n"


@pytest.mark.skipif(django.VERSION < (3, 2, 0), reason="Only want to test with latest Django")
@snapshot(
ignores=[
"meta.http.request.headers.accept-encoding",
"meta.http.request.headers.user-agent",
"meta.http.useragent",
"metrics._dd.appsec.waf.duration",
"metrics._dd.appsec.waf.duration_ext",
"metrics._dd.appsec.event_rules.loaded",
]
)
def test_request_ipblock_nomatch_200():
with daphne_client(
"application",
additional_env={
"DD_DJANGO_INSTRUMENT_TEMPLATES": "false",
"DD_APPSEC_ENABLED": "true",
"DD_APPSEC_RULES": RULES_GOOD_PATH,
},
) as client:
result = client.get("/", headers={"Via": _ALLOWED_IP})
assert result.status_code == 200
assert result.content == b"Hello, test app."


@pytest.mark.skipif(django.VERSION < (3, 2, 0), reason="Only want to test with latest Django")
@snapshot(
ignores=[
"meta._dd.appsec.waf.duration",
"meta._dd.appsec.waf.duration_ext",
"meta.http.request.headers.accept-encoding",
"meta.http.request.headers.user-agent",
"meta.http.useragent",
"metrics._dd.appsec.waf.duration",
"metrics._dd.appsec.waf.duration_ext",
"metrics._dd.appsec.event_rules.loaded",
]
)
def test_request_ipblock_match_403():
with daphne_client(
"application",
additional_env={
"DD_APPSEC_ENABLED": "true",
"DD_APPSEC_RULES": RULES_GOOD_PATH,
},
) as client:
result = client.get(
"/",
headers={
"Via": _BLOCKED_IP,
"Accept": "text/html",
},
)
assert result.status_code == 403
as_bytes = (
bytes(constants.APPSEC_BLOCKED_RESPONSE_HTML, "utf-8") if PY3 else constants.APPSEC_BLOCKED_RESPONSE_HTML
)
assert result.content == as_bytes


@pytest.mark.skipif(django.VERSION < (3, 2, 0), reason="Only want to test with latest Django")
@snapshot(
ignores=[
"meta._dd.appsec.waf.duration",
"meta._dd.appsec.waf.duration_ext",
"meta.http.request.headers.accept-encoding",
"meta.http.request.headers.user-agent",
"meta.http.useragent",
"metrics._dd.appsec.waf.duration",
"metrics._dd.appsec.waf.duration_ext",
"metrics._dd.appsec.event_rules.loaded",
]
)
def test_request_ipblock_match_403_json():
with daphne_client(
"application",
additional_env={
"DD_APPSEC_ENABLED": "true",
"DD_APPSEC_RULES": RULES_GOOD_PATH,
},
) as client:
result = client.get(
"/",
headers={
"Via": _BLOCKED_IP,
},
)
assert result.status_code == 403
as_bytes = (
bytes(constants.APPSEC_BLOCKED_RESPONSE_JSON, "utf-8") if PY3 else constants.APPSEC_BLOCKED_RESPONSE_JSONP
)
assert result.content == as_bytes

0 comments on commit 70ff85c

Please sign in to comment.