Skip to content

Commit

Permalink
prevent failure of calculatings results for a single workload from fa…
Browse files Browse the repository at this point in the history
…iling entire scan (#392)
  • Loading branch information
aantn authored Jan 16, 2025
1 parent 89d1b9f commit 7274380
Showing 1 changed file with 36 additions and 32 deletions.
68 changes: 36 additions & 32 deletions robusta_krr/core/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,39 +176,43 @@ def _format_result(self, result: RunResult) -> RunResult:
}

async def _calculate_object_recommendations(self, object: K8sObjectData) -> Optional[RunResult]:
prometheus_loader = self._get_prometheus_loader(object.cluster)

if prometheus_loader is None:
return None

object.pods = await prometheus_loader.load_pods(object, self._strategy.settings.history_timedelta)
if object.pods == []:
# Fallback to Kubernetes API
object.pods = await self._k8s_loader.load_pods(object)

# NOTE: Kubernetes API returned pods, but Prometheus did not
# This might happen with fast executing jobs
if object.pods != []:
object.add_warning("NoPrometheusPods")
logger.warning(
f"Was not able to load any pods for {object} from Prometheus. "
"Loaded pods from Kubernetes API instead."
)

metrics = await prometheus_loader.gather_data(
object,
self._strategy,
self._strategy.settings.history_timedelta,
step=self._strategy.settings.timeframe_timedelta,
)
try:
prometheus_loader = self._get_prometheus_loader(object.cluster)

if prometheus_loader is None:
return None

object.pods = await prometheus_loader.load_pods(object, self._strategy.settings.history_timedelta)
if object.pods == []:
# Fallback to Kubernetes API
object.pods = await self._k8s_loader.load_pods(object)

# NOTE: Kubernetes API returned pods, but Prometheus did not
# This might happen with fast executing jobs
if object.pods != []:
object.add_warning("NoPrometheusPods")
logger.warning(
f"Was not able to load any pods for {object} from Prometheus. "
"Loaded pods from Kubernetes API instead."
)

metrics = await prometheus_loader.gather_data(
object,
self._strategy,
self._strategy.settings.history_timedelta,
step=self._strategy.settings.timeframe_timedelta,
)

# NOTE: We run this in a threadpool as the strategy calculation might be CPU intensive
# But keep in mind that numpy calcluations will not block the GIL
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(self._executor, self._strategy.run, metrics, object)
# NOTE: We run this in a threadpool as the strategy calculation might be CPU intensive
# But keep in mind that numpy calcluations will not block the GIL
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(self._executor, self._strategy.run, metrics, object)

logger.info(f"Calculated recommendations for {object} (using {len(metrics)} metrics)")
return self._format_result(result)
logger.info(f"Calculated recommendations for {object} (using {len(metrics)} metrics)")
return self._format_result(result)
except Exception as e:
logger.error(f"An error occurred while calculating recommendations for {object}: {e}")
return None

async def _check_data_availability(self, cluster: Optional[str]) -> None:
prometheus_loader = self._get_prometheus_loader(cluster)
Expand Down Expand Up @@ -308,7 +312,7 @@ async def _collect_result(self) -> Result:
raise CriticalRunnerException("No successful scans were made. Check the logs for more information.")

return Result(
scans=scans,
scans=successful_scans,
description=f"[b]{self._strategy.display_name.title()} Strategy[/b]\n\n{self._strategy.description}",
strategy=StrategyData(
name=str(self._strategy).lower(),
Expand Down

0 comments on commit 7274380

Please sign in to comment.