diff --git a/src/core/server/status/log_plugins_status.test.ts b/src/core/server/status/log_plugins_status.test.ts new file mode 100644 index 00000000000000..e1be88fe1b1886 --- /dev/null +++ b/src/core/server/status/log_plugins_status.test.ts @@ -0,0 +1,332 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { TestScheduler } from 'rxjs/testing'; +import { PluginName } from '../plugins'; +import { ServiceStatus, ServiceStatusLevel, ServiceStatusLevels } from './types'; +import { + getPluginsStatusChanges, + getPluginsStatusDiff, + getServiceLevelChangeMessage, +} from './log_plugins_status'; + +type ObsInputType = Record; + +const getTestScheduler = () => + new TestScheduler((actual, expected) => { + expect(actual).toEqual(expected); + }); + +const createServiceStatus = (level: ServiceStatusLevel): ServiceStatus => ({ + level, + summary: 'summary', +}); + +const createPluginsStatuses = ( + input: Record +): Record => { + return Object.entries(input).reduce((output, [name, level]) => { + output[name] = createServiceStatus(level); + return output; + }, {} as Record); +}; + +describe('getPluginsStatusChanges', () => { + it('does not emit on first plugins$ emission', () => { + getTestScheduler().run(({ expectObservable, hot }) => { + const statuses = createPluginsStatuses({ + pluginA: ServiceStatusLevels.available, + pluginB: ServiceStatusLevels.degraded, + }); + + const overall$ = hot('-a', { + a: statuses, + }); + const stop$ = hot(''); + const expected = '--'; + + expectObservable(getPluginsStatusChanges(overall$, stop$, 1)).toBe(expected); + }); + }); + + it('does not emit if statuses do not change', () => { + getTestScheduler().run(({ expectObservable, hot }) => { + const statuses = createPluginsStatuses({ + pluginA: ServiceStatusLevels.available, + pluginB: ServiceStatusLevels.degraded, + }); + + const overall$ = hot('-a-b', { + a: statuses, + b: statuses, + }); + const stop$ = hot(''); + const expected = '----'; + + expectObservable(getPluginsStatusChanges(overall$, stop$, 1)).toBe(expected); + }); + }); + + it('emits if any plugin status changes', () => { + getTestScheduler().run(({ expectObservable, hot }) => { + const statusesA = createPluginsStatuses({ + pluginA: ServiceStatusLevels.available, + pluginB: ServiceStatusLevels.degraded, + }); + const statusesB = createPluginsStatuses({ + pluginA: ServiceStatusLevels.available, + pluginB: ServiceStatusLevels.available, + }); + + const overall$ = hot('-a-b', { + a: statusesA, + b: statusesB, + }); + const stop$ = hot(''); + const expected = '---a'; + + expectObservable(getPluginsStatusChanges(overall$, stop$, 1)).toBe(expected, { + a: [ + { + previousLevel: 'degraded', + nextLevel: 'available', + impactedServices: ['pluginB'], + }, + ], + }); + }); + }); + + it('emits everytime any plugin status changes', () => { + getTestScheduler().run(({ expectObservable, hot }) => { + const availableStatus = createPluginsStatuses({ + pluginA: ServiceStatusLevels.available, + }); + const degradedStatus = createPluginsStatuses({ + pluginA: ServiceStatusLevels.degraded, + }); + + const overall$ = hot('-a-b-c-d', { + a: availableStatus, + b: degradedStatus, + c: degradedStatus, + d: availableStatus, + }); + const stop$ = hot(''); + const expected = '---a---b'; + + expectObservable(getPluginsStatusChanges(overall$, stop$, 1)).toBe(expected, { + a: [ + { + previousLevel: 'available', + nextLevel: 'degraded', + impactedServices: ['pluginA'], + }, + ], + b: [ + { + previousLevel: 'degraded', + nextLevel: 'available', + impactedServices: ['pluginA'], + }, + ], + }); + }); + }); + + it('throttle events', () => { + getTestScheduler().run(({ expectObservable, hot }) => { + const statusesA = createPluginsStatuses({ + pluginA: ServiceStatusLevels.available, + pluginB: ServiceStatusLevels.degraded, + }); + const statusesB = createPluginsStatuses({ + pluginA: ServiceStatusLevels.available, + pluginB: ServiceStatusLevels.available, + }); + const statusesC = createPluginsStatuses({ + pluginA: ServiceStatusLevels.degraded, + pluginB: ServiceStatusLevels.available, + }); + + const overall$ = hot('-a-b--c', { + a: statusesA, + b: statusesB, + c: statusesC, + }); + const stop$ = hot(''); + const expected = '------a'; + + expectObservable(getPluginsStatusChanges(overall$, stop$, 5)).toBe(expected, { + a: [ + { + previousLevel: 'available', + nextLevel: 'degraded', + impactedServices: ['pluginA'], + }, + { + previousLevel: 'degraded', + nextLevel: 'available', + impactedServices: ['pluginB'], + }, + ], + }); + }); + }); + + it('stops emitting once `stop$` emits', () => { + getTestScheduler().run(({ expectObservable, hot }) => { + const statusesA = createPluginsStatuses({ + pluginA: ServiceStatusLevels.available, + pluginB: ServiceStatusLevels.degraded, + }); + const statusesB = createPluginsStatuses({ + pluginA: ServiceStatusLevels.available, + pluginB: ServiceStatusLevels.available, + }); + const statusesC = createPluginsStatuses({ + pluginA: ServiceStatusLevels.degraded, + pluginB: ServiceStatusLevels.available, + }); + + const overall$ = hot('-a-b-c', { + a: statusesA, + b: statusesB, + c: statusesC, + }); + const stop$ = hot('----(s|)'); + const expected = '---a|'; + + expectObservable(getPluginsStatusChanges(overall$, stop$, 1)).toBe(expected, { + a: [ + { + previousLevel: 'degraded', + nextLevel: 'available', + impactedServices: ['pluginB'], + }, + ], + }); + }); + }); +}); + +describe('getPluginsStatusDiff', () => { + it('returns an empty list if level is the same for all plugins', () => { + const previousStatus = createPluginsStatuses({ + pluginA: ServiceStatusLevels.available, + pluginB: ServiceStatusLevels.degraded, + pluginC: ServiceStatusLevels.unavailable, + }); + + const nextStatus = createPluginsStatuses({ + pluginA: ServiceStatusLevels.available, + pluginB: ServiceStatusLevels.degraded, + pluginC: ServiceStatusLevels.unavailable, + }); + + const result = getPluginsStatusDiff(previousStatus, nextStatus); + + expect(result).toEqual([]); + }); + + it('returns an single entry if only one status changed', () => { + const previousStatus = createPluginsStatuses({ + pluginA: ServiceStatusLevels.available, + pluginB: ServiceStatusLevels.degraded, + pluginC: ServiceStatusLevels.unavailable, + }); + + const nextStatus = createPluginsStatuses({ + pluginA: ServiceStatusLevels.degraded, + pluginB: ServiceStatusLevels.degraded, + pluginC: ServiceStatusLevels.unavailable, + }); + + const result = getPluginsStatusDiff(previousStatus, nextStatus); + + expect(result).toEqual([ + { + previousLevel: 'available', + nextLevel: 'degraded', + impactedServices: ['pluginA'], + }, + ]); + }); + + it('groups plugins by previous and next level tuples', () => { + const previousStatus = createPluginsStatuses({ + pluginA: ServiceStatusLevels.available, + pluginB: ServiceStatusLevels.available, + pluginC: ServiceStatusLevels.unavailable, + }); + + const nextStatus = createPluginsStatuses({ + pluginA: ServiceStatusLevels.degraded, + pluginB: ServiceStatusLevels.degraded, + pluginC: ServiceStatusLevels.unavailable, + }); + + const result = getPluginsStatusDiff(previousStatus, nextStatus); + + expect(result).toEqual([ + { + previousLevel: 'available', + nextLevel: 'degraded', + impactedServices: ['pluginA', 'pluginB'], + }, + ]); + }); + + it('returns one entry per previous and next level tuples', () => { + const previousStatus = createPluginsStatuses({ + pluginA: ServiceStatusLevels.available, + pluginB: ServiceStatusLevels.degraded, + pluginC: ServiceStatusLevels.unavailable, + }); + + const nextStatus = createPluginsStatuses({ + pluginA: ServiceStatusLevels.degraded, + pluginB: ServiceStatusLevels.unavailable, + pluginC: ServiceStatusLevels.available, + }); + + const result = getPluginsStatusDiff(previousStatus, nextStatus); + + expect(result).toEqual([ + { + previousLevel: 'available', + nextLevel: 'degraded', + impactedServices: ['pluginA'], + }, + { + previousLevel: 'degraded', + nextLevel: 'unavailable', + impactedServices: ['pluginB'], + }, + { + previousLevel: 'unavailable', + nextLevel: 'available', + impactedServices: ['pluginC'], + }, + ]); + }); +}); + +describe('getServiceLevelChangeMessage', () => { + it('returns a human readable message about the change', () => { + expect( + getServiceLevelChangeMessage({ + previousLevel: 'available', + nextLevel: 'degraded', + impactedServices: ['pluginA', 'pluginB'], + }) + ).toMatchInlineSnapshot( + `"2 plugins changed status from 'available' to 'degraded': pluginA, pluginB"` + ); + }); +}); diff --git a/src/core/server/status/log_plugins_status.ts b/src/core/server/status/log_plugins_status.ts new file mode 100644 index 00000000000000..5b5d0d84efc4d9 --- /dev/null +++ b/src/core/server/status/log_plugins_status.ts @@ -0,0 +1,104 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { isDeepStrictEqual } from 'util'; +import { Observable, asyncScheduler } from 'rxjs'; +import { + distinctUntilChanged, + pairwise, + takeUntil, + map, + filter, + throttleTime, +} from 'rxjs/operators'; +import { PluginName } from '../plugins'; +import { ServiceStatus } from './types'; + +export type ServiceStatusWithName = ServiceStatus & { + name: PluginName; +}; + +export interface ServiceLevelChange { + previousLevel: string; + nextLevel: string; + impactedServices: string[]; +} + +export const getPluginsStatusChanges = ( + plugins$: Observable>, + stop$: Observable, + throttleDuration: number = 250 +): Observable => { + return plugins$.pipe( + takeUntil(stop$), + distinctUntilChanged((previous, next) => + isDeepStrictEqual(getStatusLevelMap(previous), getStatusLevelMap(next)) + ), + throttleTime(throttleDuration, asyncScheduler, { leading: true, trailing: true }), + pairwise(), + map(([oldStatus, newStatus]) => { + return getPluginsStatusDiff(oldStatus, newStatus); + }), + filter((statusChanges) => statusChanges.length > 0) + ); +}; + +const getStatusLevelMap = ( + plugins: Record +): Record => { + return Object.entries(plugins).reduce((levelMap, [key, value]) => { + levelMap[key] = value.level.toString(); + return levelMap; + }, {} as Record); +}; + +export const getPluginsStatusDiff = ( + previous: Record, + next: Record +): ServiceLevelChange[] => { + const statusChanges: Map = new Map(); + + Object.entries(next).forEach(([pluginName, nextStatus]) => { + const previousStatus = previous[pluginName]; + if (!previousStatus) { + return; + } + const previousLevel = statusLevel(previousStatus); + const nextLevel = statusLevel(nextStatus); + if (previousLevel === nextLevel) { + return; + } + const changeKey = statusChangeKey(previousLevel, nextLevel); + let statusChange = statusChanges.get(changeKey); + if (!statusChange) { + statusChange = { + previousLevel, + nextLevel, + impactedServices: [], + }; + statusChanges.set(changeKey, statusChange); + } + statusChange.impactedServices.push(pluginName); + }); + + return [...statusChanges.values()]; +}; + +export const getServiceLevelChangeMessage = ({ + impactedServices: services, + nextLevel: next, + previousLevel: previous, +}: ServiceLevelChange): string => { + return `${ + services.length + } plugins changed status from '${previous}' to '${next}': ${services.join(', ')}`; +}; + +const statusLevel = (status: ServiceStatus) => status.level.toString(); + +const statusChangeKey = (previous: string, next: string) => `${previous}:${next}`; diff --git a/src/core/server/status/status_service.test.mocks.ts b/src/core/server/status/status_service.test.mocks.ts new file mode 100644 index 00000000000000..8b860d8355fc68 --- /dev/null +++ b/src/core/server/status/status_service.test.mocks.ts @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +export const getOverallStatusChangesMock = jest.fn(); +jest.doMock('./log_overall_status', () => ({ + getOverallStatusChanges: getOverallStatusChangesMock, +})); + +export const getPluginsStatusChangesMock = jest.fn(); +export const getServiceLevelChangeMessageMock = jest.fn(); +jest.doMock('./log_plugins_status', () => ({ + getPluginsStatusChanges: getPluginsStatusChangesMock, + getServiceLevelChangeMessage: getServiceLevelChangeMessageMock, +})); diff --git a/src/core/server/status/status_service.test.ts b/src/core/server/status/status_service.test.ts index dfd0ff9a7e1034..0b6fbb601c40f0 100644 --- a/src/core/server/status/status_service.test.ts +++ b/src/core/server/status/status_service.test.ts @@ -6,7 +6,13 @@ * Side Public License, v 1. */ -import { of, BehaviorSubject } from 'rxjs'; +import { + getOverallStatusChangesMock, + getPluginsStatusChangesMock, + getServiceLevelChangeMessageMock, +} from './status_service.test.mocks'; + +import { of, BehaviorSubject, Subject } from 'rxjs'; import { ServiceStatus, ServiceStatusLevels, CoreStatus } from './types'; import { StatusService } from './status_service'; @@ -19,14 +25,27 @@ import { mockRouter, RouterMock } from '../http/router/router.mock'; import { metricsServiceMock } from '../metrics/metrics_service.mock'; import { configServiceMock } from '../config/mocks'; import { coreUsageDataServiceMock } from '../core_usage_data/core_usage_data_service.mock'; +import { loggingSystemMock } from '../logging/logging_system.mock'; +import type { ServiceLevelChange } from './log_plugins_status'; expect.addSnapshotSerializer(ServiceStatusLevelSnapshotSerializer); describe('StatusService', () => { let service: StatusService; + let logger: ReturnType; beforeEach(() => { - service = new StatusService(mockCoreContext.create()); + logger = loggingSystemMock.create(); + service = new StatusService(mockCoreContext.create({ logger })); + + getOverallStatusChangesMock.mockReturnValue({ subscribe: jest.fn() }); + getPluginsStatusChangesMock.mockReturnValue({ subscribe: jest.fn() }); + }); + + afterEach(() => { + getOverallStatusChangesMock.mockReset(); + getPluginsStatusChangesMock.mockReset(); + getServiceLevelChangeMessageMock.mockReset(); }); const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); @@ -45,7 +64,7 @@ describe('StatusService', () => { }; type SetupDeps = Parameters[0]; - const setupDeps = (overrides: Partial): SetupDeps => { + const setupDeps = (overrides: Partial = {}): SetupDeps => { return { elasticsearch: { status$: of(available), @@ -536,4 +555,88 @@ describe('StatusService', () => { }); }); }); + + describe('start', () => { + it('calls getOverallStatusChanges and subscribe to the returned observable', async () => { + const mockSubscribe = jest.fn(); + getOverallStatusChangesMock.mockReturnValue({ + subscribe: mockSubscribe, + }); + + await service.setup(setupDeps()); + await service.start(); + + expect(getOverallStatusChangesMock).toHaveBeenCalledTimes(1); + expect(mockSubscribe).toHaveBeenCalledTimes(1); + }); + + it('logs a message everytime the getOverallStatusChangesMock observable emits', async () => { + const subject = new Subject(); + getOverallStatusChangesMock.mockReturnValue(subject); + + await service.setup(setupDeps()); + await service.start(); + + subject.next('some message'); + subject.next('another message'); + + const log = logger.get(); + + expect(log.info).toHaveBeenCalledTimes(2); + expect(log.info).toHaveBeenCalledWith('some message'); + expect(log.info).toHaveBeenCalledWith('another message'); + }); + + it('calls getPluginsStatusChanges and subscribe to the returned observable', async () => { + const mockSubscribe = jest.fn(); + getPluginsStatusChangesMock.mockReturnValue({ + subscribe: mockSubscribe, + }); + + await service.setup(setupDeps()); + await service.start(); + + expect(getPluginsStatusChangesMock).toHaveBeenCalledTimes(1); + expect(mockSubscribe).toHaveBeenCalledTimes(1); + }); + + it('logs messages everytime the getPluginsStatusChangesMock observable emits', async () => { + const subject = new Subject(); + getPluginsStatusChangesMock.mockReturnValue(subject); + + getServiceLevelChangeMessageMock.mockImplementation( + ({ + impactedServices: services, + nextLevel: next, + previousLevel: previous, + }: ServiceLevelChange) => { + return `${previous}-${next}-${services[0]}`; + } + ); + + await service.setup(setupDeps()); + await service.start(); + + subject.next([ + { + previousLevel: 'available', + nextLevel: 'degraded', + impactedServices: ['pluginA'], + }, + ]); + subject.next([ + { + previousLevel: 'degraded', + nextLevel: 'available', + impactedServices: ['pluginB'], + }, + ]); + + const log = logger.get(); + + expect(log.info).toHaveBeenCalledTimes(2); + expect(log.info).toHaveBeenCalledWith('available-degraded-pluginA'); + expect(log.info).toHaveBeenCalledWith('degraded-available-pluginB'); + }); + }); }); diff --git a/src/core/server/status/status_service.ts b/src/core/server/status/status_service.ts index 63a1b02d5b2e7c..be64b2558acd27 100644 --- a/src/core/server/status/status_service.ts +++ b/src/core/server/status/status_service.ts @@ -27,6 +27,7 @@ import { ServiceStatus, CoreStatus, InternalStatusServiceSetup } from './types'; import { getSummaryStatus } from './get_summary_status'; import { PluginsStatusService } from './plugins_status'; import { getOverallStatusChanges } from './log_overall_status'; +import { getPluginsStatusChanges, getServiceLevelChangeMessage } from './log_plugins_status'; interface StatusLogMeta extends LogMeta { kibana: { status: ServiceStatus }; @@ -165,6 +166,12 @@ export class StatusService implements CoreService { getOverallStatusChanges(this.overall$, this.stop$).subscribe((message) => { this.logger.info(message); }); + + getPluginsStatusChanges(this.pluginsStatus.getAll$(), this.stop$).subscribe((statusChanges) => { + statusChanges.forEach((statusChange) => { + this.logger.info(getServiceLevelChangeMessage(statusChange)); + }); + }); } public stop() {