Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make a clean interface for jobs #939

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ services:
- RAY_WORKER_GPUS_FRACTION
- LUMIGATOR_API_CORS_ALLOWED_ORIGINS
- MLFLOW_TRACKING_URI
- PYTHONPATH=/mzai/lumigator/backend:/mzai/lumigator/jobs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this should go into the dockerfile? This is required for the thing to work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like hard coding these paths into the docker file. We may need more flexibility. Let's talk about it on Monday.

# NOTE: to keep AWS_ENDPOINT_URL as http://localhost:9000 both on the host system
# and inside containers, we map localhost to the host gateway IP.
# This currently works properly, but might be the cause of networking
Expand Down
69 changes: 69 additions & 0 deletions lumigator/backend/backend/services/job_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from abc import ABC, abstractmethod
from uuid import UUID

from lumigator_schemas.jobs import JobCreate
from pydantic import BaseModel


class JobDefinition(ABC):
"""Abstract base class for jobs.

Attributes:
----------
command : str
The command to execute the job.
pip_reqs : str, optional
Path to a requirements file specifying dependencies (default: None).
work_dir : str, optional
Working directory for the job (default: None).
config_model : BaseModel
Pydantic model representing job-specific configuration.

"""

command: str
pip_reqs: str | None
work_dir: str | None
config_model: BaseModel
type: str

# This should end up not being necessary, since we'd expose the whole job config
@abstractmethod
def generate_config(
self, request: JobCreate, record_id: UUID, dataset_path: str, storage_path: str, type: str
) -> BaseModel:
"""Generates

Parameters
----------
request : JobCreate
Non-job-specific parameters for job creation
record_id : UUID
Job ID assigned to the job
dataset_path : str
S3 path for the input dataset
storage_path : str
S3 where the backend will store the results from the job

Returns:
-------
generate_config : BaseModel
A job-specific pydantic model that will be sent to the Ray job instance
"""
pass

@abstractmethod
def store_as_dataset(self) -> bool:
"""Returns:
-------
store_as_dataset : bool
Whether the results should be stored in an S3 path
"""
pass

def __init__(self, command, pip_reqs, work_dir, config_model, type):
self.command = command
self.pip_reqs = pip_reqs
self.work_dir = work_dir
self.config_model = config_model
self.type = type
151 changes: 36 additions & 115 deletions lumigator/backend/backend/services/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,13 @@
from urllib.parse import urljoin
from uuid import UUID

import evaluator
import evaluator.interface
import inference
import inference.interface
import loguru
import requests
from evaluator.schemas import DatasetConfig as ELDatasetConfig
from evaluator.schemas import EvalJobConfig, EvaluationConfig
from fastapi import BackgroundTasks, UploadFile
from inference.schemas import DatasetConfig as IDatasetConfig
from inference.schemas import (
HfPipelineConfig,
InferenceJobConfig,
InferenceServerConfig,
SamplingParameters,
)
from inference.schemas import JobConfig as InferJobConfig
from lumigator_schemas.datasets import DatasetFormat
from lumigator_schemas.extras import ListingResponse
from lumigator_schemas.jobs import (
Expand All @@ -34,7 +28,6 @@
JobResultObject,
JobResultResponse,
JobStatus,
JobType,
)
from ray.job_submission import JobSubmissionClient
from s3fs import S3FileSystem
Expand All @@ -47,37 +40,34 @@
from backend.services.exceptions.dataset_exceptions import DatasetMissingFieldsError
from backend.services.exceptions.job_exceptions import (
JobNotFoundError,
JobTypeUnsupportedError,
JobUpstreamError,
JobValidationError,
)
from backend.settings import settings

# ADD YOUR JOB IMPORT HERE #
############################
job_modules = [evaluator, inference]
############################
job_settings_map = {
job_module.interface.JOB_INTERFACE.type: job_module.interface.JOB_INTERFACE for job_module in job_modules
}

DEFAULT_SKIP = 0
DEFAULT_LIMIT = 100
JobSpecificRestrictedConfig = type[JobEvalConfig | JobInferenceConfig]


# The end result should be that InferenceJobConfig is actually JobInferenceConfig
# (resp. Eval)
# For the moment, something will convert one into the other, and we'll decide where
# to put this. The jobs should ideally have no dependency towards the backend.


class JobService:
# set storage path
storage_path = f"s3://{Path(settings.S3_BUCKET) / settings.S3_JOB_RESULTS_PREFIX}/"

job_settings = {
JobType.INFERENCE: {
"command": settings.INFERENCE_COMMAND,
"pip": settings.INFERENCE_PIP_REQS,
"work_dir": settings.INFERENCE_WORK_DIR,
"ray_worker_gpus_fraction": settings.RAY_WORKER_GPUS_FRACTION,
"ray_worker_gpus": settings.RAY_WORKER_GPUS,
},
JobType.EVALUATION: {
"command": settings.EVALUATOR_COMMAND,
"pip": settings.EVALUATOR_PIP_REQS,
"work_dir": settings.EVALUATOR_WORK_DIR,
"ray_worker_gpus_fraction": settings.RAY_WORKER_GPUS_FRACTION,
"ray_worker_gpus": settings.RAY_WORKER_GPUS,
},
}

NON_TERMINAL_STATUS = [
JobStatus.CREATED.value,
JobStatus.PENDING.value,
Expand Down Expand Up @@ -266,7 +256,7 @@ def get_upstream_job_status(self, job_id: UUID) -> str:
def get_job_logs(self, job_id: UUID) -> JobLogsResponse:
db_logs = self.job_repo.get(job_id)
if not db_logs:
raise JobNotFoundError(job_id, "Failed to find the job record holding the logs")
raise JobNotFoundError(job_id, "Failed to find the job record holding the logs") from None
elif not db_logs.logs:
ray_db_logs = self.retrieve_job_logs(job_id)
self._update_job_record(job_id, logs=ray_db_logs.logs)
Expand Down Expand Up @@ -338,64 +328,6 @@ def add_background_task(self, background_tasks: BackgroundTasks, task: callable,
"""Adds a background task to the background tasks queue."""
background_tasks.add_task(task, *args)

def generate_inference_job_config(self, request: JobCreate, record_id: UUID, dataset_path: str, storage_path: str):
# TODO Move to a custom validator in the schema
if request.job_config.task == "text-generation" and not request.job_config.system_prompt:
raise JobValidationError("System prompt is required for text generation tasks.") from None
job_config = InferenceJobConfig(
name=f"{request.name}/{record_id}",
dataset=IDatasetConfig(path=dataset_path),
job=InferJobConfig(
max_samples=request.max_samples,
storage_path=storage_path,
# TODO Should be unnecessary, check
output_field=request.job_config.output_field or "predictions",
),
)
if request.job_config.provider == "hf":
# Custom logic: if provider is hf, we run the hf model inside the ray job
job_config.hf_pipeline = HfPipelineConfig(
model_name_or_path=request.job_config.model,
task=request.job_config.task,
accelerator=request.job_config.accelerator,
revision=request.job_config.revision,
use_fast=request.job_config.use_fast,
trust_remote_code=request.job_config.trust_remote_code,
torch_dtype=request.job_config.torch_dtype,
max_new_tokens=500,
)
else:
# It will be a pass through to LiteLLM
job_config.inference_server = InferenceServerConfig(
base_url=request.job_config.base_url if request.job_config.base_url else None,
model=request.job_config.model,
provider=request.job_config.provider,
system_prompt=request.job_config.system_prompt or settings.DEFAULT_SUMMARIZER_PROMPT,
max_retries=3,
)
job_config.params = SamplingParameters(
max_tokens=request.job_config.max_tokens,
frequency_penalty=request.job_config.frequency_penalty,
temperature=request.job_config.temperature,
top_p=request.job_config.top_p,
)

return job_config

def generate_evaluation_job_config(self, request: JobCreate, record_id: UUID, dataset_path: str, storage_path: str):
job_config = EvalJobConfig(
name=f"{request.name}/{record_id}",
dataset=ELDatasetConfig(path=dataset_path),
evaluation=EvaluationConfig(
metrics=request.job_config.metrics,
max_samples=request.max_samples,
return_input_data=True,
return_predictions=True,
storage_path=storage_path,
),
)
return job_config

def create_job(
self,
request: JobCreate,
Expand All @@ -404,24 +336,21 @@ def create_job(
"""Creates a new evaluation workload to run on Ray and returns the response status."""
# Typing won't allow other job_type's
job_type = request.job_config.job_type
# Prepare the job configuration that will be sent to submit the ray job.
# This includes both the command that is going to be executed and its
# arguments defined in eval_config_args
try:
job_settings = job_settings_map[job_type]
except KeyError:
raise JobTypeUnsupportedError("Unknown job type") from None

# Create a db record for the job
# To find the experiment that a job belongs to,
# we'd use https://mlflow.org/docs/latest/python_api/mlflow.client.html#mlflow.client.MlflowClient.search_runs
record = self.job_repo.create(name=request.name, description=request.description, job_type=job_type)

# TODO defer to specific job
if job_type == JobType.INFERENCE and not request.job_config.output_field:
request.job_config.output_field = "predictions"

dataset_s3_path = self._dataset_service.get_dataset_s3_path(request.dataset)
if job_type == JobType.INFERENCE:
job_config = self.generate_inference_job_config(request, record.id, dataset_s3_path, self.storage_path)
elif job_type == JobType.EVALUATION:
job_config = self.generate_evaluation_job_config(request, record.id, dataset_s3_path, self.storage_path)
else:
# This should not happen since the job_type's are type checked
raise Exception("Unknown job type")
job_config = job_settings.generate_config(request, record.id, dataset_s3_path, self.storage_path)

# eval_config_args is used to map input configuration parameters with
# command parameters provided via command line to the ray job.
Expand All @@ -433,31 +362,20 @@ def create_job(
"--config": job_config.model_dump_json(exclude_unset=True, exclude_none=True),
}

# Prepare the job configuration that will be sent to submit the ray job.
# This includes both the command that is going to be executed and its
# arguments defined in eval_config_args
job_settings = self.job_settings[job_type]

ray_config = JobConfig(
job_id=record.id,
job_type=job_type,
command=job_settings["command"],
command=job_settings.command,
args=job_config_args,
)

# build runtime ENV for workers
runtime_env_vars = {"MZAI_JOB_ID": str(record.id)}
settings.inherit_ray_env(runtime_env_vars)

# set num_gpus per worker (zero if we are just hitting a service)
if job_type == JobType.INFERENCE and not request.job_config.provider == "hf":
worker_gpus = job_settings["ray_worker_gpus_fraction"]
else:
worker_gpus = job_settings["ray_worker_gpus"]

runtime_env = {
"pip": job_settings["pip"],
"working_dir": job_settings["work_dir"],
"pip": job_settings.pip_reqs,
"working_dir": job_settings.work_dir,
"env_vars": runtime_env_vars,
}

Expand All @@ -467,7 +385,10 @@ def create_job(
loguru.logger.info(f"{runtime_env}")

entrypoint = RayJobEntrypoint(
config=ray_config, metadata=metadata, runtime_env=runtime_env, num_gpus=worker_gpus
config=ray_config,
metadata=metadata,
runtime_env=runtime_env,
num_gpus=settings.RAY_WORKER_GPUS,
)
loguru.logger.info("Submitting {job_type} Ray job...")
submit_ray_job(self.ray_client, entrypoint)
Expand All @@ -478,7 +399,7 @@ def create_job(
# - annotation jobs do not run in workflows => they trigger dataset saving here at job level
# As JobType.ANNOTATION is not used uniformly throughout our code yet, we rely on the already
# existing `store_to_dataset` parameter to explicitly trigger this in the annotation case
if job_type == JobType.INFERENCE and request.job_config.store_to_dataset:
if job_settings.store_as_dataset():
self.add_background_task(self._background_tasks, self.handle_inference_job, record.id, request)

loguru.logger.info("Getting response...")
Expand Down
10 changes: 0 additions & 10 deletions lumigator/backend/backend/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,6 @@ def TRACKING_BACKEND_URI(self) -> str: # noqa: N802
DEEPSEEK_API_URL: str = "https://api.deepseek.com/v1"
DEFAULT_SUMMARIZER_PROMPT: str = "You are a helpful assistant, expert in text summarization. For every prompt you receive, provide a summary of its contents in at most two sentences." # noqa: E501

# Eval job details
EVALUATOR_WORK_DIR: str | None = None
EVALUATOR_PIP_REQS: str | None = None
EVALUATOR_COMMAND: str = "python evaluator.py"

# Inference job details
INFERENCE_WORK_DIR: str | None = None
INFERENCE_PIP_REQS: str | None = None
INFERENCE_COMMAND: str = "python inference.py"

def inherit_ray_env(self, runtime_env_vars: Mapping[str, str]):
for env_var_name in self.RAY_WORKER_ENV_VARS:
env_var = os.environ.get(env_var_name, None)
Expand Down
6 changes: 4 additions & 2 deletions lumigator/backend/backend/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from uuid import UUID

import boto3
import evaluator
import evaluator.interface
import fsspec
import pytest
import requests_mock
Expand Down Expand Up @@ -410,8 +412,8 @@ def create_job_config() -> JobConfig:

conf = JobConfig(
job_id=uuid.uuid4(),
job_type=JobType.EVALUATION,
command=settings.EVALUATOR_COMMAND,
job_type=evaluator.interface.JOB_INTERFACE.type,
command=evaluator.interface.JOB_INTERFACE.command,
args=conf_args,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,6 @@ def check_artifacts_times(artifacts_url):
artifacts_url,
timeout=5, # 5 seconds
).json()
logger.critical(artifacts)
assert "evaluation_time" in artifacts["artifacts"]
assert "inference_time" in artifacts["artifacts"]

Expand Down Expand Up @@ -351,24 +350,25 @@ def test_full_experiment_launch(local_client: TestClient, dialog_dataset, depend


def test_experiment_non_existing(local_client: TestClient, dependency_overrides_services):
non_existing_id = "71aaf905-4bea-4d19-ad06-214202165812"
non_existing_id = "d34dbeef-4bea-4d19-ad06-214202165812"
response = local_client.get(f"/experiments/{non_existing_id}")
assert response.status_code == 404
assert response.json()["detail"] == f"Experiment with ID {non_existing_id} not found"


def test_job_non_existing(local_client: TestClient, dependency_overrides_services):
non_existing_id = "71aaf905-4bea-4d19-ad06-214202165812"
non_existing_id = "d34dbeef-4bea-4d19-ad06-214202165812"
response = local_client.get(f"/jobs/{non_existing_id}")
assert response.status_code == 404
assert response.json()["detail"] == f"Job with ID {non_existing_id} not found"


def wait_for_workflow_complete(local_client: TestClient, workflow_id: UUID):
workflow_status = WorkflowStatus.CREATED
for _ in range(1, 300):
time.sleep(1)
workflow_details = WorkflowDetailsResponse.model_validate(local_client.get(f"/workflows/{workflow_id}").json())
workflow_status = workflow_details.status
workflow_status = WorkflowStatus(workflow_details.status)
if workflow_status in [WorkflowStatus.SUCCEEDED, WorkflowStatus.FAILED]:
logger.info(f"Workflow status: {workflow_status}")
break
Expand Down
Loading
Loading