Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reporting: Task Manager integration for 7.x #101339

Merged
merged 13 commits into from
Jun 24, 2021
Merged
16 changes: 16 additions & 0 deletions x-pack/plugins/reporting/common/schema_utils.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import moment from 'moment';
import { numberToDuration } from './schema_utils';

describe('Schema Utils', () => {
it('numberToDuration converts a number/Duration into a Duration object', () => {
expect(numberToDuration(500)).toMatchInlineSnapshot(`"PT0.5S"`);
expect(numberToDuration(moment.duration(1, 'hour'))).toMatchInlineSnapshot(`"PT1H"`);
});
});
7 changes: 7 additions & 0 deletions x-pack/plugins/reporting/common/schema_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ export const durationToNumber = (value: number | moment.Duration): number => {
return value.asMilliseconds();
};

export const numberToDuration = (value: number | moment.Duration): moment.Duration => {
if (typeof value === 'number') {
return moment.duration(value, 'milliseconds');
}
return value;
};

export const byteSizeValueToNumber = (value: number | ByteSizeValue) => {
if (typeof value === 'number') {
return value;
Expand Down
3 changes: 2 additions & 1 deletion x-pack/plugins/reporting/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export interface ReportSource {
};
meta: { objectType: string; layout?: string };
browser_type: string;
migration_version: string;
max_attempts: number;
timeout: number;

Expand All @@ -77,7 +78,7 @@ export interface ReportSource {
started_at?: string;
completed_at?: string;
created_at: string;
process_expiration?: string;
process_expiration?: string | null; // must be set to null to clear the expiration
}

/*
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/reporting/kibana.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"management",
"licensing",
"uiActions",
"taskManager",
"embeddable",
"screenshotMode",
"share",
Expand Down
44 changes: 38 additions & 6 deletions x-pack/plugins/reporting/server/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import { LicensingPluginSetup } from '../../licensing/server';
import { SecurityPluginSetup } from '../../security/server';
import { DEFAULT_SPACE_ID } from '../../spaces/common/constants';
import { SpacesPluginSetup } from '../../spaces/server';
import { TaskManagerSetupContract, TaskManagerStartContract } from '../../task_manager/server';
import { ReportingConfig, ReportingSetup } from './';
import { HeadlessChromiumDriverFactory } from './browsers/chromium/driver_factory';
import { ReportingConfigType } from './config';
import { checkLicense, getExportTypesRegistry, LevelLogger } from './lib';
import { ESQueueInstance } from './lib/create_queue';
import { screenshotsObservableFactory, ScreenshotsObservableFn } from './lib/screenshots';
import { ReportingStore } from './lib/store';
import { ExecuteReportTask, MonitorReportsTask, ReportTaskParams } from './lib/tasks';
import { ReportingPluginRouter } from './types';

export interface ReportingInternalSetup {
Expand All @@ -40,6 +41,7 @@ export interface ReportingInternalSetup {
licensing: LicensingPluginSetup;
security?: SecurityPluginSetup;
spaces?: SpacesPluginSetup;
taskManager: TaskManagerSetupContract;
screenshotMode: ScreenshotModePluginSetup;
logger: LevelLogger;
}
Expand All @@ -51,7 +53,7 @@ export interface ReportingInternalStart {
uiSettings: UiSettingsServiceStart;
esClient: IClusterClient;
data: DataPluginStart;
esqueue: ESQueueInstance;
taskManager: TaskManagerStartContract;
logger: LevelLogger;
}

Expand All @@ -62,17 +64,24 @@ export class ReportingCore {
private readonly pluginStart$ = new Rx.ReplaySubject<ReportingInternalStart>(); // observe async background startDeps
private deprecatedAllowedRoles: string[] | false = false; // DEPRECATED. If `false`, the deprecated features have been disableed
private exportTypesRegistry = getExportTypesRegistry();
private config?: ReportingConfig;
private executeTask: ExecuteReportTask;
private monitorTask: MonitorReportsTask;
private config?: ReportingConfig; // final config, includes dynamic values based on OS type
private executing: Set<string>;

public getContract: () => ReportingSetup;

constructor(private logger: LevelLogger, context: PluginInitializerContext<ReportingConfigType>) {
const syncConfig = context.config.get<ReportingConfigType>();
this.deprecatedAllowedRoles = syncConfig.roles.enabled ? syncConfig.roles.allow : false;
this.executeTask = new ExecuteReportTask(this, syncConfig, this.logger);
this.monitorTask = new MonitorReportsTask(this, syncConfig, this.logger);

this.getContract = () => ({
usesUiCapabilities: () => syncConfig.roles.enabled === false,
});

this.executing = new Set();
}

/*
Expand All @@ -81,14 +90,25 @@ export class ReportingCore {
public pluginSetup(setupDeps: ReportingInternalSetup) {
this.pluginSetup$.next(true); // trigger the observer
this.pluginSetupDeps = setupDeps; // cache

const { executeTask, monitorTask } = this;
setupDeps.taskManager.registerTaskDefinitions({
[executeTask.TYPE]: executeTask.getTaskDefinition(),
[monitorTask.TYPE]: monitorTask.getTaskDefinition(),
});
}

/*
* Register startDeps
*/
public pluginStart(startDeps: ReportingInternalStart) {
public async pluginStart(startDeps: ReportingInternalStart) {
this.pluginStart$.next(startDeps); // trigger the observer
this.pluginStartDeps = startDeps; // cache

const { taskManager } = startDeps;
const { executeTask, monitorTask } = this;
// enable this instance to generate reports and to monitor for pending reports
await Promise.all([executeTask.init(taskManager), monitorTask.init(taskManager)]);
}

/*
Expand Down Expand Up @@ -193,8 +213,8 @@ export class ReportingCore {
return this.exportTypesRegistry;
}

public async getEsqueue() {
return (await this.getPluginStartDeps()).esqueue;
public async scheduleTask(report: ReportTaskParams) {
return await this.executeTask.scheduleTask(report);
}

public async getStore() {
Expand Down Expand Up @@ -296,4 +316,16 @@ export class ReportingCore {
const startDeps = await this.getPluginStartDeps();
return startDeps.esClient;
}

public trackReport(reportId: string) {
this.executing.add(reportId);
}

public untrackReport(reportId: string) {
this.executing.delete(reportId);
}

public countConcurrentReports(): number {
return this.executing.size;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
* 2.0.
*/

import { CSV_JOB_TYPE_DEPRECATED } from '../../../common/constants';
import { cryptoFactory } from '../../lib';
import { CreateJobFn, CreateJobFnFactory } from '../../types';
import {
Expand All @@ -16,9 +15,7 @@ import {

export const createJobFnFactory: CreateJobFnFactory<
CreateJobFn<JobParamsDeprecatedCSV, TaskPayloadDeprecatedCSV>
> = function createJobFactoryFn(reporting, parentLogger) {
const logger = parentLogger.clone([CSV_JOB_TYPE_DEPRECATED, 'create-job']);

> = function createJobFactoryFn(reporting, logger) {
const config = reporting.getConfig();
const crypto = cryptoFactory(config.get('encryptionKey'));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,8 +676,7 @@ describe('CSV Execute Job', function () {
});
});

// FLAKY: https://github.com/elastic/kibana/issues/43069
describe.skip('cancellation', function () {
describe('cancellation', function () {
const scrollId = getRandomScrollId();

beforeEach(function () {
Expand Down Expand Up @@ -709,7 +708,7 @@ describe('CSV Execute Job', function () {
cancellationToken
);

await delay(100);
await delay(250);

expect(mockEsClient.search).toHaveBeenCalled();
expect(mockEsClient.scroll).toHaveBeenCalled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

import { CONTENT_TYPE_CSV, CSV_JOB_TYPE_DEPRECATED } from '../../../common/constants';
import { CONTENT_TYPE_CSV } from '../../../common/constants';
import { RunTaskFn, RunTaskFnFactory } from '../../types';
import { decryptJobHeaders } from '../common';
import { createGenerateCsv } from './generate_csv';
Expand All @@ -18,7 +18,7 @@ export const runTaskFnFactory: RunTaskFnFactory<

return async function runTask(jobId, job, cancellationToken) {
const elasticsearch = await reporting.getEsClient();
const logger = parentLogger.clone([CSV_JOB_TYPE_DEPRECATED, 'execute-job', jobId]);
const logger = parentLogger.clone([jobId]);
const generateCsv = createGenerateCsv(logger);

const encryptionKey = config.get('encryptionKey');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,14 @@
* 2.0.
*/

import { PNG_JOB_TYPE } from '../../../../common/constants';
import { cryptoFactory } from '../../../lib';
import { CreateJobFn, CreateJobFnFactory } from '../../../types';
import { validateUrls } from '../../common';
import { JobParamsPNG, TaskPayloadPNG } from '../types';

export const createJobFnFactory: CreateJobFnFactory<
CreateJobFn<JobParamsPNG, TaskPayloadPNG>
> = function createJobFactoryFn(reporting, parentLogger) {
const logger = parentLogger.clone([PNG_JOB_TYPE, 'execute-job']);
> = function createJobFactoryFn(reporting, logger) {
const config = reporting.getConfig();
const crypto = cryptoFactory(config.get('encryptionKey'));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
*/

import { KibanaRequest, RequestHandlerContext } from 'src/core/server';
import { PDF_JOB_TYPE } from '../../../../common/constants';
import { cryptoFactory } from '../../../lib';
import { CreateJobFn, CreateJobFnFactory } from '../../../types';
import { validateUrls } from '../../common';
Expand All @@ -16,10 +15,9 @@ import { compatibilityShimFactory } from './compatibility_shim';

export const createJobFnFactory: CreateJobFnFactory<
CreateJobFn<JobParamsPDF, TaskPayloadPDF>
> = function createJobFactoryFn(reporting, parentLogger) {
> = function createJobFactoryFn(reporting, logger) {
const config = reporting.getConfig();
const crypto = cryptoFactory(config.get('encryptionKey'));
const logger = parentLogger.clone([PDF_JOB_TYPE, 'create-job']);
const compatibilityShim = compatibilityShimFactory(logger);

// 7.x and below only
Expand Down
75 changes: 0 additions & 75 deletions x-pack/plugins/reporting/server/lib/create_queue.ts

This file was deleted.

Loading