Skip to content

Commit

Permalink
remove immediate functions from esqueue worker cycles
Browse files Browse the repository at this point in the history
  • Loading branch information
tsullivan committed May 6, 2020
1 parent 2ccc397 commit ac34f92
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 32 deletions.
43 changes: 13 additions & 30 deletions x-pack/legacy/plugins/reporting/server/lib/create_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,13 @@ import {
ESQueueInstance,
ESQueueWorkerExecuteFn,
ExportTypeDefinition,
ImmediateExecuteFn,
JobDocPayload,
JobSource,
Logger,
RequestFacade,
} from '../../types';
// @ts-ignore untyped dependency
import { events as esqueueEvents } from './esqueue';

export function createWorkerFactory<JobParamsType>(reporting: ReportingCore, logger: Logger) {
type JobDocPayloadType = JobDocPayload<JobParamsType>;

const config = reporting.getConfig();
const queueConfig = config.get('queue');
const kibanaName = config.kbnConfig.get('server', 'name');
Expand All @@ -31,48 +26,36 @@ export function createWorkerFactory<JobParamsType>(reporting: ReportingCore, log
// Once more document types are added, this will need to be passed in
return async function createWorker(queue: ESQueueInstance) {
// export type / execute job map
const jobExecutors: Map<
string,
ImmediateExecuteFn<JobParamsType> | ESQueueWorkerExecuteFn<JobDocPayloadType>
> = new Map();
const jobExecutors: Map<string, ESQueueWorkerExecuteFn<unknown>> = new Map();

for (const exportType of reporting.getExportTypesRegistry().getAll() as Array<
ExportTypeDefinition<
JobParamsType,
unknown,
unknown,
ImmediateExecuteFn<JobParamsType> | ESQueueWorkerExecuteFn<JobDocPayloadType>
>
ExportTypeDefinition<JobParamsType, unknown, unknown, ESQueueWorkerExecuteFn<unknown>>
>) {
const jobExecutor = await exportType.executeJobFactory(reporting, logger); // FIXME: does not "need" to be async
jobExecutors.set(exportType.jobType, jobExecutor);
}

const workerFn = (jobSource: JobSource<JobParamsType>, ...workerRestArgs: any[]) => {
const workerFn = <ScheduledTaskParamsType>(
jobSource: JobSource<ScheduledTaskParamsType>,
jobParams: ScheduledTaskParamsType,
cancellationToken: CancellationToken
) => {
const {
_id: jobId,
_source: { jobtype: jobType },
} = jobSource;

if (!jobId) {
throw new Error(`Claimed job is missing an ID!: ${JSON.stringify(jobSource)}`);
}

const jobTypeExecutor = jobExecutors.get(jobType);
// pass the work to the jobExecutor
if (!jobTypeExecutor) {
throw new Error(`Unable to find a job executor for the claimed job: [${jobId}]`);
}

if (jobId) {
const jobExecutorWorker = jobTypeExecutor as ESQueueWorkerExecuteFn<JobDocPayloadType>;
return jobExecutorWorker(
jobId,
...(workerRestArgs as [JobDocPayloadType, CancellationToken])
);
} else {
const jobExecutorImmediate = jobExecutors.get(jobType) as ImmediateExecuteFn<JobParamsType>;
return jobExecutorImmediate(
null,
...(workerRestArgs as [JobDocPayload<JobParamsType>, RequestFacade])
);
}
// pass the work to the jobExecutor
return jobTypeExecutor(jobId, jobParams, cancellationToken);
};

const workerOptions = {
Expand Down
3 changes: 1 addition & 2 deletions x-pack/legacy/plugins/reporting/server/lib/enqueue_job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import {
ConditionalHeaders,
EnqueueJobFn,
ESQueueCreateJobFn,
ImmediateCreateJobFn,
Job,
Logger,
RequestFacade,
Expand Down Expand Up @@ -40,7 +39,7 @@ export function enqueueJobFactory(reporting: ReportingCore, parentLogger: Logger
headers: ConditionalHeaders['headers'],
request: RequestFacade
): Promise<Job> {
type CreateJobFn = ESQueueCreateJobFn<JobParamsType> | ImmediateCreateJobFn<JobParamsType>;
type CreateJobFn = ESQueueCreateJobFn<JobParamsType>;

const esqueue = await reporting.getEsqueue();
const exportType = reporting.getExportTypesRegistry().getById(exportTypeId);
Expand Down

0 comments on commit ac34f92

Please sign in to comment.