Skip to content

Commit c1ff18c

Browse files
authored
refactor(py/aio): don't require loop.run_forever, run until completion as necessary (#2483)
1 parent b7cf080 commit c1ff18c

File tree

4 files changed

+47
-30
lines changed

4 files changed

+47
-30
lines changed

py/packages/genkit-ai/src/genkit/ai/veneer.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ def __init__(
163163
self.thread = threading.Thread(
164164
target=self.start_server,
165165
args=[reflection_server_spec, self.loop],
166+
daemon=True,
166167
)
167168
self.thread.start()
168169
else:
@@ -193,7 +194,6 @@ def resolver(kind, name, plugin=plugin):
193194

194195
def join(self):
195196
if self.thread and self.loop:
196-
self.loop.run_forever()
197197
self.thread.join()
198198

199199
def start_server(

py/packages/genkit-ai/src/genkit/aio/loop.py

+30-26
Original file line numberDiff line numberDiff line change
@@ -29,33 +29,37 @@ def create_loop():
2929

3030
def run_async(loop: asyncio.AbstractEventLoop, fn: Callable):
3131
"""Schedules the callable on the even loop and blocks until it completes."""
32-
output = None
33-
error = None
34-
lock = threading.Lock()
35-
lock.acquire()
36-
37-
async def run_fn():
38-
nonlocal lock
39-
nonlocal output
40-
nonlocal error
41-
try:
42-
output = await fn()
43-
except Exception as e:
44-
error = e
45-
finally:
46-
lock.release()
47-
48-
asyncio.run_coroutine_threadsafe(run_fn(), loop=loop)
49-
50-
def wait_for_done():
51-
nonlocal lock
32+
if loop.is_running():
33+
output = None
34+
error = None
35+
lock = threading.Lock()
5236
lock.acquire()
5337

54-
thread = threading.Thread(target=wait_for_done)
55-
thread.start()
56-
thread.join()
38+
async def run_fn():
39+
nonlocal lock
40+
nonlocal output
41+
nonlocal error
42+
try:
43+
output = await fn()
44+
return output
45+
except Exception as e:
46+
error = e
47+
finally:
48+
lock.release()
5749

58-
if error:
59-
raise error
50+
asyncio.run_coroutine_threadsafe(run_fn(), loop=loop)
6051

61-
return output
52+
def wait_for_done():
53+
nonlocal lock
54+
lock.acquire()
55+
56+
thread = threading.Thread(target=wait_for_done)
57+
thread.start()
58+
thread.join()
59+
60+
if error:
61+
raise error
62+
63+
return output
64+
else:
65+
return loop.run_until_complete(fn())

py/packages/genkit-ai/src/genkit/core/reflection.py

+16-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,10 @@
7474

7575

7676
def make_reflection_server(
77-
registry: Registry, loop: asyncio.AbstractEventLoop, encoding='utf-8'
77+
registry: Registry,
78+
loop: asyncio.AbstractEventLoop,
79+
encoding='utf-8',
80+
quiet=True,
7881
):
7982
"""Create and return a ReflectionServer class with the given registry.
8083
@@ -93,6 +96,18 @@ class ReflectionServer(BaseHTTPRequestHandler):
9396
registered Genkit actions during development.
9497
"""
9598

99+
def log_message(self, format, *args):
100+
if not quiet:
101+
message = format % args
102+
logger.debug(
103+
'%s - - [%s] %s'
104+
% (
105+
self.address_string(),
106+
self.log_date_time_string(),
107+
message.translate(self._control_char_table),
108+
)
109+
)
110+
96111
def do_GET(self) -> None: # noqa: N802
97112
"""Handle GET requests to the reflection API.
98113

py/samples/google_genai/src/hello.py

-2
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515
# SPDX-License-Identifier: Apache-2.0
1616

1717
import asyncio
18-
import os
1918

20-
import anyio
2119
from pydantic import BaseModel, Field
2220

2321
from genkit.ai import Document, Genkit

0 commit comments

Comments
 (0)