From f83726c77f2cc1428a2b0411190882f6ad03e33f Mon Sep 17 00:00:00 2001 From: shahzad31 Date: Tue, 4 Oct 2022 18:50:43 +0200 Subject: [PATCH] update --- .../synthetics_service/test_now_monitor.ts | 2 +- .../synthetics_service.test.ts | 19 +- .../synthetics_service/synthetics_service.ts | 197 +++++++++--------- 3 files changed, 96 insertions(+), 122 deletions(-) diff --git a/x-pack/plugins/synthetics/server/routes/synthetics_service/test_now_monitor.ts b/x-pack/plugins/synthetics/server/routes/synthetics_service/test_now_monitor.ts index 82ce2ffdc5cce..0a2c3f599ff62 100644 --- a/x-pack/plugins/synthetics/server/routes/synthetics_service/test_now_monitor.ts +++ b/x-pack/plugins/synthetics/server/routes/synthetics_service/test_now_monitor.ts @@ -56,7 +56,7 @@ export const testNowMonitorRoute: SyntheticsRestApiRouteFactory = () => ({ const testRunId = uuidv4(); - const errors = await syntheticsService.triggerConfigs(request, [ + const errors = await syntheticsService.runOnceConfigs([ formatHeartbeatRequest({ // making it enabled, even if it's disabled in the UI monitor: { ...normalizedMonitor.attributes, enabled: true }, diff --git a/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.test.ts b/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.test.ts index 5c935aba57dd1..472d9792a37bf 100644 --- a/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.test.ts +++ b/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.test.ts @@ -157,23 +157,6 @@ describe('SyntheticsService', () => { }); describe('pushConfigs', () => { - it('does not include the isEdit flag on normal push requests', async () => { - const { service, locations } = getMockedService(); - - (axios as jest.MockedFunction).mockResolvedValue({} as AxiosResponse); - - const payload = getFakePayload([locations[0]]); - - await service.pushConfigs([payload] as HeartbeatConfig[]); - - expect(axios).toHaveBeenCalledTimes(1); - expect(axios).toHaveBeenCalledWith( - expect.objectContaining({ - data: expect.objectContaining({ is_edit: false }), - }) - ); - }); - it('includes the isEdit flag on edit requests', async () => { const { service, locations } = getMockedService(); @@ -181,7 +164,7 @@ describe('SyntheticsService', () => { const payload = getFakePayload([locations[0]]); - await service.pushConfigs([payload] as HeartbeatConfig[], true); + await service.editConfig([payload] as HeartbeatConfig[]); expect(axios).toHaveBeenCalledTimes(1); expect(axios).toHaveBeenCalledWith( diff --git a/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.ts b/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.ts index c0f977758f6da..457d4e79e418e 100644 --- a/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.ts +++ b/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.ts @@ -8,13 +8,14 @@ /* eslint-disable max-classes-per-file */ import { SavedObject } from '@kbn/core/server'; -import { KibanaRequest, Logger } from '@kbn/core/server'; +import { Logger } from '@kbn/core/server'; import { ConcreteTaskInstance, TaskManagerSetupContract, TaskManagerStartContract, TaskInstance, } from '@kbn/task-manager-plugin/server'; +import { Subject } from 'rxjs'; import { sendErrorTelemetryEvents } from '../routes/telemetry/monitor_upgrade_sender'; import { UptimeServerSetup } from '../legacy_uptime/lib/adapters'; import { installSyntheticsIndexTemplates } from '../routes/synthetics_service/install_index_templates'; @@ -170,7 +171,7 @@ export class SyntheticsService { if (service.isAllowed) { service.setupIndexTemplates(); - service.syncErrors = await service.pushConfigs(); + await service.pushConfigs(); } } catch (e) { sendErrorTelemetryEvents(service.logger, service.server.telemetry, { @@ -306,65 +307,44 @@ export class SyntheticsService { } } - async pushConfigs(configs?: HeartbeatConfig[], isEdit?: boolean) { - const monitorConfigs = configs ?? (await this.getMonitorConfigs()); - const monitors = this.formatConfigs(monitorConfigs); - - if (monitors.length === 0) { - this.logger.debug('No monitor found which can be pushed to service.'); - return null; - } - - this.apiKey = await this.getApiKey(); - - if (!this.apiKey) { - return null; - } + async pushConfigs() { + const service = this; + const subject = new Subject(); - const data = { - monitors, - output: await this.getOutput(this.apiKey), - isEdit: !!isEdit, - }; + subject.subscribe(async (monitorConfigs) => { + const monitors = this.formatConfigs(monitorConfigs); - this.logger.debug(`${monitors.length} monitors will be pushed to synthetics service.`); + if (monitors.length === 0) { + this.logger.debug('No monitor found which can be pushed to service.'); + return null; + } - try { - this.syncErrors = await this.apiClient.put(data); - return this.syncErrors; - } catch (e) { - this.logger.error(e); - throw e; - } - } + this.apiKey = await this.getApiKey(); - async runOnceConfigs(configs?: HeartbeatConfig[]) { - const monitors = this.formatConfigs(configs || (await this.getMonitorConfigs())); - if (monitors.length === 0) { - return; - } + if (!this.apiKey) { + return null; + } - this.apiKey = await this.getApiKey(); + const data = { + monitors, + output: await this.getOutput(this.apiKey), + }; - if (!this.apiKey) { - return null; - } + this.logger.debug(`${monitors.length} monitors will be pushed to synthetics service.`); - const data = { - monitors, - output: await this.getOutput(this.apiKey), - }; + try { + service.syncErrors = await this.apiClient.put(data); + } catch (e) { + this.logger.error(e); + throw e; + } + }); - try { - return await this.apiClient.runOnce(data); - } catch (e) { - this.logger.error(e); - throw e; - } + await this.getMonitorConfigs(subject); } - async triggerConfigs(request?: KibanaRequest, configs?: HeartbeatConfig[]) { - const monitors = this.formatConfigs(configs || (await this.getMonitorConfigs())); + async runOnceConfigs(configs: HeartbeatConfig[]) { + const monitors = this.formatConfigs(configs); if (monitors.length === 0) { return; } @@ -401,81 +381,92 @@ export class SyntheticsService { }; const result = await this.apiClient.delete(data); if (this.syncErrors && this.syncErrors?.length > 0) { - this.syncErrors = await this.pushConfigs(); + await this.pushConfigs(); } return result; } async deleteAllConfigs() { - const configs = await this.getMonitorConfigs(); - return await this.deleteConfigs(configs); + const subject = new Subject(); + + subject.subscribe(async (monitors) => { + await this.deleteConfigs(monitors); + }); + + await this.getMonitorConfigs(subject); } - async getMonitorConfigs() { - const savedObjectsClient = this.server.savedObjectsClient; + async getMonitorConfigs(subject: Subject) { + const soClient = this.server.savedObjectsClient; const encryptedClient = this.server.encryptedSavedObjects.getClient(); - if (!savedObjectsClient?.find) { + if (!soClient?.find) { return [] as SyntheticsMonitorWithId[]; } - const { saved_objects: encryptedMonitors } = await savedObjectsClient.find({ + const finder = soClient.createPointInTimeFinder({ type: syntheticsMonitorType, + perPage: 500, namespaces: ['*'], - perPage: 10000, }); const start = performance.now(); - const monitors: Array> = ( - await Promise.all( - encryptedMonitors.map( - (monitor) => - new Promise((resolve) => { - encryptedClient - .getDecryptedAsInternalUser( - syntheticsMonitor.name, - monitor.id, - { - namespace: monitor.namespaces?.[0], - } - ) - .then((decryptedMonitor) => resolve(decryptedMonitor)) - .catch((e) => { - this.logger.error(e); - sendErrorTelemetryEvents(this.logger, this.server.telemetry, { - reason: 'Failed to decrypt monitor', - message: e?.message, - type: 'runTaskError', - code: e?.code, - status: e.status, - kibanaVersion: this.server.kibanaVersion, + for await (const result of finder.find()) { + const encryptedMonitors = result.saved_objects; + + const monitors: Array> = ( + await Promise.all( + encryptedMonitors.map( + (monitor) => + new Promise((resolve) => { + encryptedClient + .getDecryptedAsInternalUser( + syntheticsMonitor.name, + monitor.id, + { + namespace: monitor.namespaces?.[0], + } + ) + .then((decryptedMonitor) => resolve(decryptedMonitor)) + .catch((e) => { + this.logger.error(e); + sendErrorTelemetryEvents(this.logger, this.server.telemetry, { + reason: 'Failed to decrypt monitor', + message: e?.message, + type: 'runTaskError', + code: e?.code, + status: e.status, + kibanaVersion: this.server.kibanaVersion, + }); + resolve(null); }); - resolve(null); - }); - }) + }) + ) ) - ) - ).filter((monitor) => monitor !== null) as Array>; - - const end = performance.now(); - const duration = end - start; + ).filter((monitor) => monitor !== null) as Array>; - this.logger.debug(`Decrypted ${monitors.length} monitors. Took ${duration} milliseconds`, { - event: { - duration, - }, - monitors: monitors.length, - }); + const end = performance.now(); + const duration = end - start; - return (monitors ?? []).map((monitor) => { - const attributes = monitor.attributes as unknown as MonitorFields; - return formatHeartbeatRequest({ - monitor: normalizeSecrets(monitor).attributes, - monitorId: monitor.id, - customHeartbeatId: attributes[ConfigKey.CUSTOM_HEARTBEAT_ID], + this.logger.debug(`Decrypted ${monitors.length} monitors. Took ${duration} milliseconds`, { + event: { + duration, + }, + monitors: monitors.length, }); - }); + + subject.next( + (monitors ?? []).map((monitor) => { + const attributes = monitor.attributes as unknown as MonitorFields; + return formatHeartbeatRequest({ + monitor: normalizeSecrets(monitor).attributes, + monitorId: monitor.id, + customHeartbeatId: attributes[ConfigKey.CUSTOM_HEARTBEAT_ID], + }); + }) + ); + } } formatConfigs(configs: SyntheticsMonitorWithId[]) {