Skip to content

chore(iast): check secure marks for SQLi #13239

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 23, 2025
4 changes: 2 additions & 2 deletions ddtrace/appsec/_iast/_ast/iastpatch.c
Original file line number Diff line number Diff line change
@@ -17,8 +17,8 @@ static char** cached_packages = NULL;
static size_t cached_packages_count = 0;

/* Static Lists */
static const char* static_allowlist[] = { "jinja2.", "pygments.", "multipart.", "sqlalchemy.",
"python_multipart.", "attrs", "jsonschema", "s3fs" };
static const char* static_allowlist[] = { "jinja2.", "pygments.", "multipart.", "sqlalchemy.", "python_multipart.",
"attrs.", "jsonschema.", "s3fs.", "mysql.", "pymysql." };
static const size_t static_allowlist_count = sizeof(static_allowlist) / sizeof(static_allowlist[0]);

static const char* static_denylist[] = { "django.apps.config.",
30 changes: 16 additions & 14 deletions ddtrace/appsec/_iast/_patch_modules.py
Original file line number Diff line number Diff line change
@@ -34,28 +34,30 @@ def patch_iast(patch_modules=IAST_PATCH):
cmdi_sanitizer,
)
)
when_imported("psycopg2.adapt")(
when_imported("mysql.connector.conversion")(
lambda _: try_wrap_function_wrapper(
"psycopg2.adapt",
"quote_ident",
"mysql.connector.conversion",
"MySQLConverter.escape",
sqli_sanitizer,
)
)
when_imported("psycopg2.extensions")(
when_imported("pymysql")(
lambda _: try_wrap_function_wrapper(
"psycopg2.extensions",
"quote_ident",
"pymysql.connections",
"Connection.escape_string",
sqli_sanitizer,
)
)
# TODO: APPSEC-56947 task
# when_imported("mysql.connector.conversion")(
# lambda _: try_wrap_function_wrapper(
# "mysql.connector.conversion",
# "MySQLConverter.escape",
# sqli_sanitizer,
# )
# )

# Additional MySQL sanitizers
when_imported("pymysql.converters")(
lambda _: try_wrap_function_wrapper(
"pymysql.converters",
"escape_string",
sqli_sanitizer,
)
)

# when_imported("werkzeug.utils")(
# lambda _: try_wrap_function_wrapper(
# "werkzeug.utils",
2 changes: 1 addition & 1 deletion hatch.toml
Original file line number Diff line number Diff line change
@@ -293,7 +293,7 @@ dependencies = [
"pymysql",
"mysqlclient==2.1.1",
"grpcio",
"mysqlclient==2.1.1",
"mysql-connector-python",
]

[envs.appsec_iast_default.env-vars]
9 changes: 8 additions & 1 deletion tests/appsec/iast/db_utils.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
import os

import psycopg2
import pymysql


POSTGRES_HOST = os.getenv("TEST_POSTGRES_HOST", "127.0.0.1")
MYSQL_HOST = os.getenv("TEST_MYSQL_HOST", "127.0.0.1")


def get_connection():
def get_psycopg2_connection():
connection = psycopg2.connect(
user="postgres", password="postgres", host=POSTGRES_HOST, port="5432", database="postgres"
)
return connection


def get_pymysql_connection():
connection = pymysql.connect(user="test", password="test", host=MYSQL_HOST, port=3306, database="test")
return connection


def close_connection(connection):
if connection:
connection.close()
Original file line number Diff line number Diff line change
@@ -2,11 +2,11 @@

from ddtrace.appsec._iast._taint_tracking._taint_objects import get_tainted_ranges
from ddtrace.appsec._iast._taint_tracking._taint_objects import is_pyobject_tainted
from tests.appsec.iast.db_utils import get_connection
from tests.appsec.iast.db_utils import get_psycopg2_connection


def sqli_simple(table):
connection = get_connection()
connection = get_psycopg2_connection()
cur = connection.cursor()
try:
cur.execute("CREATE TABLE students (name TEXT, addr TEXT, city TEXT, pin TEXT)")
19 changes: 2 additions & 17 deletions tests/appsec/iast/fixtures/taint_sinks/sql_injection_pymysql.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,12 @@
import os

import pymysql
from pymysql.err import OperationalError

from ddtrace.appsec._iast._taint_tracking._taint_objects import get_tainted_ranges
from ddtrace.appsec._iast._taint_tracking._taint_objects import is_pyobject_tainted


MYSQL_HOST = os.getenv("TEST_MYSQL_HOST", "127.0.0.1")


def get_connection():
connection = pymysql.connect(user="test", password="test", host=MYSQL_HOST, port=3306, database="test")
return connection


def close_connection(connection):
if connection:
connection.close()
from tests.appsec.iast.db_utils import get_pymysql_connection


def sqli_simple(table):
connection = get_connection()
connection = get_pymysql_connection()
cur = connection.cursor()
try:
cur.execute("CREATE TABLE students (name TEXT, addr TEXT, city TEXT, pin TEXT)")
5 changes: 4 additions & 1 deletion tests/appsec/iast/iast_utils.py
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
from ddtrace.appsec._constants import IAST
from ddtrace.appsec._iast._ast.ast_patching import astpatch_module
from ddtrace.appsec._iast._ast.ast_patching import iastpatch
from ddtrace.appsec._iast._patch_modules import patch_iast


# Check if the log contains "iast::" to raise an error if that’s the case BUT, if the logs contains
@@ -58,7 +59,9 @@ def _iast_patched_module_and_patched_source(module_name, new_module_object=False
return module_changed, patched_source


def _iast_patched_module(module_name, new_module_object=False):
def _iast_patched_module(module_name, new_module_object=False, should_patch_iast=False):
if should_patch_iast:
patch_iast()
iastpatch.build_list_from_env(IAST.PATCH_MODULES)
iastpatch.build_list_from_env(IAST.DENY_MODULES)
res = iastpatch.should_iast_patch(module_name)
116 changes: 116 additions & 0 deletions tests/appsec/iast/test_iast_dbs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import pytest

from ddtrace.appsec._iast._taint_tracking import OriginType
from ddtrace.appsec._iast._taint_tracking import VulnerabilityType
from ddtrace.appsec._iast._taint_tracking._taint_objects import get_tainted_ranges
from ddtrace.appsec._iast._taint_tracking._taint_objects import is_pyobject_tainted
from ddtrace.appsec._iast._taint_tracking._taint_objects import taint_pyobject
from ddtrace.appsec._iast._taint_utils import LazyTaintList
from tests.appsec.iast.conftest import _end_iast_context_and_oce
from tests.appsec.iast.conftest import _start_iast_context_and_oce
from tests.appsec.iast.iast_utils import _iast_patched_module
from tests.utils import override_global_config


_ = _iast_patched_module("pymysql.connections")
_ = _iast_patched_module("pymysql.converters")
_ = _iast_patched_module("mysql.connector.conversion")
mod = _iast_patched_module("tests.appsec.integrations.fixtures.patch_dbs", should_patch_iast=True)


@pytest.fixture(autouse=True)
def iast_create_context():
with override_global_config(
dict(_iast_enabled=True, _iast_deduplication_enabled=False, _iast_request_sampling=100.0)
):
_start_iast_context_and_oce()
yield
_end_iast_context_and_oce()


def test_adapt_list():
obj_list = [1, "word", True]

value = mod.adapt_list(obj_list)
assert value == b"ARRAY[1,'word',true]"
assert not get_tainted_ranges(value)
assert not is_pyobject_tainted(value)


def test_lazy_taint_list():
obj_list = [1, "word", True]
lazy_list = LazyTaintList(obj_list, origins=(OriginType.PARAMETER, OriginType.PARAMETER))

value = mod.adapt_list(lazy_list)
assert value == b"ARRAY[1,'word',true]"
assert get_tainted_ranges(value)
assert is_pyobject_tainted(value)


def test_sanitize_quote_ident():
sql = "'; DROP TABLE users; --"
tainted = taint_pyobject(
pyobject=sql,
source_name="test_add_aspect_tainting_left_hand",
source_value=sql,
source_origin=OriginType.PARAMETER,
)

value = mod.sanitize_quote_ident(tainted)
assert value == 'a-"\'; DROP TABLE users; --"'
assert not is_pyobject_tainted(value)


def test_sanitize_mysql_connector_scape():
sql = "'; DROP TABLE users; --"
tainted = taint_pyobject(
pyobject=sql,
source_name="test_add_aspect_tainting_left_hand",
source_value=sql,
source_origin=OriginType.PARAMETER,
)

value = mod.mysql_connector_scape(tainted)
ranges = get_tainted_ranges(value)
assert value == "a-\\'; DROP TABLE users; --"
assert len(ranges) > 0
for _range in ranges:
assert _range.has_secure_mark(VulnerabilityType.SQL_INJECTION)
assert is_pyobject_tainted(value)


def test_sanitize_pymysql_escape_string():
sql = "'; DROP TABLE users; --"
tainted = taint_pyobject(
pyobject=sql,
source_name="test_add_aspect_tainting_left_hand",
source_value=sql,
source_origin=OriginType.PARAMETER,
)

value = mod.pymysql_escape_string(tainted)
ranges = get_tainted_ranges(value)
assert value == "a-\\'; DROP TABLE users; --"
assert len(ranges) > 0
for _range in ranges:
assert _range.has_secure_mark(VulnerabilityType.SQL_INJECTION)
assert is_pyobject_tainted(value)


def test_sanitize_pymysql_converters_escape():
sql = "'; DROP TABLE users; --"
tainted = taint_pyobject(
pyobject=sql,
source_name="test_sanitize_pymysql_converters",
source_value=sql,
source_origin=OriginType.PARAMETER,
)

value = mod.pymysql_converters_escape_string(tainted)
ranges = get_tainted_ranges(value)
assert value == "a-\\'; DROP TABLE users; --"
assert len(ranges) > 0
print(ranges)
for _range in ranges:
assert _range.has_secure_mark(VulnerabilityType.SQL_INJECTION)
assert is_pyobject_tainted(value)
54 changes: 0 additions & 54 deletions tests/appsec/iast/test_iast_psycopg2.py

This file was deleted.

6 changes: 5 additions & 1 deletion tests/appsec/iast/test_telemetry.py
Original file line number Diff line number Diff line change
@@ -142,7 +142,11 @@ def test_metric_instrumented_propagation(no_request_sampling, telemetry_writer):
generate_metrics = metrics_result[TELEMETRY_TYPE_GENERATE_METRICS][TELEMETRY_NAMESPACE.IAST.value]
# Remove potential sinks from internal usage of the lib (like http.client, used to communicate with
# the agent)
filtered_metrics = [metric["metric"] for metric in generate_metrics if metric["metric"] != "executed.sink"]
filtered_metrics = [
metric["metric"]
for metric in generate_metrics
if metric["metric"] not in ["executed.sink", "instrumented.sink"]
]
assert filtered_metrics == ["instrumented.propagation"]


32 changes: 32 additions & 0 deletions tests/appsec/integrations/fixtures/patch_dbs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from mysql.connector.conversion import MySQLConverter
from psycopg2.extensions import adapt
from psycopg2.extensions import quote_ident
from pymysql.converters import escape_string

from tests.appsec.iast.db_utils import get_psycopg2_connection
from tests.appsec.iast.db_utils import get_pymysql_connection


def adapt_list(obj_list):
value = adapt(obj_list)
return value.getquoted()


def sanitize_quote_ident(tainted_value):
connection = get_psycopg2_connection()
cur = connection.cursor()
return "a-" + quote_ident(tainted_value, cur)


def mysql_connector_scape(tainted_value):
converter = MySQLConverter()
return "a-" + converter.escape(tainted_value)


def pymysql_escape_string(tainted_value):
mock_conn = get_pymysql_connection()
return "a-" + mock_conn.escape_string(tainted_value)


def pymysql_converters_escape_string(tainted_value):
return "a-" + escape_string(tainted_value)
15 changes: 0 additions & 15 deletions tests/appsec/integrations/fixtures/patch_psycopg2.py

This file was deleted.