Skip to content

Commit

Permalink
Merge pull request #143 from stealthrocket/py308
Browse files Browse the repository at this point in the history
Python 3.8 support
  • Loading branch information
chriso authored Mar 27, 2024
2 parents bc89165 + 3634765 commit df098fd
Show file tree
Hide file tree
Showing 15 changed files with 242 additions and 65 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python: ['3.9', '3.10', '3.11', '3.12']
python: ['3.8', '3.9', '3.10', '3.11', '3.12']
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python }}
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name = "dispatch-py"
description = "Develop reliable distributed systems on the Dispatch platform."
readme = "README.md"
dynamic = ["version"]
requires-python = ">= 3.9"
requires-python = ">= 3.8"
dependencies = [
"grpcio >= 1.60.0",
"protobuf >= 4.24.0",
Expand Down
18 changes: 9 additions & 9 deletions src/dispatch/coroutine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import dataclass
from types import coroutine
from typing import Any, Awaitable
from typing import Any, Awaitable, List, Tuple

from dispatch.experimental.durable import durable
from dispatch.proto import Call
Expand All @@ -16,14 +16,14 @@ def call(call: Call) -> Any:

@coroutine
@durable
def gather(*awaitables: Awaitable[Any]) -> list[Any]: # type: ignore[misc]
def gather(*awaitables: Awaitable[Any]) -> List[Any]: # type: ignore[misc]
"""Alias for all."""
return all(*awaitables)


@coroutine
@durable
def all(*awaitables: Awaitable[Any]) -> list[Any]: # type: ignore[misc]
def all(*awaitables: Awaitable[Any]) -> List[Any]: # type: ignore[misc]
"""Concurrently run a set of coroutines, blocking until all coroutines
return or any coroutine raises an error. If any coroutine fails with an
uncaught exception, the exception will be re-raised here."""
Expand All @@ -32,7 +32,7 @@ def all(*awaitables: Awaitable[Any]) -> list[Any]: # type: ignore[misc]

@coroutine
@durable
def any(*awaitables: Awaitable[Any]) -> list[Any]: # type: ignore[misc]
def any(*awaitables: Awaitable[Any]) -> List[Any]: # type: ignore[misc]
"""Concurrently run a set of coroutines, blocking until any coroutine
returns or all coroutines raises an error. If all coroutines fail with
uncaught exceptions, the exception(s) will be re-raised here."""
Expand All @@ -41,7 +41,7 @@ def any(*awaitables: Awaitable[Any]) -> list[Any]: # type: ignore[misc]

@coroutine
@durable
def race(*awaitables: Awaitable[Any]) -> list[Any]: # type: ignore[misc]
def race(*awaitables: Awaitable[Any]) -> List[Any]: # type: ignore[misc]
"""Concurrently run a set of coroutines, blocking until any coroutine
returns or raises an error. If any coroutine fails with an uncaught
exception, the exception will be re-raised here."""
Expand All @@ -50,17 +50,17 @@ def race(*awaitables: Awaitable[Any]) -> list[Any]: # type: ignore[misc]

@dataclass
class AllDirective:
awaitables: tuple[Awaitable[Any], ...]
awaitables: Tuple[Awaitable[Any], ...]


@dataclass
class AnyDirective:
awaitables: tuple[Awaitable[Any], ...]
awaitables: Tuple[Awaitable[Any], ...]


@dataclass
class RaceDirective:
awaitables: tuple[Awaitable[Any], ...]
awaitables: Tuple[Awaitable[Any], ...]


class AnyException(RuntimeError):
Expand All @@ -69,7 +69,7 @@ class AnyException(RuntimeError):

__slots__ = ("exceptions",)

def __init__(self, exceptions: list[Exception]):
def __init__(self, exceptions: List[Exception]):
self.exceptions = exceptions

def __str__(self):
Expand Down
12 changes: 8 additions & 4 deletions src/dispatch/experimental/durable/frame.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
#define PY_SSIZE_T_CLEAN
#include <Python.h>

#if PY_MAJOR_VERSION != 3 || (PY_MINOR_VERSION < 9 || PY_MINOR_VERSION > 13)
# error Python 3.9-3.13 is required
#if PY_MAJOR_VERSION != 3 || (PY_MINOR_VERSION < 8 || PY_MINOR_VERSION > 13)
# error Python 3.8-3.13 is required
#endif

// This is a redefinition of the private PyTryBlock from <= 3.10.
// https://github.com/python/cpython/blob/3.8/Include/frameobject.h#L10
// https://github.com/python/cpython/blob/3.9/Include/cpython/frameobject.h#L11
// https://github.com/python/cpython/blob/3.10/Include/cpython/frameobject.h#L22
typedef struct {
Expand All @@ -19,7 +20,8 @@ typedef struct {
int b_level;
} PyTryBlock;

// This is a redefinition of the private PyCoroWrapper from 3.9-3.13.
// This is a redefinition of the private PyCoroWrapper from 3.8-3.13.
// https://github.com/python/cpython/blob/3.8/Objects/genobject.c#L840
// https://github.com/python/cpython/blob/3.9/Objects/genobject.c#L830
// https://github.com/python/cpython/blob/3.10/Objects/genobject.c#L884
// https://github.com/python/cpython/blob/3.11/Objects/genobject.c#L1016
Expand Down Expand Up @@ -53,7 +55,9 @@ static int get_frame_iblock(Frame *frame);
static void set_frame_iblock(Frame *frame, int iblock);
static PyTryBlock *get_frame_blockstack(Frame *frame);

#if PY_MINOR_VERSION == 9
#if PY_MINOR_VERSION == 8
#include "frame308.h"
#elif PY_MINOR_VERSION == 9
#include "frame309.h"
#elif PY_MINOR_VERSION == 10
#include "frame310.h"
Expand Down
145 changes: 145 additions & 0 deletions src/dispatch/experimental/durable/frame308.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// This is a redefinition of the private/opaque frame object.
//
// https://github.com/python/cpython/blob/3.8/Include/frameobject.h#L16
//
// In Python <= 3.10, `struct _frame` is both the PyFrameObject and
// PyInterpreterFrame. From Python 3.11 onwards, the two were split with the
// PyFrameObject (struct _frame) pointing to struct _PyInterpreterFrame.
struct Frame {
PyObject_VAR_HEAD
struct Frame *f_back; // struct _frame
PyCodeObject *f_code;
PyObject *f_builtins;
PyObject *f_globals;
PyObject *f_locals;
PyObject **f_valuestack;
PyObject **f_stacktop;
PyObject *f_trace;
char f_trace_lines;
char f_trace_opcodes;
PyObject *f_gen;
int f_lasti;
int f_lineno;
int f_iblock;
char f_executing;
PyTryBlock f_blockstack[CO_MAXBLOCKS];
PyObject *f_localsplus[1];
};

// Python 3.9 and prior didn't have an explicit enum of frame states,
// but we can derive them based on the presence of a frame, and other
// information found on the frame, for compatibility with later versions.
typedef enum _framestate {
FRAME_CREATED = -2,
FRAME_EXECUTING = 0,
FRAME_CLEARED = 4
} FrameState;

/*
// This is the definition of PyGenObject for reference to developers
// working on the extension.
//
// Note that PyCoroObject and PyAsyncGenObject have the same layout as
// PyGenObject, however the struct fields have a cr_ and ag_ prefix
// (respectively) rather than a gi_ prefix. In Python <= 3.10, PyCoroObject
// and PyAsyncGenObject have extra fields compared to PyGenObject. In Python
// 3.11 onwards, the three objects are identical (except for field name
// prefixes). The extra fields in Python <= 3.10 are not applicable to the
// extension at this time.
//
// https://github.com/python/cpython/blob/3.8/Include/genobject.h#L17
typedef struct {
PyObject_HEAD
PyFrameObject *gi_frame;
char gi_running;
PyObject *gi_code;
PyObject *gi_weakreflist;
PyObject *gi_name;
PyObject *gi_qualname;
_PyErr_StackItem gi_exc_state;
} PyGenObject;
*/

static Frame *get_frame(PyGenObject *gen_like) {
Frame *frame = (Frame *)(gen_like->gi_frame);
assert(frame);
return frame;
}

static PyCodeObject *get_frame_code(Frame *frame) {
PyCodeObject *code = frame->f_code;
assert(code);
return code;
}

static int get_frame_lasti(Frame *frame) {
return frame->f_lasti;
}

static void set_frame_lasti(Frame *frame, int lasti) {
frame->f_lasti = lasti;
}

static int get_frame_state(PyGenObject *gen_like) {
// Python 3.8 doesn't have frame states, but we can derive
// some for compatibility with later versions and to simplify
// the extension.
Frame *frame = (Frame *)(gen_like->gi_frame);
if (!frame) {
return FRAME_CLEARED;
}
return frame->f_executing ? FRAME_EXECUTING : FRAME_CREATED;
}

static void set_frame_state(PyGenObject *gen_like, int fs) {
Frame *frame = get_frame(gen_like);
frame->f_executing = (fs == FRAME_EXECUTING);
}

static int valid_frame_state(int fs) {
return fs == FRAME_CREATED || fs == FRAME_EXECUTING || fs == FRAME_CLEARED;
}

static int get_frame_stacktop_limit(Frame *frame) {
PyCodeObject *code = get_frame_code(frame);
return code->co_stacksize + code->co_nlocals;
}

static int get_frame_stacktop(Frame *frame) {
assert(frame->f_localsplus);
int stacktop = (int)(frame->f_stacktop - frame->f_localsplus);
assert(stacktop >= 0 && stacktop < get_frame_stacktop_limit(frame));
return stacktop;
}

static void set_frame_stacktop(Frame *frame, int stacktop) {
assert(stacktop >= 0 && stacktop < get_frame_stacktop_limit(frame));
assert(frame->f_localsplus);
frame->f_stacktop = frame->f_localsplus + stacktop;
}

static PyObject **get_frame_localsplus(Frame *frame) {
PyObject **localsplus = frame->f_localsplus;
assert(localsplus);
return localsplus;
}

static int get_frame_iblock_limit(Frame *frame) {
return CO_MAXBLOCKS;
}

static int get_frame_iblock(Frame *frame) {
return frame->f_iblock;
}

static void set_frame_iblock(Frame *frame, int iblock) {
assert(iblock >= 0 && iblock < get_frame_iblock_limit(frame));
frame->f_iblock = iblock;
}

static PyTryBlock *get_frame_blockstack(Frame *frame) {
PyTryBlock *blockstack = frame->f_blockstack;
assert(blockstack);
return blockstack;
}

19 changes: 15 additions & 4 deletions src/dispatch/experimental/durable/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,18 @@
MethodType,
TracebackType,
)
from typing import Any, Callable, Coroutine, Generator, Optional, TypeVar, Union, cast
from typing import (
Any,
Callable,
Coroutine,
Dict,
Generator,
Optional,
Tuple,
TypeVar,
Union,
cast,
)

from . import frame as ext
from .registry import RegisteredFunction, lookup_function, register_function
Expand Down Expand Up @@ -78,8 +89,8 @@ class Serializable:
g: Union[GeneratorType, CoroutineType]
registered_fn: RegisteredFunction
wrapped_coroutine: Union["DurableCoroutine", None]
args: tuple[Any, ...]
kwargs: dict[str, Any]
args: Tuple[Any, ...]
kwargs: Dict[str, Any]

def __init__(
self,
Expand Down Expand Up @@ -274,7 +285,7 @@ def cr_await(self) -> Any:
return self.coroutine.cr_await

@property
def cr_origin(self) -> Optional[tuple[tuple[str, int, str], ...]]:
def cr_origin(self) -> Optional[Tuple[Tuple[str, int, str], ...]]:
return self.coroutine.cr_origin

def __repr__(self) -> str:
Expand Down
3 changes: 2 additions & 1 deletion src/dispatch/experimental/durable/registry.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import hashlib
from dataclasses import dataclass
from types import FunctionType
from typing import Dict


@dataclass
Expand Down Expand Up @@ -46,7 +47,7 @@ def __setstate__(self, state):
self.hash = code_hash


_REGISTRY: dict[str, RegisteredFunction] = {}
_REGISTRY: Dict[str, RegisteredFunction] = {}


def register_function(fn: FunctionType) -> RegisteredFunction:
Expand Down
7 changes: 4 additions & 3 deletions src/dispatch/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
Dict,
Generic,
Iterable,
List,
Optional,
TypeVar,
overload,
Expand Down Expand Up @@ -329,7 +330,7 @@ def batch(self) -> Batch:
a set of calls to dispatch."""
return Batch(self)

def dispatch(self, calls: Iterable[Call]) -> list[DispatchID]:
def dispatch(self, calls: Iterable[Call]) -> List[DispatchID]:
"""Dispatch function calls.
Args:
Expand Down Expand Up @@ -369,7 +370,7 @@ class Batch:

def __init__(self, client: Client):
self.client = client
self.calls: list[Call] = []
self.calls: List[Call] = []

def add(self, func: Function[P, T], *args: P.args, **kwargs: P.kwargs) -> Batch:
"""Add a call to the specified function to the batch."""
Expand All @@ -380,7 +381,7 @@ def add_call(self, call: Call) -> Batch:
self.calls.append(call)
return self

def dispatch(self) -> list[DispatchID]:
def dispatch(self) -> List[DispatchID]:
"""Dispatch dispatches the calls asynchronously.
The batch is reset when the calls are dispatched successfully.
Expand Down
Loading

0 comments on commit df098fd

Please sign in to comment.