Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Add guard watch and guard logger. #868

Merged
merged 27 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
43d05aa
Sketch out guard log.
JosephCatrambone Jun 24, 2024
35df11c
Add some experimental code sketches.
JosephCatrambone Jun 25, 2024
9598ab3
Add docstrings. Small improvements.
JosephCatrambone Jun 25, 2024
dd02436
Update: this is no longer the approach we're going to use. We're swit…
JosephCatrambone Jun 25, 2024
2689ddc
Merge branch '0.5.0-dev' of github.com:guardrails-ai/guardrails into …
JosephCatrambone Jun 26, 2024
fda35ad
Start integration with spans.
JosephCatrambone Jun 26, 2024
fdf89ec
Integrate logging with trace.
JosephCatrambone Jun 26, 2024
2805426
Merge branch '0.5.0-dev' of github.com:guardrails-ai/guardrails into …
JosephCatrambone Jun 26, 2024
fceb95c
Output as table.
JosephCatrambone Jun 26, 2024
6c9348e
Output as table.
JosephCatrambone Jun 26, 2024
c44b573
Make sure logging works across async, multiple threads, and multiple …
JosephCatrambone Jun 27, 2024
c44799f
Move test for guard logger to the right place.
JosephCatrambone Jun 27, 2024
c7f8f79
Write to tempfile rather than local directory. Add method to truncat…
JosephCatrambone Jun 27, 2024
cd5f7c4
Reformat.
JosephCatrambone Jun 27, 2024
f49e8fc
Default to follow (by request). Remove unused log level.
JosephCatrambone Jun 27, 2024
9b1b512
Fix doctest. Update docstring.
JosephCatrambone Jun 27, 2024
f10a6d5
Format.
JosephCatrambone Jun 27, 2024
3d12e66
Relint.
JosephCatrambone Jun 27, 2024
6f1a379
Accidentally only returned Noop handler.
JosephCatrambone Jun 27, 2024
6b66bda
Remove error level and reformat.
JosephCatrambone Jun 27, 2024
5da2e5d
Linting.
JosephCatrambone Jun 27, 2024
8e2ac75
Fix lint and pyrite issues.
JosephCatrambone Jun 27, 2024
6e38053
PR Feedback: Move guard_call_logging to tracing.
JosephCatrambone Jun 27, 2024
46bc48a
PR Feeback: Move things to individual namespaced files.
JosephCatrambone Jun 27, 2024
62ae667
Move around and clean up based on PR feedback.
JosephCatrambone Jun 27, 2024
bf5c8d8
Remove the macarena from unit tests.
JosephCatrambone Jun 27, 2024
36f5177
Linting.
JosephCatrambone Jun 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions guardrails/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import guardrails.cli.validate # noqa
from guardrails.cli.guardrails import guardrails as cli
from guardrails.cli.hub import hub_command
from guardrails.cli.watch import watch_command # noqa: F401


cli.add_typer(
hub_command, name="hub", help="Manage validators installed from the Guardrails Hub."
Expand Down
63 changes: 63 additions & 0 deletions guardrails/cli/watch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import json
import sqlite3
import time
from dataclasses import asdict
from typing import Optional

import rich
import typer

from guardrails.cli.guardrails import guardrails as gr_cli
from guardrails.guard_call_logging import (
GuardLogEntry,
TraceHandler,
)


@gr_cli.command(name="watch")
def watch_command(
plain: bool = typer.Option(
default=False,
is_flag=True,
help="Do not use any rich formatting, instead printing each entry on a line.",
),
num_lines: int = typer.Option(
default=0,
help="Print the last n most recent lines. If omitted, will print all history.",
),
follow: bool = typer.Option(
default=True,
help="Continuously read the last output commands",
),
log_path_override: Optional[str] = typer.Option(
default=None, help="Specify a path to the log output file."
),
):
# Open a reader for the log path:
log_reader = None
while log_reader is None:
try:
if log_path_override is not None:
log_reader = TraceHandler.get_reader(log_path_override) # type: ignore
else:
log_reader = TraceHandler.get_reader()
except sqlite3.OperationalError:
print("Logfile not found. Retrying.")
time.sleep(1)

# If we are using fancy outputs, grab a console ref and prep a table.
output_fn = _print_and_format_plain
if not plain:
output_fn = _print_fancy

# Spin while tailing, breaking if we aren't continuously tailing.
for log_msg in log_reader.tail_logs(-num_lines, follow):
output_fn(log_msg)


def _print_fancy(log_msg: GuardLogEntry):
rich.print(log_msg)


def _print_and_format_plain(log_msg: GuardLogEntry) -> None:
print(json.dumps(asdict(log_msg)))
305 changes: 305 additions & 0 deletions guardrails/guard_call_logging.py
CalebCourier marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
"""
guard_call_logging.py

A set of tools to track the behavior of guards, specifically with the intent of
collating the pre/post validation text and timing of guard calls. Uses a singleton to
share write access to a SQLite database across threads.

By default, logs will be created in a temporary directory. This can be overridden by
setting GUARDRAILS_LOG_FILE_PATH in the environment. tracehandler.log_path will give
the full path of the current log file.

# Reading logs (basic):
>>> reader = TraceHandler.get_reader()
>>> for t in reader.tail_logs():
>>> print(t)

# Reading logs (advanced):
>>> reader = TraceHandler.get_reader()
>>> reader.db.execute("SELECT * FROM guard_logs;") # Arbitrary SQL support.

# Saving logs
>>> writer = TraceHandler()
>>> writer.log(
>>> "my_guard_name", 0.0, 1.0, "Raw LLM Output Text", "Sanitized", "exception?"
>>> )
"""

import datetime
import os
import sqlite3
import tempfile
import threading
from dataclasses import dataclass, asdict
from typing import Iterator

from guardrails.utils.casting_utils import to_string
from guardrails.classes.validation.validator_logs import ValidatorLogs


# TODO: We should read this from guardrailsrc.
LOG_RETENTION_LIMIT = 100000
LOG_FILENAME = "guardrails_calls.db"
LOGFILE_PATH = os.environ.get(
"GUARDRAILS_LOG_FILE_PATH", # Document this environment variable.
os.path.join(tempfile.gettempdir(), LOG_FILENAME),
)


# These adapters make it more convenient to add data into our log DB:
# Handle timestamp -> sqlite map:
def adapt_datetime(val):
"""Adapt datetime.datetime to Unix timestamp."""
# return val.isoformat() # If we want to go to datetime/isoformat...
return int(val.timestamp())


sqlite3.register_adapter(datetime.datetime, adapt_datetime)


def convert_timestamp(val):
"""Convert Unix epoch timestamp to datetime.datetime object."""
# To go to datetime.datetime:
# return datetime.datetime.fromisoformat(val.decode())
return datetime.datetime.fromtimestamp(int(val))


sqlite3.register_converter("timestamp", convert_timestamp)


# This class makes it slightly easier to be selective about how we pull data.
# While it's not the ultimate contract/DB schema, it helps with typing and improves dx.
@dataclass
class GuardLogEntry:
guard_name: str
start_time: float
end_time: float
id: int = -1
prevalidate_text: str = ""
postvalidate_text: str = ""
exception_message: str = ""

@property
def timedelta(self):
return self.end_time - self.start_time


class _BaseTraceHandler:
"""The base TraceHandler only pads out the methods. It's effectively a Noop"""

def __init__(self, log_path: os.PathLike, read_mode: bool):
self.db = None

def log(self, *args, **kwargs):
pass

def log_entry(self, guard_log_entry: GuardLogEntry):
pass

def log_validator(self, vlog: ValidatorLogs):
pass

def tail_logs(
self,
start_offset_idx: int = 0,
follow: bool = False,
) -> Iterator[GuardLogEntry]:
yield from []


# This structured handler shouldn't be used directly, since it's touching a SQLite db.
# Instead, use the singleton or the async singleton.
class _SQLiteTraceHandler(_BaseTraceHandler):
CREATE_COMMAND = """
CREATE TABLE IF NOT EXISTS guard_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
guard_name TEXT,
start_time REAL,
end_time REAL,
prevalidate_text TEXT,
postvalidate_text TEXT,
exception_message TEXT
);
"""
INSERT_COMMAND = """
INSERT INTO guard_logs (
guard_name, start_time, end_time, prevalidate_text, postvalidate_text,
exception_message
) VALUES (
:guard_name, :start_time, :end_time, :prevalidate_text, :postvalidate_text,
:exception_message
);
"""

def __init__(self, log_path: os.PathLike, read_mode: bool):
self._log_path = log_path # Read-only value.
self.readonly = read_mode
if read_mode:
self.db = _SQLiteTraceHandler._get_read_connection(log_path)
else:
self.db = _SQLiteTraceHandler._get_write_connection(log_path)

@property
def log_path(self):
return self._log_path

@classmethod
def _get_write_connection(cls, log_path: os.PathLike) -> sqlite3.Connection:
try:
db = sqlite3.connect(
log_path,
isolation_level=None,
check_same_thread=False,
)
db.execute("PRAGMA journal_mode = wal")
db.execute("PRAGMA synchronous = OFF")
# isolation_level = None and pragma WAL means we can READ from the DB
# while threads using it are writing. Synchronous off puts us on the
# highway to the danger zone, depending on how willing we are to lose log
# messages in the event of a guard crash.
except sqlite3.OperationalError as e:
# logging.exception("Unable to connect to guard log handler.")
raise e
with db:
db.execute(_SQLiteTraceHandler.CREATE_COMMAND)
return db

@classmethod
def _get_read_connection(cls, log_path: os.PathLike) -> sqlite3.Connection:
# A bit of a hack to open in read-only mode...
db = sqlite3.connect(
"file:" + str(log_path) + "?mode=ro", isolation_level=None, uri=True
)
db.row_factory = sqlite3.Row
return db

def _truncate(self, keep_n: int = LOG_RETENTION_LIMIT):
assert not self.readonly
self.db.execute(
"""
DELETE FROM guard_logs
WHERE id < (
SELECT id FROM guard_logs ORDER BY id DESC LIMIT 1 OFFSET ?
);
""",
(keep_n,),
)

def log(
self,
guard_name: str,
start_time: float,
end_time: float,
prevalidate_text: str,
postvalidate_text: str,
exception_text: str,
):
assert not self.readonly
with self.db:
self.db.execute(
_SQLiteTraceHandler.INSERT_COMMAND,
dict(
guard_name=guard_name,
start_time=start_time,
end_time=end_time,
prevalidate_text=prevalidate_text,
postvalidate_text=postvalidate_text,
exception_message=exception_text,
),
)

def log_entry(self, guard_log_entry: GuardLogEntry):
assert not self.readonly
with self.db:
self.db.execute(_SQLiteTraceHandler.INSERT_COMMAND, asdict(guard_log_entry))

def log_validator(self, vlog: ValidatorLogs):
assert not self.readonly
maybe_outcome = (
str(vlog.validation_result.outcome)
if (
vlog.validation_result is not None
and hasattr(vlog.validation_result, "outcome")
)
else ""
)
with self.db:
self.db.execute(
_SQLiteTraceHandler.INSERT_COMMAND,
dict(
guard_name=vlog.validator_name,
start_time=vlog.start_time if vlog.start_time else None,
end_time=vlog.end_time if vlog.end_time else 0.0,
prevalidate_text=to_string(vlog.value_before_validation),
postvalidate_text=to_string(vlog.value_after_validation),
exception_message=maybe_outcome,
),
)

def tail_logs(
self, start_offset_idx: int = 0, follow: bool = False
) -> Iterator[GuardLogEntry]:
"""Returns an iterator to generate GuardLogEntries.
@param start_offset_idx : Start printing entries after this IDX. If
negative, this will instead start printing the LAST start_offset_idx entries.
@param follow : If follow is True, will re-check the database for new entries
after the first batch is complete. If False (default), will return when entries
are exhausted.
"""
last_idx = start_offset_idx
cursor = self.db.cursor()
if last_idx < 0:
# We're indexing from the end, so do a quick check.
cursor.execute(
"SELECT id FROM guard_logs ORDER BY id DESC LIMIT 1 OFFSET ?;",
(-last_idx,),
)
for row in cursor:
last_idx = row["id"]
sql = """
SELECT
id, guard_name, start_time, end_time, prevalidate_text,
postvalidate_text, exception_message
FROM guard_logs
WHERE id > ?
ORDER BY start_time;
"""
cursor.execute(sql, (last_idx,))
while True:
for row in cursor:
last_entry = GuardLogEntry(**row)
last_idx = last_entry.id
yield last_entry
if not follow:
return
# If we're here we've run out of entries to tail. Fetch more:
cursor.execute(sql, (last_idx,))


class TraceHandler(_SQLiteTraceHandler):
"""TraceHandler wraps the internal _SQLiteTraceHandler to make it multi-thread
safe. Coupled with some write ahead journaling in the _SyncTrace internal, we have
a faux-multi-write multi-read interface for SQLite."""

_instance = None
_lock = threading.Lock()

def __new__(cls):
if cls._instance is None:
# We run two 'if None' checks so we don't have to call the mutex check for
# the cases where there's obviously no handler. Only do a check if there
# MIGHT not be a handler instantiated.
with cls._lock:
if cls._instance is None:
cls._instance = cls._create()
return cls._instance

@classmethod
def _create(cls, path: os.PathLike = LOGFILE_PATH) -> _BaseTraceHandler: # type: ignore
return _SQLiteTraceHandler(path, read_mode=False)
# To disable logging:
# return _BaseTraceHandler(path, read_mode=False)

@classmethod
def get_reader(cls, path: os.PathLike = LOGFILE_PATH) -> _BaseTraceHandler: # type: ignore
return _SQLiteTraceHandler(path, read_mode=True)
4 changes: 4 additions & 0 deletions guardrails/utils/telemetry_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from opentelemetry.context import Context
from opentelemetry.trace import StatusCode, Tracer

from guardrails.guard_call_logging import TraceHandler
from guardrails.stores.context import get_tracer as get_context_tracer
from guardrails.stores.context import get_tracer_context
from guardrails.utils.casting_utils import to_string
Expand Down Expand Up @@ -100,6 +101,9 @@ def trace_validator_result(
"instance_id": instance_id,
**kwargs,
}

TraceHandler().log_validator(validator_log)
CalebCourier marked this conversation as resolved.
Show resolved Hide resolved

current_span.add_event(
f"{validator_name}_result",
{k: v for k, v in event.items() if v is not None},
Expand Down
Loading
Loading