Skip to content

Commit

Permalink
Merge pull request #176 from Maplemx/dev
Browse files Browse the repository at this point in the history
v3.4.0.3 updates
  • Loading branch information
Maplemx authored Oct 24, 2024
2 parents 1a8149b + 81b345b commit e81dd37
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 15 deletions.
8 changes: 6 additions & 2 deletions Agently/plugins/agent_component/EventListener.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ def __init__(self, agent: object):

def add(self, event:str, listener: callable, *, is_await:bool=False, is_agent_event:bool=False):
event = event.replace(".", "->")
if event == "realtime":
self.agent.settings.set("use_realtime", True)
if event.startswith("realtime:"):
self.agent.settings.set("use_realtime", True)
event_data = event.replace(" ", "").split(":")
hooks = event_data[1].replace("->", ".").split("&")
hook_list = []
Expand Down Expand Up @@ -55,8 +58,8 @@ async def realtime_hook_handler(data):
await listener(data)
else:
listener(data)
if event not in (self.listeners.get(trace_back=False) or {}):
self.listeners.update(event, [])
if "realtime" not in (self.listeners.get(trace_back=False) or {}):
self.listeners.update("realtime", [])
self.listeners.append("realtime", { "listener": realtime_hook_handler, "is_await": is_await })
else:
if is_agent_event:
Expand All @@ -83,6 +86,7 @@ def on_finally(self, listener: callable, *, is_await:bool=False, is_agent_event:
return self.agent

def on_realtime(self, listener: callable, *, is_await:bool=False, is_agent_event:bool=False):
self.agent.settings.set("use_realtime", True)
self.add("realtime", listener, is_await=is_await, is_agent_event=is_agent_event)
return self.agent

Expand Down
26 changes: 13 additions & 13 deletions Agently/plugins/agent_component/Realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
class Realtime(ComponentABC):
def __init__(self, agent: object):
self.agent = agent
self.__is_enable = False
self.__get_enable = self.agent.settings.get_trace_back("use_realtime")
self.__is_init = False
self.__on_going_key_id = None
self.__cached_value = {}
Expand All @@ -16,7 +16,7 @@ def __init__(self, agent: object):
self.__realtime_value = None

def use_realtime(self):
self.__is_enable = True
self.agent.settings.set("use_realtime", True)
return self.agent

def __scan_possible_keys(self, prompt_output_pointer, *, prefix:str=None):
Expand All @@ -32,16 +32,16 @@ def __scan_possible_keys(self, prompt_output_pointer, *, prefix:str=None):
return

async def __emit_realtime(self, key, indexes, delta, value):
await self.agent.call_event_listeners(
"realtime",
{
"key": key[1:],
"indexes": indexes,
"delta": delta,
"value": value,
"complete_value": self.__realtime_value,
}
)
event = "realtime"
data = {
"key": key[1:],
"indexes": indexes,
"delta": delta,
"value": value,
"complete_value": self.__realtime_value,
}
self.agent.put_data_to_generator(event, data)
await self.agent.call_event_listeners(event, data)

async def __scan_realtime_value(self, key: str, indexes:list, value:any):
indexes = indexes[:]
Expand Down Expand Up @@ -115,7 +115,7 @@ async def __emit_waiting(self, *, is_done:bool=False):

async def _suffix(self, event: str, data: any):
if (
not self.__is_enable
not self.agent.settings.get("use_realtime")
or "type" not in self.agent.request.response_cache
or self.agent.request.response_cache["type"] != "JSON"
):
Expand Down
77 changes: 77 additions & 0 deletions Agently/plugins/agent_component/ResponseGenerator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import threading
import asyncio
import queue
from .utils import ComponentABC

class ResponseGenerator(ComponentABC):
def __init__(self, agent):
self.agent = agent
self.data_queue = queue.Queue()

def put_data_to_generator(self, event, data):
self.data_queue.put((event, data))

def get_complete_generator(self):
thread = threading.Thread(target=self.agent.start)
thread.daemon = True
thread.start()
while True:
try:
item = self.data_queue.get_nowait()
if item == (None, None):
break
yield item
except:
continue
thread.join()

def get_realtime_generator(self):
self.agent.settings.set("use_realtime", True)
thread = threading.Thread(target=self.agent.start)
thread.daemon = True
thread.start()
while True:
try:
item = self.data_queue.get_nowait()
if item == (None, None):
break
if item[0] == "realtime":
yield item[1]
except:
continue
thread.join()

def get_generator(self):
thread = threading.Thread(target=self.agent.start)
thread.daemon = True
thread.start()
while True:
try:
item = self.data_queue.get_nowait()
if item == (None, None):
break
if not item[0].endswith(("_origin")):
yield item
except:
continue
thread.join()

def _suffix(self, event, data):
if event != "response:finally":
self.put_data_to_generator(event, data)
else:
self.put_data_to_generator(None, None)

def export(self):
return {
"suffix": self._suffix,
"alias": {
"put_data_to_generator": { "func": self.put_data_to_generator },
"get_generator": { "func": self.get_generator, "return_value": True },
"get_realtime_generator": { "func": self.get_realtime_generator, "return_value": True },
"get_complete_generator": { "func": self.get_complete_generator, "return_value": True },
},
}

def export():
return ("ResponseGenerator", ResponseGenerator)

0 comments on commit e81dd37

Please sign in to comment.