Skip to content

Commit

Permalink
feat: restart server on job timeout (#104)
Browse files Browse the repository at this point in the history
Co-authored-by: Avram Tudor <tudor.avram@8x8.com>
  • Loading branch information
quitrk and Avram Tudor authored Oct 8, 2024
1 parent 94397c2 commit 760aa16
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 24 deletions.
3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ ENV \
# https://docs.python.org/3/using/cmdline.html#envvar-PYTHONDONTWRITEBYTECODE
PYTHONDONTWRITEBYTECODE=1 \
PYTHONPATH=/app \
OUTLINES_CACHE_DIR=/app/vllm/outlines \
VLLM_CONFIG_ROOT=/app/vllm/config \
HF_HOME=/app/hf \
LLAMA_PATH="/models/Llama-3.1-8B-Instruct-Q8_0.gguf"

VOLUME [ "/models" ]
Expand Down
7 changes: 4 additions & 3 deletions skynet/modules/ttt/openai_api/app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import subprocess

from skynet import http_client
Expand Down Expand Up @@ -71,12 +72,12 @@ def destroy():


def restart():
log.info('Restarting OpenAI API server...')
log.info('Restarting Skynet...')

OPENAI_API_RESTART_COUNTER.inc()

destroy()
initialize()
# rely on the supervisor to restart the process
os._exit(1)


__all__ = ['destroy', 'initialize', 'restart']
48 changes: 27 additions & 21 deletions skynet/modules/ttt/summaries/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def can_run_next_job() -> bool:
return 'summaries:executor' in modules and (current_task is None or current_task.done())


def get_job_processor(customer_id: str) -> str:
def get_job_processor(customer_id: str) -> Processors:
options = get_credentials(customer_id)
secret = options.get('secret')
api_type = options.get('type')
Expand Down Expand Up @@ -125,6 +125,29 @@ async def run_job(job: Job) -> None:
exit_task.cancel()


async def update_done_job(job: Job, result: str, processor: Processors, has_failed: bool = False) -> None:
should_expire = not has_failed or processor != Processors.LOCAL

updated_job = await update_job(
expires=redis_exp_seconds if should_expire else None,
job_id=job.id,
end=time.time(),
status=JobStatus.ERROR if has_failed else JobStatus.SUCCESS,
result=result,
)

if not should_expire:
await db.rpush(ERROR_JOBS_KEY, job.id)
SUMMARY_ERROR_COUNTER.inc()

await db.lrem(RUNNING_JOBS_KEY, 0, job.id)

SUMMARY_DURATION_METRIC.observe(updated_job.computed_duration)
SUMMARY_INPUT_LENGTH_METRIC.observe(len(updated_job.payload.text))

log.info(f"Job {updated_job.id} duration: {updated_job.computed_duration} seconds")


async def _run_job(job: Job) -> None:
has_failed = False
result = None
Expand Down Expand Up @@ -177,26 +200,7 @@ async def _run_job(job: Job) -> None:
has_failed = True
result = str(e)

should_expire = not has_failed or processor != Processors.LOCAL

updated_job = await update_job(
expires=redis_exp_seconds if should_expire else None,
job_id=job.id,
end=time.time(),
status=JobStatus.ERROR if has_failed else JobStatus.SUCCESS,
result=result,
)

if not should_expire:
await db.rpush(ERROR_JOBS_KEY, job.id)
SUMMARY_ERROR_COUNTER.inc()

await db.lrem(RUNNING_JOBS_KEY, 0, job.id)

SUMMARY_DURATION_METRIC.observe(updated_job.computed_duration)
SUMMARY_INPUT_LENGTH_METRIC.observe(len(updated_job.payload.text))

log.info(f"Job {updated_job.id} duration: {updated_job.computed_duration} seconds")
await update_done_job(job, result, processor, has_failed)


def create_run_job_task(job: Job) -> asyncio.Task:
Expand Down Expand Up @@ -239,6 +243,8 @@ async def restart_on_timeout(job: Job) -> None:

log.warning(f"Job {job.id} timed out after {job_timeout} seconds, restarting...")

await update_done_job(job, "Job timed out", Processors.LOCAL, has_failed=True)

restart_openai_api()


Expand Down

0 comments on commit 760aa16

Please sign in to comment.