Skip to content

Commit

Permalink
Count, latency, error metrics for all Executor operations
Browse files Browse the repository at this point in the history
  • Loading branch information
eabatalov committed Feb 11, 2025
1 parent c608f41 commit 7fd9671
Show file tree
Hide file tree
Showing 25 changed files with 1,101 additions and 167 deletions.
2 changes: 1 addition & 1 deletion indexify/src/indexify/executor/api_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ class TaskResult(BaseModel):


TASK_OUTCOME_SUCCESS = "success"
TASK_OUCOME_FAILURE = "failure"
TASK_OUTCOME_FAILURE = "failure"
50 changes: 45 additions & 5 deletions indexify/src/indexify/executor/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,21 @@
from tensorlake.utils.http_client import get_httpx_client

from .api_objects import Task
from .metrics.downloader import (
metric_graph_download_errors,
metric_graph_download_latency,
metric_graph_downloads,
metric_graphs_from_cache,
metric_reducer_init_value_download_errors,
metric_reducer_init_value_download_latency,
metric_reducer_init_value_downloads,
metric_task_input_download_errors,
metric_task_input_download_latency,
metric_task_input_downloads,
metric_tasks_downloading_graphs,
metric_tasks_downloading_inputs,
metric_tasks_downloading_reducer_init_value,
)


class Downloader:
Expand All @@ -19,6 +34,33 @@ def __init__(
self._client = get_httpx_client(config_path, make_async=True)

async def download_graph(self, task: Task) -> SerializedObject:
with (
metric_graph_download_errors.count_exceptions(),
metric_tasks_downloading_graphs.track_inprogress(),
metric_graph_download_latency.time(),
):
metric_graph_downloads.inc()
return await self._download_graph(task)

async def download_input(self, task: Task) -> SerializedObject:
with (
metric_task_input_download_errors.count_exceptions(),
metric_tasks_downloading_inputs.track_inprogress(),
metric_task_input_download_latency.time(),
):
metric_task_input_downloads.inc()
return await self._download_input(task)

async def download_init_value(self, task: Task) -> SerializedObject:
with (
metric_reducer_init_value_download_errors.count_exceptions(),
metric_tasks_downloading_reducer_init_value.track_inprogress(),
metric_reducer_init_value_download_latency.time(),
):
metric_reducer_init_value_downloads.inc()
return await self._download_init_value(task)

async def _download_graph(self, task: Task) -> SerializedObject:
# Cache graph to reduce load on the server.
graph_path = os.path.join(
self.code_path,
Expand All @@ -33,6 +75,7 @@ async def download_graph(self, task: Task) -> SerializedObject:
self._read_cached_graph, graph_path
)
if graph is not None:
metric_graphs_from_cache.inc()
return graph

logger = self._task_logger(task)
Expand Down Expand Up @@ -71,7 +114,7 @@ def _write_cached_graph(
# This also allows to share the same cache between multiple Executors.
os.replace(tmp_path, path)

async def download_input(self, task: Task) -> SerializedObject:
async def _download_input(self, task: Task) -> SerializedObject:
logger = self._task_logger(task)

first_function_in_graph = task.invocation_id == task.input_key.split("|")[-1]
Expand All @@ -81,10 +124,7 @@ async def download_input(self, task: Task) -> SerializedObject:
else:
return await self._fetch_function_input(task, logger)

async def download_init_value(self, task: Task) -> Optional[SerializedObject]:
if task.reducer_output_id is None:
return None

async def _download_init_value(self, task: Task) -> SerializedObject:
logger = self._task_logger(task)
return await self._fetch_function_init_value(task, logger)

Expand Down
99 changes: 47 additions & 52 deletions indexify/src/indexify/executor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,32 @@
from socket import gethostname
from typing import Any, Dict, List, Optional

import prometheus_client
import structlog
from tensorlake.function_executor.proto.function_executor_pb2 import SerializedObject
from tensorlake.utils.logging import suppress as suppress_logging

from .api_objects import TASK_OUCOME_FAILURE, TASK_OUTCOME_SUCCESS, FunctionURI, Task
from .api_objects import FunctionURI, Task
from .downloader import Downloader
from .function_executor.function_executor_states_container import (
FunctionExecutorStatesContainer,
)
from .function_executor.server.function_executor_server_factory import (
FunctionExecutorServerFactory,
)
from .metrics.executor import (
METRIC_TASKS_COMPLETED_OUTCOME_ALL,
METRIC_TASKS_COMPLETED_OUTCOME_ERROR_CUSTOMER_CODE,
METRIC_TASKS_COMPLETED_OUTCOME_ERROR_PLATFORM,
METRIC_TASKS_COMPLETED_OUTCOME_SUCCESS,
metric_executor_info,
metric_executor_state,
metric_task_outcome_report_latency,
metric_task_outcome_report_retries,
metric_task_outcome_reports,
metric_tasks_completed,
metric_tasks_fetched,
metric_tasks_reporting_outcome,
)
from .monitoring.function_allowlist import function_allowlist_to_info_dict
from .monitoring.health_check_handler import HealthCheckHandler
from .monitoring.health_checker.health_checker import HealthChecker
Expand All @@ -27,38 +40,7 @@
from .task_reporter import TaskReporter
from .task_runner import TaskInput, TaskOutput, TaskRunner

# Executor metrics
metric_executor_info: prometheus_client.Info = prometheus_client.Info(
"executor", "Executor information"
)
metric_executor_state: prometheus_client.Enum = prometheus_client.Enum(
"executor_state",
"Current Executor state",
states=["starting", "running", "shutting_down"],
)
metric_executor_state.state("starting")
# Task metrics
metric_tasks_started: prometheus_client.Counter = prometheus_client.Counter(
"tasks_started", "Number of tasks that were started"
)
metric_tasks_completed: prometheus_client.Counter = prometheus_client.Counter(
"tasks_completed", "Number of tasks that were completed", ["outcome"]
)
metric_tasks_completed.labels(outcome=TASK_OUTCOME_SUCCESS)
metric_tasks_completed.labels(outcome=TASK_OUCOME_FAILURE)
# Lifecycle metrics
metric_tasks_downloading_graph: prometheus_client.Gauge = prometheus_client.Gauge(
"tasks_downloading_graph", "Number of tasks currently downloading their graphs"
)
metric_tasks_downloading_inputs: prometheus_client.Gauge = prometheus_client.Gauge(
"tasks_downloading_inputs", "Number of tasks currently downloading their inputs"
)
metric_tasks_reporting_outcome: prometheus_client.Gauge = prometheus_client.Gauge(
"tasks_reporting_outcome",
"Number of tasks currently reporting their outcomes to the Server",
)
# TODO: Add duration distribution metrics
# TODO: Add Platform errors metric


class Executor:
Expand Down Expand Up @@ -163,6 +145,7 @@ async def _run_tasks_loop(self):
while not self._is_shutdown:
try:
async for task in self._task_fetcher.run():
metric_tasks_fetched.inc()
asyncio.create_task(self._run_task(task))
except Exception as e:
self._logger.error(
Expand All @@ -176,19 +159,15 @@ async def _run_task(self, task: Task) -> None:
Doesn't raise any Exceptions. All errors are reported to the server."""
logger = self._task_logger(task)
output: Optional[TaskOutput] = None
metric_tasks_started.inc()

try:
with metric_tasks_downloading_graph.track_inprogress():
graph: SerializedObject = await self._downloader.download_graph(task)

with metric_tasks_downloading_inputs.track_inprogress():
input: SerializedObject = await self._downloader.download_input(task)
init_value: Optional[SerializedObject] = (
await self._downloader.download_init_value(task)
)

logger.info("task_is_runnable")
graph: SerializedObject = await self._downloader.download_graph(task)
input: SerializedObject = await self._downloader.download_input(task)
init_value: Optional[SerializedObject] = (
None
if task.reducer_output_id is None
else (await self._downloader.download_init_value(task))
)
output: TaskOutput = await self._task_runner.run(
TaskInput(
task=task,
Expand All @@ -198,17 +177,18 @@ async def _run_task(self, task: Task) -> None:
),
logger=logger,
)
logger.info("task_execution_finished", success=output.success)
logger.info("task execution finished", success=output.success)
except Exception as e:
output = TaskOutput.internal_error(task)
logger.error("task_execution_failed", exc_info=e)
logger.error("task execution failed", exc_info=e)

with metric_tasks_reporting_outcome.track_inprogress():
with (
metric_tasks_reporting_outcome.track_inprogress(),
metric_task_outcome_report_latency.time(),
):
metric_task_outcome_reports.inc()
await self._report_task_outcome(output=output, logger=logger)

outcome: str = TASK_OUTCOME_SUCCESS if output.success else TASK_OUCOME_FAILURE
metric_tasks_completed.labels(outcome=outcome).inc()

async def _report_task_outcome(self, output: TaskOutput, logger: Any) -> None:
"""Reports the task with the given output to the server.
Expand All @@ -222,14 +202,29 @@ async def _report_task_outcome(self, output: TaskOutput, logger: Any) -> None:
break
except Exception as e:
logger.error(
"failed_to_report_task",
"failed to report task",
exc_info=e,
)
reporting_retries += 1
metric_task_outcome_report_retries.inc()
await asyncio.sleep(5)

metric_tasks_completed.labels(outcome=METRIC_TASKS_COMPLETED_OUTCOME_ALL).inc()
if output.is_internal_error:
metric_tasks_completed.labels(
outcome=METRIC_TASKS_COMPLETED_OUTCOME_ERROR_PLATFORM
).inc()
elif output.success:
metric_tasks_completed.labels(
outcome=METRIC_TASKS_COMPLETED_OUTCOME_SUCCESS
).inc()
else:
metric_tasks_completed.labels(
outcome=METRIC_TASKS_COMPLETED_OUTCOME_ERROR_CUSTOMER_CODE
).inc()

async def _shutdown(self, loop):
self._logger.info("shutting_down")
self._logger.info("shutting down")
metric_executor_state.state("shutting_down")
# There will be lots of task cancellation exceptions and "X is shutting down"
# exceptions logged during Executor shutdown. Suppress their logs as they are
Expand Down
Loading

0 comments on commit 7fd9671

Please sign in to comment.