Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
- MCP docs: Used Hishel for transparently caching documentation resources,
by default for one hour. Use the `CRATEDB_MCP_DOCS_CACHE_TTL` environment
variable to adjust (default: 3600)
- SQL: Stronger read-only mode, using `sqlparse`
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ LLMs can still hallucinate and give incorrect answers.
* What is the storage consumption of my tables, give it in a graph.
* How can I format a timestamp column to '2019 Jan 21'.

# Data integrity
We do not recommend letting the LLMs insert data or modify columns by itself, as such we tell the
LLMs to only allow 'SELECT' statements in the `cratedb_sql` tool, you can jailbreak this against
our recommendation.
# Read-only access
We do not recommend letting LLM-based agents insert or modify data by itself.
As such, only `SELECT` statements are permitted and forwarded to the database.
All other operations will raise a `ValueError` exception, unless the
`CRATEDB_MCP_PERMIT_ALL_STATEMENTS` environment variable is set to a
truthy value. This is **not** recommended.

# Install
```shell
Expand Down
3 changes: 2 additions & 1 deletion cratedb_mcp/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from .knowledge import DocumentationIndex, Queries, documentation_url_permitted
from .settings import DOCS_CACHE_TTL, HTTP_TIMEOUT, HTTP_URL
from .util.sql import sql_is_permitted

# Configure Hishel, an httpx client with caching.
# Define one hour of caching time.
Expand All @@ -25,7 +26,7 @@ def query_cratedb(query: str) -> list[dict]:
@mcp.tool(description="Send a SQL query to CrateDB, only 'SELECT' queries are allows, queries that"
" modify data, columns or are otherwise deemed un-safe are rejected.")
def query_sql(query: str):
if 'select' not in query.lower():
if not sql_is_permitted(query):
raise ValueError('Only queries that have a SELECT statement are allowed.')
return query_cratedb(query)

Expand Down
10 changes: 10 additions & 0 deletions cratedb_mcp/settings.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
import warnings

from attr.converters import to_bool

HTTP_URL: str = os.getenv("CRATEDB_MCP_HTTP_URL", "http://localhost:4200")

# Configure cache lifetime for documentation resources.
Expand All @@ -15,3 +17,11 @@

# Configure HTTP timeout for all conversations.
HTTP_TIMEOUT = 10.0

# Whether to permit all statements. By default, only SELECT operations are permitted.
PERMIT_ALL_STATEMENTS: bool = to_bool(os.getenv("CRATEDB_MCP_PERMIT_ALL_STATEMENTS", "false"))

# TODO: Refactor into code which is not on the module level. Use OOM early.
if PERMIT_ALL_STATEMENTS: # pragma: no cover
warnings.warn("All types of SQL statements are permitted. This means the LLM "
"agent can write and modify the connected database", category=UserWarning, stacklevel=2)
Empty file added cratedb_mcp/util/__init__.py
Empty file.
126 changes: 126 additions & 0 deletions cratedb_mcp/util/sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import dataclasses
import logging
import typing as t

import sqlparse
from sqlparse.tokens import Keyword

from cratedb_mcp.settings import PERMIT_ALL_STATEMENTS

logger = logging.getLogger(__name__)


def sql_is_permitted(expression: str) -> bool:
"""
Validate the SQL expression, only permit read queries by default.

When the `CRATEDB_MCP_PERMIT_ALL_STATEMENTS` environment variable is set,
allow all types of statements. This is **not** recommended.

FIXME: Revisit implementation, it might be too naive or weak.
Issue: https://github.com/crate/cratedb-mcp/issues/10
Question: Does SQLAlchemy provide a solid read-only mode, or any other library?
"""
is_dql = SqlStatementClassifier(expression=expression, permit_all=PERMIT_ALL_STATEMENTS).is_dql
if is_dql:
logger.info(f"Permitted SQL expression: {expression and expression[:50]}...")
else:
logger.warning(f"Denied SQL expression: {expression and expression[:50]}...")
return is_dql


@dataclasses.dataclass
class SqlStatementClassifier:
"""
Helper to classify an SQL statement.

Here, most importantly: Provide the `is_dql` property that
signals truthfulness for read-only SQL SELECT statements only.
"""
expression: str
permit_all: bool = False

_parsed_sqlparse: t.Any = dataclasses.field(init=False, default=None)

def __post_init__(self) -> None:
if self.expression is None:
self.expression = ""
if self.expression:
self.expression = self.expression.strip()

def parse_sqlparse(self) -> t.List[sqlparse.sql.Statement]:
"""
Parse expression using traditional `sqlparse` library.
"""
if self._parsed_sqlparse is None:
self._parsed_sqlparse = sqlparse.parse(self.expression)
return self._parsed_sqlparse

@property
def is_dql(self) -> bool:
"""
Whether the statement is a DQL statement, which effectively invokes read-only operations only.
"""

if not self.expression:
return False

if self.permit_all:
return True

# Check if the expression is valid and if it's a DQL/SELECT statement,
# also trying to consider `SELECT ... INTO ...` and evasive
# `SELECT * FROM users; \uff1b DROP TABLE users` statements.
return self.is_select and not self.is_camouflage

@property
def is_select(self) -> bool:
"""
Whether the expression is an SQL SELECT statement.
"""
return self.operation == 'SELECT'

@property
def operation(self) -> str:
"""
The SQL operation: SELECT, INSERT, UPDATE, DELETE, CREATE, etc.
"""
parsed = self.parse_sqlparse()
return parsed[0].get_type().upper()

@property
def is_camouflage(self) -> bool:
"""
Innocent-looking `SELECT` statements can evade filters.
"""
return self.is_select_into or self.is_evasive

@property
def is_select_into(self) -> bool:
"""
Use traditional `sqlparse` for catching `SELECT ... INTO ...` statements.
Examples:
SELECT * INTO foobar FROM bazqux
SELECT * FROM bazqux INTO foobar
"""
# Flatten all tokens (including nested ones) and match on type+value.
statement = self.parse_sqlparse()[0]
return any(
token.ttype is Keyword and token.value.upper() == "INTO"
for token in statement.flatten()
)

@property
def is_evasive(self) -> bool:
"""
Use traditional `sqlparse` for catching evasive SQL statements.

A practice picked up from CodeRabbit was to reject multiple statements
to prevent potential SQL injections. Is it a viable suggestion?

Examples:

SELECT * FROM users; \uff1b DROP TABLE users
"""
parsed = self.parse_sqlparse()
return len(parsed) > 1
9 changes: 9 additions & 0 deletions docs/backlog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# CrateDB MCP backlog

## Iteration +1
- Docs: HTTP caching
- Docs: Load documentation index from custom outline file
- SQL: Extract `SqlFilter` or `SqlGateway` functionality to `cratedb-sqlparse`

## Done
- SQL: Stronger read-only mode
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ dynamic = [
"version",
]
dependencies = [
"attrs",
"cachetools<6",
"cratedb-about==0.0.4",
"hishel<0.2",
"mcp[cli]>=1.5.0",
"sqlparse<0.6",
]
optional-dependencies.develop = [
"mypy<1.16",
Expand Down
2 changes: 2 additions & 0 deletions tests/test_knowledge.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ def test_queries():
assert "sys.health" in Queries.TABLES_METADATA
assert "WITH partitions_health" in Queries.TABLES_METADATA
assert "LEFT JOIN" in Queries.TABLES_METADATA


17 changes: 9 additions & 8 deletions tests/test_mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,20 @@ def test_fetch_docs_permitted_cratedb_com():
assert "initcap" in response


def test_query_sql_forbidden():
def test_query_sql_permitted():
assert query_sql("SELECT 42")["rows"] == [[42]]


def test_query_sql_forbidden_easy():
with pytest.raises(ValueError) as ex:
assert "RelationUnknown" in str(query_sql("INSERT INTO foobar (id) VALUES (42) RETURNING id"))
assert ex.match("Only queries that have a SELECT statement are allowed")


def test_query_sql_permitted():
assert query_sql("SELECT 42")["rows"] == [[42]]


def test_query_sql_permitted_exploit():
# FIXME: Read-only protection must become stronger.
query_sql("INSERT INTO foobar (operation) VALUES ('select')")
def test_query_sql_forbidden_sneak_value():
with pytest.raises(ValueError) as ex:
query_sql("INSERT INTO foobar (operation) VALUES ('select')")
assert ex.match("Only queries that have a SELECT statement are allowed")


def test_get_table_metadata():
Expand Down
107 changes: 107 additions & 0 deletions tests/test_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import cratedb_mcp
from cratedb_mcp.util.sql import sql_is_permitted


def test_sql_select_permitted():
"""Regular SQL SELECT statements are permitted"""
assert sql_is_permitted("SELECT 42") is True
assert sql_is_permitted(" SELECT 42") is True
assert sql_is_permitted("select 42") is True


def test_sql_select_rejected():
"""Bogus SQL SELECT statements are rejected"""
assert sql_is_permitted(r"--\; select 42") is False


def test_sql_insert_allowed(mocker):
"""When explicitly allowed, permit any kind of statement"""
mocker.patch.object(cratedb_mcp.util.sql, "PERMIT_ALL_STATEMENTS", True)
assert sql_is_permitted("INSERT INTO foobar") is True


def test_sql_select_multiple_rejected():
"""Multiple SQL statements are rejected"""
assert sql_is_permitted("SELECT 42; SELECT 42;") is False


def test_sql_create_rejected():
"""DDL statements are rejected"""
assert sql_is_permitted("CREATE TABLE foobar AS SELECT 42") is False


def test_sql_insert_rejected():
"""DML statements are rejected"""
assert sql_is_permitted("INSERT INTO foobar") is False


def test_sql_select_into_rejected():
"""SELECT+DML statements are rejected"""
assert sql_is_permitted("SELECT * INTO foobar FROM bazqux") is False
assert sql_is_permitted("SELECT * FROM bazqux INTO foobar") is False
assert sql_is_permitted("WITH FOO AS (SELECT * FROM (SELECT * bazqux INTO foobar))") is False


def test_sql_select_into_permitted():
"""Forbidden keywords do not contribute to classification when used as labels"""
assert sql_is_permitted('SELECT * FROM "into"') is True
assert sql_is_permitted('SELECT * FROM "INTO"') is True


def test_sql_empty_rejected():
"""Empty statements are rejected"""
assert sql_is_permitted("") is False


def test_sql_almost_empty_rejected():
"""Quasi-empty statements are rejected"""
assert sql_is_permitted(" ") is False


def test_sql_none_rejected():
"""Void statements are rejected"""
assert sql_is_permitted(None) is False


def test_sql_multiple_statements_rejected():
assert sql_is_permitted("SELECT 42; INSERT INTO foo VALUES (1)") is False


def test_sql_with_comments_rejected():
assert sql_is_permitted(
"/* Sneaky comment */ INSERT /* another comment */ INTO foo VALUES (1)") is False


def test_sql_update_rejected():
"""UPDATE statements are rejected"""
assert sql_is_permitted("UPDATE foobar SET column = 'value'") is False


def test_sql_delete_rejected():
"""DELETE statements are rejected"""
assert sql_is_permitted("DELETE FROM foobar") is False


def test_sql_truncate_rejected():
"""TRUNCATE statements are rejected"""
assert sql_is_permitted("TRUNCATE TABLE foobar") is False


def test_sql_drop_rejected():
"""DROP statements are rejected"""
assert sql_is_permitted("DROP TABLE foobar") is False


def test_sql_alter_rejected():
"""ALTER statements are rejected"""
assert sql_is_permitted("ALTER TABLE foobar ADD COLUMN newcol INTEGER") is False


def test_sql_case_manipulation_rejected():
"""Statements with case manipulation to hide intent are rejected"""
assert sql_is_permitted("SeLeCt * FrOm users; DrOp TaBlE users") is False


def test_sql_unicode_evasion_rejected():
"""Statements with unicode characters to evade filters are rejected"""
assert sql_is_permitted("SELECT * FROM users; \uFF1B DROP TABLE users") is False