Skip to content

Commit

Permalink
[7.x] [Reporting] ReportingStore module (#69426) (#70016)
Browse files Browse the repository at this point in the history
* [Reporting] ReportingStore module (#69426)

* Add store class

* fix tests

* fix the createIndex bug

* add reportingstore test

* change function args

* nits

* add test for automatic index creation failure recovery
# Conflicts:
#	x-pack/plugins/reporting/server/lib/esqueue/job.js

* fix ts

* fix bugs
  • Loading branch information
tsullivan authored Jul 1, 2020
1 parent c89be2b commit f0babb4
Show file tree
Hide file tree
Showing 21 changed files with 665 additions and 841 deletions.
2 changes: 2 additions & 0 deletions x-pack/plugins/reporting/server/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { screenshotsObservableFactory } from './export_types/common/lib/screensh
import { checkLicense, getExportTypesRegistry } from './lib';
import { ESQueueInstance } from './lib/create_queue';
import { EnqueueJobFn } from './lib/enqueue_job';
import { ReportingStore } from './lib/store';

export interface ReportingInternalSetup {
elasticsearch: ElasticsearchServiceSetup;
Expand All @@ -37,6 +38,7 @@ export interface ReportingInternalStart {
browserDriverFactory: HeadlessChromiumDriverFactory;
enqueueJob: EnqueueJobFn;
esqueue: ESQueueInstance;
store: ReportingStore;
savedObjects: SavedObjectsServiceStart;
uiSettings: UiSettingsServiceStart;
}
Expand Down
16 changes: 7 additions & 9 deletions x-pack/plugins/reporting/server/lib/create_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,16 @@ import { ReportingCore } from '../core';
import { JobSource, TaskRunResult } from '../types';
import { createTaggedLogger } from './create_tagged_logger'; // TODO remove createTaggedLogger once esqueue is removed
import { createWorkerFactory } from './create_worker';
import { Job } from './enqueue_job';
// @ts-ignore
import { Esqueue } from './esqueue';
import { LevelLogger } from './level_logger';
import { ReportingStore } from './store';

interface ESQueueWorker {
on: (event: string, handler: any) => void;
}

export interface ESQueueInstance {
addJob: (type: string, payload: unknown, options: object) => Job;
registerWorker: <JobParamsType>(
pluginId: string,
workerFn: GenericWorkerFn<JobParamsType>,
Expand All @@ -37,26 +36,25 @@ type GenericWorkerFn<JobParamsType> = (
...workerRestArgs: any[]
) => void | Promise<TaskRunResult>;

export async function createQueueFactory<JobParamsType, JobPayloadType>(
export async function createQueueFactory(
reporting: ReportingCore,
store: ReportingStore,
logger: LevelLogger
): Promise<ESQueueInstance> {
const config = reporting.getConfig();
const queueIndexInterval = config.get('queue', 'indexInterval');

// esqueue-related
const queueTimeout = config.get('queue', 'timeout');
const queueIndex = config.get('index');
const isPollingEnabled = config.get('queue', 'pollEnabled');

const elasticsearch = await reporting.getElasticsearchService();
const elasticsearch = reporting.getElasticsearchService();
const queueOptions = {
interval: queueIndexInterval,
timeout: queueTimeout,
dateSeparator: '.',
client: elasticsearch.legacy.client,
logger: createTaggedLogger(logger, ['esqueue', 'queue-worker']),
};

const queue: ESQueueInstance = new Esqueue(queueIndex, queueOptions);
const queue: ESQueueInstance = new Esqueue(store, queueOptions);

if (isPollingEnabled) {
// create workers to poll the index for idle jobs waiting to be claimed and executed
Expand Down
49 changes: 12 additions & 37 deletions x-pack/plugins/reporting/server/lib/enqueue_job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,24 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { EventEmitter } from 'events';
import { KibanaRequest, RequestHandlerContext } from 'src/core/server';
import { AuthenticatedUser } from '../../../security/server';
import { ESQueueCreateJobFn } from '../../server/types';
import { ReportingCore } from '../core';
// @ts-ignore
import { events as esqueueEvents } from './esqueue';
import { LevelLogger } from './level_logger';
import { LevelLogger } from './';
import { ReportingStore, Report } from './store';

interface ConfirmedJob {
id: string;
index: string;
_seq_no: number;
_primary_term: number;
}

export type Job = EventEmitter & {
id: string;
toJSON: () => {
id: string;
};
};

export type EnqueueJobFn = <JobParamsType>(
export type EnqueueJobFn = (
exportTypeId: string,
jobParams: JobParamsType,
jobParams: unknown,
user: AuthenticatedUser | null,
context: RequestHandlerContext,
request: KibanaRequest
) => Promise<Job>;
) => Promise<Report>;

export function enqueueJobFactory(
reporting: ReportingCore,
store: ReportingStore,
parentLogger: LevelLogger
): EnqueueJobFn {
const config = reporting.getConfig();
Expand All @@ -45,16 +30,16 @@ export function enqueueJobFactory(
const maxAttempts = config.get('capture', 'maxAttempts');
const logger = parentLogger.clone(['queue-job']);

return async function enqueueJob<JobParamsType>(
return async function enqueueJob(
exportTypeId: string,
jobParams: JobParamsType,
jobParams: unknown,
user: AuthenticatedUser | null,
context: RequestHandlerContext,
request: KibanaRequest
): Promise<Job> {
type ScheduleTaskFnType = ESQueueCreateJobFn<JobParamsType>;
) {
type ScheduleTaskFnType = ESQueueCreateJobFn<unknown>;

const username = user ? user.username : false;
const esqueue = await reporting.getEsqueue();
const exportType = reporting.getExportTypesRegistry().getById(exportTypeId);

if (exportType == null) {
Expand All @@ -71,16 +56,6 @@ export function enqueueJobFactory(
max_attempts: maxAttempts,
};

return new Promise((resolve, reject) => {
const job = esqueue.addJob(exportType.jobType, payload, options);

job.on(esqueueEvents.EVENT_JOB_CREATED, (createdJob: ConfirmedJob) => {
if (createdJob.id === job.id) {
logger.info(`Successfully queued job: ${createdJob.id}`);
resolve(job);
}
});
job.on(esqueueEvents.EVENT_JOB_CREATE_ERROR, reject);
});
return await store.addReport(exportType.jobType, payload, options);
};
}

This file was deleted.

This file was deleted.

Loading

0 comments on commit f0babb4

Please sign in to comment.