Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Synthetics] Use createPointInTimeFinder to fetch savedObjects in sync #142640

Merged
merged 2 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export const testNowMonitorRoute: SyntheticsRestApiRouteFactory = () => ({

const testRunId = uuidv4();

const errors = await syntheticsService.triggerConfigs(request, [
const errors = await syntheticsService.runOnceConfigs([
formatHeartbeatRequest({
// making it enabled, even if it's disabled in the UI
monitor: { ...normalizedMonitor.attributes, enabled: true },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,31 +157,14 @@ describe('SyntheticsService', () => {
});

describe('pushConfigs', () => {
it('does not include the isEdit flag on normal push requests', async () => {
const { service, locations } = getMockedService();

(axios as jest.MockedFunction<typeof axios>).mockResolvedValue({} as AxiosResponse);

const payload = getFakePayload([locations[0]]);

await service.pushConfigs([payload] as HeartbeatConfig[]);

expect(axios).toHaveBeenCalledTimes(1);
expect(axios).toHaveBeenCalledWith(
expect.objectContaining({
data: expect.objectContaining({ is_edit: false }),
})
);
});

it('includes the isEdit flag on edit requests', async () => {
const { service, locations } = getMockedService();

(axios as jest.MockedFunction<typeof axios>).mockResolvedValue({} as AxiosResponse);

const payload = getFakePayload([locations[0]]);

await service.pushConfigs([payload] as HeartbeatConfig[], true);
await service.editConfig([payload] as HeartbeatConfig[]);

expect(axios).toHaveBeenCalledTimes(1);
expect(axios).toHaveBeenCalledWith(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
/* eslint-disable max-classes-per-file */

import { SavedObject } from '@kbn/core/server';
import { KibanaRequest, Logger } from '@kbn/core/server';
import { Logger } from '@kbn/core/server';
import {
ConcreteTaskInstance,
TaskManagerSetupContract,
TaskManagerStartContract,
TaskInstance,
} from '@kbn/task-manager-plugin/server';
import { Subject } from 'rxjs';
import { sendErrorTelemetryEvents } from '../routes/telemetry/monitor_upgrade_sender';
import { UptimeServerSetup } from '../legacy_uptime/lib/adapters';
import { installSyntheticsIndexTemplates } from '../routes/synthetics_service/install_index_templates';
Expand Down Expand Up @@ -170,7 +171,7 @@ export class SyntheticsService {

if (service.isAllowed) {
service.setupIndexTemplates();
service.syncErrors = await service.pushConfigs();
await service.pushConfigs();
}
} catch (e) {
sendErrorTelemetryEvents(service.logger, service.server.telemetry, {
Expand Down Expand Up @@ -306,65 +307,44 @@ export class SyntheticsService {
}
}

async pushConfigs(configs?: HeartbeatConfig[], isEdit?: boolean) {
const monitorConfigs = configs ?? (await this.getMonitorConfigs());
const monitors = this.formatConfigs(monitorConfigs);

if (monitors.length === 0) {
this.logger.debug('No monitor found which can be pushed to service.');
return null;
}

this.apiKey = await this.getApiKey();

if (!this.apiKey) {
return null;
}
async pushConfigs() {
const service = this;
const subject = new Subject<SyntheticsMonitorWithId[]>();

const data = {
monitors,
output: await this.getOutput(this.apiKey),
isEdit: !!isEdit,
};
subject.subscribe(async (monitorConfigs) => {
const monitors = this.formatConfigs(monitorConfigs);

this.logger.debug(`${monitors.length} monitors will be pushed to synthetics service.`);
if (monitors.length === 0) {
this.logger.debug('No monitor found which can be pushed to service.');
return null;
}

try {
this.syncErrors = await this.apiClient.put(data);
return this.syncErrors;
} catch (e) {
this.logger.error(e);
throw e;
}
}
this.apiKey = await this.getApiKey();

async runOnceConfigs(configs?: HeartbeatConfig[]) {
const monitors = this.formatConfigs(configs || (await this.getMonitorConfigs()));
if (monitors.length === 0) {
return;
}
if (!this.apiKey) {
return null;
}

this.apiKey = await this.getApiKey();
const data = {
monitors,
output: await this.getOutput(this.apiKey),
};

if (!this.apiKey) {
return null;
}
this.logger.debug(`${monitors.length} monitors will be pushed to synthetics service.`);

const data = {
monitors,
output: await this.getOutput(this.apiKey),
};
try {
service.syncErrors = await this.apiClient.put(data);
} catch (e) {
this.logger.error(e);
throw e;
}
});

try {
return await this.apiClient.runOnce(data);
} catch (e) {
this.logger.error(e);
throw e;
}
await this.getMonitorConfigs(subject);
}

async triggerConfigs(request?: KibanaRequest, configs?: HeartbeatConfig[]) {
const monitors = this.formatConfigs(configs || (await this.getMonitorConfigs()));
async runOnceConfigs(configs: HeartbeatConfig[]) {
const monitors = this.formatConfigs(configs);
if (monitors.length === 0) {
return;
}
Expand Down Expand Up @@ -401,81 +381,92 @@ export class SyntheticsService {
};
const result = await this.apiClient.delete(data);
if (this.syncErrors && this.syncErrors?.length > 0) {
this.syncErrors = await this.pushConfigs();
await this.pushConfigs();
}
return result;
}

async deleteAllConfigs() {
const configs = await this.getMonitorConfigs();
return await this.deleteConfigs(configs);
const subject = new Subject<SyntheticsMonitorWithId[]>();

subject.subscribe(async (monitors) => {
await this.deleteConfigs(monitors);
});

await this.getMonitorConfigs(subject);
}

async getMonitorConfigs() {
const savedObjectsClient = this.server.savedObjectsClient;
async getMonitorConfigs(subject: Subject<SyntheticsMonitorWithId[]>) {
const soClient = this.server.savedObjectsClient;
const encryptedClient = this.server.encryptedSavedObjects.getClient();

if (!savedObjectsClient?.find) {
if (!soClient?.find) {
return [] as SyntheticsMonitorWithId[];
}

const { saved_objects: encryptedMonitors } = await savedObjectsClient.find<SyntheticsMonitor>({
const finder = soClient.createPointInTimeFinder({
type: syntheticsMonitorType,
perPage: 500,
namespaces: ['*'],
perPage: 10000,
});

const start = performance.now();

const monitors: Array<SavedObject<SyntheticsMonitorWithSecrets>> = (
await Promise.all(
encryptedMonitors.map(
(monitor) =>
new Promise((resolve) => {
encryptedClient
.getDecryptedAsInternalUser<SyntheticsMonitorWithSecrets>(
syntheticsMonitor.name,
monitor.id,
{
namespace: monitor.namespaces?.[0],
}
)
.then((decryptedMonitor) => resolve(decryptedMonitor))
.catch((e) => {
this.logger.error(e);
sendErrorTelemetryEvents(this.logger, this.server.telemetry, {
reason: 'Failed to decrypt monitor',
message: e?.message,
type: 'runTaskError',
code: e?.code,
status: e.status,
kibanaVersion: this.server.kibanaVersion,
for await (const result of finder.find()) {
const encryptedMonitors = result.saved_objects;

const monitors: Array<SavedObject<SyntheticsMonitorWithSecrets>> = (
await Promise.all(
encryptedMonitors.map(
(monitor) =>
new Promise((resolve) => {
encryptedClient
.getDecryptedAsInternalUser<SyntheticsMonitorWithSecrets>(
syntheticsMonitor.name,
monitor.id,
{
namespace: monitor.namespaces?.[0],
}
)
.then((decryptedMonitor) => resolve(decryptedMonitor))
.catch((e) => {
this.logger.error(e);
sendErrorTelemetryEvents(this.logger, this.server.telemetry, {
reason: 'Failed to decrypt monitor',
message: e?.message,
type: 'runTaskError',
code: e?.code,
status: e.status,
kibanaVersion: this.server.kibanaVersion,
});
resolve(null);
});
resolve(null);
});
})
})
)
)
)
).filter((monitor) => monitor !== null) as Array<SavedObject<SyntheticsMonitorWithSecrets>>;

const end = performance.now();
const duration = end - start;
).filter((monitor) => monitor !== null) as Array<SavedObject<SyntheticsMonitorWithSecrets>>;

this.logger.debug(`Decrypted ${monitors.length} monitors. Took ${duration} milliseconds`, {
event: {
duration,
},
monitors: monitors.length,
});
const end = performance.now();
const duration = end - start;

return (monitors ?? []).map((monitor) => {
const attributes = monitor.attributes as unknown as MonitorFields;
return formatHeartbeatRequest({
monitor: normalizeSecrets(monitor).attributes,
monitorId: monitor.id,
customHeartbeatId: attributes[ConfigKey.CUSTOM_HEARTBEAT_ID],
this.logger.debug(`Decrypted ${monitors.length} monitors. Took ${duration} milliseconds`, {
event: {
duration,
},
monitors: monitors.length,
});
});

subject.next(
(monitors ?? []).map((monitor) => {
const attributes = monitor.attributes as unknown as MonitorFields;
return formatHeartbeatRequest({
monitor: normalizeSecrets(monitor).attributes,
monitorId: monitor.id,
customHeartbeatId: attributes[ConfigKey.CUSTOM_HEARTBEAT_ID],
});
})
);
}
}

formatConfigs(configs: SyntheticsMonitorWithId[]) {
Expand Down