Skip to content

Commit

Permalink
Reporting: Task Manager integration for 7.x
Browse files Browse the repository at this point in the history
  • Loading branch information
tsullivan committed Jun 15, 2021
1 parent b0700a8 commit 203b2ed
Show file tree
Hide file tree
Showing 48 changed files with 1,416 additions and 2,582 deletions.
16 changes: 16 additions & 0 deletions x-pack/plugins/reporting/common/schema_utils.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import moment from 'moment';
import { numberToDuration } from './schema_utils';

describe('Schema Utils', () => {
it('numberToDuration converts a number/Duration into a Duration object', () => {
expect(numberToDuration(500)).toMatchInlineSnapshot(`"PT0.5S"`);
expect(numberToDuration(moment.duration(1, 'hour'))).toMatchInlineSnapshot(`"PT1H"`);
});
});
7 changes: 7 additions & 0 deletions x-pack/plugins/reporting/common/schema_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ export const durationToNumber = (value: number | moment.Duration): number => {
return value.asMilliseconds();
};

export const numberToDuration = (value: number | moment.Duration): moment.Duration => {
if (typeof value === 'number') {
return moment.duration(value, 'milliseconds');
}
return value;
};

export const byteSizeValueToNumber = (value: number | ByteSizeValue) => {
if (typeof value === 'number') {
return value;
Expand Down
3 changes: 2 additions & 1 deletion x-pack/plugins/reporting/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export interface ReportSource {
};
meta: { objectType: string; layout?: string };
browser_type: string;
migration_version: string;
max_attempts: number;
timeout: number;

Expand All @@ -77,7 +78,7 @@ export interface ReportSource {
started_at?: string;
completed_at?: string;
created_at: string;
process_expiration?: string;
process_expiration?: string | null; // must be set to null to clear the expiration
}

/*
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/reporting/kibana.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"management",
"licensing",
"uiActions",
"taskManager",
"embeddable",
"screenshotMode",
"share",
Expand Down
44 changes: 38 additions & 6 deletions x-pack/plugins/reporting/server/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import { LicensingPluginSetup } from '../../licensing/server';
import { SecurityPluginSetup } from '../../security/server';
import { DEFAULT_SPACE_ID } from '../../spaces/common/constants';
import { SpacesPluginSetup } from '../../spaces/server';
import { TaskManagerSetupContract, TaskManagerStartContract } from '../../task_manager/server';
import { ReportingConfig, ReportingSetup } from './';
import { HeadlessChromiumDriverFactory } from './browsers/chromium/driver_factory';
import { ReportingConfigType } from './config';
import { checkLicense, getExportTypesRegistry, LevelLogger } from './lib';
import { ESQueueInstance } from './lib/create_queue';
import { screenshotsObservableFactory, ScreenshotsObservableFn } from './lib/screenshots';
import { ReportingStore } from './lib/store';
import { ExecuteReportTask, MonitorReportsTask, ReportTaskParams } from './lib/tasks';
import { ReportingPluginRouter } from './types';

export interface ReportingInternalSetup {
Expand All @@ -40,6 +41,7 @@ export interface ReportingInternalSetup {
licensing: LicensingPluginSetup;
security?: SecurityPluginSetup;
spaces?: SpacesPluginSetup;
taskManager: TaskManagerSetupContract;
screenshotMode: ScreenshotModePluginSetup;
logger: LevelLogger;
}
Expand All @@ -51,7 +53,7 @@ export interface ReportingInternalStart {
uiSettings: UiSettingsServiceStart;
esClient: IClusterClient;
data: DataPluginStart;
esqueue: ESQueueInstance;
taskManager: TaskManagerStartContract;
logger: LevelLogger;
}

Expand All @@ -62,17 +64,24 @@ export class ReportingCore {
private readonly pluginStart$ = new Rx.ReplaySubject<ReportingInternalStart>(); // observe async background startDeps
private deprecatedAllowedRoles: string[] | false = false; // DEPRECATED. If `false`, the deprecated features have been disableed
private exportTypesRegistry = getExportTypesRegistry();
private config?: ReportingConfig;
private executeTask: ExecuteReportTask;
private monitorTask: MonitorReportsTask;
private config?: ReportingConfig; // final config, includes dynamic values based on OS type
private executing: Set<string>;

public getContract: () => ReportingSetup;

constructor(private logger: LevelLogger, context: PluginInitializerContext<ReportingConfigType>) {
const syncConfig = context.config.get<ReportingConfigType>();
this.deprecatedAllowedRoles = syncConfig.roles.enabled ? syncConfig.roles.allow : false;
this.executeTask = new ExecuteReportTask(this, syncConfig, this.logger);
this.monitorTask = new MonitorReportsTask(this, syncConfig, this.logger);

this.getContract = () => ({
usesUiCapabilities: () => syncConfig.roles.enabled === false,
});

this.executing = new Set();
}

/*
Expand All @@ -81,14 +90,25 @@ export class ReportingCore {
public pluginSetup(setupDeps: ReportingInternalSetup) {
this.pluginSetup$.next(true); // trigger the observer
this.pluginSetupDeps = setupDeps; // cache

const { executeTask, monitorTask } = this;
setupDeps.taskManager.registerTaskDefinitions({
[executeTask.TYPE]: executeTask.getTaskDefinition(),
[monitorTask.TYPE]: monitorTask.getTaskDefinition(),
});
}

/*
* Register startDeps
*/
public pluginStart(startDeps: ReportingInternalStart) {
public async pluginStart(startDeps: ReportingInternalStart) {
this.pluginStart$.next(startDeps); // trigger the observer
this.pluginStartDeps = startDeps; // cache

const { taskManager } = startDeps;
const { executeTask, monitorTask } = this;
// enable this instance to generate reports and to monitor for pending reports
await Promise.all([executeTask.init(taskManager), monitorTask.init(taskManager)]);
}

/*
Expand Down Expand Up @@ -193,8 +213,8 @@ export class ReportingCore {
return this.exportTypesRegistry;
}

public async getEsqueue() {
return (await this.getPluginStartDeps()).esqueue;
public async scheduleTask(report: ReportTaskParams) {
return await this.executeTask.scheduleTask(report);
}

public async getStore() {
Expand Down Expand Up @@ -296,4 +316,16 @@ export class ReportingCore {
const startDeps = await this.getPluginStartDeps();
return startDeps.esClient;
}

public trackReport(reportId: string) {
this.executing.add(reportId);
}

public untrackReport(reportId: string) {
this.executing.delete(reportId);
}

public countConcurrentReports(): number {
return this.executing.size;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
* 2.0.
*/

import { CSV_JOB_TYPE_DEPRECATED } from '../../../common/constants';
import { cryptoFactory } from '../../lib';
import { CreateJobFn, CreateJobFnFactory } from '../../types';
import {
Expand All @@ -16,9 +15,7 @@ import {

export const createJobFnFactory: CreateJobFnFactory<
CreateJobFn<JobParamsDeprecatedCSV, TaskPayloadDeprecatedCSV>
> = function createJobFactoryFn(reporting, parentLogger) {
const logger = parentLogger.clone([CSV_JOB_TYPE_DEPRECATED, 'create-job']);

> = function createJobFactoryFn(reporting, logger) {
const config = reporting.getConfig();
const crypto = cryptoFactory(config.get('encryptionKey'));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,8 +676,7 @@ describe('CSV Execute Job', function () {
});
});

// FLAKY: https://github.com/elastic/kibana/issues/43069
describe.skip('cancellation', function () {
describe('cancellation', function () {
const scrollId = getRandomScrollId();

beforeEach(function () {
Expand Down Expand Up @@ -709,7 +708,7 @@ describe('CSV Execute Job', function () {
cancellationToken
);

await delay(100);
await delay(250);

expect(mockEsClient.search).toHaveBeenCalled();
expect(mockEsClient.scroll).toHaveBeenCalled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

import { CONTENT_TYPE_CSV, CSV_JOB_TYPE_DEPRECATED } from '../../../common/constants';
import { CONTENT_TYPE_CSV } from '../../../common/constants';
import { RunTaskFn, RunTaskFnFactory } from '../../types';
import { decryptJobHeaders } from '../common';
import { createGenerateCsv } from './generate_csv';
Expand All @@ -18,7 +18,7 @@ export const runTaskFnFactory: RunTaskFnFactory<

return async function runTask(jobId, job, cancellationToken) {
const elasticsearch = await reporting.getEsClient();
const logger = parentLogger.clone([CSV_JOB_TYPE_DEPRECATED, 'execute-job', jobId]);
const logger = parentLogger.clone([jobId]);
const generateCsv = createGenerateCsv(logger);

const encryptionKey = config.get('encryptionKey');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,14 @@
* 2.0.
*/

import { PNG_JOB_TYPE } from '../../../../common/constants';
import { cryptoFactory } from '../../../lib';
import { CreateJobFn, CreateJobFnFactory } from '../../../types';
import { validateUrls } from '../../common';
import { JobParamsPNG, TaskPayloadPNG } from '../types';

export const createJobFnFactory: CreateJobFnFactory<
CreateJobFn<JobParamsPNG, TaskPayloadPNG>
> = function createJobFactoryFn(reporting, parentLogger) {
const logger = parentLogger.clone([PNG_JOB_TYPE, 'execute-job']);
> = function createJobFactoryFn(reporting, logger) {
const config = reporting.getConfig();
const crypto = cryptoFactory(config.get('encryptionKey'));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
*/

import { KibanaRequest, RequestHandlerContext } from 'src/core/server';
import { PDF_JOB_TYPE } from '../../../../common/constants';
import { cryptoFactory } from '../../../lib';
import { CreateJobFn, CreateJobFnFactory } from '../../../types';
import { validateUrls } from '../../common';
Expand All @@ -16,10 +15,9 @@ import { compatibilityShimFactory } from './compatibility_shim';

export const createJobFnFactory: CreateJobFnFactory<
CreateJobFn<JobParamsPDF, TaskPayloadPDF>
> = function createJobFactoryFn(reporting, parentLogger) {
> = function createJobFactoryFn(reporting, logger) {
const config = reporting.getConfig();
const crypto = cryptoFactory(config.get('encryptionKey'));
const logger = parentLogger.clone([PDF_JOB_TYPE, 'create-job']);
const compatibilityShim = compatibilityShimFactory(logger);

// 7.x and below only
Expand Down
75 changes: 0 additions & 75 deletions x-pack/plugins/reporting/server/lib/create_queue.ts

This file was deleted.

Loading

0 comments on commit 203b2ed

Please sign in to comment.