Skip to content

Commit

Permalink
[Synthetics] Use createPointInTimeFinder to fetch savedObjects in sync (
Browse files Browse the repository at this point in the history
  • Loading branch information
shahzad31 authored Oct 14, 2022
1 parent 195ce74 commit 161d8b9
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export const testNowMonitorRoute: SyntheticsRestApiRouteFactory = () => ({

const testRunId = uuidv4();

const errors = await syntheticsService.triggerConfigs(request, [
const errors = await syntheticsService.runOnceConfigs([
formatHeartbeatRequest({
// making it enabled, even if it's disabled in the UI
monitor: { ...normalizedMonitor.attributes, enabled: true },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,31 +157,14 @@ describe('SyntheticsService', () => {
});

describe('pushConfigs', () => {
it('does not include the isEdit flag on normal push requests', async () => {
const { service, locations } = getMockedService();

(axios as jest.MockedFunction<typeof axios>).mockResolvedValue({} as AxiosResponse);

const payload = getFakePayload([locations[0]]);

await service.pushConfigs([payload] as HeartbeatConfig[]);

expect(axios).toHaveBeenCalledTimes(1);
expect(axios).toHaveBeenCalledWith(
expect.objectContaining({
data: expect.objectContaining({ is_edit: false }),
})
);
});

it('includes the isEdit flag on edit requests', async () => {
const { service, locations } = getMockedService();

(axios as jest.MockedFunction<typeof axios>).mockResolvedValue({} as AxiosResponse);

const payload = getFakePayload([locations[0]]);

await service.pushConfigs([payload] as HeartbeatConfig[], true);
await service.editConfig([payload] as HeartbeatConfig[]);

expect(axios).toHaveBeenCalledTimes(1);
expect(axios).toHaveBeenCalledWith(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
/* eslint-disable max-classes-per-file */

import { SavedObject } from '@kbn/core/server';
import { KibanaRequest, Logger } from '@kbn/core/server';
import { Logger } from '@kbn/core/server';
import {
ConcreteTaskInstance,
TaskManagerSetupContract,
TaskManagerStartContract,
TaskInstance,
} from '@kbn/task-manager-plugin/server';
import { Subject } from 'rxjs';
import { sendErrorTelemetryEvents } from '../routes/telemetry/monitor_upgrade_sender';
import { UptimeServerSetup } from '../legacy_uptime/lib/adapters';
import { installSyntheticsIndexTemplates } from '../routes/synthetics_service/install_index_templates';
Expand Down Expand Up @@ -170,7 +171,7 @@ export class SyntheticsService {

if (service.isAllowed) {
service.setupIndexTemplates();
service.syncErrors = await service.pushConfigs();
await service.pushConfigs();
}
} catch (e) {
sendErrorTelemetryEvents(service.logger, service.server.telemetry, {
Expand Down Expand Up @@ -306,65 +307,44 @@ export class SyntheticsService {
}
}

async pushConfigs(configs?: HeartbeatConfig[], isEdit?: boolean) {
const monitorConfigs = configs ?? (await this.getMonitorConfigs());
const monitors = this.formatConfigs(monitorConfigs);

if (monitors.length === 0) {
this.logger.debug('No monitor found which can be pushed to service.');
return null;
}

this.apiKey = await this.getApiKey();

if (!this.apiKey) {
return null;
}
async pushConfigs() {
const service = this;
const subject = new Subject<SyntheticsMonitorWithId[]>();

const data = {
monitors,
output: await this.getOutput(this.apiKey),
isEdit: !!isEdit,
};
subject.subscribe(async (monitorConfigs) => {
const monitors = this.formatConfigs(monitorConfigs);

this.logger.debug(`${monitors.length} monitors will be pushed to synthetics service.`);
if (monitors.length === 0) {
this.logger.debug('No monitor found which can be pushed to service.');
return null;
}

try {
this.syncErrors = await this.apiClient.put(data);
return this.syncErrors;
} catch (e) {
this.logger.error(e);
throw e;
}
}
this.apiKey = await this.getApiKey();

async runOnceConfigs(configs?: HeartbeatConfig[]) {
const monitors = this.formatConfigs(configs || (await this.getMonitorConfigs()));
if (monitors.length === 0) {
return;
}
if (!this.apiKey) {
return null;
}

this.apiKey = await this.getApiKey();
const data = {
monitors,
output: await this.getOutput(this.apiKey),
};

if (!this.apiKey) {
return null;
}
this.logger.debug(`${monitors.length} monitors will be pushed to synthetics service.`);

const data = {
monitors,
output: await this.getOutput(this.apiKey),
};
try {
service.syncErrors = await this.apiClient.put(data);
} catch (e) {
this.logger.error(e);
throw e;
}
});

try {
return await this.apiClient.runOnce(data);
} catch (e) {
this.logger.error(e);
throw e;
}
await this.getMonitorConfigs(subject);
}

async triggerConfigs(request?: KibanaRequest, configs?: HeartbeatConfig[]) {
const monitors = this.formatConfigs(configs || (await this.getMonitorConfigs()));
async runOnceConfigs(configs: HeartbeatConfig[]) {
const monitors = this.formatConfigs(configs);
if (monitors.length === 0) {
return;
}
Expand Down Expand Up @@ -401,81 +381,92 @@ export class SyntheticsService {
};
const result = await this.apiClient.delete(data);
if (this.syncErrors && this.syncErrors?.length > 0) {
this.syncErrors = await this.pushConfigs();
await this.pushConfigs();
}
return result;
}

async deleteAllConfigs() {
const configs = await this.getMonitorConfigs();
return await this.deleteConfigs(configs);
const subject = new Subject<SyntheticsMonitorWithId[]>();

subject.subscribe(async (monitors) => {
await this.deleteConfigs(monitors);
});

await this.getMonitorConfigs(subject);
}

async getMonitorConfigs() {
const savedObjectsClient = this.server.savedObjectsClient;
async getMonitorConfigs(subject: Subject<SyntheticsMonitorWithId[]>) {
const soClient = this.server.savedObjectsClient;
const encryptedClient = this.server.encryptedSavedObjects.getClient();

if (!savedObjectsClient?.find) {
if (!soClient?.find) {
return [] as SyntheticsMonitorWithId[];
}

const { saved_objects: encryptedMonitors } = await savedObjectsClient.find<SyntheticsMonitor>({
const finder = soClient.createPointInTimeFinder({
type: syntheticsMonitorType,
perPage: 500,
namespaces: ['*'],
perPage: 10000,
});

const start = performance.now();

const monitors: Array<SavedObject<SyntheticsMonitorWithSecrets>> = (
await Promise.all(
encryptedMonitors.map(
(monitor) =>
new Promise((resolve) => {
encryptedClient
.getDecryptedAsInternalUser<SyntheticsMonitorWithSecrets>(
syntheticsMonitor.name,
monitor.id,
{
namespace: monitor.namespaces?.[0],
}
)
.then((decryptedMonitor) => resolve(decryptedMonitor))
.catch((e) => {
this.logger.error(e);
sendErrorTelemetryEvents(this.logger, this.server.telemetry, {
reason: 'Failed to decrypt monitor',
message: e?.message,
type: 'runTaskError',
code: e?.code,
status: e.status,
kibanaVersion: this.server.kibanaVersion,
for await (const result of finder.find()) {
const encryptedMonitors = result.saved_objects;

const monitors: Array<SavedObject<SyntheticsMonitorWithSecrets>> = (
await Promise.all(
encryptedMonitors.map(
(monitor) =>
new Promise((resolve) => {
encryptedClient
.getDecryptedAsInternalUser<SyntheticsMonitorWithSecrets>(
syntheticsMonitor.name,
monitor.id,
{
namespace: monitor.namespaces?.[0],
}
)
.then((decryptedMonitor) => resolve(decryptedMonitor))
.catch((e) => {
this.logger.error(e);
sendErrorTelemetryEvents(this.logger, this.server.telemetry, {
reason: 'Failed to decrypt monitor',
message: e?.message,
type: 'runTaskError',
code: e?.code,
status: e.status,
kibanaVersion: this.server.kibanaVersion,
});
resolve(null);
});
resolve(null);
});
})
})
)
)
)
).filter((monitor) => monitor !== null) as Array<SavedObject<SyntheticsMonitorWithSecrets>>;

const end = performance.now();
const duration = end - start;
).filter((monitor) => monitor !== null) as Array<SavedObject<SyntheticsMonitorWithSecrets>>;

this.logger.debug(`Decrypted ${monitors.length} monitors. Took ${duration} milliseconds`, {
event: {
duration,
},
monitors: monitors.length,
});
const end = performance.now();
const duration = end - start;

return (monitors ?? []).map((monitor) => {
const attributes = monitor.attributes as unknown as MonitorFields;
return formatHeartbeatRequest({
monitor: normalizeSecrets(monitor).attributes,
monitorId: monitor.id,
customHeartbeatId: attributes[ConfigKey.CUSTOM_HEARTBEAT_ID],
this.logger.debug(`Decrypted ${monitors.length} monitors. Took ${duration} milliseconds`, {
event: {
duration,
},
monitors: monitors.length,
});
});

subject.next(
(monitors ?? []).map((monitor) => {
const attributes = monitor.attributes as unknown as MonitorFields;
return formatHeartbeatRequest({
monitor: normalizeSecrets(monitor).attributes,
monitorId: monitor.id,
customHeartbeatId: attributes[ConfigKey.CUSTOM_HEARTBEAT_ID],
});
})
);
}
}

formatConfigs(configs: SyntheticsMonitorWithId[]) {
Expand Down

0 comments on commit 161d8b9

Please sign in to comment.