diff --git a/x-pack/plugins/reporting/server/core.ts b/x-pack/plugins/reporting/server/core.ts index 9acd359fa0db..eccd6c7db169 100644 --- a/x-pack/plugins/reporting/server/core.ts +++ b/x-pack/plugins/reporting/server/core.ts @@ -24,6 +24,7 @@ import { screenshotsObservableFactory } from './export_types/common/lib/screensh import { checkLicense, getExportTypesRegistry } from './lib'; import { ESQueueInstance } from './lib/create_queue'; import { EnqueueJobFn } from './lib/enqueue_job'; +import { ReportingStore } from './lib/store'; export interface ReportingInternalSetup { elasticsearch: ElasticsearchServiceSetup; @@ -37,6 +38,7 @@ export interface ReportingInternalStart { browserDriverFactory: HeadlessChromiumDriverFactory; enqueueJob: EnqueueJobFn; esqueue: ESQueueInstance; + store: ReportingStore; savedObjects: SavedObjectsServiceStart; uiSettings: UiSettingsServiceStart; } diff --git a/x-pack/plugins/reporting/server/lib/create_queue.ts b/x-pack/plugins/reporting/server/lib/create_queue.ts index 5d09af312a41..a8dcb92c55b2 100644 --- a/x-pack/plugins/reporting/server/lib/create_queue.ts +++ b/x-pack/plugins/reporting/server/lib/create_queue.ts @@ -8,17 +8,16 @@ import { ReportingCore } from '../core'; import { JobSource, TaskRunResult } from '../types'; import { createTaggedLogger } from './create_tagged_logger'; // TODO remove createTaggedLogger once esqueue is removed import { createWorkerFactory } from './create_worker'; -import { Job } from './enqueue_job'; // @ts-ignore import { Esqueue } from './esqueue'; import { LevelLogger } from './level_logger'; +import { ReportingStore } from './store'; interface ESQueueWorker { on: (event: string, handler: any) => void; } export interface ESQueueInstance { - addJob: (type: string, payload: unknown, options: object) => Job; registerWorker: ( pluginId: string, workerFn: GenericWorkerFn, @@ -37,26 +36,25 @@ type GenericWorkerFn = ( ...workerRestArgs: any[] ) => void | Promise; -export async function createQueueFactory( +export async function createQueueFactory( reporting: ReportingCore, + store: ReportingStore, logger: LevelLogger ): Promise { const config = reporting.getConfig(); - const queueIndexInterval = config.get('queue', 'indexInterval'); + + // esqueue-related const queueTimeout = config.get('queue', 'timeout'); - const queueIndex = config.get('index'); const isPollingEnabled = config.get('queue', 'pollEnabled'); - const elasticsearch = await reporting.getElasticsearchService(); + const elasticsearch = reporting.getElasticsearchService(); const queueOptions = { - interval: queueIndexInterval, timeout: queueTimeout, - dateSeparator: '.', client: elasticsearch.legacy.client, logger: createTaggedLogger(logger, ['esqueue', 'queue-worker']), }; - const queue: ESQueueInstance = new Esqueue(queueIndex, queueOptions); + const queue: ESQueueInstance = new Esqueue(store, queueOptions); if (isPollingEnabled) { // create workers to poll the index for idle jobs waiting to be claimed and executed diff --git a/x-pack/plugins/reporting/server/lib/enqueue_job.ts b/x-pack/plugins/reporting/server/lib/enqueue_job.ts index 625da90f3b4f..d1554a03b938 100644 --- a/x-pack/plugins/reporting/server/lib/enqueue_job.ts +++ b/x-pack/plugins/reporting/server/lib/enqueue_job.ts @@ -4,39 +4,24 @@ * you may not use this file except in compliance with the Elastic License. */ -import { EventEmitter } from 'events'; import { KibanaRequest, RequestHandlerContext } from 'src/core/server'; import { AuthenticatedUser } from '../../../security/server'; import { ESQueueCreateJobFn } from '../../server/types'; import { ReportingCore } from '../core'; -// @ts-ignore -import { events as esqueueEvents } from './esqueue'; -import { LevelLogger } from './level_logger'; +import { LevelLogger } from './'; +import { ReportingStore, Report } from './store'; -interface ConfirmedJob { - id: string; - index: string; - _seq_no: number; - _primary_term: number; -} - -export type Job = EventEmitter & { - id: string; - toJSON: () => { - id: string; - }; -}; - -export type EnqueueJobFn = ( +export type EnqueueJobFn = ( exportTypeId: string, - jobParams: JobParamsType, + jobParams: unknown, user: AuthenticatedUser | null, context: RequestHandlerContext, request: KibanaRequest -) => Promise; +) => Promise; export function enqueueJobFactory( reporting: ReportingCore, + store: ReportingStore, parentLogger: LevelLogger ): EnqueueJobFn { const config = reporting.getConfig(); @@ -45,16 +30,16 @@ export function enqueueJobFactory( const maxAttempts = config.get('capture', 'maxAttempts'); const logger = parentLogger.clone(['queue-job']); - return async function enqueueJob( + return async function enqueueJob( exportTypeId: string, - jobParams: JobParamsType, + jobParams: unknown, user: AuthenticatedUser | null, context: RequestHandlerContext, request: KibanaRequest - ): Promise { - type ScheduleTaskFnType = ESQueueCreateJobFn; + ) { + type ScheduleTaskFnType = ESQueueCreateJobFn; + const username = user ? user.username : false; - const esqueue = await reporting.getEsqueue(); const exportType = reporting.getExportTypesRegistry().getById(exportTypeId); if (exportType == null) { @@ -71,16 +56,6 @@ export function enqueueJobFactory( max_attempts: maxAttempts, }; - return new Promise((resolve, reject) => { - const job = esqueue.addJob(exportType.jobType, payload, options); - - job.on(esqueueEvents.EVENT_JOB_CREATED, (createdJob: ConfirmedJob) => { - if (createdJob.id === job.id) { - logger.info(`Successfully queued job: ${createdJob.id}`); - resolve(job); - } - }); - job.on(esqueueEvents.EVENT_JOB_CREATE_ERROR, reject); - }); + return await store.addReport(exportType.jobType, payload, options); }; } diff --git a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/helpers/create_index.js b/x-pack/plugins/reporting/server/lib/esqueue/__tests__/helpers/create_index.js deleted file mode 100644 index 691bd4f618a1..000000000000 --- a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/helpers/create_index.js +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import expect from '@kbn/expect'; -import sinon from 'sinon'; -import { createIndex } from '../../helpers/create_index'; -import { ClientMock } from '../fixtures/legacy_elasticsearch'; -import { constants } from '../../constants'; - -describe('Create Index', function () { - describe('Does not exist', function () { - let client; - let createSpy; - - beforeEach(function () { - client = new ClientMock(); - createSpy = sinon.spy(client, 'callAsInternalUser').withArgs('indices.create'); - }); - - it('should return true', function () { - const indexName = 'test-index'; - const result = createIndex(client, indexName); - - return result.then((exists) => expect(exists).to.be(true)); - }); - - it('should create the index with mappings and default settings', function () { - const indexName = 'test-index'; - const settings = constants.DEFAULT_SETTING_INDEX_SETTINGS; - const result = createIndex(client, indexName); - - return result.then(function () { - const payload = createSpy.getCall(0).args[1]; - sinon.assert.callCount(createSpy, 1); - expect(payload).to.have.property('index', indexName); - expect(payload).to.have.property('body'); - expect(payload.body).to.have.property('settings'); - expect(payload.body.settings).to.eql(settings); - expect(payload.body).to.have.property('mappings'); - expect(payload.body.mappings).to.have.property('properties'); - }); - }); - - it('should create the index with custom settings', function () { - const indexName = 'test-index'; - const settings = { - ...constants.DEFAULT_SETTING_INDEX_SETTINGS, - auto_expand_replicas: false, - number_of_shards: 3000, - number_of_replicas: 1, - format: '3000', - }; - const result = createIndex(client, indexName, settings); - - return result.then(function () { - const payload = createSpy.getCall(0).args[1]; - sinon.assert.callCount(createSpy, 1); - expect(payload).to.have.property('index', indexName); - expect(payload).to.have.property('body'); - expect(payload.body).to.have.property('settings'); - expect(payload.body.settings).to.eql(settings); - expect(payload.body).to.have.property('mappings'); - expect(payload.body.mappings).to.have.property('properties'); - }); - }); - }); - - describe('Does exist', function () { - let client; - let createSpy; - - beforeEach(function () { - client = new ClientMock(); - sinon - .stub(client, 'callAsInternalUser') - .withArgs('indices.exists') - .callsFake(() => Promise.resolve(true)); - createSpy = client.callAsInternalUser.withArgs('indices.create'); - }); - - it('should return true', function () { - const indexName = 'test-index'; - const result = createIndex(client, indexName); - - return result.then((exists) => expect(exists).to.be(true)); - }); - - it('should not create the index', function () { - const indexName = 'test-index'; - const result = createIndex(client, indexName); - - return result.then(function () { - sinon.assert.callCount(createSpy, 0); - }); - }); - }); -}); diff --git a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/helpers/index_timestamp.js b/x-pack/plugins/reporting/server/lib/esqueue/__tests__/helpers/index_timestamp.js deleted file mode 100644 index 71dc8a363e42..000000000000 --- a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/helpers/index_timestamp.js +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import expect from '@kbn/expect'; -import sinon from 'sinon'; -import moment from 'moment'; -import { constants } from '../../constants'; -import { indexTimestamp } from '../../helpers/index_timestamp'; - -const anchor = '2016-04-02T01:02:03.456'; // saturday - -describe('Index timestamp interval', function () { - describe('construction', function () { - it('should throw given an invalid interval', function () { - const init = () => indexTimestamp('bananas'); - expect(init).to.throwException(/invalid.+interval/i); - }); - }); - - describe('timestamps', function () { - let clock; - let separator; - - beforeEach(function () { - separator = constants.DEFAULT_SETTING_DATE_SEPARATOR; - clock = sinon.useFakeTimers(moment(anchor).valueOf()); - }); - - afterEach(function () { - clock.restore(); - }); - - describe('formats', function () { - it('should return the year', function () { - const timestamp = indexTimestamp('year'); - const str = `2016`; - expect(timestamp).to.equal(str); - }); - - it('should return the year and month', function () { - const timestamp = indexTimestamp('month'); - const str = `2016${separator}04`; - expect(timestamp).to.equal(str); - }); - - it('should return the year, month, and first day of the week', function () { - const timestamp = indexTimestamp('week'); - const str = `2016${separator}03${separator}27`; - expect(timestamp).to.equal(str); - }); - - it('should return the year, month, and day of the week', function () { - const timestamp = indexTimestamp('day'); - const str = `2016${separator}04${separator}02`; - expect(timestamp).to.equal(str); - }); - - it('should return the year, month, day and hour', function () { - const timestamp = indexTimestamp('hour'); - const str = `2016${separator}04${separator}02${separator}01`; - expect(timestamp).to.equal(str); - }); - - it('should return the year, month, day, hour and minute', function () { - const timestamp = indexTimestamp('minute'); - const str = `2016${separator}04${separator}02${separator}01${separator}02`; - expect(timestamp).to.equal(str); - }); - }); - - describe('date separator', function () { - it('should be customizable', function () { - const separators = ['-', '.', '_']; - separators.forEach((customSep) => { - const str = `2016${customSep}04${customSep}02${customSep}01${customSep}02`; - const timestamp = indexTimestamp('minute', customSep); - expect(timestamp).to.equal(str); - }); - }); - - it('should throw if a letter is used', function () { - const separators = ['a', 'B', 'YYYY']; - separators.forEach((customSep) => { - const fn = () => indexTimestamp('minute', customSep); - expect(fn).to.throwException(); - }); - }); - }); - }); -}); diff --git a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/job.js b/x-pack/plugins/reporting/server/lib/esqueue/__tests__/job.js deleted file mode 100644 index 955eed8d6572..000000000000 --- a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/job.js +++ /dev/null @@ -1,420 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import events from 'events'; -import expect from '@kbn/expect'; -import sinon from 'sinon'; -import proxyquire from 'proxyquire'; -import { QueueMock } from './fixtures/queue'; -import { ClientMock } from './fixtures/legacy_elasticsearch'; -import { constants } from '../constants'; - -const createIndexMock = sinon.stub(); -const { Job } = proxyquire.noPreserveCache()('../job', { - './helpers/create_index': { createIndex: createIndexMock }, -}); - -const maxPriority = 20; -const minPriority = -20; -const defaultPriority = 10; -const defaultCreatedBy = false; - -function validateDoc(spy) { - sinon.assert.callCount(spy, 1); - const spyCall = spy.getCall(0); - return spyCall.args[1]; -} - -describe('Job Class', function () { - let mockQueue; - let client; - let index; - - let type; - let payload; - let options; - - beforeEach(function () { - createIndexMock.resetHistory(); - createIndexMock.returns(Promise.resolve('mock')); - index = 'test'; - - client = new ClientMock(); - mockQueue = new QueueMock(); - mockQueue.setClient(client); - }); - - it('should be an event emitter', function () { - const job = new Job(mockQueue, index, 'test', {}); - expect(job).to.be.an(events.EventEmitter); - }); - - describe('invalid construction', function () { - it('should throw with a missing type', function () { - const init = () => new Job(mockQueue, index); - expect(init).to.throwException(/type.+string/i); - }); - - it('should throw with an invalid type', function () { - const init = () => new Job(mockQueue, index, { 'not a string': true }); - expect(init).to.throwException(/type.+string/i); - }); - - it('should throw with an invalid payload', function () { - const init = () => new Job(mockQueue, index, 'type1', [1, 2, 3]); - expect(init).to.throwException(/plain.+object/i); - }); - - it(`should throw error if invalid maxAttempts`, function () { - const init = () => new Job(mockQueue, index, 'type1', { id: '123' }, { max_attempts: -1 }); - expect(init).to.throwException(/invalid.+max_attempts/i); - }); - }); - - describe('construction', function () { - let indexSpy; - beforeEach(function () { - type = 'type1'; - payload = { id: '123' }; - indexSpy = sinon.spy(client, 'callAsInternalUser').withArgs('index'); - }); - - it('should create the target index', function () { - const job = new Job(mockQueue, index, type, payload, options); - return job.ready.then(() => { - sinon.assert.calledOnce(createIndexMock); - const args = createIndexMock.getCall(0).args; - expect(args[0]).to.equal(client); - expect(args[1]).to.equal(index); - }); - }); - - it('should index the payload', function () { - const job = new Job(mockQueue, index, type, payload); - return job.ready.then(() => { - const indexArgs = validateDoc(indexSpy); - expect(indexArgs).to.have.property('index', index); - expect(indexArgs).to.have.property('body'); - expect(indexArgs.body).to.have.property('payload', payload); - }); - }); - - it('should index the job type', function () { - const job = new Job(mockQueue, index, type, payload); - return job.ready.then(() => { - const indexArgs = validateDoc(indexSpy); - expect(indexArgs).to.have.property('index', index); - expect(indexArgs).to.have.property('body'); - expect(indexArgs.body).to.have.property('jobtype', type); - }); - }); - - it('should set event creation time', function () { - const job = new Job(mockQueue, index, type, payload); - return job.ready.then(() => { - const indexArgs = validateDoc(indexSpy); - expect(indexArgs.body).to.have.property('created_at'); - }); - }); - - it('should refresh the index', function () { - const refreshSpy = client.callAsInternalUser.withArgs('indices.refresh'); - - const job = new Job(mockQueue, index, type, payload); - return job.ready.then(() => { - sinon.assert.calledOnce(refreshSpy); - const spyCall = refreshSpy.getCall(0); - expect(spyCall.args[1]).to.have.property('index', index); - }); - }); - - it('should emit the job information on success', function (done) { - const job = new Job(mockQueue, index, type, payload); - job.once(constants.EVENT_JOB_CREATED, (jobDoc) => { - try { - expect(jobDoc).to.have.property('id'); - expect(jobDoc).to.have.property('index'); - expect(jobDoc).to.have.property('_seq_no'); - expect(jobDoc).to.have.property('_primary_term'); - done(); - } catch (e) { - done(e); - } - }); - }); - - it('should emit error on index creation failure', function (done) { - const errMsg = 'test index creation failure'; - - createIndexMock.returns(Promise.reject(new Error(errMsg))); - const job = new Job(mockQueue, index, type, payload); - - job.once(constants.EVENT_JOB_CREATE_ERROR, (err) => { - try { - expect(err.message).to.equal(errMsg); - done(); - } catch (e) { - done(e); - } - }); - }); - - it('should emit error on client index failure', function (done) { - const errMsg = 'test document index failure'; - - client.callAsInternalUser.restore(); - sinon - .stub(client, 'callAsInternalUser') - .withArgs('index') - .callsFake(() => Promise.reject(new Error(errMsg))); - const job = new Job(mockQueue, index, type, payload); - - job.once(constants.EVENT_JOB_CREATE_ERROR, (err) => { - try { - expect(err.message).to.equal(errMsg); - done(); - } catch (e) { - done(e); - } - }); - }); - }); - - describe('event emitting', function () { - it('should trigger events on the queue instance', function (done) { - const eventName = 'test event'; - const payload1 = { - test: true, - deep: { object: 'ok' }, - }; - const payload2 = 'two'; - const payload3 = new Error('test error'); - - const job = new Job(mockQueue, index, type, payload, options); - - mockQueue.on(eventName, (...args) => { - try { - expect(args[0]).to.equal(payload1); - expect(args[1]).to.equal(payload2); - expect(args[2]).to.equal(payload3); - done(); - } catch (e) { - done(e); - } - }); - - job.emit(eventName, payload1, payload2, payload3); - }); - }); - - describe('default values', function () { - let indexSpy; - beforeEach(function () { - type = 'type1'; - payload = { id: '123' }; - indexSpy = sinon.spy(client, 'callAsInternalUser').withArgs('index'); - }); - - it('should set attempt count to 0', function () { - const job = new Job(mockQueue, index, type, payload); - return job.ready.then(() => { - const indexArgs = validateDoc(indexSpy); - expect(indexArgs.body).to.have.property('attempts', 0); - }); - }); - - it('should index default created_by value', function () { - const job = new Job(mockQueue, index, type, payload); - return job.ready.then(() => { - const indexArgs = validateDoc(indexSpy); - expect(indexArgs.body).to.have.property('created_by', defaultCreatedBy); - }); - }); - - it('should set an expired process_expiration time', function () { - const now = new Date().getTime(); - const job = new Job(mockQueue, index, type, payload); - return job.ready.then(() => { - const indexArgs = validateDoc(indexSpy); - expect(indexArgs.body).to.have.property('process_expiration'); - expect(indexArgs.body.process_expiration.getTime()).to.be.lessThan(now); - }); - }); - - it('should set status as pending', function () { - const job = new Job(mockQueue, index, type, payload); - return job.ready.then(() => { - const indexArgs = validateDoc(indexSpy); - expect(indexArgs.body).to.have.property('status', constants.JOB_STATUS_PENDING); - }); - }); - - it('should have a default priority of 10', function () { - const job = new Job(mockQueue, index, type, payload, options); - return job.ready.then(() => { - const indexArgs = validateDoc(indexSpy); - expect(indexArgs.body).to.have.property('priority', defaultPriority); - }); - }); - - it('should set a browser type', function () { - const job = new Job(mockQueue, index, type, payload); - return job.ready.then(() => { - const indexArgs = validateDoc(indexSpy); - expect(indexArgs.body).to.have.property('browser_type'); - }); - }); - }); - - describe('option passing', function () { - let indexSpy; - beforeEach(function () { - type = 'type1'; - payload = { id: '123' }; - options = { - timeout: 4567, - max_attempts: 9, - headers: { - authorization: 'Basic cXdlcnR5', - }, - }; - indexSpy = sinon.spy(client, 'callAsInternalUser').withArgs('index'); - }); - - it('should index the created_by value', function () { - const createdBy = 'user_identifier'; - const job = new Job(mockQueue, index, type, payload, { - created_by: createdBy, - ...options, - }); - return job.ready.then(() => { - const indexArgs = validateDoc(indexSpy); - expect(indexArgs.body).to.have.property('created_by', createdBy); - }); - }); - - it('should index timeout value from options', function () { - const job = new Job(mockQueue, index, type, payload, options); - return job.ready.then(() => { - const indexArgs = validateDoc(indexSpy); - expect(indexArgs.body).to.have.property('timeout', options.timeout); - }); - }); - - it('should set max attempt count', function () { - const job = new Job(mockQueue, index, type, payload, options); - return job.ready.then(() => { - const indexArgs = validateDoc(indexSpy); - expect(indexArgs.body).to.have.property('max_attempts', options.max_attempts); - }); - }); - - it('should add headers to the request params', function () { - const job = new Job(mockQueue, index, type, payload, options); - return job.ready.then(() => { - const indexArgs = validateDoc(indexSpy); - expect(indexArgs).to.have.property('headers', options.headers); - }); - }); - - it(`should use upper priority of ${maxPriority}`, function () { - const job = new Job(mockQueue, index, type, payload, { priority: maxPriority * 2 }); - return job.ready.then(() => { - const indexArgs = validateDoc(indexSpy); - expect(indexArgs.body).to.have.property('priority', maxPriority); - }); - }); - - it(`should use lower priority of ${minPriority}`, function () { - const job = new Job(mockQueue, index, type, payload, { priority: minPriority * 2 }); - return job.ready.then(() => { - const indexArgs = validateDoc(indexSpy); - expect(indexArgs.body).to.have.property('priority', minPriority); - }); - }); - }); - - describe('get method', function () { - beforeEach(function () { - type = 'type2'; - payload = { id: '123' }; - }); - - it('should return the job document', function () { - const job = new Job(mockQueue, index, type, payload); - - return job.get().then((doc) => { - const jobDoc = job.document; // document should be resolved - expect(doc).to.have.property('index', index); - expect(doc).to.have.property('id', jobDoc.id); - expect(doc).to.have.property('_seq_no', jobDoc._seq_no); - expect(doc).to.have.property('_primary_term', jobDoc._primary_term); - expect(doc).to.have.property('created_by', defaultCreatedBy); - - expect(doc).to.have.property('payload'); - expect(doc).to.have.property('jobtype'); - expect(doc).to.have.property('priority'); - expect(doc).to.have.property('timeout'); - }); - }); - - it('should contain optional data', function () { - const optionals = { - created_by: 'some_ident', - }; - - const job = new Job(mockQueue, index, type, payload, optionals); - return Promise.resolve(client.callAsInternalUser('get', {}, optionals)) - .then((doc) => { - sinon.stub(client, 'callAsInternalUser').withArgs('get').returns(Promise.resolve(doc)); - }) - .then(() => { - return job.get().then((doc) => { - expect(doc).to.have.property('created_by', optionals.created_by); - }); - }); - }); - }); - - describe('toJSON method', function () { - beforeEach(function () { - type = 'type2'; - payload = { id: '123' }; - options = { - timeout: 4567, - max_attempts: 9, - priority: 8, - }; - }); - - it('should return the static information about the job', function () { - const job = new Job(mockQueue, index, type, payload, options); - - // toJSON is sync, should work before doc is written to elasticsearch - expect(job.document).to.be(undefined); - - const doc = job.toJSON(); - expect(doc).to.have.property('index', index); - expect(doc).to.have.property('jobtype', type); - expect(doc).to.have.property('created_by', defaultCreatedBy); - expect(doc).to.have.property('timeout', options.timeout); - expect(doc).to.have.property('max_attempts', options.max_attempts); - expect(doc).to.have.property('priority', options.priority); - expect(doc).to.have.property('id'); - expect(doc).to.not.have.property('version'); - }); - - it('should contain optional data', function () { - const optionals = { - created_by: 'some_ident', - }; - - const job = new Job(mockQueue, index, type, payload, optionals); - const doc = job.toJSON(); - expect(doc).to.have.property('created_by', optionals.created_by); - }); - }); -}); diff --git a/x-pack/plugins/reporting/server/lib/esqueue/index.js b/x-pack/plugins/reporting/server/lib/esqueue/index.js index 735d19f8f6c4..0fbcb54c673d 100644 --- a/x-pack/plugins/reporting/server/lib/esqueue/index.js +++ b/x-pack/plugins/reporting/server/lib/esqueue/index.js @@ -5,20 +5,17 @@ */ import { EventEmitter } from 'events'; -import { Job } from './job'; import { Worker } from './worker'; import { constants } from './constants'; -import { indexTimestamp } from './helpers/index_timestamp'; import { omit } from 'lodash'; export { events } from './constants/events'; export class Esqueue extends EventEmitter { - constructor(index, options = {}) { - if (!index) throw new Error('Must specify an index to write to'); - + constructor(store, options = {}) { super(); - this.index = index; + this.store = store; // for updating jobs in ES + this.index = this.store.indexPrefix; // for polling for pending jobs this.settings = { interval: constants.DEFAULT_SETTING_INTERVAL, timeout: constants.DEFAULT_SETTING_TIMEOUT, @@ -40,21 +37,6 @@ export class Esqueue extends EventEmitter { }); } - addJob(jobtype, payload, opts = {}) { - const timestamp = indexTimestamp(this.settings.interval, this.settings.dateSeparator); - const index = `${this.index}-${timestamp}`; - const defaults = { - timeout: this.settings.timeout, - }; - - const options = Object.assign(defaults, opts, { - indexSettings: this.settings.indexSettings, - logger: this._logger, - }); - - return new Job(this, index, jobtype, payload, options); - } - registerWorker(type, workerFn, opts) { const worker = new Worker(this, type, workerFn, { ...opts, logger: this._logger }); this._workers.push(worker); diff --git a/x-pack/plugins/reporting/server/lib/esqueue/job.js b/x-pack/plugins/reporting/server/lib/esqueue/job.js deleted file mode 100644 index 6ab78eeb1b86..000000000000 --- a/x-pack/plugins/reporting/server/lib/esqueue/job.js +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import events from 'events'; -import Puid from 'puid'; -import { constants } from './constants'; -import { createIndex } from './helpers/create_index'; -import { isPlainObject } from 'lodash'; - -const puid = new Puid(); - -export class Job extends events.EventEmitter { - constructor(queue, index, jobtype, payload, options = {}) { - if (typeof jobtype !== 'string') throw new Error('Jobtype must be a string'); - if (!isPlainObject(payload)) throw new Error('Payload must be a plain object'); - - super(); - - this.queue = queue; - this._client = this.queue.client; - this.id = puid.generate(); - this.index = index; - this.jobtype = jobtype; - this.payload = payload; - this.created_by = options.created_by || false; - this.timeout = options.timeout || 10000; - this.maxAttempts = options.max_attempts || 3; - this.priority = Math.max(Math.min(options.priority || 10, 20), -20); - this.indexSettings = options.indexSettings || {}; - this.browser_type = options.browser_type; - - if (typeof this.maxAttempts !== 'number' || this.maxAttempts < 1) { - throw new Error(`Invalid max_attempts: ${this.maxAttempts}`); - } - - this.debug = (msg, err) => { - const logger = options.logger || function () {}; - const message = `${this.id} - ${msg}`; - const tags = ['debug']; - - if (err) { - logger(`${message}: ${err}`, tags); - return; - } - - logger(message, tags); - }; - - const indexParams = { - index: this.index, - id: this.id, - body: { - jobtype: this.jobtype, - meta: { - // We are copying these values out of payload because these fields are indexed and can be aggregated on - // for tracking stats, while payload contents are not. - objectType: payload.objectType, - layout: payload.layout ? payload.layout.id : 'none', - }, - payload: this.payload, - priority: this.priority, - created_by: this.created_by, - timeout: this.timeout, - process_expiration: new Date(0), // use epoch so the job query works - created_at: new Date(), - attempts: 0, - max_attempts: this.maxAttempts, - status: constants.JOB_STATUS_PENDING, - browser_type: this.browser_type, - }, - }; - - if (options.headers) { - indexParams.headers = options.headers; - } - - this.ready = createIndex(this._client, this.index, this.indexSettings) - .then(() => this._client.callAsInternalUser('index', indexParams)) - .then((doc) => { - this.document = { - id: doc._id, - index: doc._index, - _seq_no: doc._seq_no, - _primary_term: doc._primary_term, - }; - this.debug(`Job created in index ${this.index}`); - - return this._client - .callAsInternalUser('indices.refresh', { - index: this.index, - }) - .then(() => { - this.debug(`Job index refreshed ${this.index}`); - this.emit(constants.EVENT_JOB_CREATED, this.document); - }); - }) - .catch((err) => { - this.debug('Job creation failed', err); - this.emit(constants.EVENT_JOB_CREATE_ERROR, err); - }); - } - - emit(name, ...args) { - super.emit(name, ...args); - this.queue.emit(name, ...args); - } - - get() { - return this.ready - .then(() => { - return this._client.callAsInternalUser('get', { - index: this.index, - id: this.id, - }); - }) - .then((doc) => { - return Object.assign(doc._source, { - index: doc._index, - id: doc._id, - _seq_no: doc._seq_no, - _primary_term: doc._primary_term, - }); - }); - } - - toJSON() { - return { - id: this.id, - index: this.index, - jobtype: this.jobtype, - created_by: this.created_by, - payload: this.payload, - timeout: this.timeout, - max_attempts: this.maxAttempts, - priority: this.priority, - browser_type: this.browser_type, - }; - } -} diff --git a/x-pack/plugins/reporting/server/lib/esqueue/worker.js b/x-pack/plugins/reporting/server/lib/esqueue/worker.js index b26ed731c683..469bafd69461 100644 --- a/x-pack/plugins/reporting/server/lib/esqueue/worker.js +++ b/x-pack/plugins/reporting/server/lib/esqueue/worker.js @@ -158,8 +158,8 @@ export class Worker extends events.EventEmitter { kibana_name: this.kibanaName, }; - return this._client - .callAsInternalUser('update', { + return this.queue.store + .updateReport({ index: job._index, id: job._id, if_seq_no: job._seq_no, @@ -197,8 +197,8 @@ export class Worker extends events.EventEmitter { output: docOutput, }); - return this._client - .callAsInternalUser('update', { + return this.queue.store + .updateReport({ index: job._index, id: job._id, if_seq_no: job._seq_no, @@ -294,8 +294,8 @@ export class Worker extends events.EventEmitter { output: docOutput, }; - return this._client - .callAsInternalUser('update', { + return this.queue.store + .updateReport({ index: job._index, id: job._id, if_seq_no: job._seq_no, diff --git a/x-pack/plugins/reporting/server/lib/index.ts b/x-pack/plugins/reporting/server/lib/index.ts index 0e9c49b17088..f5a50fca28b7 100644 --- a/x-pack/plugins/reporting/server/lib/index.ts +++ b/x-pack/plugins/reporting/server/lib/index.ts @@ -12,3 +12,4 @@ export { enqueueJobFactory } from './enqueue_job'; export { getExportTypesRegistry } from './export_types_registry'; export { runValidations } from './validate'; export { startTrace } from './trace'; +export { ReportingStore } from './store'; diff --git a/x-pack/plugins/reporting/server/lib/store/index.ts b/x-pack/plugins/reporting/server/lib/store/index.ts new file mode 100644 index 000000000000..a88d36d3fdf9 --- /dev/null +++ b/x-pack/plugins/reporting/server/lib/store/index.ts @@ -0,0 +1,8 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export { Report } from './report'; +export { ReportingStore } from './store'; diff --git a/x-pack/plugins/reporting/server/lib/esqueue/helpers/index_timestamp.js b/x-pack/plugins/reporting/server/lib/store/index_timestamp.ts similarity index 80% rename from x-pack/plugins/reporting/server/lib/esqueue/helpers/index_timestamp.js rename to x-pack/plugins/reporting/server/lib/store/index_timestamp.ts index ceb4ef43b2d9..71ce0b1e572f 100644 --- a/x-pack/plugins/reporting/server/lib/esqueue/helpers/index_timestamp.js +++ b/x-pack/plugins/reporting/server/lib/store/index_timestamp.ts @@ -4,19 +4,20 @@ * you may not use this file except in compliance with the Elastic License. */ -import moment from 'moment'; +import moment, { unitOfTime } from 'moment'; export const intervals = ['year', 'month', 'week', 'day', 'hour', 'minute']; // TODO: This helper function can be removed by using `schema.duration` objects in the reporting config schema -export function indexTimestamp(intervalStr, separator = '-') { +export function indexTimestamp(intervalStr: string, separator = '-') { + const startOf = intervalStr as unitOfTime.StartOf; if (separator.match(/[a-z]/i)) throw new Error('Interval separator can not be a letter'); const index = intervals.indexOf(intervalStr); - if (index === -1) throw new Error('Invalid index interval: ', intervalStr); + if (index === -1) throw new Error('Invalid index interval: ' + intervalStr); const m = moment(); - m.startOf(intervalStr); + m.startOf(startOf); let dateString; switch (intervalStr) { diff --git a/x-pack/plugins/reporting/server/lib/store/mapping.ts b/x-pack/plugins/reporting/server/lib/store/mapping.ts new file mode 100644 index 000000000000..a819923e2f10 --- /dev/null +++ b/x-pack/plugins/reporting/server/lib/store/mapping.ts @@ -0,0 +1,65 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export const mapping = { + meta: { + // We are indexing these properties with both text and keyword fields because that's what will be auto generated + // when an index already exists. This schema is only used when a reporting index doesn't exist. This way existing + // reporting indexes and new reporting indexes will look the same and the data can be queried in the same + // manner. + properties: { + /** + * Type of object that is triggering this report. Should be either search, visualization or dashboard. + * Used for job listing and telemetry stats only. + */ + objectType: { + type: 'text', + fields: { + keyword: { + type: 'keyword', + ignore_above: 256, + }, + }, + }, + /** + * Can be either preserve_layout, print or none (in the case of csv export). + * Used for phone home stats only. + */ + layout: { + type: 'text', + fields: { + keyword: { + type: 'keyword', + ignore_above: 256, + }, + }, + }, + }, + }, + browser_type: { type: 'keyword' }, + jobtype: { type: 'keyword' }, + payload: { type: 'object', enabled: false }, + priority: { type: 'byte' }, + timeout: { type: 'long' }, + process_expiration: { type: 'date' }, + created_by: { type: 'keyword' }, + created_at: { type: 'date' }, + started_at: { type: 'date' }, + completed_at: { type: 'date' }, + attempts: { type: 'short' }, + max_attempts: { type: 'short' }, + kibana_name: { type: 'keyword' }, + kibana_id: { type: 'keyword' }, + status: { type: 'keyword' }, + output: { + type: 'object', + properties: { + content_type: { type: 'keyword' }, + size: { type: 'long' }, + content: { type: 'object', enabled: false }, + }, + }, +}; diff --git a/x-pack/plugins/reporting/server/lib/store/report.test.ts b/x-pack/plugins/reporting/server/lib/store/report.test.ts new file mode 100644 index 000000000000..83444494e61d --- /dev/null +++ b/x-pack/plugins/reporting/server/lib/store/report.test.ts @@ -0,0 +1,77 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { Report } from './report'; + +describe('Class Report', () => { + it('constructs Report instance', () => { + const opts = { + index: '.reporting-test-index-12345', + jobtype: 'test-report', + created_by: 'created_by_test_string', + browser_type: 'browser_type_test_string', + max_attempts: 50, + payload: { payload_test_field: 1 }, + timeout: 30000, + priority: 1, + }; + const report = new Report(opts); + expect(report.toJSON()).toMatchObject({ + _primary_term: undefined, + _seq_no: undefined, + browser_type: 'browser_type_test_string', + created_by: 'created_by_test_string', + jobtype: 'test-report', + max_attempts: 50, + payload: { + payload_test_field: 1, + }, + priority: 1, + timeout: 30000, + }); + + expect(report.id).toBeDefined(); + }); + + it('updateWithDoc method syncs takes fields to sync ES metadata', () => { + const opts = { + index: '.reporting-test-index-12345', + jobtype: 'test-report', + created_by: 'created_by_test_string', + browser_type: 'browser_type_test_string', + max_attempts: 50, + payload: { payload_test_field: 1 }, + timeout: 30000, + priority: 1, + }; + const report = new Report(opts); + + const metadata = { + _index: '.reporting-test-update', + _id: '12342p9o387549o2345', + _primary_term: 77, + _seq_no: 99, + }; + report.updateWithDoc(metadata); + + expect(report.toJSON()).toMatchObject({ + index: '.reporting-test-update', + _primary_term: 77, + _seq_no: 99, + browser_type: 'browser_type_test_string', + created_by: 'created_by_test_string', + jobtype: 'test-report', + max_attempts: 50, + payload: { + payload_test_field: 1, + }, + priority: 1, + timeout: 30000, + }); + + expect(report._id).toBe('12342p9o387549o2345'); + }); +}); diff --git a/x-pack/plugins/reporting/server/lib/store/report.ts b/x-pack/plugins/reporting/server/lib/store/report.ts new file mode 100644 index 000000000000..cc9967e64b6e --- /dev/null +++ b/x-pack/plugins/reporting/server/lib/store/report.ts @@ -0,0 +1,85 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +// @ts-ignore no module definition +import Puid from 'puid'; + +interface Payload { + id?: string; + index: string; + jobtype: string; + created_by: string | boolean; + payload: unknown; + browser_type: string; + priority: number; + max_attempts: number; + timeout: number; +} + +const puid = new Puid(); + +export class Report { + public readonly jobtype: string; + public readonly created_by: string | boolean; + public readonly payload: unknown; + public readonly browser_type: string; + public readonly id: string; + + public readonly priority: number; + // queue stuff, to be removed with Task Manager integration + public readonly max_attempts: number; + public readonly timeout: number; + + public _index: string; + public _id?: string; // set by ES + public _primary_term?: unknown; // set by ES + public _seq_no: unknown; // set by ES + + /* + * Create an unsaved report + */ + constructor(opts: Payload) { + this.jobtype = opts.jobtype; + this.created_by = opts.created_by; + this.payload = opts.payload; + this.browser_type = opts.browser_type; + this.priority = opts.priority; + this.max_attempts = opts.max_attempts; + this.timeout = opts.timeout; + this.id = puid.generate(); + + this._index = opts.index; + } + + /* + * Update the report with "live" storage metadata + */ + updateWithDoc(doc: Partial) { + if (doc._index) { + this._index = doc._index; // can not be undefined + } + + this._id = doc._id; + this._primary_term = doc._primary_term; + this._seq_no = doc._seq_no; + } + + toJSON() { + return { + id: this.id, + index: this._index, + _seq_no: this._seq_no, + _primary_term: this._primary_term, + jobtype: this.jobtype, + created_by: this.created_by, + payload: this.payload, + timeout: this.timeout, + max_attempts: this.max_attempts, + priority: this.priority, + browser_type: this.browser_type, + }; + } +} diff --git a/x-pack/plugins/reporting/server/lib/store/store.test.ts b/x-pack/plugins/reporting/server/lib/store/store.test.ts new file mode 100644 index 000000000000..4868a1dfdd8f --- /dev/null +++ b/x-pack/plugins/reporting/server/lib/store/store.test.ts @@ -0,0 +1,166 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import sinon from 'sinon'; +import { ReportingConfig, ReportingCore } from '../..'; +import { createMockReportingCore } from '../../test_helpers'; +import { createMockLevelLogger } from '../../test_helpers/create_mock_levellogger'; +import { ReportingStore } from './store'; +import { ElasticsearchServiceSetup } from 'src/core/server'; + +const getMockConfig = (mockConfigGet: sinon.SinonStub) => ({ + get: mockConfigGet, + kbnConfig: { get: mockConfigGet }, +}); + +describe('ReportingStore', () => { + const mockLogger = createMockLevelLogger(); + let mockConfig: ReportingConfig; + let mockCore: ReportingCore; + + const callClusterStub = sinon.stub(); + const mockElasticsearch = { legacy: { client: { callAsInternalUser: callClusterStub } } }; + + beforeEach(async () => { + const mockConfigGet = sinon.stub(); + mockConfigGet.withArgs('index').returns('.reporting-test'); + mockConfigGet.withArgs('queue', 'indexInterval').returns('week'); + mockConfig = getMockConfig(mockConfigGet); + mockCore = await createMockReportingCore(mockConfig); + + callClusterStub.withArgs('indices.exists').resolves({}); + callClusterStub.withArgs('indices.create').resolves({}); + callClusterStub.withArgs('index').resolves({}); + callClusterStub.withArgs('indices.refresh').resolves({}); + callClusterStub.withArgs('update').resolves({}); + + mockCore.getElasticsearchService = () => + (mockElasticsearch as unknown) as ElasticsearchServiceSetup; + }); + + describe('addReport', () => { + it('returns Report object', async () => { + const store = new ReportingStore(mockCore, mockLogger); + const reportType = 'unknowntype'; + const reportPayload = {}; + const reportOptions = { + timeout: 10000, + created_by: 'created_by_string', + browser_type: 'browser_type_string', + max_attempts: 1, + }; + await expect( + store.addReport(reportType, reportPayload, reportOptions) + ).resolves.toMatchObject({ + _primary_term: undefined, + _seq_no: undefined, + browser_type: 'browser_type_string', + created_by: 'created_by_string', + jobtype: 'unknowntype', + max_attempts: 1, + payload: {}, + priority: 10, + timeout: 10000, + }); + }); + + it('throws if options has invalid indexInterval', async () => { + const mockConfigGet = sinon.stub(); + mockConfigGet.withArgs('index').returns('.reporting-test'); + mockConfigGet.withArgs('queue', 'indexInterval').returns('centurially'); + mockConfig = getMockConfig(mockConfigGet); + mockCore = await createMockReportingCore(mockConfig); + + const store = new ReportingStore(mockCore, mockLogger); + const reportType = 'unknowntype'; + const reportPayload = {}; + const reportOptions = { + timeout: 10000, + created_by: 'created_by_string', + browser_type: 'browser_type_string', + max_attempts: 1, + }; + expect( + store.addReport(reportType, reportPayload, reportOptions) + ).rejects.toMatchInlineSnapshot(`[Error: Invalid index interval: centurially]`); + }); + + it('handles error creating the index', async () => { + // setup + callClusterStub.withArgs('indices.exists').resolves(false); + callClusterStub.withArgs('indices.create').rejects(new Error('error')); + + const store = new ReportingStore(mockCore, mockLogger); + const reportType = 'unknowntype'; + const reportPayload = {}; + const reportOptions = { + timeout: 10000, + created_by: 'created_by_string', + browser_type: 'browser_type_string', + max_attempts: 1, + }; + await expect( + store.addReport(reportType, reportPayload, reportOptions) + ).rejects.toMatchInlineSnapshot(`[Error: error]`); + }); + + /* Creating the index will fail, if there were multiple jobs staged in + * parallel and creation completed from another Kibana instance. Only the + * first request in line can successfully create it. + * In spite of that race condition, adding the new job in Elasticsearch is + * fine. + */ + it('ignores index creation error if the index already exists and continues adding the report', async () => { + // setup + callClusterStub.withArgs('indices.exists').resolves(false); + callClusterStub.withArgs('indices.create').rejects(new Error('error')); + + const store = new ReportingStore(mockCore, mockLogger); + const reportType = 'unknowntype'; + const reportPayload = {}; + const reportOptions = { + timeout: 10000, + created_by: 'created_by_string', + browser_type: 'browser_type_string', + max_attempts: 1, + }; + await expect( + store.addReport(reportType, reportPayload, reportOptions) + ).rejects.toMatchInlineSnapshot(`[Error: error]`); + }); + + it('skips creating the index if already exists', async () => { + // setup + callClusterStub.withArgs('indices.exists').resolves(false); + callClusterStub + .withArgs('indices.create') + .rejects(new Error('resource_already_exists_exception')); // will be triggered but ignored + + const store = new ReportingStore(mockCore, mockLogger); + const reportType = 'unknowntype'; + const reportPayload = {}; + const reportOptions = { + timeout: 10000, + created_by: 'created_by_string', + browser_type: 'browser_type_string', + max_attempts: 1, + }; + await expect( + store.addReport(reportType, reportPayload, reportOptions) + ).resolves.toMatchObject({ + _primary_term: undefined, + _seq_no: undefined, + browser_type: 'browser_type_string', + created_by: 'created_by_string', + jobtype: 'unknowntype', + max_attempts: 1, + payload: {}, + priority: 10, + timeout: 10000, + }); + }); + }); +}); diff --git a/x-pack/plugins/reporting/server/lib/store/store.ts b/x-pack/plugins/reporting/server/lib/store/store.ts new file mode 100644 index 000000000000..1cb964a7bbfa --- /dev/null +++ b/x-pack/plugins/reporting/server/lib/store/store.ts @@ -0,0 +1,169 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { ElasticsearchServiceSetup } from 'src/core/server'; +import { LevelLogger } from '../'; +import { ReportingCore } from '../../'; +import { LayoutInstance } from '../../export_types/common/layouts'; +import { indexTimestamp } from './index_timestamp'; +import { mapping } from './mapping'; +import { Report } from './report'; + +export const statuses = { + JOB_STATUS_PENDING: 'pending', + JOB_STATUS_PROCESSING: 'processing', + JOB_STATUS_COMPLETED: 'completed', + JOB_STATUS_WARNINGS: 'completed_with_warnings', + JOB_STATUS_FAILED: 'failed', + JOB_STATUS_CANCELLED: 'cancelled', +}; + +interface AddReportOpts { + timeout: number; + created_by: string | boolean; + browser_type: string; + max_attempts: number; +} + +interface UpdateQuery { + index: string; + id: string; + if_seq_no: unknown; + if_primary_term: unknown; + body: { doc: Partial }; +} + +/* + * A class to give an interface to historical reports in the reporting.index + * - track the state: pending, processing, completed, etc + * - handle updates and deletes to the reporting document + * - interface for downloading the report + */ +export class ReportingStore { + public readonly indexPrefix: string; + public readonly indexInterval: string; + + private client: ElasticsearchServiceSetup['legacy']['client']; + private logger: LevelLogger; + + constructor(reporting: ReportingCore, logger: LevelLogger) { + const config = reporting.getConfig(); + const elasticsearch = reporting.getElasticsearchService(); + + this.client = elasticsearch.legacy.client; + this.indexPrefix = config.get('index'); + this.indexInterval = config.get('queue', 'indexInterval'); + + this.logger = logger; + } + + private async createIndex(indexName: string) { + return this.client + .callAsInternalUser('indices.exists', { + index: indexName, + }) + .then((exists) => { + if (exists) { + return exists; + } + + const indexSettings = { + number_of_shards: 1, + auto_expand_replicas: '0-1', + }; + const body = { + settings: indexSettings, + mappings: { + properties: mapping, + }, + }; + + return this.client + .callAsInternalUser('indices.create', { + index: indexName, + body, + }) + .then(() => true) + .catch((err: Error) => { + const isIndexExistsError = err.message.match(/resource_already_exists_exception/); + if (isIndexExistsError) { + // Do not fail a job if the job runner hits the race condition. + this.logger.warn(`Automatic index creation failed: index already exists: ${err}`); + return; + } + + throw err; + }); + }); + } + + private async saveReport(report: Report) { + const payload = report.payload as { objectType: string; layout: LayoutInstance }; + + const indexParams = { + index: report._index, + id: report.id, + body: { + jobtype: report.jobtype, + meta: { + // We are copying these values out of payload because these fields are indexed and can be aggregated on + // for tracking stats, while payload contents are not. + objectType: payload.objectType, + layout: payload.layout ? payload.layout.id : 'none', + }, + payload: report.payload, + created_by: report.created_by, + timeout: report.timeout, + process_expiration: new Date(0), // use epoch so the job query works + created_at: new Date(), + attempts: 0, + max_attempts: report.max_attempts, + status: statuses.JOB_STATUS_PENDING, + browser_type: report.browser_type, + }, + }; + return this.client.callAsInternalUser('index', indexParams); + } + + private async refreshIndex(index: string) { + return this.client.callAsInternalUser('indices.refresh', { index }); + } + + public async addReport(type: string, payload: unknown, options: AddReportOpts): Promise { + const timestamp = indexTimestamp(this.indexInterval); + const index = `${this.indexPrefix}-${timestamp}`; + await this.createIndex(index); + + const report = new Report({ + index, + payload, + jobtype: type, + created_by: options.created_by, + browser_type: options.browser_type, + max_attempts: options.max_attempts, + timeout: options.timeout, + priority: 10, // unused + }); + + const doc = await this.saveReport(report); + report.updateWithDoc(doc); + + await this.refreshIndex(index); + this.logger.info(`Successfully queued pending job: ${report._index}/${report.id}`); + + return report; + } + + public async updateReport(query: UpdateQuery): Promise { + return this.client.callAsInternalUser('update', { + index: query.index, + id: query.id, + if_seq_no: query.if_seq_no, + if_primary_term: query.if_primary_term, + body: { doc: query.body.doc }, + }); + } +} diff --git a/x-pack/plugins/reporting/server/plugin.ts b/x-pack/plugins/reporting/server/plugin.ts index 693b0917603f..cedc9dc14a23 100644 --- a/x-pack/plugins/reporting/server/plugin.ts +++ b/x-pack/plugins/reporting/server/plugin.ts @@ -8,7 +8,13 @@ import { CoreSetup, CoreStart, Plugin, PluginInitializerContext } from 'src/core import { ReportingCore } from './'; import { initializeBrowserDriverFactory } from './browsers'; import { buildConfig, ReportingConfigType } from './config'; -import { createQueueFactory, enqueueJobFactory, LevelLogger, runValidations } from './lib'; +import { + createQueueFactory, + enqueueJobFactory, + LevelLogger, + runValidations, + ReportingStore, +} from './lib'; import { registerRoutes } from './routes'; import { setFieldFormats } from './services'; import { ReportingSetup, ReportingSetupDeps, ReportingStart, ReportingStartDeps } from './types'; @@ -86,9 +92,9 @@ export class ReportingPlugin const config = reportingCore.getConfig(); const browserDriverFactory = await initializeBrowserDriverFactory(config, logger); - - const esqueue = await createQueueFactory(reportingCore, logger); // starts polling for pending jobs - const enqueueJob = enqueueJobFactory(reportingCore, logger); // called from generation routes + const store = new ReportingStore(reportingCore, logger); + const esqueue = await createQueueFactory(reportingCore, store, logger); // starts polling for pending jobs + const enqueueJob = enqueueJobFactory(reportingCore, store, logger); // called from generation routes reportingCore.pluginStart({ browserDriverFactory, @@ -96,6 +102,7 @@ export class ReportingPlugin uiSettings: core.uiSettings, esqueue, enqueueJob, + store, }); // run self-check validations diff --git a/x-pack/plugins/reporting/server/test_helpers/create_mock_levellogger.ts b/x-pack/plugins/reporting/server/test_helpers/create_mock_levellogger.ts new file mode 100644 index 000000000000..f5e9a44281cb --- /dev/null +++ b/x-pack/plugins/reporting/server/test_helpers/create_mock_levellogger.ts @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { LevelLogger } from '../lib'; + +export function createMockLevelLogger() { + // eslint-disable-next-line no-console + const consoleLogger = (tag: string) => (message: unknown) => console.log(tag, message); + const innerLogger = { + get: () => innerLogger, + debug: consoleLogger('debug'), + info: consoleLogger('info'), + warn: consoleLogger('warn'), + trace: consoleLogger('trace'), + error: consoleLogger('error'), + fatal: consoleLogger('fatal'), + log: consoleLogger('log'), + }; + return new LevelLogger(innerLogger); +} diff --git a/x-pack/plugins/reporting/server/test_helpers/create_mock_reportingplugin.ts b/x-pack/plugins/reporting/server/test_helpers/create_mock_reportingplugin.ts index 579035a46f61..427a6362a725 100644 --- a/x-pack/plugins/reporting/server/test_helpers/create_mock_reportingplugin.ts +++ b/x-pack/plugins/reporting/server/test_helpers/create_mock_reportingplugin.ts @@ -20,6 +20,8 @@ import { } from '../browsers'; import { ReportingInternalSetup, ReportingInternalStart } from '../core'; import { ReportingStartDeps } from '../types'; +import { ReportingStore } from '../lib'; +import { createMockLevelLogger } from './create_mock_levellogger'; (initializeBrowserDriverFactory as jest.Mock< Promise @@ -37,13 +39,19 @@ const createMockPluginSetup = (setupMock?: any): ReportingInternalSetup => { }; }; -const createMockPluginStart = (startMock?: any): ReportingInternalStart => { +const createMockPluginStart = ( + mockReportingCore: ReportingCore, + startMock?: any +): ReportingInternalStart => { + const logger = createMockLevelLogger(); + const store = new ReportingStore(mockReportingCore, logger); return { browserDriverFactory: startMock.browserDriverFactory, enqueueJob: startMock.enqueueJob, esqueue: startMock.esqueue, savedObjects: startMock.savedObjects || { getScopedClient: jest.fn() }, uiSettings: startMock.uiSettings || { asScopedToClient: () => ({ get: jest.fn() }) }, + store, }; }; @@ -60,9 +68,22 @@ export const createMockStartDeps = (startMock?: any): ReportingStartDeps => ({ export const createMockReportingCore = async ( config: ReportingConfig, - setupDepsMock: ReportingInternalSetup | undefined = createMockPluginSetup({}), - startDepsMock: ReportingInternalStart | undefined = createMockPluginStart({}) + setupDepsMock: ReportingInternalSetup | undefined = undefined, + startDepsMock: ReportingInternalStart | undefined = undefined ) => { + if (!setupDepsMock) { + setupDepsMock = createMockPluginSetup({}); + } + + const mockReportingCore = { + getConfig: () => config, + getElasticsearchService: () => setupDepsMock?.elasticsearch, + } as ReportingCore; + + if (!startDepsMock) { + startDepsMock = createMockPluginStart(mockReportingCore, {}); + } + config = config || {}; const core = new ReportingCore(); diff --git a/x-pack/test/reporting_api_integration/services.ts b/x-pack/test/reporting_api_integration/services.ts index dadb466d4598..85f5a98c69b2 100644 --- a/x-pack/test/reporting_api_integration/services.ts +++ b/x-pack/test/reporting_api_integration/services.ts @@ -7,8 +7,7 @@ import expect from '@kbn/expect'; import * as Rx from 'rxjs'; import { filter, first, mapTo, switchMap, timeout } from 'rxjs/operators'; -// @ts-ignore no module definition -import { indexTimestamp } from '../../plugins/reporting/server/lib/esqueue/helpers/index_timestamp'; +import { indexTimestamp } from '../../plugins/reporting/server/lib/store/index_timestamp'; import { services as xpackServices } from '../functional/services'; import { FtrProviderContext } from './ftr_provider_context';