Skip to content

Commit

Permalink
Fixed alerting health check behavior when alerting cannot find its he…
Browse files Browse the repository at this point in the history
…alth task in Task Manager.
  • Loading branch information
YulNaumenko committed May 7, 2021
1 parent 238fc3a commit 0e448e8
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 27 deletions.
14 changes: 13 additions & 1 deletion x-pack/plugins/alerting/server/health/get_health.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

import { ISavedObjectsRepository } from 'src/core/server';
import { ISavedObjectsRepository, SavedObjectsServiceStart } from 'src/core/server';
import { AlertsHealth, HealthStatus, RawAlert, AlertExecutionStatusErrorReasons } from '../types';

export const getHealth = async (
Expand Down Expand Up @@ -97,3 +97,15 @@ export const getHealth = async (

return healthStatuses;
};

export const getAlertingHealthStatus = async (savedObjects: SavedObjectsServiceStart, stateRuns?: number) => {
const alertingHealthStatus = await getHealth(
savedObjects.createInternalRepository(['alert'])
);
return {
state: {
runs: (stateRuns || 0) + 1,
health_status: alertingHealthStatus.decryptionHealth.status,
},
};
}
5 changes: 4 additions & 1 deletion x-pack/plugins/alerting/server/health/get_state.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
} from './get_state';
import { ConcreteTaskInstance, TaskStatus } from '../../../task_manager/server';
import { HealthStatus } from '../types';
import { loggingSystemMock } from 'src/core/server/mocks';

const tick = () => new Promise((resolve) => setImmediate(resolve));

Expand All @@ -38,6 +39,8 @@ const getHealthCheckTask = (overrides = {}): ConcreteTaskInstance => ({
...overrides,
});

const logger = loggingSystemMock.create().get();

describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
beforeEach(() => jest.useFakeTimers());

Expand All @@ -47,7 +50,7 @@ describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
const pollInterval = 100;
const halfInterval = Math.floor(pollInterval / 2);

getHealthStatusStream(mockTaskManager, pollInterval).subscribe();
getHealthStatusStream(mockTaskManager, logger, pollInterval).subscribe();

// shouldn't fire before poll interval passes
// should fire once each poll interval
Expand Down
42 changes: 28 additions & 14 deletions x-pack/plugins/alerting/server/health/get_state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,29 @@
import { i18n } from '@kbn/i18n';
import { defer, of, interval, Observable, throwError, timer } from 'rxjs';
import { catchError, mergeMap, retryWhen, switchMap } from 'rxjs/operators';
import { ServiceStatus, ServiceStatusLevels } from '../../../../../src/core/server';
import { Logger, SavedObjectsServiceStart, ServiceStatus, ServiceStatusLevels } from '../../../../../src/core/server';
import { TaskManagerStartContract } from '../../../task_manager/server';
import { HEALTH_TASK_ID } from './task';
import { HEALTH_TASK_ID, scheduleAlertingHealthCheck } from './task';
import { HealthStatus } from '../types';
import { getAlertingHealthStatus } from './get_health';
import { AlertsConfig } from '../config';

export const MAX_RETRY_ATTEMPTS = 3;
const HEALTH_STATUS_INTERVAL = 60000 * 5; // Five minutes
const RETRY_DELAY = 5000; // Wait 5 seconds before retrying on errors

async function getLatestTaskState(taskManager: TaskManagerStartContract) {
async function getLatestTaskState(taskManager: TaskManagerStartContract, logger: Logger, savedObjects: SavedObjectsServiceStart, config: Promise<AlertsConfig>) {
try {
const result = await taskManager.get(HEALTH_TASK_ID);
return result;
} catch (err) {
const errMessage = err && err.message ? err.message : err.toString();
if (!errMessage.includes('NotInitialized')) {
throw err;
// if task is not found
if (err?.output?.statusCode === 404) {
scheduleAlertingHealthCheck(logger, config, taskManager);
return getAlertingHealthStatus(savedObjects);
}
throw err;
}

return null;
}

const LEVEL_SUMMARY = {
Expand All @@ -53,13 +55,16 @@ const LEVEL_SUMMARY = {
};

const getHealthServiceStatus = async (
taskManager: TaskManagerStartContract
taskManager: TaskManagerStartContract,
logger: Logger,
savedObjects: SavedObjectsServiceStart,
config: Promise<AlertsConfig>
): Promise<ServiceStatus<unknown>> => {
const doc = await getLatestTaskState(taskManager);
const doc = await getLatestTaskState(taskManager, logger, savedObjects, config);
const level =
doc?.state?.health_status === HealthStatus.OK
doc.state?.health_status === HealthStatus.OK
? ServiceStatusLevels.available
: doc?.state?.health_status === HealthStatus.Warning
: doc.state?.health_status === HealthStatus.Warning
? ServiceStatusLevels.degraded
: ServiceStatusLevels.unavailable;
return {
Expand All @@ -70,9 +75,12 @@ const getHealthServiceStatus = async (

export const getHealthServiceStatusWithRetryAndErrorHandling = (
taskManager: TaskManagerStartContract,
logger: Logger,
savedObjects: SavedObjectsServiceStart,
config: Promise<AlertsConfig>,
retryDelay?: number
): Observable<ServiceStatus<unknown>> => {
return defer(() => getHealthServiceStatus(taskManager)).pipe(
return defer(() => getHealthServiceStatus(taskManager, logger, savedObjects, config)).pipe(
retryWhen((errors) => {
return errors.pipe(
mergeMap((error, i) => {
Expand All @@ -85,6 +93,9 @@ export const getHealthServiceStatusWithRetryAndErrorHandling = (
);
}),
catchError((error) => {
logger.warn(
`Alerting framework is unavailable due to the error: ${error}`
);
return of({
level: ServiceStatusLevels.unavailable,
summary: LEVEL_SUMMARY[ServiceStatusLevels.unavailable.toString()],
Expand All @@ -96,9 +107,12 @@ export const getHealthServiceStatusWithRetryAndErrorHandling = (

export const getHealthStatusStream = (
taskManager: TaskManagerStartContract,
logger: Logger,
savedObjects: SavedObjectsServiceStart,
config: Promise<AlertsConfig>,
healthStatusInterval?: number,
retryDelay?: number
): Observable<ServiceStatus<unknown>> =>
interval(healthStatusInterval ?? HEALTH_STATUS_INTERVAL).pipe(
switchMap(() => getHealthServiceStatusWithRetryAndErrorHandling(taskManager, retryDelay))
switchMap(() => getHealthServiceStatusWithRetryAndErrorHandling(taskManager, logger, savedObjects, config, retryDelay))
);
13 changes: 4 additions & 9 deletions x-pack/plugins/alerting/server/health/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
import { AlertsConfig } from '../config';
import { AlertingPluginsStart } from '../plugin';
import { HealthStatus } from '../types';
import { getHealth } from './get_health';
import { getAlertingHealthStatus } from './get_health';

export const HEALTH_TASK_TYPE = 'alerting_health_check';

Expand Down Expand Up @@ -71,15 +71,10 @@ export function healthCheckTaskRunner(
return {
async run() {
try {
const alertingHealthStatus = await getHealth(
(await coreStartServices)[0].savedObjects.createInternalRepository(['alert'])
return await getAlertingHealthStatus(
(await coreStartServices)[0].savedObjects,
state.runs
);
return {
state: {
runs: (state.runs || 0) + 1,
health_status: alertingHealthStatus.decryptionHealth.status,
},
};
} catch (errMsg) {
logger.warn(`Error executing alerting health check task: ${errMsg}`);
return {
Expand Down
9 changes: 7 additions & 2 deletions x-pack/plugins/alerting/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,16 @@ export class AlertingPlugin {
this.config
);

core.getStartServices().then(async ([, startPlugins]) => {
core.getStartServices().then(async ([coreStart, startPlugins]) => {
core.status.set(
combineLatest([
core.status.derivedStatus$,
getHealthStatusStream(startPlugins.taskManager),
getHealthStatusStream(
startPlugins.taskManager,
this.logger,
coreStart.savedObjects,
this.config
),
]).pipe(
map(([derivedStatus, healthStatus]) => {
if (healthStatus.level > derivedStatus.level) {
Expand Down

0 comments on commit 0e448e8

Please sign in to comment.