diff --git a/llama.cpp b/llama.cpp index 15fa07a..00ba2ff 160000 --- a/llama.cpp +++ b/llama.cpp @@ -1 +1 @@ -Subproject commit 15fa07a5c564d3ed7e7eb64b73272cedb27e73ec +Subproject commit 00ba2ff78100e187ae17987bacd1c916211718b2 diff --git a/skynet/modules/ttt/summaries/jobs.py b/skynet/modules/ttt/summaries/jobs.py index 4c5a56c..1f21ca6 100644 --- a/skynet/modules/ttt/summaries/jobs.py +++ b/skynet/modules/ttt/summaries/jobs.py @@ -137,6 +137,9 @@ async def _run_job(job: Job) -> None: await update_job(job_id=job.id, start=start, status=JobStatus.RUNNING, worker_id=worker_id) + customer_id = job.metadata.customer_id + processor = get_job_processor(customer_id) # may have changed since job was created + # add to running jobs list if not already there (which may occur on multiple worker disconnects while running the same job) if job.id not in await db.lrange(RUNNING_JOBS_KEY, 0, -1): await db.rpush(RUNNING_JOBS_KEY, job.id) @@ -147,10 +150,8 @@ async def _run_job(job: Job) -> None: result = job.payload.text else: try: - customer_id = job.metadata.customer_id options = get_credentials(customer_id) secret = options.get('secret') - processor = get_job_processor(customer_id) # may have changed since job was created if processor == Processors.OPENAI: log.info(f"Forwarding inference to OpenAI for customer {customer_id}") @@ -176,15 +177,17 @@ async def _run_job(job: Job) -> None: has_failed = True result = str(e) + should_expire = not has_failed or processor != Processors.LOCAL + updated_job = await update_job( - expires=redis_exp_seconds if not has_failed else None, + expires=redis_exp_seconds if should_expire else None, job_id=job.id, end=time.time(), status=JobStatus.ERROR if has_failed else JobStatus.SUCCESS, result=result, ) - if has_failed: + if not should_expire: await db.rpush(ERROR_JOBS_KEY, job.id) SUMMARY_ERROR_COUNTER.inc()