Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Otel exporter #529

Merged
merged 63 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
745b78f
feat(session): integrate OpenTelemetry for event tracing
teocns Nov 21, 2024
0f16593
remove deprecated `header` params from HttpClient
teocns Nov 22, 2024
8e3696a
refactor(http_client): use `header`
teocns Nov 22, 2024
7b1f23c
Move SessionExporter to `session`
teocns Nov 22, 2024
1156d93
draft
teocns Nov 22, 2024
96c72f3
refactor(session): update session tracer provider management
teocns Nov 22, 2024
543dc05
rmv sess. tests
teocns Nov 22, 2024
cb52558
deactivate console exporter
teocns Nov 23, 2024
ea92035
tests: mock processor to use linear simple span processor (no batch)
teocns Nov 23, 2024
de888fe
revert setup fixture and extra tests
teocns Nov 23, 2024
9ee5bb5
refactor(session): restore otel_exporter for span processing
teocns Nov 23, 2024
70e10f4
test: Enhance assertions in session tests
teocns Nov 23, 2024
e888b71
refactor(session): reorder tracer initialization after session start
teocns Nov 23, 2024
78041ca
revert TestSingleSession.test_add_tags
teocns Nov 23, 2024
b713e51
revert TestSingleSession.test_session
teocns Nov 23, 2024
2db1c5c
fix: add missing api_key to endpoint calls
teocns Nov 23, 2024
bf7593d
revert: Session._start_session
teocns Nov 23, 2024
1e10dcf
match order of execution with original / ease of comparison
teocns Nov 23, 2024
19d96f3
remove queue attr
teocns Nov 23, 2024
9b54cdb
refactor(session): persisted span processor attr;
teocns Nov 23, 2024
31b6ced
flush_now flag
teocns Nov 23, 2024
c82de5c
small improves
teocns Nov 23, 2024
b794ad9
Improve general lifecycle management of OTEL
teocns Nov 23, 2024
b68c1dc
Removed class-level state from SessionExporterEach Session now gets i…
teocns Nov 23, 2024
d0b217a
14 errors
teocns Nov 23, 2024
ff776b3
13 errors
teocns Nov 23, 2024
65f4b1c
1 error
teocns Nov 23, 2024
a4e363b
0 error
teocns Nov 23, 2024
59fb0e2
Cleanup deps
teocns Nov 23, 2024
e834aef
refactored code for `get_analytics` method merged in `main`
the-praxs Nov 23, 2024
a71807b
tests for the `get_analytics` method
the-praxs Nov 23, 2024
7198696
Merge branch 'main' into otel-exporter
the-praxs Nov 23, 2024
10aa8db
linting
the-praxs Nov 23, 2024
f549fef
oops
the-praxs Nov 23, 2024
feaa3ab
add tests targeting SessionExporter
teocns Nov 25, 2024
2333dbe
SessionExporter: Added default value using current UTC time when end_…
teocns Nov 25, 2024
8880c12
Moved timestamp handling earlier in the process, before OpenTelemetry…
teocns Nov 25, 2024
62bf845
test: add test for exporting LLM event handling
teocns Nov 25, 2024
a828b90
test: add test for handling missing event ID in export
teocns Nov 25, 2024
2934e56
add session url
areibman Nov 25, 2024
ec26373
feat(HttpClient): add session management and header preparation
teocns Nov 26, 2024
07b9985
feat(HttpClient): Cache host env
teocns Nov 26, 2024
3877595
refactor(HttpClient): improve session management and headers
teocns Nov 26, 2024
259c045
feat(session): add static resource management in exporter
teocns Nov 26, 2024
894c846
feat(session): force flush pending spans on session end
teocns Nov 26, 2024
9589e73
replace core manual test
areibman Nov 26, 2024
8f712b3
remove log flag
areibman Nov 26, 2024
4f8f1eb
refactor(client): simplify host_env retrieval logic
teocns Nov 26, 2024
793b196
refactor(http_client): covnert _prepare_headers to classmethod
teocns Nov 26, 2024
d7f862f
ruff
teocns Nov 29, 2024
07f8d79
added cost param to LLMEvent (how was this not here before??)
areibman Nov 30, 2024
23a7352
fix autogen. Added costs and completions
areibman Nov 30, 2024
918ecd7
remove prints
areibman Nov 30, 2024
3cb5563
better completion grabber
areibman Nov 30, 2024
dc9312b
revert autogen line
areibman Dec 1, 2024
8d542c0
updated autogen completions
areibman Dec 1, 2024
4c53ab3
black fixes
areibman Dec 2, 2024
2c65f0c
Revert "updated autogen completions"
areibman Dec 2, 2024
1d7e7ab
ruff format
areibman Dec 2, 2024
7ac5733
revert notebook
areibman Dec 2, 2024
515db5f
revert math notebook
areibman Dec 2, 2024
dd46614
revert ollama notebook
areibman Dec 2, 2024
f96bd65
revert anthropic notebook
areibman Dec 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions agentops/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,29 @@
Client: Provides methods to interact with the AgentOps service.
"""

import inspect
import atexit
import inspect
import logging
import os
import signal
import sys
import threading
import traceback
from decimal import Decimal
from functools import cached_property
from typing import List, Optional, Tuple, Union
from uuid import UUID, uuid4
from typing import Optional, List, Union, Tuple

from termcolor import colored

from .event import Event, ErrorEvent
from .singleton import (
conditional_singleton,
)
from .session import Session, active_sessions
from .config import Configuration
from .event import ErrorEvent, Event
from .host_env import get_host_env
from .llms import LlmTracker
from .log_config import logger
from .meta_client import MetaClient
from .config import Configuration
from .llms import LlmTracker
from .session import Session, active_sessions
from .singleton import conditional_singleton


@conditional_singleton
Expand All @@ -39,6 +39,7 @@ def __init__(self):
self._sessions: List[Session] = active_sessions
self._config = Configuration()
self._pre_init_queue = {"agents": []}
self._host_env = None # Cache host env data

self.configure(
api_key=os.environ.get("AGENTOPS_API_KEY"),
Expand Down Expand Up @@ -111,6 +112,7 @@ def initialize(self) -> Union[Session, None]:
def _initialize_autogen_logger(self) -> None:
try:
import autogen

from .partners.autogen_logger import AutogenLogger

autogen.runtime_logging.start(logger=AutogenLogger())
Expand Down Expand Up @@ -224,7 +226,7 @@ def start_session(
session = Session(
session_id=session_id,
tags=list(session_tags),
host_env=get_host_env(self._config.env_data_opt_out),
host_env=self.host_env,
config=self._config,
)

Expand Down Expand Up @@ -430,3 +432,8 @@ def api_key(self):
@property
def parent_key(self):
return self._config.parent_key

@cached_property
def host_env(self):
"""Cache and reuse host environment data"""
return get_host_env(self._config.env_data_opt_out)
1 change: 1 addition & 0 deletions agentops/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class LLMEvent(Event):
prompt_tokens: Optional[int] = None
completion: Union[str, object] = None
completion_tokens: Optional[int] = None
cost: Optional[float] = None
model: Optional[str] = None


Expand Down
108 changes: 75 additions & 33 deletions agentops/http_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from enum import Enum
from typing import Optional
from requests.adapters import Retry, HTTPAdapter
from typing import Optional, Dict, Any

import requests
from requests.adapters import HTTPAdapter, Retry
import json

from .exceptions import ApiServerException

Expand Down Expand Up @@ -54,33 +56,79 @@


class HttpClient:
@staticmethod
_session: Optional[requests.Session] = None

@classmethod
def get_session(cls) -> requests.Session:
"""Get or create the global session with optimized connection pooling"""
if cls._session is None:
cls._session = requests.Session()

# Configure connection pooling
adapter = requests.adapters.HTTPAdapter(
pool_connections=15, # Number of connection pools
pool_maxsize=256, # Connections per pool
max_retries=Retry(total=3, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504]),
)

# Mount adapter for both HTTP and HTTPS
cls._session.mount("http://", adapter)
cls._session.mount("https://", adapter)

# Set default headers
cls._session.headers.update(
{
"Connection": "keep-alive",
"Keep-Alive": "timeout=10, max=1000",
"Content-Type": "application/json",
}
)

return cls._session

@classmethod
def _prepare_headers(
cls,
api_key: Optional[str] = None,
parent_key: Optional[str] = None,
jwt: Optional[str] = None,
custom_headers: Optional[dict] = None,
) -> dict:
"""Prepare headers for the request"""
headers = JSON_HEADER.copy()

if api_key is not None:
headers["X-Agentops-Api-Key"] = api_key

if parent_key is not None:
headers["X-Agentops-Parent-Key"] = parent_key

Check warning on line 104 in agentops/http_client.py

View check run for this annotation

Codecov / codecov/patch

agentops/http_client.py#L104

Added line #L104 was not covered by tests

if jwt is not None:
headers["Authorization"] = f"Bearer {jwt}"

if custom_headers is not None:
headers.update(custom_headers)

Check warning on line 110 in agentops/http_client.py

View check run for this annotation

Codecov / codecov/patch

agentops/http_client.py#L110

Added line #L110 was not covered by tests

return headers

@classmethod
def post(
cls,
url: str,
payload: bytes,
api_key: Optional[str] = None,
parent_key: Optional[str] = None,
jwt: Optional[str] = None,
header=None,
header: Optional[Dict[str, str]] = None,
) -> Response:
"""Make HTTP POST request using connection pooling"""
result = Response()
try:
# Create request session with retries configured
request_session = requests.Session()
request_session.mount(url, HTTPAdapter(max_retries=retry_config))

if api_key is not None:
JSON_HEADER["X-Agentops-Api-Key"] = api_key

if parent_key is not None:
JSON_HEADER["X-Agentops-Parent-Key"] = parent_key

if jwt is not None:
JSON_HEADER["Authorization"] = f"Bearer {jwt}"

res = request_session.post(url, data=payload, headers=JSON_HEADER, timeout=20)

headers = cls._prepare_headers(api_key, parent_key, jwt, header)
session = cls.get_session()
res = session.post(url, data=payload, headers=headers, timeout=20)
result.parse(res)

except requests.exceptions.Timeout:
result.code = 408
result.status = HttpStatus.TIMEOUT
Expand Down Expand Up @@ -112,28 +160,22 @@

return result

@staticmethod
@classmethod
def get(
cls,
url: str,
api_key: Optional[str] = None,
jwt: Optional[str] = None,
header=None,
header: Optional[Dict[str, str]] = None,
) -> Response:
"""Make HTTP GET request using connection pooling"""
result = Response()
try:
# Create request session with retries configured
request_session = requests.Session()
request_session.mount(url, HTTPAdapter(max_retries=retry_config))

if api_key is not None:
JSON_HEADER["X-Agentops-Api-Key"] = api_key

if jwt is not None:
JSON_HEADER["Authorization"] = f"Bearer {jwt}"

res = request_session.get(url, headers=JSON_HEADER, timeout=20)

headers = cls._prepare_headers(api_key, None, jwt, header)
session = cls.get_session()
res = session.get(url, headers=headers, timeout=20)

Check warning on line 176 in agentops/http_client.py

View check run for this annotation

Codecov / codecov/patch

agentops/http_client.py#L174-L176

Added lines #L174 - L176 were not covered by tests
result.parse(res)

except requests.exceptions.Timeout:
result.code = 408
result.status = HttpStatus.TIMEOUT
Expand Down
6 changes: 3 additions & 3 deletions agentops/llms/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ async def async_generator():

The raw response has the following structure:
{
'id': str, # Message ID (e.g. 'msg_018Gk9N2pcWaYLS7mxXbPD5i')
'id': str, # Message ID (e.g. 'msg_018Gk9N2pcWaYLS7mxXbPD5i')
'type': str, # Type of response (e.g. 'message')
'role': str, # Role of responder (e.g. 'assistant')
'model': str, # Model used (e.g. 'claude-3-5-sonnet-20241022')
Expand All @@ -151,7 +151,7 @@ async def async_generator():
}

Note: We import Anthropic types here since the package must be installed
for raw responses to be available; doing so in the global scope would
for raw responses to be available; doing so in the global scope would
result in dependencies error since this provider is not lazily imported (tests fail)
"""
from anthropic import APIResponse
Expand All @@ -167,7 +167,7 @@ async def async_generator():
llm_event.model = response_data["model"]
llm_event.completion = {
"role": response_data.get("role"),
"content": response_data.get("content")[0].get("text") if response_data.get("content") else "",
"content": (response_data.get("content")[0].get("text") if response_data.get("content") else ""),
}
if usage := response_data.get("usage"):
llm_event.prompt_tokens = usage.get("input_tokens")
Expand Down
8 changes: 5 additions & 3 deletions agentops/partners/autogen_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
from openai.types.chat import ChatCompletion

from autogen.logger.base_logger import BaseLogger, LLMConfig
from autogen.logger.logger_utils import get_current_ts, to_dict

from agentops.enums import EndState
from agentops.helpers import get_ISO_time

from agentops import LLMEvent, ToolEvent, ActionEvent
from uuid import uuid4
Expand Down Expand Up @@ -55,17 +55,19 @@ def log_chat_completion(
start_time: str,
) -> None:
"""Records an LLMEvent to AgentOps session"""
end_time = get_current_ts()

completion = response.choices[len(response.choices) - 1]

# Note: Autogen tokens are not included in the request and function call tokens are not counted in the completion
llm_event = LLMEvent(
prompt=request["messages"],
completion=completion.message,
model=response.model,
cost=cost,
returns=completion.message.to_json(),
)
llm_event.init_timestamp = start_time
llm_event.end_timestamp = end_time
llm_event.end_timestamp = get_ISO_time()
llm_event.agent_id = self._get_agentops_id_from_agent(str(id(agent)))
agentops.record(llm_event)

Expand Down
Loading
Loading