Skip to content

Commit

Permalink
Add feedback API (#119)
Browse files Browse the repository at this point in the history
* Add feedback API

* Export feedback func

* Get feedback working
  • Loading branch information
steventkrawczyk authored Dec 31, 2023
1 parent 280f4e9 commit 326e8c4
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 4 deletions.
3 changes: 2 additions & 1 deletion prompttools/logger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
# LICENSE file in the root directory of this source tree.


from .logger import Logger
from .logger import Logger, add_feedback


__all__ = [
"Logger",
"add_feedback",
]
43 changes: 40 additions & 3 deletions prompttools/logger/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# This source code's license can be found in the
# LICENSE file in the root directory of this source tree.
import json
import uuid

import requests
import threading
Expand All @@ -27,13 +28,21 @@ class Logger:
def __init__(self):
self.backend_url = f"{HEGEL_BACKEND_URL}/sdk/logger"
self.data_queue = queue.Queue()
self.feedback_queue = queue.Queue()
self.worker_thread = threading.Thread(target=self.worker)

# When the main thread is joining, put `None` into queue to signal worker thread to end
threading.Thread(target=lambda: threading.main_thread().join() or self.data_queue.put(None)).start()

self.worker_thread.start()

def add_feedback(self, log_id, metric_name, value):
self.feedback_queue.put({
"log_id": log_id,
"key": metric_name,
"value": value
})

def execute_and_add_to_queue(self, callable_func, **kwargs):
if "hegel_model" in kwargs:
hegel_model = kwargs["hegel_model"]
Expand All @@ -43,28 +52,40 @@ def execute_and_add_to_queue(self, callable_func, **kwargs):
start = perf_counter()
result = callable_func(**kwargs)
latency = perf_counter() - start
log_id = str(uuid.uuid4())
self.data_queue.put(
{
"hegel_model": hegel_model,
"result": result.model_dump_json(),
"input_parameters": json.dumps(kwargs),
"latency": latency,
"log_id": log_id,
}
)
result.log_id = log_id
return result

def wrap(self, callable_func):
return partial(self.execute_and_add_to_queue, callable_func)

def worker(self):
while True:
# Process logging data
if not self.data_queue.empty():
result = self.data_queue.get()
if result is None:
data = self.data_queue.get()
if data is None: # Shutdown signal
return
self.log_data_to_remote(result)
self.log_data_to_remote(data)
self.data_queue.task_done()

# Process feedback data
if not self.feedback_queue.empty():
feedback_data = self.feedback_queue.get()
if feedback_data is None: # Shutdown signal
return
self.send_feedback_to_remote(feedback_data)
self.feedback_queue.task_done()

def log_data_to_remote(self, data):
try:
headers = {
Expand All @@ -78,6 +99,19 @@ def log_data_to_remote(self, data):
except requests.exceptions.RequestException as e:
print(f"Error sending data to Flask API: {e}")

def send_feedback_to_remote(self, feedback_data):
feedback_url = f"{HEGEL_BACKEND_URL}/sdk/add_feedback/"
try:
headers = {
"Content-Type": "application/json",
"Authorization": os.environ["HEGELAI_API_KEY"],
}

response = requests.post(feedback_url, json=feedback_data, headers=headers)
if response.status_code != 200:
print(f"Failed to send feedback to Flask API. Status code: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"Error sending feedback to Flask API: {e}")

sender = Logger()
# Monkey-patching
Expand All @@ -86,3 +120,6 @@ def log_data_to_remote(self, data):
except Exception:
print("You may need to add `OPENAI_API_KEY=''` to your `.env` file.")
raise

def add_feedback(*args):
sender.add_feedback(*args)

0 comments on commit 326e8c4

Please sign in to comment.