Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
tsullivan committed Jun 11, 2021
1 parent af09376 commit f505322
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 154 deletions.
3 changes: 2 additions & 1 deletion x-pack/plugins/reporting/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,14 @@ export interface ReportSource {
migration_version: string;
max_attempts: number;
timeout: number;

status: JobStatus;
attempts: number;
output: TaskRunResult | null;
started_at?: string;
completed_at?: string;
created_at: string;
process_expiration?: string | null;
process_expiration?: string | null; // must be set to null to clear the expiration
}

/*
Expand Down
227 changes: 83 additions & 144 deletions x-pack/plugins/reporting/server/lib/store/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,24 @@
* 2.0.
*/

import {
GetRequest,
IndexRequest,
IndexResponse,
SearchRequest,
UpdateRequest,
UpdateResponse,
} from '@elastic/elasticsearch/api/types';
import { IndexResponse, UpdateResponse } from '@elastic/elasticsearch/api/types';
import { ElasticsearchClient } from 'src/core/server';
import { LevelLogger, statuses } from '../';
import { ReportingCore } from '../../';
import { numberToDuration } from '../../../common/schema_utils';
import { JobStatus } from '../../../common/types';
import { ReportTaskParams } from '../tasks';
import { indexTimestamp } from './index_timestamp';
import { mapping } from './mapping';
import { MIGRATION_VERSION, Report, ReportDocument, ReportSource } from './report';

/*
* When searching for long-pending reports, we get a subset of fields
*/
const sourceDoc = (doc: Partial<ReportSource>): Partial<ReportSource> => {
return {
...doc,
migration_version: MIGRATION_VERSION,
export interface ReportRecordTimeout {
_id: string;
_index: string;
_source: {
status: JobStatus;
process_expiration?: string;
};
};
}

const checkReportIsEditable = (report: Report) => {
if (!report._id || !report._index) {
Expand All @@ -41,23 +32,23 @@ const checkReportIsEditable = (report: Report) => {
throw new Error(`Missing seq_no and primary_term fields!`);
}
};
/*
* When searching for long-pending reports, we get a subset of fields
*/
const sourceDoc = (doc: Partial<ReportSource>): Partial<ReportSource> => {
return {
...doc,
migration_version: MIGRATION_VERSION,
};
};

const jobDebugMessage = (report: Report) =>
`${report._id} [_index: ${report._index}] [attempts: ${report.attempts}] [_seq_no: ${report._seq_no}] [_primary_term: ${report._primary_term}]`;

interface StaleReportsResponse {
pending: ReportRecordTimeout[];
processing: ReportRecordTimeout[];
}

export interface ReportRecordTimeout {
_id: string;
_index: string;
_source: {
status: JobStatus;
process_expiration?: string;
};
}
report._id +
`[_index: ${report._index}] ` +
`[_seq_no: ${report._seq_no}] ` +
`[_primary_term: ${report._primary_term}]` +
`[attempts: ${report.attempts}] ` +
`[process_expiration: ${report.process_expiration}]`;

/*
* A class to give an interface to historical reports in the reporting.index
Expand All @@ -68,7 +59,6 @@ export interface ReportRecordTimeout {
export class ReportingStore {
private readonly indexPrefix: string; // config setting of index prefix in system index name
private readonly indexInterval: string; // config setting of index prefix: how often to poll for pending work
private readonly queueTimeoutSecs: number; // config setting of queue timeout, rounded up to nearest minute
private client?: ElasticsearchClient;

constructor(private reportingCore: ReportingCore, private logger: LevelLogger) {
Expand All @@ -77,7 +67,6 @@ export class ReportingStore {
this.indexPrefix = config.get('index');
this.indexInterval = config.get('queue', 'indexInterval');
this.logger = logger.clone(['store']);
this.queueTimeoutSecs = Math.ceil(numberToDuration(config.get('queue', 'timeout')).asSeconds());
}

private async getClient() {
Expand Down Expand Up @@ -129,7 +118,8 @@ export class ReportingStore {
* Called from addReport, which handles any errors
*/
private async indexReport(report: Report): Promise<IndexResponse> {
const indexRequest: IndexRequest<Omit<ReportSource, 'output'>> = {
const client = await this.getClient();
const { body } = await client.index({
index: report._index!,
id: report._id,
refresh: true,
Expand All @@ -141,10 +131,7 @@ export class ReportingStore {
status: statuses.JOB_STATUS_PENDING,
}),
},
};

const client = await this.getClient();
const { body } = await client.index(indexRequest);
});

return body;
}
Expand All @@ -168,8 +155,7 @@ export class ReportingStore {
await this.createIndex(index);

try {
const doc: Partial<ReportDocument> = await this.indexReport(report);
report.updateWithEsDoc(doc);
report.updateWithEsDoc(await this.indexReport(report));

await this.refreshIndex(index);

Expand All @@ -193,11 +179,10 @@ export class ReportingStore {

try {
const client = await this.getClient();
const getRequest: GetRequest = {
const { body: document } = await client.get<ReportSource>({
index: taskJson.index,
id: taskJson.id,
};
const { body: document } = await client.get<ReportSource>(getRequest);
});

return new Report({
_id: document._id,
Expand Down Expand Up @@ -233,32 +218,26 @@ export class ReportingStore {
report: Report,
stats: Partial<Report>
): Promise<UpdateResponse<ReportDocument>> {
const updateRequest: UpdateRequest<ReportDocument> = {
id: report._id,
index: report._index!,
if_seq_no: report._seq_no,
if_primary_term: report._primary_term,
refresh: true,
body: {
doc: sourceDoc({
...stats,
status: statuses.JOB_STATUS_PROCESSING,
}),
},
};

try {
checkReportIsEditable(report);

const doc = sourceDoc({ ...stats, status: statuses.JOB_STATUS_PROCESSING });

const client = await this.getClient();
const { body } = await client.update<ReportDocument>(updateRequest);
const { body } = await client.update<ReportDocument>({
id: report._id,
index: report._index!,
if_seq_no: report._seq_no,
if_primary_term: report._primary_term,
refresh: true,
body: { doc },
});

return body;
} catch (err) {
this.logger.error(
`Error in updating the report document status to processing! Report: ${jobDebugMessage(
report
)}`
`Error in updating the report document status to processing! Report: ` +
jobDebugMessage(report)
);
this.logger.error(err);
throw err;
Expand All @@ -269,29 +248,24 @@ export class ReportingStore {
report: Report,
stats: Partial<Report>
): Promise<UpdateResponse<ReportDocument>> {
const updateRequest: UpdateRequest<ReportDocument> = {
id: report._id,
index: report._index!,
if_seq_no: report._seq_no,
if_primary_term: report._primary_term,
refresh: true,
body: {
doc: sourceDoc({
...stats,
status: statuses.JOB_STATUS_FAILED,
}),
},
};

try {
checkReportIsEditable(report);

const doc = sourceDoc({ ...stats, status: statuses.JOB_STATUS_FAILED });

const client = await this.getClient();
const { body } = await client.update<ReportDocument>(updateRequest);
const { body } = await client.update<ReportDocument>({
id: report._id,
index: report._index!,
if_seq_no: report._seq_no,
if_primary_term: report._primary_term,
refresh: true,
body: { doc },
});
return body;
} catch (err) {
this.logger.error(
`Error in updating the report document status to failed! Report: ${jobDebugMessage(report)}`
`Error in updating the report document status to failed! Report: ` + jobDebugMessage(report)
);
this.logger.error(err);
throw err;
Expand All @@ -307,61 +281,50 @@ export class ReportingStore {
output && output.warnings && output.warnings.length > 0
? statuses.JOB_STATUS_WARNINGS
: statuses.JOB_STATUS_COMPLETED;
const updateRequest: UpdateRequest<ReportDocument> = {
id: report._id,
index: report._index!,
if_seq_no: report._seq_no,
if_primary_term: report._primary_term,
refresh: true,
body: {
doc: sourceDoc({
...stats,
status,
}),
},
};

try {
checkReportIsEditable(report);

const doc = sourceDoc({ ...stats, status });

const client = await this.getClient();
const { body } = await client.update<ReportDocument>(updateRequest);
const { body } = await client.update<ReportDocument>({
id: report._id,
index: report._index!,
if_seq_no: report._seq_no,
if_primary_term: report._primary_term,
refresh: true,
body: { doc },
});
return body;
} catch (err) {
this.logger.error(
`Error in updating the report document status to complete! Report: ${jobDebugMessage(
report
)}`
`Error in updating the report document status to complete! Report: ` +
jobDebugMessage(report)
);
this.logger.error(err);
throw err;
}
}

public async prepareReportForRetry(report: Report): Promise<UpdateResponse<ReportDocument>> {
const updateRequest: UpdateRequest<ReportDocument> = {
id: report._id,
index: report._index!,
if_seq_no: report._seq_no,
if_primary_term: report._primary_term,
refresh: true,
body: {
doc: sourceDoc({
status: statuses.JOB_STATUS_PENDING,
process_expiration: null,
}),
},
};

try {
checkReportIsEditable(report);

const doc = sourceDoc({ status: statuses.JOB_STATUS_PENDING, process_expiration: null });

const client = await this.getClient();
const { body } = await client.update<ReportDocument>(updateRequest);
const { body } = await client.update<ReportDocument>({
id: report._id,
index: report._index!,
if_seq_no: report._seq_no,
if_primary_term: report._primary_term,
refresh: true,
body: { doc },
});
return body;
} catch (err) {
this.logger.error(
`Error in updating the report document for retry! Report: ${jobDebugMessage(report)}`
`Error in updating the report document for retry! Report: ` + jobDebugMessage(report)
);
this.logger.error(err);
throw err;
Expand All @@ -373,59 +336,35 @@ export class ReportingStore {
* 1. An older version of Kibana created jobs with ESQueue, and they have not yet started running. The job is stuck in "status: pending"
* 2. Kibana stopped while the job was executing. The job is stuck in "status: processing"
*/
public async findStaleReportJobs(): Promise<StaleReportsResponse> {
public async findStaleReportJob(): Promise<ReportRecordTimeout> {
const client = await this.getClient();

const processingFilter = {
const expiredFilter = {
bool: {
must: [
{ range: { process_expiration: { lt: `now-${this.queueTimeoutSecs}s` } } },
{ range: { process_expiration: { lt: `now` } } },
{ terms: { status: [statuses.JOB_STATUS_PROCESSING] } },
],
},
};
const pendingFilter = {
const oldVersionFilter = {
bool: {
must: [{ terms: { status: [statuses.JOB_STATUS_PENDING] } }],
must_not: [{ exists: { field: 'migration_version' } }],
},
};

const query: SearchRequest = {
const { body } = await client.search<ReportRecordTimeout['_source']>({
index: this.indexPrefix + '-*',
seq_no_primary_term: true,
// pull all the fields needed for rescheduling this report as a task
_source_excludes: ['output'],
body: {
sort: { created_at: { order: 'asc' as const } }, // find the oldest first
query: { bool: { filter: { bool: { should: [processingFilter, pendingFilter] } } } },
query: { bool: { filter: { bool: { should: [expiredFilter, oldVersionFilter] } } } },
},
};

const { body } = await client.search<ReportRecordTimeout['_source']>(query);

const initialSorted: Required<StaleReportsResponse> = { pending: [], processing: [] };
const sorted = body.hits?.hits.reduce((accum, hit) => {
const processing = [...accum.processing];
const pending = [...accum.pending];

if (hit._source?.status === statuses.JOB_STATUS_PROCESSING) {
return {
processing: processing.concat(hit as ReportRecordTimeout),
pending,
};
}

if (hit._source?.status === statuses.JOB_STATUS_PENDING) {
return {
processing,
pending: pending.concat(hit as ReportRecordTimeout),
};
}

throw new Error(`Invalid job status in the monitoring search result: ${hit._source?.status}`); // only pending or processing jobs should appear in the search result
}, initialSorted);
});

return sorted || initialSorted;
return body.hits?.hits[0] as ReportRecordTimeout;
}
}
Loading

0 comments on commit f505322

Please sign in to comment.