Skip to content

Commit

Permalink
Merge pull request #15 from luca-stamatescu/add_open_requests_count
Browse files Browse the repository at this point in the history
Add counts of requests currently processing
  • Loading branch information
technicianted authored Jan 5, 2024
2 parents 83f3717 + 892de81 commit 84c05e1
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 23 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,11 @@ The tool supports four different shape profiles via command line option `--shape
|-|-|-|-|
|`time`|Time offset in seconds since the start of the test.|no|`120`|
|`rpm`|Successful Requests Per Minute. Note that it may be less than `--rate` as it counts completed requests.|yes|`12`|
|`requests`|Total number of requests made.|no|`1233`|
|`processing`|Total number of requests currently being processed by the endpoint.|no|`100`|
|`completed`|Total number of completed requests.|no|`100`|
|`failures`|Total number of failed requests out of `requests`.|no|`100`|
|`throttled`|Total number of throttled requests out of `requests`.|no|`100`|
|`requests`|Deprecated in favor of `completed` field (output values of both fields are the same)|no|`1233`|
|`ctx_tpm`|Number of context Tokens Per Minute.|yes|`1200`|
|`gen_tpm`|Number of generated Tokens Per Minute.|yes|`156`|
|`ttft_avg`|Average time in seconds from the beginning of the request until the first token was received.|yes|`0.122`|
Expand Down
20 changes: 12 additions & 8 deletions benchmark/loadcmd.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import time
import logging
import math
import os
import sys
import time
from typing import Iterable, Iterator

import aiohttp
import wonderwords
import math
import logging
from typing import Iterable, Iterator
from .statsaggregator import _StatsAggregator
from .oairequester import OAIRequester

from .asynchttpexecuter import AsyncHTTPExecuter
from .ratelimiting import RateLimiter, NoRateLimiter
from .oairequester import OAIRequester
from .oaitokenizer import num_tokens_from_messages
from .ratelimiting import NoRateLimiter, RateLimiter
from .statsaggregator import _StatsAggregator

import sys

class _RequestBuilder:
"""
Expand Down Expand Up @@ -122,13 +124,15 @@ def _run_load(request_builder: Iterable[dict],
aggregator = _StatsAggregator(
window_duration=aggregation_duration,
dump_duration=1,
clients=max_concurrency,
json_output=json_output)
requester = OAIRequester(api_key, url, backoff=backoff)

async def request_func(session:aiohttp.ClientSession):
nonlocal aggregator
nonlocal requester
request_body, messages_tokens = request_builder.__next__()
aggregator.record_new_request()
stats = await requester.call(session, request_body)
stats.context_tokens = messages_tokens
try:
Expand Down
12 changes: 7 additions & 5 deletions benchmark/oairequester.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import time
import aiohttp
import logging
import asyncio
import logging
import time
from typing import Optional

import aiohttp
import backoff

# TODO: switch to using OpenAI client library once new headers are exposed.
Expand Down Expand Up @@ -119,8 +120,8 @@ async def _call(self, session:aiohttp.ClientSession, body: dict, stats: RequestS
async def _handle_response(self, response: aiohttp.ClientResponse, stats: RequestStats):
async with response:
stats.response_time = time.time()
async for l in response.content:
if not l.startswith(b'data:'):
async for line in response.content:
if not line.startswith(b'data:'):
continue
if stats.first_token_time is None:
stats.first_token_time = time.time()
Expand All @@ -142,3 +143,4 @@ def _read_utilization(self, response: aiohttp.ClientResponse, stats: RequestStat
except ValueError as e:
logging.warning(f"unable to parse utilization header value: {UTILIZATION_HEADER}={util_str}: {e}")


30 changes: 21 additions & 9 deletions benchmark/statsaggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import datetime
import json
import logging
import math
import threading
import time

Expand Down Expand Up @@ -42,10 +41,9 @@ class _StatsAggregator(threading.Thread):
terminate: threading.Event

start_time: float = 0
processing_requests_count: int = 0
total_requests_count: int = 0
total_failed_count: int = 0
requests_count: int = 0
failed_count: int = 0
throttled_count: int = 0

request_timestamps = _Samples()
Expand All @@ -58,12 +56,14 @@ class _StatsAggregator(threading.Thread):
generated_tokens = _Samples()
utilizations = _Samples()

def __init__(self, dump_duration:float=5, window_duration:float=60, json_output=False, *args,**kwargs):
def __init__(self, clients:int, dump_duration:float=5, window_duration:float=60, json_output=False, *args,**kwargs):
"""
:param clients: number of clients used in testing
:param dump_duration: duration in seconds to dump current aggregates.
:param window_duration: duration of sliding window in second to consider for aggregation.
:param json_output: whether to dump periodic stats as json or human readable.
"""
self.clients = clients
self.dump_duration = dump_duration
self.json_output = json_output
self.window_duration = window_duration
Expand All @@ -82,18 +82,26 @@ def run(self):

def stop(self):
self.terminate.set()
# Dump one more time to ensure we include the final request
self._dump()

def record_new_request(self):
"""
Records a new request, so that the number of processing requests is known.
"""
with self.lock:
self.processing_requests_count += 1

def aggregate_request(self, stats: RequestStats):
"""
Aggregates request stat within the sliding window.
:param stats: request stats object.
"""
with self.lock:
self.requests_count += 1
self.processing_requests_count -= 1
self.total_requests_count += 1
self.call_tries._append(stats.request_start_time, stats.calls)
if stats.response_status_code != 200:
self.failed_count += 1
self.total_failed_count += 1
if stats.response_status_code == 429:
self.throttled_count += 1
Expand Down Expand Up @@ -135,14 +143,18 @@ def _dump(self):
util_avg = f"{round(np.average(self.utilizations._values()), 1)}%" if self.utilizations._len() > 0 else "n/a"
util_95th = f"{round(np.percentile(self.utilizations._values(), 95), 1)}%" if self.utilizations._len() > 1 else "n/a"
rpm = round(60.0 * self.request_timestamps._len() / self.window_duration, 1) if self.request_timestamps._len() > 0 else "n/a"
# Handle the 1x extra processing_request due to next request being queued
processing_requests_count = min(self.clients, self.processing_requests_count)
if self.json_output:
j = {
"run_seconds": run_seconds,
"timestamp": timestamp,
"rpm": rpm,
"requests": self.requests_count,
"failures": self.failed_count,
"processing": processing_requests_count,
"completed": self.total_requests_count,
"failures": self.total_failed_count,
"throttled": self.throttled_count,
"requests": self.total_requests_count,
"tpm": {
"context": context_per_minute,
"gen": gen_per_minute,
Expand All @@ -167,7 +179,7 @@ def _dump(self):
}
print(json.dumps(j), flush=True)
else:
print(f"{timestamp} rpm: {rpm:<5} requests: {self.requests_count:<5} failures: {self.failed_count:<4} throttled: {self.throttled_count:<4} tpm: {tokens_per_minute:<6} ttft_avg: {ttft_avg:<6} ttft_95th: {ttft_95th:<6} tbt_avg: {tbt_avg:<6} tbt_95th: {tbt_95th:<6} e2e_avg: {e2e_latency_avg:<6} e2e_95th: {e2e_latency_95th:<6} util_avg: {util_avg:<6} util_95th: {util_95th:<6}", flush=True)
print(f"{timestamp} rpm: {rpm:<5} processing: {processing_requests_count:<4} completed: {self.total_requests_count:<5} failures: {self.total_failed_count:<4} throttled: {self.throttled_count:<4} requests: {self.total_requests_count:<5} tpm: {tokens_per_minute:<6} ttft_avg: {ttft_avg:<6} ttft_95th: {ttft_95th:<6} tbt_avg: {tbt_avg:<6} tbt_95th: {tbt_95th:<6} e2e_avg: {e2e_latency_avg:<6} e2e_95th: {e2e_latency_95th:<6} util_avg: {util_avg:<6} util_95th: {util_95th:<6}", flush=True)

def _slide_window(self):
with self.lock:
Expand Down

0 comments on commit 84c05e1

Please sign in to comment.