Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
Fix HealthManager (#3819)
Browse files Browse the repository at this point in the history
Bug fixes in the HealthManager endpoint.
  • Loading branch information
thinker0 authored Apr 27, 2022
1 parent 7db7c24 commit 3fdf1f8
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.heron.api.utils.Slf4jUtils;
import org.apache.heron.classification.InterfaceStability.Evolving;
import org.apache.heron.classification.InterfaceStability.Unstable;
import org.apache.heron.common.basics.SingletonRegistry;
Expand Down Expand Up @@ -153,7 +152,6 @@ public HealthManager(Config config, AbstractModule baseModule) {
}

public static void main(String[] args) throws Exception {
Slf4jUtils.installSLF4JBridge();
CommandLineParser parser = new DefaultParser();
Options slaManagerCliOptions = constructCliOptions();

Expand Down
2 changes: 1 addition & 1 deletion heron/tools/tracker/src/python/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pex_library(
"click==7.1.2",
"fastapi==0.75.0",
"httpx==0.16.1",
"javaobj-py3==0.4.1",
"javaobj-py3==0.4.3",
"networkx==2.5",
"protobuf==3.16.0",
"uvicorn==0.11.7",
Expand Down
6 changes: 5 additions & 1 deletion heron/tools/tracker/src/python/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from fastapi import FastAPI, Query, Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from fastapi.responses import JSONResponse, PlainTextResponse
from starlette.exceptions import HTTPException as StarletteHTTPException

from heron.tools.tracker.src.python import constants, state, query
Expand Down Expand Up @@ -128,3 +128,7 @@ async def get_machines(
] = topology.get_machines()

return response

@app.get("/health", response_class=PlainTextResponse)
def healthcheck():
return "ok"
11 changes: 11 additions & 0 deletions heron/tools/tracker/src/python/metricstimeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ class MetricsTimeline(BaseModel):
description="map of (metric name, instance, start) to metric value",
)


class LegacyMetricsTimeline(BaseModel):
component: str
starttime: int
endtime: int
timeline: Dict[str, Dict[str, Dict[int, str]]] = Field(
...,
description="map of (metric name, instance, start) to metric value",
)


# pylint: disable=too-many-locals, too-many-branches, unused-argument
async def get_metrics_timeline(
tmanager: tmanager_pb2.TManagerLocation,
Expand Down
7 changes: 5 additions & 2 deletions heron/tools/tracker/src/python/query_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ async def execute(self, tracker, tmanager: TManagerLocation, start: int, end: in
for key, metric in metrics2.items():
# Initialize with first metrics timeline, but second metric's instance
# because that is multivariate
if metrics:
if "" in metrics:
met = Metrics(None, None, metric.instance, start, end, metrics[""].timeline.copy())
for timestamp in list(met.timeline.keys()):
v = self._f(met.timeline[timestamp], metric.timeline.get(timestamp))
Expand All @@ -477,7 +477,10 @@ async def execute(self, tracker, tmanager: TManagerLocation, start: int, end: in
# Initialize with first metrics timeline and its instance
met = Metrics(None, None, metric.instance, start, end, metric.timeline.copy())
for timestamp in list(met.timeline.keys()):
v = self._f(met.timeline[timestamp], metrics2[""].timeline.get(timestamp))
met2_value = None
if "" in metrics2:
met2_value = metrics2[""].timeline.get(timestamp)
v = self._f(met.timeline[timestamp], met2_value)
if v is None:
met.timeline.pop(timestamp, None)
else:
Expand Down
33 changes: 26 additions & 7 deletions heron/tools/tracker/src/python/routers/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async def get_component_metrics(
if metric_response.status.status == common_pb2.NOTOK:
if metric_response.status.HasField("message"):
Log.warn(
"Recieved response from Tmanager: %s", metric_response.status.message
"Received response from Tmanager: %s", metric_response.status.message
)

metrics = {}
Expand Down Expand Up @@ -119,8 +119,6 @@ async def get_metrics( # pylint: disable=too-many-arguments
)


@router.get("/metricstimeline", response_model=metricstimeline.MetricsTimeline,
deprecated=True)
@router.get("/metrics/timeline", response_model=metricstimeline.MetricsTimeline)
async def get_metrics_timeline( # pylint: disable=too-many-arguments
cluster: str,
Expand All @@ -134,7 +132,6 @@ async def get_metrics_timeline( # pylint: disable=too-many-arguments
instances: Optional[List[str]] = Query(None, alias="instance"),
):
"""
'/metricstimeline' 0.20.5 below.
'/metrics/timeline' 0.20.5 above.
Return metrics over the given interval.
"""
Expand All @@ -146,6 +143,31 @@ async def get_metrics_timeline( # pylint: disable=too-many-arguments
)


@router.get("/metricstimeline", response_model=metricstimeline.LegacyMetricsTimeline,
deprecated=True)
async def get_legacy_metrics_timeline( # pylint: disable=too-many-arguments
cluster: str,
environ: str,
component: str,
start_time: int = Query(..., alias="starttime"),
end_time: int = Query(..., alias="endtime"),
role: Optional[str] = None,
topology_name: str = Query(..., alias="topology"),
metric_names: Optional[List[str]] = Query(None, alias="metricname"),
instances: Optional[List[str]] = Query(None, alias="instance"),
):
"""
'/metricstimeline' 0.20.5 below.
Return metrics over the given interval.
"""
if start_time > end_time:
raise BadRequest("start_time > end_time")
topology = state.tracker.get_topology(cluster, role, environ, topology_name)
return await metricstimeline.get_metrics_timeline(
topology.tmanager, component, metric_names, instances, start_time, end_time
)


class TimelinePoint(BaseModel): # pylint: disable=too-few-public-methods
"""A metric at discrete points in time."""
instance: Optional[str] = Field(
Expand All @@ -163,8 +185,6 @@ class MetricsQueryResponse(BaseModel): # pylint: disable=too-few-public-methods
..., description="list of timeline point objects",
)

@router.get("/metricsquery", response_model=MetricsQueryResponse,
deprecated=True)
@router.get("/metrics/query", response_model=MetricsQueryResponse)
async def get_metrics_query( # pylint: disable=too-many-arguments
cluster: str,
Expand All @@ -176,7 +196,6 @@ async def get_metrics_query( # pylint: disable=too-many-arguments
topology_name: str = Query(..., alias="topology"),
) -> MetricsQueryResponse:
"""
'/metricsquery' 0.20.5 below.
'/metrics/query' 0.20.5 above.
Run a metrics query against a particular topology.
"""
Expand Down
8 changes: 3 additions & 5 deletions heron/tools/tracker/src/python/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class TopologyInfoExecutionState(TopologyInfoMetadata):
has_packing_plan: bool
has_tmanager_location: bool
has_scheduler_location: bool
status: str

class RuntimeStateStatemanager(BaseModel):
is_registered: bool
Expand Down Expand Up @@ -551,11 +552,8 @@ def _build_scheduler_location(scheduler_location) -> TopologyInfoSchedulerLocati
return TopologyInfoSchedulerLocation(
name=scheduler_location.topology_name,
http_endpoint=scheduler_location.http_endpoint,
job_page_link=(
scheduler_location.job_page_link[0]
if scheduler_location.job_page_link
else ""
),
job_page_link=scheduler_location.job_page_link \
if scheduler_location.job_page_link else "",
)

@staticmethod
Expand Down
4 changes: 2 additions & 2 deletions heron/tools/tracker/src/python/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,10 @@ def convert_pb_kvs(kvs, include_non_primitives=True) -> dict:
def _convert_java_value(kv, include_non_primitives=True):
try:
pobj = javaobj.loads(kv.serialized_value)
if isinstance(pobj, str):
if isinstance(pobj, (str, int, float, bool)):
return pobj

if isinstance(pobj, javaobj.transformers.DefaultObjectTransformer.JavaPrimitiveClass):
if hasattr(pobj, 'value'):
return pobj.value

if include_non_primitives:
Expand Down
2 changes: 1 addition & 1 deletion heron/tools/ui/resources/static/js/topologies.js
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,7 @@ var BoltRunningInfo = React.createClass({
var latencyMetrics = metrics[boltName][time][executeLatencyMetricName][stream].metrics;
// For each intance
for (var m in countMetrics) {
if (countMetrics.hasOwnProperty(m) && latencyMetrics.hasOwnProperty(m)) {
if (countMetrics.hasOwnProperty(m) && latencyMetrics && latencyMetrics.hasOwnProperty(m)) {
var count = Number(countMetrics[m]) / (metrics[boltName][time][executeCountMetricName][stream].scaleDevisor || 1);
var latency = Number(latencyMetrics[m]) / (metrics[boltName][time][executeLatencyMetricName][stream].scaleDevisor || 1);
var utilization = count * latency;
Expand Down
7 changes: 6 additions & 1 deletion heron/tools/ui/src/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import uvicorn

from fastapi import APIRouter, FastAPI, Query, Request
from fastapi.responses import HTMLResponse
from fastapi.responses import HTMLResponse, PlainTextResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from starlette.responses import RedirectResponse, Response
Expand Down Expand Up @@ -614,6 +614,11 @@ def histogram_snippet(
)


@app.get("/health", response_class=PlainTextResponse)
def healthcheck():
return "ok"


app.include_router(topologies_router, prefix="/topologies")
app.mount(
"/static",
Expand Down

0 comments on commit 3fdf1f8

Please sign in to comment.