Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

python[minor]: Release 0.3.0 #1439

Merged
merged 22 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 4 additions & 18 deletions python/docs/create_api_rst.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Script for auto-generating api_reference.rst."""

Check notice on line 1 in python/docs/create_api_rst.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

........... create_5_000_run_trees: Mean +- std dev: 679 ms +- 58 ms ........... create_10_000_run_trees: Mean +- std dev: 1.35 sec +- 0.08 sec ........... create_20_000_run_trees: Mean +- std dev: 2.60 sec +- 0.21 sec ........... dumps_class_nested_py_branch_and_leaf_200x400: Mean +- std dev: 718 us +- 12 us ........... dumps_class_nested_py_leaf_50x100: Mean +- std dev: 24.8 ms +- 0.3 ms ........... dumps_class_nested_py_leaf_100x200: Mean +- std dev: 104 ms +- 2 ms ........... dumps_dataclass_nested_50x100: Mean +- std dev: 25.2 ms +- 0.3 ms ........... WARNING: the benchmark result may be unstable * the standard deviation (16.1 ms) is 23% of the mean (69.4 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. dumps_pydantic_nested_50x100: Mean +- std dev: 69.4 ms +- 16.1 ms ........... dumps_pydanticv1_nested_50x100: Mean +- std dev: 193 ms +- 2 ms

Check notice on line 1 in python/docs/create_api_rst.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------------------+----------+------------------------+ | Benchmark | main | changes | +===============================================+==========+========================+ | dumps_pydanticv1_nested_50x100 | 214 ms | 193 ms: 1.11x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_50x100 | 25.3 ms | 24.8 ms: 1.02x faster | +-----------------------------------------------+----------+------------------------+ | dumps_dataclass_nested_50x100 | 25.4 ms | 25.2 ms: 1.01x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_100x200 | 104 ms | 104 ms: 1.00x faster | +-----------------------------------------------+----------+------------------------+ | create_20_000_run_trees | 2.59 sec | 2.60 sec: 1.00x slower | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_branch_and_leaf_200x400 | 707 us | 718 us: 1.02x slower | +-----------------------------------------------+----------+------------------------+ | create_5_000_run_trees | 665 ms | 679 ms: 1.02x slower | +-----------------------------------------------+----------+------------------------+ | dumps_pydantic_nested_50x100 | 67.6 ms | 69.4 ms: 1.03x slower | +-----------------------------------------------+----------+------------------------+ | create_10_000_run_trees | 1.29 sec | 1.35 sec: 1.05x slower | +-----------------------------------------------+----------+------------------------+ | Geometric mean | (ref) | 1.00x faster | +-----------------------------------------------+----------+------------------------+

import importlib
import inspect
Expand Down Expand Up @@ -108,18 +108,6 @@
else "Pydantic" if issubclass(type_, BaseModel) else "Regular"
)
)
# if hasattr(type_, "__slots__"):
# for func_name, func_type in inspect.getmembers(type_):
# if inspect.isfunction(func_type):
# functions.append(
# FunctionInfo(
# name=func_name,
# qualified_name=f"{namespace}.{name}.{func_name}",
# is_public=not func_name.startswith("_"),
# is_deprecated=".. deprecated::"
# in (func_type.__doc__ or ""),
# )
# )
classes_.append(
ClassInfo(
name=name,
Expand Down Expand Up @@ -156,7 +144,7 @@
if file_path.name not in {
"_runner.py",
"_arunner.py",
"_testing.py",
"_internal.py",
"_expect.py",
"_openai.py",
}:
Expand Down Expand Up @@ -200,6 +188,7 @@
"utils",
"anonymizer",
"wrappers",
"testing",
]


Expand Down Expand Up @@ -387,20 +376,17 @@
| [AsyncClient](async_client/langsmith.async_client.AsyncClient) | Asynchronous client for interacting with the LangSmith API. |
| [traceable](run_helpers/langsmith.run_helpers.traceable) | Wrapper/decorator for tracing any function. |
| [wrap_openai](wrappers/langsmith.wrappers._openai.wrap_openai) | Wrapper for OpenAI client, adds LangSmith tracing to all OpenAI calls. |
| [evaluate](evaluation/langsmith.evaluation._runner.evaluate) | Evaluate an application on a dataset. |
| [aevaluate](evaluation/langsmith.evaluation._arunner.aevaluate) | Asynchronously evaluate an application on a dataset. |
| [unit](_testing/langsmith._testing.unit) | Create a LangSmith unit test. |
| [@pytest.mark.langsmith](/testing/langsmith.testing._internal.test) | LangSmith pytest integration. |

```{{toctree}}
:maxdepth: 2
:hidden:

client<client>
async_client<async_client>
evaluation<evaluation>
run_helpers<run_helpers>
wrappers<wrappers>
_testing<_testing>
testing<testing>
```

"""
Expand Down
6 changes: 3 additions & 3 deletions python/langsmith/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

if TYPE_CHECKING:
from langsmith._expect import expect
from langsmith._testing import test, unit
from langsmith.async_client import AsyncClient
from langsmith.client import Client
from langsmith.evaluation import aevaluate, evaluate
Expand All @@ -18,6 +17,7 @@
tracing_context,
)
from langsmith.run_trees import RunTree
from langsmith.testing._internal import test, unit
from langsmith.utils import (
ContextThreadPoolExecutor,
)
Expand Down Expand Up @@ -63,7 +63,7 @@ def __getattr__(name: str) -> Any:
return traceable

elif name == "test":
from langsmith._testing import test
from langsmith.testing._internal import test

return test

Expand Down Expand Up @@ -104,7 +104,7 @@ def __getattr__(name: str) -> Any:
return get_current_run_tree

elif name == "unit":
from langsmith._testing import unit
from langsmith.testing._internal import unit

return unit
elif name == "ContextThreadPoolExecutor":
Expand Down
2 changes: 1 addition & 1 deletion python/langsmith/_expect.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ def value(self, value: Any) -> _Matcher:

def score(
self,
score: Union[float, int],
score: Union[float, int, bool],
*,
key: str = "score",
source_run_id: Optional[ls_client.ID_TYPE] = None,
Expand Down
100 changes: 81 additions & 19 deletions python/langsmith/_internal/_background_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import sys
import threading
import time
import weakref
from multiprocessing import cpu_count
from queue import Empty, Queue
Expand All @@ -20,6 +21,7 @@

from langsmith import schemas as ls_schemas
from langsmith import utils as ls_utils
from langsmith._internal._compressed_runs import CompressedRuns
from langsmith._internal._constants import (
_AUTO_SCALE_DOWN_NEMPTY_TRIGGER,
_AUTO_SCALE_UP_NTHREADS_LIMIT,
Expand Down Expand Up @@ -100,7 +102,8 @@ def _tracing_thread_drain_queue(
def _tracing_thread_drain_compressed_buffer(
client: Client, size_limit: int = 100, size_limit_bytes: int | None = 20_971_520
) -> Tuple[Optional[io.BytesIO], Optional[Tuple[int, int]]]:
assert client.compressed_runs is not None
if client.compressed_runs is None:
return None, None
with client.compressed_runs.lock:
client.compressed_runs.compressor_writer.flush()
current_size = client.compressed_runs.buffer.tell()
Expand Down Expand Up @@ -214,6 +217,24 @@ def tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None:
scale_up_qsize_trigger: int = batch_ingest_config["scale_up_qsize_trigger"]
use_multipart = batch_ingest_config.get("use_multipart_endpoint", False)

disable_compression = ls_utils.get_env_var("DISABLE_RUN_COMPRESSION")
if not ls_utils.is_truish(disable_compression) and use_multipart:
if not (client.info.instance_flags or {}).get(
"zstd_compression_enabled", False
):
logger.warning(
"Run compression is not enabled. Please update to the latest "
"version of LangSmith. Falling back to regular multipart ingestion."
)
else:
client._futures = set()
client.compressed_runs = CompressedRuns()
client._data_available_event = threading.Event()
threading.Thread(
target=tracing_control_thread_func_compress_parallel,
args=(weakref.ref(client),),
).start()

sub_threads: List[threading.Thread] = []
# 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached
num_known_refs = 3
Expand Down Expand Up @@ -256,6 +277,7 @@ def keep_thread_active() -> bool:
_tracing_thread_handle_batch(
client, tracing_queue, next_batch, use_multipart
)

# drain the queue on exit
while next_batch := _tracing_thread_drain_queue(
tracing_queue, limit=size_limit, block=False
Expand All @@ -264,12 +286,20 @@ def keep_thread_active() -> bool:


def tracing_control_thread_func_compress_parallel(
client_ref: weakref.ref[Client],
client_ref: weakref.ref[Client], flush_interval: float = 0.5
) -> None:
client = client_ref()
if client is None:
return

if (
client.compressed_runs is None
or client._data_available_event is None
or client._futures is None
):
logger.error("Required compression attributes not initialized")
return

batch_ingest_config = _ensure_ingest_config(client.info)
size_limit: int = batch_ingest_config["size_limit"]
size_limit_bytes = batch_ingest_config.get("size_limit_bytes", 20_971_520)
Expand Down Expand Up @@ -300,35 +330,67 @@ def keep_thread_active() -> bool:
# for now, keep thread alive
return True

last_flush_time = time.monotonic()

while True:
triggered = client._data_available_event.wait(timeout=0.05)
if not keep_thread_active():
break
if not triggered:
continue
client._data_available_event.clear()

data_stream, compressed_runs_info = _tracing_thread_drain_compressed_buffer(
client, size_limit, size_limit_bytes
)
# If data arrived, clear the event and attempt a drain
if triggered:
client._data_available_event.clear()

if data_stream is not None:
try:
future = HTTP_REQUEST_THREAD_POOL.submit(
client._send_compressed_multipart_req,
data_stream,
compressed_runs_info,
data_stream, compressed_runs_info = _tracing_thread_drain_compressed_buffer(
client, size_limit, size_limit_bytes
)
# If we have data, submit the send request
if data_stream is not None:
try:
future = HTTP_REQUEST_THREAD_POOL.submit(
client._send_compressed_multipart_req,
data_stream,
compressed_runs_info,
)
client._futures.add(future)
except RuntimeError:
client._send_compressed_multipart_req(
data_stream,
compressed_runs_info,
)
last_flush_time = time.monotonic()

else:
if (time.monotonic() - last_flush_time) >= flush_interval:
data_stream, compressed_runs_info = (
_tracing_thread_drain_compressed_buffer(
client, size_limit=1, size_limit_bytes=1
)
)
client._futures.add(future)
except RuntimeError:
client._send_compressed_multipart_req(data_stream, compressed_runs_info)
if data_stream is not None:
try:
cf.wait(
[
HTTP_REQUEST_THREAD_POOL.submit(
client._send_compressed_multipart_req,
data_stream,
compressed_runs_info,
)
]
)
except RuntimeError:
client._send_compressed_multipart_req(
data_stream,
compressed_runs_info,
)
last_flush_time = time.monotonic()

# Drain the buffer on exit
# Drain the buffer on exit (final flush)
try:
final_data_stream, compressed_runs_info = (
_tracing_thread_drain_compressed_buffer(
client, size_limit=1, size_limit_bytes=1
) # Force final drain
)
)
if final_data_stream is not None:
try:
Expand Down
6 changes: 3 additions & 3 deletions python/langsmith/_internal/_beta_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ class LangSmithBetaWarning(UserWarning):


@functools.lru_cache(maxsize=100)
def _warn_once(message: str) -> None:
warnings.warn(message, LangSmithBetaWarning, stacklevel=2)
def _warn_once(message: str, stacklevel: int = 2) -> None:
warnings.warn(message, LangSmithBetaWarning, stacklevel=stacklevel)


def warn_beta(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
_warn_once(f"Function {func.__name__} is in beta.")
_warn_once(f"Function {func.__name__} is in beta.", stacklevel=3)
return func(*args, **kwargs)

return wrapper
17 changes: 1 addition & 16 deletions python/langsmith/_internal/_compressed_runs.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
import io
import threading

try:
from zstandard import ZstdCompressor # type: ignore[import]

HAVE_ZSTD = True
except ImportError:
HAVE_ZSTD = False
from zstandard import ZstdCompressor # type: ignore[import]

from langsmith import utils as ls_utils

Expand All @@ -20,11 +15,6 @@ def __init__(self):
self.lock = threading.Lock()
self.uncompressed_size = 0

if not HAVE_ZSTD:
raise ImportError(
"zstandard package required for compression. "
"Install with 'pip install langsmith[compression]'"
)
self.compressor_writer = ZstdCompressor(
level=compression_level, threads=-1
).stream_writer(self.buffer, closefd=False)
Expand All @@ -34,11 +24,6 @@ def reset(self):
self.run_count = 0
self.uncompressed_size = 0

if not HAVE_ZSTD:
raise ImportError(
"zstandard package required for compression. "
"Install with 'pip install langsmith[compression]'"
)
self.compressor_writer = ZstdCompressor(
level=compression_level, threads=-1
).stream_writer(self.buffer, closefd=False)
7 changes: 5 additions & 2 deletions python/langsmith/_internal/_multipart.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from __future__ import annotations

from typing import Dict, Iterable, Tuple
from io import BufferedReader
from typing import Dict, Iterable, Tuple, Union

MultipartPart = Tuple[str, Tuple[None, bytes, str, Dict[str, str]]]
MultipartPart = Tuple[
str, Tuple[None, Union[bytes, BufferedReader], str, Dict[str, str]]
]


class MultipartPartsAndContext:
Expand Down
Loading
Loading