Skip to content

Commit

Permalink
Fixed alerting health check behavior when alerting cannot find its he…
Browse files Browse the repository at this point in the history
…alth task in Task Manager. (#99564)

* Fixed alerting health check behavior when alerting cannot find its health task in Task Manager.

* fixed test

* added unit tests
YulNaumenko authored May 10, 2021

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 6d269c5 commit fcc2ac5
Showing 6 changed files with 290 additions and 48 deletions.
74 changes: 72 additions & 2 deletions x-pack/plugins/alerting/server/health/get_health.test.ts
Original file line number Diff line number Diff line change
@@ -5,9 +5,12 @@
* 2.0.
*/

import { savedObjectsRepositoryMock } from '../../../../../src/core/server/mocks';
import {
savedObjectsRepositoryMock,
savedObjectsServiceMock,
} from '../../../../../src/core/server/mocks';
import { AlertExecutionStatusErrorReasons, HealthStatus } from '../types';
import { getHealth } from './get_health';
import { getAlertingHealthStatus, getHealth } from './get_health';

const savedObjectsRepository = savedObjectsRepositoryMock.create();

@@ -221,3 +224,70 @@ describe('getHealth()', () => {
});
});
});

describe('getAlertingHealthStatus()', () => {
test('return the proper framework state if some of alerts has a decryption error', async () => {
const savedObjects = savedObjectsServiceMock.createStartContract();
const lastExecutionDateError = new Date().toISOString();
savedObjectsRepository.find.mockResolvedValueOnce({
total: 1,
per_page: 1,
page: 1,
saved_objects: [
{
id: '1',
type: 'alert',
attributes: {
alertTypeId: 'myType',
schedule: { interval: '10s' },
params: {
bar: true,
},
createdAt: new Date().toISOString(),
actions: [
{
group: 'default',
actionRef: 'action_0',
params: {
foo: true,
},
},
],
executionStatus: {
status: 'error',
lastExecutionDate: lastExecutionDateError,
error: {
reason: AlertExecutionStatusErrorReasons.Decrypt,
message: 'Failed decrypt',
},
},
},
score: 1,
references: [
{
name: 'action_0',
type: 'action',
id: '1',
},
],
},
],
});
savedObjectsRepository.find.mockResolvedValue({
total: 0,
per_page: 10,
page: 1,
saved_objects: [],
});
const result = await getAlertingHealthStatus(
{ ...savedObjects, createInternalRepository: () => savedObjectsRepository },
1
);
expect(result).toStrictEqual({
state: {
runs: 2,
health_status: HealthStatus.Warning,
},
});
});
});
15 changes: 14 additions & 1 deletion x-pack/plugins/alerting/server/health/get_health.ts
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
* 2.0.
*/

import { ISavedObjectsRepository } from 'src/core/server';
import { ISavedObjectsRepository, SavedObjectsServiceStart } from 'src/core/server';
import { AlertsHealth, HealthStatus, RawAlert, AlertExecutionStatusErrorReasons } from '../types';

export const getHealth = async (
@@ -97,3 +97,16 @@ export const getHealth = async (

return healthStatuses;
};

export const getAlertingHealthStatus = async (
savedObjects: SavedObjectsServiceStart,
stateRuns?: number
) => {
const alertingHealthStatus = await getHealth(savedObjects.createInternalRepository(['alert']));
return {
state: {
runs: (stateRuns || 0) + 1,
health_status: alertingHealthStatus.decryptionHealth.status,
},
};
};
166 changes: 148 additions & 18 deletions x-pack/plugins/alerting/server/health/get_state.test.ts
Original file line number Diff line number Diff line change
@@ -14,6 +14,16 @@ import {
} from './get_state';
import { ConcreteTaskInstance, TaskStatus } from '../../../task_manager/server';
import { HealthStatus } from '../types';
import { loggingSystemMock, savedObjectsServiceMock } from 'src/core/server/mocks';

jest.mock('./get_health', () => ({
getAlertingHealthStatus: jest.fn().mockReturnValue({
state: {
runs: 0,
health_status: 'warn',
},
}),
}));

const tick = () => new Promise((resolve) => setImmediate(resolve));

@@ -38,6 +48,9 @@ const getHealthCheckTask = (overrides = {}): ConcreteTaskInstance => ({
...overrides,
});

const logger = loggingSystemMock.create().get();
const savedObjects = savedObjectsServiceMock.createStartContract();

describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
beforeEach(() => jest.useFakeTimers());

@@ -47,7 +60,21 @@ describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
const pollInterval = 100;
const halfInterval = Math.floor(pollInterval / 2);

getHealthStatusStream(mockTaskManager, pollInterval).subscribe();
getHealthStatusStream(
mockTaskManager,
logger,
savedObjects,
Promise.resolve({
healthCheck: {
interval: '5m',
},
invalidateApiKeysTask: {
interval: '5m',
removalDelay: '1h',
},
}),
pollInterval
).subscribe();

// shouldn't fire before poll interval passes
// should fire once each poll interval
@@ -68,7 +95,22 @@ describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
const pollInterval = 100;
const halfInterval = Math.floor(pollInterval / 2);

getHealthStatusStream(mockTaskManager, pollInterval, retryDelay).subscribe();
getHealthStatusStream(
mockTaskManager,
logger,
savedObjects,
Promise.resolve({
healthCheck: {
interval: '5m',
},
invalidateApiKeysTask: {
interval: '5m',
removalDelay: '1h',
},
}),
pollInterval,
retryDelay
).subscribe();

jest.advanceTimersByTime(halfInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(0);
@@ -99,7 +141,18 @@ describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
mockTaskManager.get.mockResolvedValue(getHealthCheckTask());

const status = await getHealthServiceStatusWithRetryAndErrorHandling(
mockTaskManager
mockTaskManager,
logger,
savedObjects,
Promise.resolve({
healthCheck: {
interval: '5m',
},
invalidateApiKeysTask: {
interval: '5m',
removalDelay: '1h',
},
})
).toPromise();

expect(status.level).toEqual(ServiceStatusLevels.available);
@@ -118,7 +171,18 @@ describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
);

const status = await getHealthServiceStatusWithRetryAndErrorHandling(
mockTaskManager
mockTaskManager,
logger,
savedObjects,
Promise.resolve({
healthCheck: {
interval: '5m',
},
invalidateApiKeysTask: {
interval: '5m',
removalDelay: '1h',
},
})
).toPromise();

expect(status.level).toEqual(ServiceStatusLevels.degraded);
@@ -137,7 +201,18 @@ describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
);

const status = await getHealthServiceStatusWithRetryAndErrorHandling(
mockTaskManager
mockTaskManager,
logger,
savedObjects,
Promise.resolve({
healthCheck: {
interval: '5m',
},
invalidateApiKeysTask: {
interval: '5m',
removalDelay: '1h',
},
})
).toPromise();

expect(status.level).toEqual(ServiceStatusLevels.unavailable);
@@ -152,12 +227,25 @@ describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
.mockRejectedValueOnce(new Error('Failure'))
.mockResolvedValue(getHealthCheckTask());

getHealthServiceStatusWithRetryAndErrorHandling(mockTaskManager, retryDelay).subscribe(
(status) => {
expect(status.level).toEqual(ServiceStatusLevels.available);
expect(status.summary).toEqual('Alerting framework is available');
}
);
getHealthServiceStatusWithRetryAndErrorHandling(
mockTaskManager,
logger,
savedObjects,
Promise.resolve({
healthCheck: {
interval: '5m',
},
invalidateApiKeysTask: {
interval: '5m',
removalDelay: '1h',
},
}),
retryDelay
).subscribe((status) => {
expect(status.level).toEqual(ServiceStatusLevels.available);
expect(logger.warn).toHaveBeenCalledTimes(1);
expect(status.summary).toEqual('Alerting framework is available');
});

await tick();
jest.advanceTimersByTime(retryDelay * 2);
@@ -169,18 +257,60 @@ describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get.mockRejectedValue(err);

getHealthServiceStatusWithRetryAndErrorHandling(mockTaskManager, retryDelay).subscribe(
(status) => {
expect(status.level).toEqual(ServiceStatusLevels.unavailable);
expect(status.summary).toEqual('Alerting framework is unavailable');
expect(status.meta).toEqual({ error: err });
}
);
getHealthServiceStatusWithRetryAndErrorHandling(
mockTaskManager,
logger,
savedObjects,
Promise.resolve({
healthCheck: {
interval: '5m',
},
invalidateApiKeysTask: {
interval: '5m',
removalDelay: '1h',
},
}),
retryDelay
).subscribe((status) => {
expect(status.level).toEqual(ServiceStatusLevels.unavailable);
expect(status.summary).toEqual('Alerting framework is unavailable');
expect(status.meta).toEqual({ error: err });
});

for (let i = 0; i < MAX_RETRY_ATTEMPTS + 1; i++) {
await tick();
jest.advanceTimersByTime(retryDelay);
}
expect(mockTaskManager.get).toHaveBeenCalledTimes(MAX_RETRY_ATTEMPTS + 1);
});

it('should schedule a new health check task if it does not exist without throwing an error', async () => {
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get.mockRejectedValue({
output: {
statusCode: 404,
message: 'Not Found',
},
});

const status = await getHealthServiceStatusWithRetryAndErrorHandling(
mockTaskManager,
logger,
savedObjects,
Promise.resolve({
healthCheck: {
interval: '5m',
},
invalidateApiKeysTask: {
interval: '5m',
removalDelay: '1h',
},
})
).toPromise();

expect(mockTaskManager.ensureScheduled).toHaveBeenCalledTimes(1);
expect(status.level).toEqual(ServiceStatusLevels.degraded);
expect(status.summary).toEqual('Alerting framework is degraded');
expect(status.meta).toBeUndefined();
});
});
61 changes: 45 additions & 16 deletions x-pack/plugins/alerting/server/health/get_state.ts
Original file line number Diff line number Diff line change
@@ -8,27 +8,38 @@
import { i18n } from '@kbn/i18n';
import { defer, of, interval, Observable, throwError, timer } from 'rxjs';
import { catchError, mergeMap, retryWhen, switchMap } from 'rxjs/operators';
import { ServiceStatus, ServiceStatusLevels } from '../../../../../src/core/server';
import {
Logger,
SavedObjectsServiceStart,
ServiceStatus,
ServiceStatusLevels,
} from '../../../../../src/core/server';
import { TaskManagerStartContract } from '../../../task_manager/server';
import { HEALTH_TASK_ID } from './task';
import { HEALTH_TASK_ID, scheduleAlertingHealthCheck } from './task';
import { HealthStatus } from '../types';
import { getAlertingHealthStatus } from './get_health';
import { AlertsConfig } from '../config';

export const MAX_RETRY_ATTEMPTS = 3;
const HEALTH_STATUS_INTERVAL = 60000 * 5; // Five minutes
const RETRY_DELAY = 5000; // Wait 5 seconds before retrying on errors

async function getLatestTaskState(taskManager: TaskManagerStartContract) {
async function getLatestTaskState(
taskManager: TaskManagerStartContract,
logger: Logger,
savedObjects: SavedObjectsServiceStart,
config: Promise<AlertsConfig>
) {
try {
const result = await taskManager.get(HEALTH_TASK_ID);
return result;
return await taskManager.get(HEALTH_TASK_ID);
} catch (err) {
const errMessage = err && err.message ? err.message : err.toString();
if (!errMessage.includes('NotInitialized')) {
throw err;
// if task is not found
if (err?.output?.statusCode === 404) {
await scheduleAlertingHealthCheck(logger, config, taskManager);
return await getAlertingHealthStatus(savedObjects);
}
throw err;
}

return null;
}

const LEVEL_SUMMARY = {
@@ -53,13 +64,16 @@ const LEVEL_SUMMARY = {
};

const getHealthServiceStatus = async (
taskManager: TaskManagerStartContract
taskManager: TaskManagerStartContract,
logger: Logger,
savedObjects: SavedObjectsServiceStart,
config: Promise<AlertsConfig>
): Promise<ServiceStatus<unknown>> => {
const doc = await getLatestTaskState(taskManager);
const doc = await getLatestTaskState(taskManager, logger, savedObjects, config);
const level =
doc?.state?.health_status === HealthStatus.OK
doc.state?.health_status === HealthStatus.OK
? ServiceStatusLevels.available
: doc?.state?.health_status === HealthStatus.Warning
: doc.state?.health_status === HealthStatus.Warning
? ServiceStatusLevels.degraded
: ServiceStatusLevels.unavailable;
return {
@@ -70,9 +84,12 @@ const getHealthServiceStatus = async (

export const getHealthServiceStatusWithRetryAndErrorHandling = (
taskManager: TaskManagerStartContract,
logger: Logger,
savedObjects: SavedObjectsServiceStart,
config: Promise<AlertsConfig>,
retryDelay?: number
): Observable<ServiceStatus<unknown>> => {
return defer(() => getHealthServiceStatus(taskManager)).pipe(
return defer(() => getHealthServiceStatus(taskManager, logger, savedObjects, config)).pipe(
retryWhen((errors) => {
return errors.pipe(
mergeMap((error, i) => {
@@ -85,6 +102,7 @@ export const getHealthServiceStatusWithRetryAndErrorHandling = (
);
}),
catchError((error) => {
logger.warn(`Alerting framework is unavailable due to the error: ${error}`);
return of({
level: ServiceStatusLevels.unavailable,
summary: LEVEL_SUMMARY[ServiceStatusLevels.unavailable.toString()],
@@ -96,9 +114,20 @@ export const getHealthServiceStatusWithRetryAndErrorHandling = (

export const getHealthStatusStream = (
taskManager: TaskManagerStartContract,
logger: Logger,
savedObjects: SavedObjectsServiceStart,
config: Promise<AlertsConfig>,
healthStatusInterval?: number,
retryDelay?: number
): Observable<ServiceStatus<unknown>> =>
interval(healthStatusInterval ?? HEALTH_STATUS_INTERVAL).pipe(
switchMap(() => getHealthServiceStatusWithRetryAndErrorHandling(taskManager, retryDelay))
switchMap(() =>
getHealthServiceStatusWithRetryAndErrorHandling(
taskManager,
logger,
savedObjects,
config,
retryDelay
)
)
);
13 changes: 4 additions & 9 deletions x-pack/plugins/alerting/server/health/task.ts
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ import {
import { AlertsConfig } from '../config';
import { AlertingPluginsStart } from '../plugin';
import { HealthStatus } from '../types';
import { getHealth } from './get_health';
import { getAlertingHealthStatus } from './get_health';

export const HEALTH_TASK_TYPE = 'alerting_health_check';

@@ -71,15 +71,10 @@ export function healthCheckTaskRunner(
return {
async run() {
try {
const alertingHealthStatus = await getHealth(
(await coreStartServices)[0].savedObjects.createInternalRepository(['alert'])
return await getAlertingHealthStatus(
(await coreStartServices)[0].savedObjects,
state.runs
);
return {
state: {
runs: (state.runs || 0) + 1,
health_status: alertingHealthStatus.decryptionHealth.status,
},
};
} catch (errMsg) {
logger.warn(`Error executing alerting health check task: ${errMsg}`);
return {
9 changes: 7 additions & 2 deletions x-pack/plugins/alerting/server/plugin.ts
Original file line number Diff line number Diff line change
@@ -220,11 +220,16 @@ export class AlertingPlugin {
this.config
);

core.getStartServices().then(async ([, startPlugins]) => {
core.getStartServices().then(async ([coreStart, startPlugins]) => {
core.status.set(
combineLatest([
core.status.derivedStatus$,
getHealthStatusStream(startPlugins.taskManager),
getHealthStatusStream(
startPlugins.taskManager,
this.logger,
coreStart.savedObjects,
this.config
),
]).pipe(
map(([derivedStatus, healthStatus]) => {
if (healthStatus.level > derivedStatus.level) {

0 comments on commit fcc2ac5

Please sign in to comment.