Skip to content

Commit

Permalink
Merge pull request #1307 from GaloisInc/rpc/quoted-cryptol
Browse files Browse the repository at this point in the history
[RPC] f-string-like Cryptol quasiquotation
  • Loading branch information
m-yac authored Nov 29, 2021
2 parents 4ad8961 + aebde83 commit abb41f5
Show file tree
Hide file tree
Showing 10 changed files with 345 additions and 28 deletions.
5 changes: 5 additions & 0 deletions cryptol-remote-api/python/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,8 @@ dmypy.json

# Pyre type checker
.pyre/

# Files generated from running tests/cryptol/test_cryptol_api.py
*.crt
*.csr
*.key
3 changes: 2 additions & 1 deletion cryptol-remote-api/python/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

## 2.12.2 -- YYYY-MM-DD

* NEW CHANGELOG ENTRIES SINCE 2.12.0 GO HERE
* Add an interface for Cryptol quasiquotation using an f-string-like syntax,
see `tests/cryptol/test_quoting` for some examples.

## 2.12.0 -- 2021-11-19

Expand Down
13 changes: 3 additions & 10 deletions cryptol-remote-api/python/cryptol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from . import cryptoltypes
from . import solver
from .bitvector import BV
from .cryptoltypes import fail_with
from .quoting import cry, cry_f
from .commands import *
from .connection import *
# import everything from `.synchronous` except `connect` and `connect_stdio`
Expand All @@ -25,13 +27,4 @@
from . import synchronous
sync = synchronous

__all__ = ['bitvector', 'commands', 'connection', 'cryptoltypes', 'opaque', 'single_connection', 'solver', 'synchronous']


def fail_with(x : Exception) -> NoReturn:
"Raise an exception. This is valid in expression positions."
raise x


def cry(string : str) -> cryptoltypes.CryptolCode:
return cryptoltypes.CryptolLiteral(string)
__all__ = ['bitvector', 'commands', 'connection', 'cryptoltypes', 'opaque', 'quoting', 'single_connection', 'solver', 'synchronous']
26 changes: 21 additions & 5 deletions cryptol-remote-api/python/cryptol/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import argo_client.interaction as argo
from argo_client.connection import DynamicSocketProcess, ServerConnection, ServerProcess, StdIOProcess, HttpProcess
from .custom_fstring import *
from .quoting import *
from . import cryptoltypes
from . import solver
from .commands import *
Expand Down Expand Up @@ -217,21 +219,35 @@ def eval_raw(self, expression : Any, *, timeout:Optional[float] = None) -> argo.
:param timeout: Optional timeout for this request (in seconds).
"""
if hasattr(expression, '__to_cryptol__'):
expression = cryptoltypes.to_cryptol(expression)
timeout = timeout if timeout is not None else self.timeout
self.most_recent_result = CryptolEvalExprRaw(self, expression, timeout)
return self.most_recent_result

def eval(self, expression : Any, *, timeout:Optional[float] = None) -> argo.Command:
"""Evaluate a Cryptol expression, represented according to
:ref:`cryptol-json-expression`, with Python datatypes standing
for their JSON equivalents.
"""Evaluate a Cryptol expression, with the result represented
according to :ref:`cryptol-json-expression`, with Python datatypes
standing for their JSON equivalents.
:param timeout: Optional timeout for this request (in seconds).
"""
if hasattr(expression, '__to_cryptol__'):
expression = cryptoltypes.to_cryptol(expression)
timeout = timeout if timeout is not None else self.timeout
self.most_recent_result = CryptolEvalExpr(self, expression, timeout)
return self.most_recent_result

def eval_f(self, s : str, *, timeout:Optional[float] = None) -> argo.Command:
"""Parses the given string like ``cry_f``, then evalues it, with the
result represented according to :ref:`cryptol-json-expression`, with
Python datatypes standing for their JSON equivalents.
:param timeout: Optional timeout for this request (in seconds).
"""
expression = to_cryptol_str_customf(s, frames=1, filename="<eval_f>")
return self.eval(expression, timeout=timeout)

def evaluate_expression(self, expression : Any, *, timeout:Optional[float] = None) -> argo.Command:
"""Synonym for member method ``eval``.
Expand All @@ -251,7 +267,7 @@ def call_raw(self, fun : str, *args : List[Any], timeout:Optional[float] = None)
``from_cryptol_arg`` on the ``.result()``.
"""
timeout = timeout if timeout is not None else self.timeout
encoded_args = [cryptoltypes.CryptolType().from_python(a) for a in args]
encoded_args = [cryptoltypes.to_cryptol(a) for a in args]
self.most_recent_result = CryptolCallRaw(self, fun, encoded_args, timeout)
return self.most_recent_result

Expand All @@ -260,7 +276,7 @@ def call(self, fun : str, *args : List[Any], timeout:Optional[float] = None) ->
:param timeout: Optional timeout for this request (in seconds)."""
timeout = timeout if timeout is not None else self.timeout
encoded_args = [cryptoltypes.CryptolType().from_python(a) for a in args]
encoded_args = [cryptoltypes.to_cryptol(a) for a in args]
self.most_recent_result = CryptolCall(self, fun, encoded_args, timeout)
return self.most_recent_result

Expand Down
95 changes: 83 additions & 12 deletions cryptol-remote-api/python/cryptol/cryptoltypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,41 +7,111 @@
from .bitvector import BV
from .opaque import OpaqueValue

from typing import Any, Dict, Iterable, List, NoReturn, Optional, TypeVar, Union

import typing
from typing import cast, Any, Dict, Iterable, List, NoReturn, Optional, TypeVar, Union
from typing_extensions import Literal, Protocol

A = TypeVar('A')

def is_parenthesized(s : str) -> bool:
"""Returns ``True`` iff the given string has balanced parentheses and is
enclosed in a matching pair of parentheses.
:examples:
>>> is_parenthesized(' ((a) b )')
True
>>> is_parenthesized('(a) (b)')
False
>>> is_parenthesized('(a')
False
"""
seen_one, depth = False, 0
for c in s:
if depth > 0:
if c == '(': depth += 1
if c == ')': depth -= 1
else: # depth == 0
if c == '(':
if not seen_one: seen_one, depth = True, 1
# A new left paren after all parens have been closed means our
# string is not enclosed in a matching pair of parentheses
else: return False
if c == ')':
# A right paren with no matching left means our string does
# not have balanced parentheses
return False
# Return True if in the end all parentheses are balanced and we've seen at
# least one matching pair
return seen_one and depth == 0

def parenthesize(s : str) -> str:
"""Encloses the given string ``s`` in parentheses if
``is_parenthesized(s)`` is ``False``"""
return s if is_parenthesized(s) else f'({s})'


JSON = Union[bool, int, float, str, Dict, typing.Tuple, List]

class CryptolJSON(Protocol):
def __to_cryptol__(self, ty : CryptolType) -> Any: ...
def __to_cryptol__(self, ty : CryptolType) -> JSON: ...
def __to_cryptol_str__(self) -> str: ...

class CryptolCode(metaclass=ABCMeta):
def __call__(self, other : CryptolJSON) -> CryptolCode:
return CryptolApplication(self, other)
def __call__(self, *others : CryptolJSON) -> CryptolCode:
if all(hasattr(other, '__to_cryptol__') for other in others):
return CryptolApplication(self, *others)
else:
raise ValueError("Argument to __call__ on CryptolCode is not CryptolJSON")

@abstractmethod
def __to_cryptol__(self, ty : CryptolType) -> Any: ...
def __to_cryptol__(self, ty : CryptolType) -> JSON: ...

@abstractmethod
def __to_cryptol_str__(self) -> str: ...

def __str__(self) -> str:
return self.__to_cryptol_str__()

class CryptolLiteral(CryptolCode):
def __init__(self, code : str) -> None:
self._code = code

def __to_cryptol__(self, ty : CryptolType) -> Any:
def __to_cryptol__(self, ty : CryptolType) -> JSON:
return self._code

def __to_cryptol_str__(self) -> str:
return self._code

def __eq__(self, other : Any) -> bool:
return isinstance(other, CryptolLiteral) and self._code == other._code

def __repr__(self) -> str:
return f'CryptolLiteral({self._code!r})'

class CryptolApplication(CryptolCode):
def __init__(self, rator : CryptolJSON, *rands : CryptolJSON) -> None:
self._rator = rator
self._rands = rands

def __to_cryptol__(self, ty : CryptolType) -> Any:
def __to_cryptol__(self, ty : CryptolType) -> JSON:
return {'expression': 'call',
'function': to_cryptol(self._rator),
'arguments': [to_cryptol(arg) for arg in self._rands]}

def __to_cryptol_str__(self) -> str:
if len(self._rands) == 0:
return self._rator.__to_cryptol_str__()
else:
return ' '.join(parenthesize(x.__to_cryptol_str__()) for x in [self._rator, *self._rands])

def __eq__(self, other : Any) -> bool:
return isinstance(other, CryptolApplication) and self._rator == other._rator and self._rands == other._rands

def __repr__(self) -> str:
return f'CryptolApplication({", ".join(repr(x) for x in [self._rator, *self._rands])})'


class CryptolArrowKind:
def __init__(self, dom : CryptolKind, ran : CryptolKind):
self.domain = dom
Expand Down Expand Up @@ -93,7 +163,7 @@ def __repr__(self) -> str:
return f"Logic({self.subject!r})"


def to_cryptol(val : Any, cryptol_type : Optional[CryptolType] = None) -> Any:
def to_cryptol(val : Any, cryptol_type : Optional[CryptolType] = None) -> JSON:
if cryptol_type is not None:
return cryptol_type.from_python(val)
else:
Expand All @@ -115,11 +185,12 @@ def is_plausible_json(val : Any) -> bool:
return False

class CryptolType:
def from_python(self, val : Any) -> Any:
def from_python(self, val : Any) -> JSON:
if hasattr(val, '__to_cryptol__'):
code = val.__to_cryptol__(self)
if is_plausible_json(code):
return code
# the call to is_plausible_json ensures this cast is OK
return cast(JSON, code)
else:
raise ValueError(f"Improbable JSON from __to_cryptol__: {val!r} gave {code!r}")
# if isinstance(code, CryptolCode):
Expand All @@ -129,7 +200,7 @@ def from_python(self, val : Any) -> Any:
else:
return self.convert(val)

def convert(self, val : Any) -> Any:
def convert(self, val : Any) -> JSON:
if isinstance(val, bool):
return val
elif isinstance(val, tuple) and val == ():
Expand Down
79 changes: 79 additions & 0 deletions cryptol-remote-api/python/cryptol/custom_fstring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""An interface for defining custom f-string wrappers"""

from typing import Any, Callable, Dict, List
import builtins
import ast
import sys

def customf(body : str, onAST : Callable[[ast.FormattedValue], List[ast.expr]],
globals : Dict[str, Any] = {}, locals : Dict[str, Any] = {},
*, frames : int = 0, filename : str = "<custom f-string>") -> str:
"""This function parses the given string as if it were an f-string,
applies the given function to the AST of each of the formatting fields in
the string, then evaluates the result to get the resulting string.
By default, the global and local variables used in the call to ``eval``
are the value of ``sys.__getframe(1+frames).f_globals`` and the value of
``sys.__getframe(1+frames).f_locals``, respectively. This is meant to
ensure that the all the variables which were in scope when the custom
f-string is formed are also in scope when it is evaluated. Thus, the value
of ``frames`` should be incremented for each wrapper function defined
around this function (e.g. see the definition of ``func_customf``).
To add additional global or local variable values (which are combined with,
but take precedence over, the values mentioned in the previous paragraph)
use the ``globals`` and ``locals`` keyword arguments.
"""
# Get the global/local variable context of the previous frame so the local
# names 'body', 'onAST', etc. aren't shadowed our the call to ``eval``
previous_frame = sys._getframe(1 + frames)
all_globals = {**previous_frame.f_globals, **globals}
all_locals = {**previous_frame.f_locals, **locals}
# The below line should be where any f-string syntax errors are raised
tree = ast.parse('f' + str(repr(body)), filename=filename, mode='eval')
if not isinstance(tree, ast.Expression) or not isinstance(tree.body, ast.JoinedStr):
raise ValueError(f'Invalid custom f-string: {str(repr(body))}')
joined_values : List[ast.expr] = []
for node in tree.body.values:
if isinstance(node, ast.FormattedValue):
joined_values += onAST(node)
else:
joined_values += [node]
tree.body.values = joined_values
try:
return str(eval(compile(tree, filename=filename, mode='eval'), all_globals, all_locals))
except SyntaxError as e:
# I can't think of a case where we would raise an error here, but if
# we do it's worth telling the user that the column numbers are all
# messed up
msg = '\nNB: Column numbers refer to positions in the original string'
raise type(e)(str(e) + msg).with_traceback(sys.exc_info()[2])

def func_customf(body : str, func : Callable,
globals : Dict[str, Any] = {}, locals : Dict[str, Any] = {},
*, frames : int = 0, filename : str = "<custom f-string>",
doConvFmtAfter : bool = False,
func_id : str = "_func_customf__func_id") -> str:
"""Like ``customf``, but can be provided a function to apply to the values
of each of the formatting fields before they are formatted as strings,
instead of a function applied to their ASTs.
Unless the parameter ``doConvFmtAfter`` is set to ``True``, any conversions
(i.e. ``{...!s}``, ``{...!r}``, or ``{...!a}``) or format specifiers
(e.g. ``{...:>30}`` or ``{...:+f}``) in the input string will be applied
before the given function is applied. For example,
``func_customf('{5!r}', f)`` is the same as ``f'{f(repr(5))}'``, but
``func_customf('{5!r}', f, doConvFmtAfter=True)`` is ``f'{repr(f(5))}'``.
"""
def onAST(node : ast.FormattedValue) -> List[ast.expr]:
kwargs = {'lineno': node.lineno, 'col_offset': node.col_offset}
func = ast.Name(id=func_id, ctx=ast.Load(), **kwargs)
if doConvFmtAfter or (node.conversion == -1 and node.format_spec is None):
node.value = ast.Call(func=func, args=[node.value], keywords=[], **kwargs)
else:
node_str = ast.JoinedStr(values=[node], **kwargs)
node_val = ast.Call(func=func, args=[node_str], keywords=[], **kwargs)
node = ast.FormattedValue(value=node_val, conversion=-1, format_spec=None, **kwargs)
return [node]
return customf(body, onAST, globals, {**locals, func_id:func}, frames=1+frames,
filename=filename)
Loading

0 comments on commit abb41f5

Please sign in to comment.