diff --git a/docs/management/action-types.asciidoc b/docs/management/action-types.asciidoc index 3d3d7aeb2d777..92adbaf97d8c5 100644 --- a/docs/management/action-types.asciidoc +++ b/docs/management/action-types.asciidoc @@ -135,5 +135,14 @@ image::images/connectors-with-missing-secrets.png[Connectors with missing secret For out-of-the-box and standardized connectors, you can <> before {kib} starts. +[float] +[[montoring-connectors]] +=== Monitoring connectors + +The <> helps you understand the performance of all tasks in your environment. +However, if connectors fail to execute, they will report as successful to Task Manager. The failure stats will not +accurately depict the performance of connectors. + +For more information on connector successes and failures, refer to the <>. include::connectors/index.asciidoc[] diff --git a/docs/user/production-considerations/task-manager-health-monitoring.asciidoc b/docs/user/production-considerations/task-manager-health-monitoring.asciidoc index 3321a9d0c02a1..b07a01906b895 100644 --- a/docs/user/production-considerations/task-manager-health-monitoring.asciidoc +++ b/docs/user/production-considerations/task-manager-health-monitoring.asciidoc @@ -111,6 +111,7 @@ a| Runtime | This section tracks excution performance of Task Manager, tracking task _drift_, worker _load_, and execution stats broken down by type, including duration and execution results. + a| Capacity Estimation | This section provides a rough estimate about the sufficiency of its capacity. As the name suggests, these are estimates based on historical data and should not be used as predictions. Use these estimations when following the Task Manager <>. @@ -123,6 +124,14 @@ The root `status` indicates the `status` of the system overall. The Runtime `status` indicates whether task executions have exceeded any of the <>. An `OK` status means none of the threshold have been exceeded. A `Warning` status means that at least one warning threshold has been exceeded. An `Error` status means that at least one error threshold has been exceeded. +[IMPORTANT] +============================================== +Some tasks (such as <>) will incorrectly report their status as successful even if the task failed. +The runtime and workload block will return data about success and failures and will not take this into consideration. + +To get a better sense of action failures, please refer to the <> for more accurate context into failures and successes. +============================================== + The Capacity Estimation `status` indicates the sufficiency of the observed capacity. An `OK` status means capacity is sufficient. A `Warning` status means that capacity is sufficient for the scheduled recurring tasks, but non-recurring tasks often cause the cluster to exceed capacity. An `Error` status means that there is insufficient capacity across all types of tasks. By monitoring the `status` of the system overall, and the `status` of specific task types of interest, you can evaluate the health of the {kib} Task Management system. diff --git a/x-pack/plugins/actions/server/action_type_registry.ts b/x-pack/plugins/actions/server/action_type_registry.ts index e5846560a6c98..76b360ce8b17f 100644 --- a/x-pack/plugins/actions/server/action_type_registry.ts +++ b/x-pack/plugins/actions/server/action_type_registry.ts @@ -134,7 +134,8 @@ export class ActionTypeRegistry { // Don't retry other kinds of errors return false; }, - createTaskRunner: (context: RunContext) => this.taskRunnerFactory.create(context), + createTaskRunner: (context: RunContext) => + this.taskRunnerFactory.create(context, actionType.maxAttempts), }, }); // No need to notify usage on basic action types diff --git a/x-pack/plugins/actions/server/lib/action_executor.test.ts b/x-pack/plugins/actions/server/lib/action_executor.test.ts index 440de161490aa..ba7f750859d40 100644 --- a/x-pack/plugins/actions/server/lib/action_executor.test.ts +++ b/x-pack/plugins/actions/server/lib/action_executor.test.ts @@ -187,10 +187,12 @@ test('successfully executes as a task', async () => { const scheduleDelay = 10000; // milliseconds const scheduled = new Date(Date.now() - scheduleDelay); + const attempts = 1; await actionExecutor.execute({ ...executeParams, taskInfo: { scheduled, + attempts, }, }); diff --git a/x-pack/plugins/actions/server/lib/action_executor.ts b/x-pack/plugins/actions/server/lib/action_executor.ts index 5dfe56cff5016..d265bca237c3b 100644 --- a/x-pack/plugins/actions/server/lib/action_executor.ts +++ b/x-pack/plugins/actions/server/lib/action_executor.ts @@ -44,6 +44,7 @@ export interface ActionExecutorContext { export interface TaskInfo { scheduled: Date; + attempts: number; } export interface ExecuteOptions { @@ -210,6 +211,7 @@ export class ActionExecutor { config: validatedConfig, secrets: validatedSecrets, isEphemeral, + taskInfo, }); } catch (err) { rawResult = { diff --git a/x-pack/plugins/actions/server/lib/task_runner_factory.test.ts b/x-pack/plugins/actions/server/lib/task_runner_factory.test.ts index cff92f874e0ef..85d819ba09b8a 100644 --- a/x-pack/plugins/actions/server/lib/task_runner_factory.test.ts +++ b/x-pack/plugins/actions/server/lib/task_runner_factory.test.ts @@ -136,6 +136,7 @@ test('executes the task by calling the executor with proper parameters, using gi }), taskInfo: { scheduled: new Date(), + attempts: 0, }, }); @@ -191,6 +192,7 @@ test('executes the task by calling the executor with proper parameters, using st }), taskInfo: { scheduled: new Date(), + attempts: 0, }, }); @@ -341,6 +343,7 @@ test('uses API key when provided', async () => { }), taskInfo: { scheduled: new Date(), + attempts: 0, }, }); @@ -401,6 +404,7 @@ test('uses relatedSavedObjects merged with references when provided', async () = }), taskInfo: { scheduled: new Date(), + attempts: 0, }, }); }); @@ -451,6 +455,7 @@ test('uses relatedSavedObjects as is when references are empty', async () => { }), taskInfo: { scheduled: new Date(), + attempts: 0, }, }); }); @@ -499,6 +504,7 @@ test('sanitizes invalid relatedSavedObjects when provided', async () => { relatedSavedObjects: [], taskInfo: { scheduled: new Date(), + attempts: 0, }, }); }); @@ -538,6 +544,7 @@ test(`doesn't use API key when not provided`, async () => { }), taskInfo: { scheduled: new Date(), + attempts: 0, }, }); @@ -549,9 +556,15 @@ test(`doesn't use API key when not provided`, async () => { }); test(`throws an error when license doesn't support the action type`, async () => { - const taskRunner = taskRunnerFactory.create({ - taskInstance: mockedTaskInstance, - }); + const taskRunner = taskRunnerFactory.create( + { + taskInstance: { + ...mockedTaskInstance, + attempts: 1, + }, + }, + 2 + ); mockedEncryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({ id: '3', @@ -579,6 +592,138 @@ test(`throws an error when license doesn't support the action type`, async () => } catch (e) { expect(e instanceof ExecutorError).toEqual(true); expect(e.data).toEqual({}); - expect(e.retry).toEqual(false); + expect(e.retry).toEqual(true); } }); + +test(`treats errors as errors if the task is retryable`, async () => { + const taskRunner = taskRunnerFactory.create({ + taskInstance: { + ...mockedTaskInstance, + attempts: 0, + }, + }); + + mockedEncryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({ + id: '3', + type: 'action_task_params', + attributes: { + actionId: '2', + params: { baz: true }, + apiKey: Buffer.from('123:abc').toString('base64'), + }, + references: [ + { + id: '2', + name: 'actionRef', + type: 'action', + }, + ], + }); + mockedActionExecutor.execute.mockResolvedValueOnce({ + status: 'error', + actionId: '2', + message: 'Error message', + data: { foo: true }, + retry: false, + }); + + let err; + try { + await taskRunner.run(); + } catch (e) { + err = e; + } + expect(err).toBeDefined(); + expect(err instanceof ExecutorError).toEqual(true); + expect(err.data).toEqual({ foo: true }); + expect(err.retry).toEqual(false); + expect(taskRunnerFactoryInitializerParams.logger.error as jest.Mock).toHaveBeenCalledWith( + `Action '2' failed and will not retry: Error message` + ); +}); + +test(`treats errors as successes if the task is not retryable`, async () => { + const taskRunner = taskRunnerFactory.create({ + taskInstance: { + ...mockedTaskInstance, + attempts: 1, + }, + }); + + mockedEncryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({ + id: '3', + type: 'action_task_params', + attributes: { + actionId: '2', + params: { baz: true }, + apiKey: Buffer.from('123:abc').toString('base64'), + }, + references: [ + { + id: '2', + name: 'actionRef', + type: 'action', + }, + ], + }); + mockedActionExecutor.execute.mockResolvedValueOnce({ + status: 'error', + actionId: '2', + message: 'Error message', + data: { foo: true }, + retry: false, + }); + + let err; + try { + await taskRunner.run(); + } catch (e) { + err = e; + } + expect(err).toBeUndefined(); + expect(taskRunnerFactoryInitializerParams.logger.error as jest.Mock).toHaveBeenCalledWith( + `Action '2' failed and will not retry: Error message` + ); +}); + +test('treats errors as errors if the error is thrown instead of returned', async () => { + const taskRunner = taskRunnerFactory.create({ + taskInstance: { + ...mockedTaskInstance, + attempts: 0, + }, + }); + + mockedEncryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({ + id: '3', + type: 'action_task_params', + attributes: { + actionId: '2', + params: { baz: true }, + apiKey: Buffer.from('123:abc').toString('base64'), + }, + references: [ + { + id: '2', + name: 'actionRef', + type: 'action', + }, + ], + }); + mockedActionExecutor.execute.mockRejectedValueOnce({}); + + let err; + try { + await taskRunner.run(); + } catch (e) { + err = e; + } + expect(err).toBeDefined(); + expect(err instanceof ExecutorError).toEqual(true); + expect(err.data).toEqual({}); + expect(err.retry).toEqual(true); + expect(taskRunnerFactoryInitializerParams.logger.error as jest.Mock).toHaveBeenCalledWith( + `Action '2' failed and will retry: undefined` + ); +}); diff --git a/x-pack/plugins/actions/server/lib/task_runner_factory.ts b/x-pack/plugins/actions/server/lib/task_runner_factory.ts index 45ae6c1d5fae9..9a3856bbf7cee 100644 --- a/x-pack/plugins/actions/server/lib/task_runner_factory.ts +++ b/x-pack/plugins/actions/server/lib/task_runner_factory.ts @@ -22,7 +22,6 @@ import { ActionExecutorContract } from './action_executor'; import { ExecutorError } from './executor_error'; import { RunContext } from '../../../task_manager/server'; import { EncryptedSavedObjectsClient } from '../../../encrypted_saved_objects/server'; -import { ActionTypeDisabledError } from './errors'; import { ActionTaskParams, ActionTypeRegistryContract, @@ -62,7 +61,7 @@ export class TaskRunnerFactory { this.taskRunnerContext = taskRunnerContext; } - public create({ taskInstance }: RunContext) { + public create({ taskInstance }: RunContext, maxAttempts: number = 1) { if (!this.isInitialized) { throw new Error('TaskRunnerFactory not initialized'); } @@ -78,6 +77,7 @@ export class TaskRunnerFactory { const taskInfo = { scheduled: taskInstance.runAt, + attempts: taskInstance.attempts, }; return { @@ -119,7 +119,14 @@ export class TaskRunnerFactory { basePathService.set(fakeRequest, path); - let executorResult: ActionTypeExecutorResult; + // Throwing an executor error means we will attempt to retry the task + // TM will treat a task as a failure if `attempts >= maxAttempts` + // so we need to handle that here to avoid TM persisting the failed task + const isRetryableBasedOnAttempts = taskInfo.attempts < (maxAttempts ?? 1); + const willRetryMessage = `and will retry`; + const willNotRetryMessage = `and will not retry`; + + let executorResult: ActionTypeExecutorResult | undefined; try { executorResult = await actionExecutor.execute({ params, @@ -131,20 +138,39 @@ export class TaskRunnerFactory { relatedSavedObjects: validatedRelatedSavedObjects(logger, relatedSavedObjects), }); } catch (e) { - if (e instanceof ActionTypeDisabledError) { - // We'll stop re-trying due to action being forbidden - throw new ExecutorError(e.message, {}, false); + logger.error( + `Action '${actionId}' failed ${ + isRetryableBasedOnAttempts ? willRetryMessage : willNotRetryMessage + }: ${e.message}` + ); + if (isRetryableBasedOnAttempts) { + // In order for retry to work, we need to indicate to task manager this task + // failed + throw new ExecutorError(e.message, {}, true); } - throw e; } - if (executorResult.status === 'error') { + if ( + executorResult && + executorResult?.status === 'error' && + executorResult?.retry !== undefined && + isRetryableBasedOnAttempts + ) { + logger.error( + `Action '${actionId}' failed ${ + !!executorResult.retry ? willRetryMessage : willNotRetryMessage + }: ${executorResult.message}` + ); // Task manager error handler only kicks in when an error thrown (at this time) // So what we have to do is throw when the return status is `error`. throw new ExecutorError( executorResult.message, executorResult.data, - executorResult.retry == null ? false : executorResult.retry + executorResult.retry as boolean | Date + ); + } else if (executorResult && executorResult?.status === 'error') { + logger.error( + `Action '${actionId}' failed ${willNotRetryMessage}: ${executorResult.message}` ); } diff --git a/x-pack/plugins/actions/server/types.ts b/x-pack/plugins/actions/server/types.ts index 14e9e120a853a..64250ca77fba4 100644 --- a/x-pack/plugins/actions/server/types.ts +++ b/x-pack/plugins/actions/server/types.ts @@ -19,6 +19,7 @@ import { SavedObjectReference, } from '../../../../src/core/server'; import { ActionTypeExecutorResult } from '../common'; +import { TaskInfo } from './lib/action_executor'; export { ActionTypeExecutorResult } from '../common'; export { GetFieldsByIssueTypeResponse as JiraGetFieldsResponse } from './builtin_action_types/jira/types'; export { GetCommonFieldsResponse as ServiceNowGetFieldsResponse } from './builtin_action_types/servicenow/types'; @@ -59,6 +60,7 @@ export interface ActionTypeExecutorOptions { secrets: Secrets; params: Params; isEphemeral?: boolean; + taskInfo?: TaskInfo; } export interface ActionResult { diff --git a/x-pack/test/alerting_api_integration/common/config.ts b/x-pack/test/alerting_api_integration/common/config.ts index dd43606cc79b7..d9383306e1dc6 100644 --- a/x-pack/test/alerting_api_integration/common/config.ts +++ b/x-pack/test/alerting_api_integration/common/config.ts @@ -44,6 +44,7 @@ const enabledActionTypes = [ 'test.noop', 'test.delayed', 'test.rate-limit', + 'test.no-attempts-rate-limit', 'test.throw', ]; diff --git a/x-pack/test/alerting_api_integration/common/fixtures/plugins/alerts/server/action_types.ts b/x-pack/test/alerting_api_integration/common/fixtures/plugins/alerts/server/action_types.ts index a848207bf1b70..e7e48a0938084 100644 --- a/x-pack/test/alerting_api_integration/common/fixtures/plugins/alerts/server/action_types.ts +++ b/x-pack/test/alerting_api_integration/common/fixtures/plugins/alerts/server/action_types.ts @@ -37,6 +37,7 @@ export function defineActionTypes( actions.registerType(getDelayedActionType()); actions.registerType(getFailingActionType()); actions.registerType(getRateLimitedActionType()); + actions.registerType(getNoAttemptsRateLimitedActionType()); actions.registerType(getAuthorizationActionType(core)); } @@ -183,6 +184,42 @@ function getRateLimitedActionType() { return result; } +function getNoAttemptsRateLimitedActionType() { + const paramsSchema = schema.object({ + index: schema.string(), + reference: schema.string(), + retryAt: schema.number(), + }); + type ParamsType = TypeOf; + const result: ActionType<{}, {}, ParamsType> = { + id: 'test.no-attempts-rate-limit', + name: 'Test: Rate Limit', + minimumLicenseRequired: 'gold', + maxAttempts: 0, + validate: { + params: paramsSchema, + }, + async executor({ config, params, services }) { + await services.scopedClusterClient.index({ + index: params.index, + refresh: 'wait_for', + body: { + params, + config, + reference: params.reference, + source: 'action:test.rate-limit', + }, + }); + return { + status: 'error', + retry: new Date(params.retryAt), + actionId: '', + }; + }, + }; + return result; +} + function getAuthorizationActionType(core: CoreSetup) { const paramsSchema = schema.object({ callClusterAuthorizationIndex: schema.string(), diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/actions/enqueue.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/actions/enqueue.ts index f937e63840937..533570ae4c16d 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/actions/enqueue.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/actions/enqueue.ts @@ -97,21 +97,8 @@ export default function ({ getService }: FtrProviderContext) { }, }) .expect(204); - await esTestIndexTool.waitForDocs('action:test.failing', reference, 1); - await supertest - .put( - `${getUrlPrefix( - Spaces.space1.id - )}/api/alerts_fixture/Actions-cleanup_failed_action_executions/reschedule_task` - ) - .set('kbn-xsrf', 'foo') - .send({ - runAt: new Date().toISOString(), - }) - .expect(200); - await retry.try(async () => { const searchResult = await es.search({ index: '.kibana_task_manager', @@ -139,5 +126,81 @@ export default function ({ getService }: FtrProviderContext) { expect((searchResult.body.hits.total as estypes.SearchTotalHits).value).to.eql(0); }); }); + + it('should never leaved a failed task, even if max attempts is reached', async () => { + // We have to provide the test.rate-limit the next runAt, for testing purposes + const retryDate = new Date(Date.now() + 1); + const { body: createdAction } = await supertest + .post(`${getUrlPrefix(Spaces.space1.id)}/api/actions/connector`) + .set('kbn-xsrf', 'foo') + .send({ + name: 'My action', + connector_type_id: 'test.no-attempts-rate-limit', + config: {}, + secrets: {}, + }) + .expect(200); + objectRemover.add(Spaces.space1.id, createdAction.id, 'action', 'actions'); + + const reference = `actions-enqueue-2:${Spaces.space1.id}:${createdAction.id}`; + await supertest + .post( + `${getUrlPrefix(Spaces.space1.id)}/api/alerts_fixture/${createdAction.id}/enqueue_action` + ) + .set('kbn-xsrf', 'foo') + .send({ + params: { + reference, + index: ES_TEST_INDEX_NAME, + retryAt: retryDate.getTime(), + }, + }) + .expect(204); + + await retry.try(async () => { + const runningSearchResult = await es.search({ + index: '.kibana_task_manager', + body: { + query: { + bool: { + must: [ + { + term: { + 'task.taskType': 'actions:test.no-attempts-rate-limit', + }, + }, + { + term: { + 'task.status': 'running', + }, + }, + ], + }, + }, + }, + }); + expect((runningSearchResult.body.hits.total as estypes.SearchTotalHits).value).to.eql(1); + }); + + await retry.try(async () => { + const searchResult = await es.search({ + index: '.kibana_task_manager', + body: { + query: { + bool: { + must: [ + { + term: { + 'task.taskType': 'actions:test.no-attempts-rate-limit', + }, + }, + ], + }, + }, + }, + }); + expect((searchResult.body.hits.total as estypes.SearchTotalHits).value).to.eql(0); + }); + }); }); }