Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Reporting] ReportingStore module #69426

Merged
merged 9 commits into from
Jun 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions x-pack/plugins/reporting/server/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { screenshotsObservableFactory } from './export_types/common/lib/screensh
import { checkLicense, getExportTypesRegistry } from './lib';
import { ESQueueInstance } from './lib/create_queue';
import { EnqueueJobFn } from './lib/enqueue_job';
import { ReportingStore } from './lib/store';

export interface ReportingInternalSetup {
elasticsearch: ElasticsearchServiceSetup;
Expand All @@ -37,6 +38,7 @@ export interface ReportingInternalStart {
browserDriverFactory: HeadlessChromiumDriverFactory;
enqueueJob: EnqueueJobFn;
esqueue: ESQueueInstance;
store: ReportingStore;
savedObjects: SavedObjectsServiceStart;
uiSettings: UiSettingsServiceStart;
}
Expand Down
16 changes: 7 additions & 9 deletions x-pack/plugins/reporting/server/lib/create_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,16 @@ import { ReportingCore } from '../core';
import { JobSource, TaskRunResult } from '../types';
import { createTaggedLogger } from './create_tagged_logger'; // TODO remove createTaggedLogger once esqueue is removed
import { createWorkerFactory } from './create_worker';
import { Job } from './enqueue_job';
// @ts-ignore
import { Esqueue } from './esqueue';
import { LevelLogger } from './level_logger';
import { ReportingStore } from './store';

interface ESQueueWorker {
on: (event: string, handler: any) => void;
}

export interface ESQueueInstance {
addJob: (type: string, payload: unknown, options: object) => Job;
registerWorker: <JobParamsType>(
pluginId: string,
workerFn: GenericWorkerFn<JobParamsType>,
Expand All @@ -37,26 +36,25 @@ type GenericWorkerFn<JobParamsType> = (
...workerRestArgs: any[]
) => void | Promise<TaskRunResult>;

export async function createQueueFactory<JobParamsType, JobPayloadType>(
Copy link
Member Author

@tsullivan tsullivan Jun 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These generics were not used, so I removed them

Copy link
Contributor

@joelgriffith joelgriffith Jun 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pray I don't alter it further

export async function createQueueFactory(
reporting: ReportingCore,
store: ReportingStore,
logger: LevelLogger
): Promise<ESQueueInstance> {
const config = reporting.getConfig();
const queueIndexInterval = config.get('queue', 'indexInterval');

// esqueue-related
const queueTimeout = config.get('queue', 'timeout');
const queueIndex = config.get('index');
const isPollingEnabled = config.get('queue', 'pollEnabled');

const elasticsearch = await reporting.getElasticsearchService();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not seeing if this changed from sync to async here. I'm assuming so...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method was never async, so this is a little cleanup. It's something that doesn't get warned about.

const elasticsearch = reporting.getElasticsearchService();
const queueOptions = {
interval: queueIndexInterval,
timeout: queueTimeout,
dateSeparator: '.',
client: elasticsearch.legacy.client,
logger: createTaggedLogger(logger, ['esqueue', 'queue-worker']),
};

const queue: ESQueueInstance = new Esqueue(queueIndex, queueOptions);
const queue: ESQueueInstance = new Esqueue(store, queueOptions);

if (isPollingEnabled) {
// create workers to poll the index for idle jobs waiting to be claimed and executed
Expand Down
49 changes: 12 additions & 37 deletions x-pack/plugins/reporting/server/lib/enqueue_job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,24 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { EventEmitter } from 'events';
import { KibanaRequest, RequestHandlerContext } from 'src/core/server';
import { AuthenticatedUser } from '../../../security/server';
import { ESQueueCreateJobFn } from '../../server/types';
import { ReportingCore } from '../core';
// @ts-ignore
import { events as esqueueEvents } from './esqueue';
import { LevelLogger } from './level_logger';
import { LevelLogger } from './';
import { ReportingStore, Report } from './store';

interface ConfirmedJob {
id: string;
index: string;
_seq_no: number;
_primary_term: number;
}

export type Job = EventEmitter & {
Copy link
Member Author

@tsullivan tsullivan Jun 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type is no longer needed: replaced with Report class

id: string;
toJSON: () => {
id: string;
};
};

export type EnqueueJobFn = <JobParamsType>(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handling JobParamsType isn't necessary since jobParams is just pass-through

export type EnqueueJobFn = (
exportTypeId: string,
jobParams: JobParamsType,
jobParams: unknown,
user: AuthenticatedUser | null,
context: RequestHandlerContext,
request: KibanaRequest
) => Promise<Job>;
) => Promise<Report>;

export function enqueueJobFactory(
reporting: ReportingCore,
store: ReportingStore,
parentLogger: LevelLogger
): EnqueueJobFn {
const config = reporting.getConfig();
Expand All @@ -45,16 +30,16 @@ export function enqueueJobFactory(
const maxAttempts = config.get('capture', 'maxAttempts');
const logger = parentLogger.clone(['queue-job']);

return async function enqueueJob<JobParamsType>(
return async function enqueueJob(
exportTypeId: string,
jobParams: JobParamsType,
jobParams: unknown,
user: AuthenticatedUser | null,
context: RequestHandlerContext,
request: KibanaRequest
): Promise<Job> {
type ScheduleTaskFnType = ESQueueCreateJobFn<JobParamsType>;
) {
type ScheduleTaskFnType = ESQueueCreateJobFn<unknown>;

const username = user ? user.username : false;
const esqueue = await reporting.getEsqueue();
const exportType = reporting.getExportTypesRegistry().getById(exportTypeId);

if (exportType == null) {
Expand All @@ -71,16 +56,6 @@ export function enqueueJobFactory(
max_attempts: maxAttempts,
};

return new Promise((resolve, reject) => {
const job = esqueue.addJob(exportType.jobType, payload, options);

job.on(esqueueEvents.EVENT_JOB_CREATED, (createdJob: ConfirmedJob) => {
if (createdJob.id === job.id) {
logger.info(`Successfully queued job: ${createdJob.id}`);
resolve(job);
}
});
job.on(esqueueEvents.EVENT_JOB_CREATE_ERROR, reject);
});
return await store.addReport(exportType.jobType, payload, options);
};
}

This file was deleted.

This file was deleted.

Loading