Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement memory and disk utilization check #657

Merged
merged 9 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions src/marqo/api/models/health_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from typing import Optional

from marqo.base_model import StrictBaseModel


class InferenceHealthResponse(StrictBaseModel):
status: str


class BackendHealthResponse(StrictBaseModel):
status: str
memoryIsAvailable: Optional[bool]
storageIsAvailable: Optional[bool]


class HealthResponse(StrictBaseModel):
status: str
inference: InferenceHealthResponse
backend: BackendHealthResponse

@classmethod
def from_marqo_health_status(cls, marqo_health_status):
return cls(
status=marqo_health_status.status.value,
inference=InferenceHealthResponse(
status=marqo_health_status.inference.status.value
),
backend=BackendHealthResponse(
status=marqo_health_status.backend.status.value,
memoryIsAvailable=marqo_health_status.backend.memory_is_available,
storageIsAvailable=marqo_health_status.backend.storage_is_available
)
)
22 changes: 22 additions & 0 deletions src/marqo/base_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from pydantic import BaseModel


class MarqoBaseModel(BaseModel):
class Config:
allow_population_by_field_name = True # accept both real name and alias (if present)
validate_assignment = True


class StrictBaseModel(MarqoBaseModel):
class Config(MarqoBaseModel.Config):
extra = "forbid"


class ImmutableBaseModel(MarqoBaseModel):
class Config(MarqoBaseModel.Config):
allow_mutation = False


class ImmutableStrictBaseModel(StrictBaseModel, ImmutableBaseModel):
class Config(StrictBaseModel.Config, ImmutableBaseModel.Config):
pass
4 changes: 3 additions & 1 deletion src/marqo/core/index_management/index_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def _remove_schema(self, app: str, name: str) -> None:
def _add_schema_to_services(self, app: str, name: str) -> None:
services_path = os.path.join(app, 'services.xml')

tree = ET.parse(services_path) # Replace 'path_to_file.xml' with the path to your XML file
tree = ET.parse(services_path)
root = tree.getroot()

documents_section = root.find(".//documents")
Expand All @@ -273,6 +273,8 @@ def _remove_schema_from_services(self, app: str, name: str) -> None:
tree = ET.parse(services_path)
root = tree.getroot()

# TODO - Verify there is only one documents section (one content cluster)
# Error out otherwise as we don't know which one to use
documents_section = root.find(".//documents")

deleted = False
Expand Down
14 changes: 7 additions & 7 deletions src/marqo/core/models/marqo_index.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import re
from abc import ABC, abstractmethod
from enum import Enum
from typing import List, Optional, Dict, Any, Set, Type, Union
from typing import List, Optional, Dict, Any, Set, Union

import pydantic
from pydantic import PrivateAttr, root_validator
Expand All @@ -10,7 +10,7 @@
from pydantic.utils import ROOT_KEY

from marqo.core import constants
from marqo.core.models.strict_base_model import ImmutableStrictBaseModel, StrictBaseModel
from marqo.base_model import ImmutableStrictBaseModel, StrictBaseModel
from marqo.exceptions import InvalidArgumentError
from marqo.logging import get_logger
from marqo.s2_inference import s2_inference
Expand Down Expand Up @@ -95,18 +95,18 @@ class TensorField(ImmutableStrictBaseModel):


class HnswConfig(ImmutableStrictBaseModel):
ef_construction: int = pydantic.Field(gt=0)
ef_construction: int = pydantic.Field(gt=0, alias='efConstruction')
m: int = pydantic.Field(gt=0)


class TextPreProcessing(ImmutableStrictBaseModel):
split_length: int = pydantic.Field(gt=0)
split_overlap: int = pydantic.Field(ge=0)
split_method: TextSplitMethod
split_length: int = pydantic.Field(gt=0, alias='splitLength')
split_overlap: int = pydantic.Field(ge=0, alias='splitOverlap')
split_method: TextSplitMethod = pydantic.Field(alias='splitMethod')


class ImagePreProcessing(ImmutableStrictBaseModel):
patch_method: Optional[PatchMethod]
patch_method: Optional[PatchMethod] = pydantic.Field(alias='patchMethod')


class Model(StrictBaseModel):
Expand Down
10 changes: 9 additions & 1 deletion src/marqo/core/models/marqo_index_health.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from enum import Enum
from types import DynamicClassAttribute
from typing import Optional

from marqo.core.models.strict_base_model import StrictBaseModel
from marqo.base_model import StrictBaseModel


class HealthStatus(Enum):
Expand All @@ -27,10 +28,17 @@ def __eq__(self, other):
return self.priority == other.priority


class InferenceHealthStatus(StrictBaseModel):
status: HealthStatus


class VespaHealthStatus(StrictBaseModel):
status: HealthStatus
memory_is_available: Optional[bool]
storage_is_available: Optional[bool]


class MarqoHealthStatus(StrictBaseModel):
status: HealthStatus
inference: InferenceHealthStatus
backend: VespaHealthStatus
5 changes: 3 additions & 2 deletions src/marqo/core/models/marqo_index_request.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from abc import ABC
from typing import List, Dict, Optional

import pydantic
from pydantic import root_validator, validator

import marqo.core.models.marqo_index as marqo_index
from marqo.core.models.strict_base_model import StrictBaseModel, ImmutableStrictBaseModel
from marqo.base_model import StrictBaseModel, ImmutableStrictBaseModel


class MarqoIndexRequest(ImmutableStrictBaseModel, ABC):
Expand Down Expand Up @@ -42,7 +43,7 @@ class FieldRequest(StrictBaseModel):
name: str
type: marqo_index.FieldType
features: List[marqo_index.FieldFeature] = []
dependent_fields: Optional[Dict[str, float]]
dependent_fields: Optional[Dict[str, float]] = pydantic.Field(alias='dependentFields')

@root_validator
def check_all_fields(cls, values):
Expand Down
10 changes: 9 additions & 1 deletion src/marqo/core/models/marqo_index_stats.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
from marqo.core.models.strict_base_model import StrictBaseModel
from typing import Optional

from marqo.base_model import StrictBaseModel


class VespaStats(StrictBaseModel):
memory_used_percentage: Optional[float]
storage_used_percentage: Optional[float]


class MarqoIndexStats(StrictBaseModel):
number_of_documents: int
number_of_vectors: int
backend: VespaStats
2 changes: 1 addition & 1 deletion src/marqo/core/models/marqo_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from pydantic import validator

from marqo.core.models.strict_base_model import StrictBaseModel
from marqo.base_model import StrictBaseModel
from marqo.core.search.search_filter import SearchFilter, MarqoFilterStringParser


Expand Down
17 changes: 0 additions & 17 deletions src/marqo/core/models/strict_base_model.py

This file was deleted.

79 changes: 58 additions & 21 deletions src/marqo/core/monitoring/monitoring.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from typing import Optional

import marqo.logging
from marqo.core.exceptions import IndexNotFoundError
from marqo.core.index_management.index_management import IndexManagement
from marqo.core.models import MarqoIndex
from marqo.core.models.marqo_index_health import MarqoHealthStatus, HealthStatus, VespaHealthStatus
from marqo.core.models.marqo_index_stats import MarqoIndexStats
from marqo.core.models.marqo_index_health import MarqoHealthStatus, HealthStatus, VespaHealthStatus, \
InferenceHealthStatus
from marqo.core.models.marqo_index_stats import MarqoIndexStats, VespaStats
from marqo.core.vespa_index import for_marqo_index as vespa_index_factory
from marqo.exceptions import InternalError
from marqo.vespa.exceptions import VespaError
Expand Down Expand Up @@ -49,9 +49,24 @@ def get_index_stats(self, marqo_index: MarqoIndex) -> MarqoIndexStats:
except (TypeError, AttributeError, IndexError) as e:
raise InternalError(f"Failed to get the number of vectors for index {marqo_index.name}: {e}") from e

metrics = self.vespa_client.get_metrics()

memory_utilization = metrics.clusterController_resourceUsage_maxMemoryUtilization_max
disk_utilization = metrics.clusterController_resourceUsage_maxDiskUtilization_max

# Occasionally Vespa returns empty metrics, often for the first call after a restart
if memory_utilization is None:
logger.warn(f'Vespa did not return a value for memory utilization metrics')
if disk_utilization is None:
logger.warn(f'Vespa did not return a value for disk utilization metrics')

return MarqoIndexStats(
number_of_documents=doc_count_query_result.total_count,
number_of_vectors=number_of_vectors
number_of_vectors=number_of_vectors,
backend=VespaStats(
memory_used_percentage=memory_utilization * 100 if memory_utilization is not None else None,
storage_used_percentage=disk_utilization * 100 if disk_utilization is not None else None
)
)

def get_index_stats_by_name(self, index_name: str) -> MarqoIndexStats:
Expand Down Expand Up @@ -80,34 +95,56 @@ def get_health(self, index_name: Optional[str] = None, hostname_filter: Optional
Marqo index health status
"""
# TODO - Check index specific metrics such as memory and disk usage
marqo_status = self._get_marqo_health()
try:
vespa_status = self._get_vespa_health(hostname_filter=hostname_filter)
except VespaError as e:
logger.warning(f"Failed to get Vespa health: {e}")
vespa_status = HealthStatus.Red
inference_status = self._get_inference_health()
vespa_status = self._get_vespa_health(hostname_filter=hostname_filter)

aggregated_status = max(marqo_status, vespa_status)
aggregated_status = max(inference_status.status, vespa_status.status)

return MarqoHealthStatus(
status=aggregated_status,
backend=VespaHealthStatus(status=vespa_status)
inference=inference_status,
backend=vespa_status
)

def _get_marqo_health(self) -> HealthStatus:
return HealthStatus.Green
def _get_inference_health(self) -> InferenceHealthStatus:
return InferenceHealthStatus(status=HealthStatus.Green)

def _get_vespa_health(self, hostname_filter: Optional[str]) -> HealthStatus:
metrics = self.vespa_client.get_metrics()
def _get_vespa_health(self, hostname_filter: Optional[str]) -> VespaHealthStatus:
try:
metrics = self.vespa_client.get_metrics()
except VespaError as e:
logger.error(f"Failed to get Vespa metrics: {e}")
return VespaHealthStatus(status=HealthStatus.Red)

status = HealthStatus.Green
# Check service status
service_status = HealthStatus.Green
for node in metrics.nodes:
if service_status == HealthStatus.Red:
break

if hostname_filter is not None and hostname_filter not in node.hostname:
continue

for service in node.services:
if service.status.code != 'up':
status = HealthStatus.Red
break

return status
service_status = HealthStatus.Red

# Check feed block
feed_status = HealthStatus.Green
nodes_above_limit = metrics.clusterController_resourceUsage_nodesAboveLimit_max
memory_utilization = metrics.clusterController_resourceUsage_maxMemoryUtilization_max
disk_utilization = metrics.clusterController_resourceUsage_maxDiskUtilization_max

if nodes_above_limit is None:
logger.warn(f'Vespa did not return a value for nodes_above_limit metric')
feed_status = HealthStatus.Yellow
elif nodes_above_limit > 0:
feed_status = HealthStatus.Yellow

status = max(service_status, feed_status)

return VespaHealthStatus(
status=status,
memory_is_available=memory_utilization is not None and memory_utilization < 1.0,
storage_is_available=disk_utilization is not None and disk_utilization < 1.0
)
24 changes: 15 additions & 9 deletions src/marqo/tensor_search/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from marqo import config, errors
from marqo import version
from marqo.api.models.health_response import HealthResponse
from marqo.core.exceptions import IndexExistsError, IndexNotFoundError
from marqo.core.index_management.index_management import IndexManagement
from marqo.errors import InvalidArgError, MarqoWebError, MarqoError
Expand Down Expand Up @@ -192,13 +193,14 @@ def get_documents_by_ids(
@app.get("/indexes/{index_name}/stats")
def get_index_stats(index_name: str, marqo_config: config.Config = Depends(get_config)):
stats = marqo_config.monitoring.get_index_stats_by_name(index_name)
return JSONResponse(
content={
'numberOfDocuments': stats.number_of_documents,
'numberOfVectors': stats.number_of_vectors
},
status_code=200
)
return {
'numberOfDocuments': stats.number_of_documents,
'numberOfVectors': stats.number_of_vectors,
'backend': {
'memoryUsedPercentage': stats.backend.memory_used_percentage,
'storageUsedPercentage': stats.backend.storage_used_percentage
}
}


@app.delete("/indexes/{index_name}")
Expand All @@ -218,12 +220,16 @@ def delete_docs(index_name: str, documentIds: List[str],

@app.get("/health")
def check_health(marqo_config: config.Config = Depends(get_config)):
return marqo_config.monitoring.get_health()
health_status = marqo_config.monitoring.get_health()

return HealthResponse.from_marqo_health_status(health_status)


@app.get("/indexes/{index_name}/health")
def check_index_health(index_name: str, marqo_config: config.Config = Depends(get_config)):
return marqo_config.monitoring.get_health(index_name=index_name)
health_status = marqo_config.monitoring.get_health(index_name=index_name)

return HealthResponse.from_marqo_health_status(health_status)


@app.get("/indexes")
Expand Down
Loading
Loading