From 687987aa9ce56ce359f722485330179a4807d79a Mon Sep 17 00:00:00 2001 From: Julia Bardi <90178898+juliaElastic@users.noreply.github.com> Date: Tue, 20 Dec 2022 10:36:36 +0100 Subject: [PATCH] [Fleet] refactored bulk update tags retry (#147594) ## Summary Fixes https://github.com/elastic/kibana/issues/144161 As discussed [here](https://github.com/elastic/kibana/issues/144161#issuecomment-1348668610), the existing implementation of update tags doesn't work well with real agents, as there are many conflicts with checkin, even when trying to add/remove one tag. Refactored the logic to make retries more efficient: - Instead of aborting the whole bulk action on conflicts, changed the conflict strategy to 'proceed'. This means, if an action of 50k agents has 1k conflicts, not all 50k is retried, but only the 1k conflicts, this makes it less likely to conflict on retry. - Because of this, on retry we have to know which agents don't yet have the tag added/removed. For this, added an additional filter to the `updateByQuery` request. Only adding the filter if there is exactly one `tagsToAdd` or one `tagsToRemove`. This is the main use case from the UI, and handling other cases would complicate the logic more (each additional tag to add/remove would result in another OR query, which would match more agents, making conflicts more likely). - Added this additional query on the initial request as well (not only retries) to save on unnecessary work e.g. if the user tries to add a tag on 50k agents, but 48k already have it, it is enough to update the remaining 2k agents. - This improvement has the effect that 'Agent activity' shows the real updated agent count, not the total selected. I think this is not really a problem for update tags. - Cleaned up some of the UI logic, because the conflicts are fully handled now on the backend. - Locally I couldn't reproduce the conflict with agent checkins, even with 1k horde agents. I'll try to test in cloud with more real agents. To verify: - Enroll 50k agents (I used 50k with create_agents script, and 1k with horde). Enroll 50k with horde if possible. - Select all on UI and try to add/remove one or more tags - Expect the changes to propagate quickly (up to 1m). It might take a few refreshes to see the result on agent list and tags list, because the UI polls the agents every 30s. It is expected that the tags list temporarily shows incorrect data because the action is async. E.g. removed `test3` tag and added `add` tag quickly: image image The logs show the details of how many `version_conflicts` were there, and it decreased with retries. ``` [2022-12-15T10:32:12.937+01:00][INFO ][plugins.fleet] Running action asynchronously, actionId: 90acd541-19ac-4738-b3d3-db32789233de, total agents: 52000 [2022-12-15T10:32:12.981+01:00][INFO ][plugins.fleet] Scheduling task fleet:update_agent_tags:retry:check:90acd541-19ac-4738-b3d3-db32789233de [2022-12-15T10:32:16.477+01:00][INFO ][plugins.fleet] Running action asynchronously, actionId: 29e9da70-7194-4e52-8004-2c1b19f6dfd5, total agents: 52000 [2022-12-15T10:32:16.537+01:00][INFO ][plugins.fleet] Scheduling task fleet:update_agent_tags:retry:check:29e9da70-7194-4e52-8004-2c1b19f6dfd5 [2022-12-15T10:32:22.893+01:00][DEBUG][plugins.fleet] {"took":9886,"timed_out":false,"total":52000,"updated":41143,"deleted":0,"batches":52,"version_conflicts":10857,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1,"throttled_until_millis":0,"failures":[]} [2022-12-15T10:32:26.066+01:00][DEBUG][plugins.fleet] {"took":9518,"timed_out":false,"total":52000,"updated":25755,"deleted":0,"batches":52,"version_conflicts":26245,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1,"throttled_until_millis":0,"failures":[]} [2022-12-15T10:32:27.401+01:00][ERROR][plugins.fleet] Action failed: version conflict of 10857 agents [2022-12-15T10:32:27.461+01:00][INFO ][plugins.fleet] Scheduling task fleet:update_agent_tags:retry:90acd541-19ac-4738-b3d3-db32789233de [2022-12-15T10:32:27.462+01:00][INFO ][plugins.fleet] Retrying in task: fleet:update_agent_tags:retry:90acd541-19ac-4738-b3d3-db32789233de [2022-12-15T10:32:29.274+01:00][ERROR][plugins.fleet] Action failed: version conflict of 26245 agents [2022-12-15T10:32:29.353+01:00][INFO ][plugins.fleet] Scheduling task fleet:update_agent_tags:retry:29e9da70-7194-4e52-8004-2c1b19f6dfd5 [2022-12-15T10:32:29.353+01:00][INFO ][plugins.fleet] Retrying in task: fleet:update_agent_tags:retry:29e9da70-7194-4e52-8004-2c1b19f6dfd5 [2022-12-15T10:32:31.480+01:00][INFO ][plugins.fleet] Running bulk action retry task [2022-12-15T10:32:31.481+01:00][DEBUG][plugins.fleet] Retry #1 of task fleet:update_agent_tags:retry:90acd541-19ac-4738-b3d3-db32789233de [2022-12-15T10:32:31.481+01:00][INFO ][plugins.fleet] Running action asynchronously, actionId: 90acd541-19ac-4738-b3d3-db32789233de, total agents: 52000 [2022-12-15T10:32:31.481+01:00][INFO ][plugins.fleet] Completed bulk action retry task [2022-12-15T10:32:31.485+01:00][INFO ][plugins.fleet] Scheduling task fleet:update_agent_tags:retry:check:90acd541-19ac-4738-b3d3-db32789233de [2022-12-15T10:32:33.841+01:00][DEBUG][plugins.fleet] {"took":2347,"timed_out":false,"total":10857,"updated":9857,"deleted":0,"batches":11,"version_conflicts":1000,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1,"throttled_until_millis":0,"failures":[]} [2022-12-15T10:32:34.556+01:00][INFO ][plugins.fleet] Running bulk action retry task [2022-12-15T10:32:34.557+01:00][DEBUG][plugins.fleet] Retry #1 of task fleet:update_agent_tags:retry:29e9da70-7194-4e52-8004-2c1b19f6dfd5 [2022-12-15T10:32:34.557+01:00][INFO ][plugins.fleet] Running action asynchronously, actionId: 29e9da70-7194-4e52-8004-2c1b19f6dfd5, total agents: 52000 [2022-12-15T10:32:34.557+01:00][INFO ][plugins.fleet] Completed bulk action retry task [2022-12-15T10:32:34.560+01:00][INFO ][plugins.fleet] Scheduling task fleet:update_agent_tags:retry:check:29e9da70-7194-4e52-8004-2c1b19f6dfd5 [2022-12-15T10:32:35.388+01:00][ERROR][plugins.fleet] Retry #1 of task fleet:update_agent_tags:retry:90acd541-19ac-4738-b3d3-db32789233de failed: version conflict of 1000 agents [2022-12-15T10:32:35.468+01:00][INFO ][plugins.fleet] Scheduling task fleet:update_agent_tags:retry:90acd541-19ac-4738-b3d3-db32789233de [2022-12-15T10:32:35.468+01:00][INFO ][plugins.fleet] Retrying in task: fleet:update_agent_tags:retry:90acd541-19ac-4738-b3d3-db32789233de {"took":5509,"timed_out":false,"total":26245,"updated":26245,"deleted":0,"batches":27,"version_conflicts":0,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1,"throttled_until_millis":0,"failures":[]} [2022-12-15T10:32:42.722+01:00][INFO ][plugins.fleet] processed 26245 agents, took 5509ms [2022-12-15T10:32:42.723+01:00][INFO ][plugins.fleet] Removing task fleet:update_agent_tags:retry:check:29e9da70-7194-4e52-8004-2c1b19f6dfd5 [2022-12-15T10:32:46.705+01:00][INFO ][plugins.fleet] Running bulk action retry task [2022-12-15T10:32:46.706+01:00][DEBUG][plugins.fleet] Retry #2 of task fleet:update_agent_tags:retry:90acd541-19ac-4738-b3d3-db32789233de [2022-12-15T10:32:46.707+01:00][INFO ][plugins.fleet] Running action asynchronously, actionId: 90acd541-19ac-4738-b3d3-db32789233de, total agents: 52000 [2022-12-15T10:32:46.707+01:00][INFO ][plugins.fleet] Completed bulk action retry task [2022-12-15T10:32:46.711+01:00][INFO ][plugins.fleet] Scheduling task fleet:update_agent_tags:retry:check:90acd541-19ac-4738-b3d3-db32789233de [2022-12-15T10:32:47.099+01:00][DEBUG][plugins.fleet] {"took":379,"timed_out":false,"total":1000,"updated":1000,"deleted":0,"batches":1,"version_conflicts":0,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1,"throttled_until_millis":0,"failures":[]} [2022-12-15T10:32:47.623+01:00][INFO ][plugins.fleet] processed 1000 agents, took 379ms [2022-12-15T10:32:47.623+01:00][INFO ][plugins.fleet] Removing task fleet:update_agent_tags:retry:check:90acd541-19ac-4738-b3d3-db32789233de ``` ### Checklist - [x] [Unit or functional tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html) were updated or added to match the most common scenarios Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> --- .../components/tags_add_remove.test.tsx | 22 ++- .../components/tags_add_remove.tsx | 26 +-- .../server/services/agents/action_runner.ts | 6 +- .../server/services/agents/action_status.ts | 15 +- .../fleet/server/services/agents/crud.ts | 5 +- .../services/agents/update_agent_tags.test.ts | 153 +++++++++++++++++- .../services/agents/update_agent_tags.ts | 41 ++--- .../agents/update_agent_tags_action_runner.ts | 70 ++++---- .../apis/agents/update_agent_tags.ts | 91 ++++++----- 9 files changed, 284 insertions(+), 145 deletions(-) diff --git a/x-pack/plugins/fleet/public/applications/fleet/sections/agents/agent_list_page/components/tags_add_remove.test.tsx b/x-pack/plugins/fleet/public/applications/fleet/sections/agents/agent_list_page/components/tags_add_remove.test.tsx index 465db5236338c..4f4e7e24097f4 100644 --- a/x-pack/plugins/fleet/public/applications/fleet/sections/agents/agent_list_page/components/tags_add_remove.test.tsx +++ b/x-pack/plugins/fleet/public/applications/fleet/sections/agents/agent_list_page/components/tags_add_remove.test.tsx @@ -292,7 +292,16 @@ describe('TagsAddRemove', () => { expect(mockBulkUpdateTags).toHaveBeenCalledWith( 'query', - ['newTag2', 'newTag'], + ['newTag'], + [], + expect.anything(), + 'Tag created', + 'Tag creation failed' + ); + + expect(mockBulkUpdateTags).toHaveBeenCalledWith( + 'query', + ['newTag2'], [], expect.anything(), 'Tag created', @@ -316,7 +325,16 @@ describe('TagsAddRemove', () => { expect(mockBulkUpdateTags).toHaveBeenCalledWith( '', [], - ['tag2', 'tag1'], + ['tag1'], + expect.anything(), + undefined, + undefined + ); + + expect(mockBulkUpdateTags).toHaveBeenCalledWith( + '', + [], + ['tag2'], expect.anything(), undefined, undefined diff --git a/x-pack/plugins/fleet/public/applications/fleet/sections/agents/agent_list_page/components/tags_add_remove.tsx b/x-pack/plugins/fleet/public/applications/fleet/sections/agents/agent_list_page/components/tags_add_remove.tsx index 8307bc3467cc2..8e539954e5204 100644 --- a/x-pack/plugins/fleet/public/applications/fleet/sections/agents/agent_list_page/components/tags_add_remove.tsx +++ b/x-pack/plugins/fleet/public/applications/fleet/sections/agents/agent_list_page/components/tags_add_remove.tsx @@ -120,32 +120,10 @@ export const TagsAddRemove: React.FC = ({ errorMessage ); } else { - // sending updated tags to add/remove, in case multiple actions are done quickly and the first one is not yet propagated - const updatedTagsToAdd = tagsToAdd.concat( - labels - .filter( - (tag) => - tag.checked === 'on' && - !selectedTags.includes(tag.label) && - !tagsToRemove.includes(tag.label) - ) - .map((tag) => tag.label) - ); - const updatedTagsToRemove = tagsToRemove.concat( - labels - .filter( - (tag) => - tag.checked !== 'on' && - selectedTags.includes(tag.label) && - !tagsToAdd.includes(tag.label) - ) - .map((tag) => tag.label) - ); - updateTagsHook.bulkUpdateTags( agents!, - updatedTagsToAdd, - updatedTagsToRemove, + tagsToAdd, + tagsToRemove, (hasCompleted) => handleTagsUpdated(tagsToAdd, tagsToRemove, hasCompleted), successMessage, errorMessage diff --git a/x-pack/plugins/fleet/server/services/agents/action_runner.ts b/x-pack/plugins/fleet/server/services/agents/action_runner.ts index 18af331980238..83b61a340bfed 100644 --- a/x-pack/plugins/fleet/server/services/agents/action_runner.ts +++ b/x-pack/plugins/fleet/server/services/agents/action_runner.ts @@ -22,6 +22,8 @@ import { getAgentActions } from './actions'; import { closePointInTime, getAgentsByKuery } from './crud'; import type { BulkActionsResolver } from './bulk_actions_resolver'; +export const MAX_RETRY_COUNT = 3; + export interface ActionParams { kuery: string; showInactive?: boolean; @@ -110,8 +112,8 @@ export abstract class ActionRunner { `Retry #${this.retryParams.retryCount} of task ${this.retryParams.taskId} failed: ${error.message}` ); - if (this.retryParams.retryCount === 3) { - const errorMessage = 'Stopping after 3rd retry. Error: ' + error.message; + if (this.retryParams.retryCount === MAX_RETRY_COUNT) { + const errorMessage = `Stopping after ${MAX_RETRY_COUNT}rd retry. Error: ${error.message}`; appContextService.getLogger().warn(errorMessage); // clean up tasks after 3rd retry reached diff --git a/x-pack/plugins/fleet/server/services/agents/action_status.ts b/x-pack/plugins/fleet/server/services/agents/action_status.ts index 5c6753425cbc7..c36e13a4441ca 100644 --- a/x-pack/plugins/fleet/server/services/agents/action_status.ts +++ b/x-pack/plugins/fleet/server/services/agents/action_status.ts @@ -69,12 +69,15 @@ export async function getActionStatuses( const nbAgentsActioned = action.nbAgentsActioned || action.nbAgentsActionCreated; const cardinalityCount = (matchingBucket?.agent_count as any)?.value ?? 0; const docCount = matchingBucket?.doc_count ?? 0; - const nbAgentsAck = Math.min( - docCount, - // only using cardinality count when count lower than precision threshold - docCount > PRECISION_THRESHOLD ? docCount : cardinalityCount, - nbAgentsActioned - ); + const nbAgentsAck = + action.type === 'UPDATE_TAGS' + ? Math.min(docCount, nbAgentsActioned) + : Math.min( + docCount, + // only using cardinality count when count lower than precision threshold + docCount > PRECISION_THRESHOLD ? docCount : cardinalityCount, + nbAgentsActioned + ); const completionTime = (matchingBucket?.max_timestamp as any)?.value_as_string; const complete = nbAgentsAck >= nbAgentsActioned; const cancelledAction = cancelledActions.find((a) => a.actionId === action.actionId); diff --git a/x-pack/plugins/fleet/server/services/agents/crud.ts b/x-pack/plugins/fleet/server/services/agents/crud.ts index 97efef4f226c5..c3ad82518625f 100644 --- a/x-pack/plugins/fleet/server/services/agents/crud.ts +++ b/x-pack/plugins/fleet/server/services/agents/crud.ts @@ -155,7 +155,8 @@ export function getElasticsearchQuery( kuery: string, showInactive = false, includeHosted = false, - hostedPolicies: string[] = [] + hostedPolicies: string[] = [], + extraFilters: string[] = [] ): estypes.QueryDslQueryContainer | undefined { const filters = []; @@ -171,6 +172,8 @@ export function getElasticsearchQuery( filters.push('NOT (policy_id:{policyIds})'.replace('{policyIds}', hostedPolicies.join(','))); } + filters.push(...extraFilters); + const kueryNode = _joinFilters(filters); return kueryNode ? toElasticsearchQuery(kueryNode) : undefined; } diff --git a/x-pack/plugins/fleet/server/services/agents/update_agent_tags.test.ts b/x-pack/plugins/fleet/server/services/agents/update_agent_tags.test.ts index a37165483f136..c357ed0e11edf 100644 --- a/x-pack/plugins/fleet/server/services/agents/update_agent_tags.test.ts +++ b/x-pack/plugins/fleet/server/services/agents/update_agent_tags.test.ts @@ -10,6 +10,7 @@ import { elasticsearchServiceMock, savedObjectsClientMock } from '@kbn/core/serv import { createClientMock } from './action.mock'; import { updateAgentTags } from './update_agent_tags'; +import { updateTagsBatch } from './update_agent_tags_action_runner'; jest.mock('../app_context', () => { return { @@ -28,6 +29,7 @@ jest.mock('../agent_policy', () => { return { agentPolicyService: { getByIDs: jest.fn().mockResolvedValue([{ id: 'hosted-agent-policy', is_managed: true }]), + list: jest.fn().mockResolvedValue({ items: [] }), }, }; }); @@ -73,7 +75,7 @@ describe('update_agent_tags', () => { expect(esClient.updateByQuery).toHaveBeenCalledWith( expect.objectContaining({ - conflicts: 'abort', + conflicts: 'proceed', index: '.fleet-agents', query: { terms: { _id: ['agent1'] } }, script: expect.objectContaining({ @@ -90,6 +92,9 @@ describe('update_agent_tags', () => { }); it('should update action results on success', async () => { + esClient.updateByQuery.mockReset(); + esClient.updateByQuery.mockResolvedValue({ failures: [], updated: 1, total: 1 } as any); + await updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['one'], []); const agentAction = esClient.create.mock.calls[0][0] as any; @@ -110,11 +115,32 @@ describe('update_agent_tags', () => { expect(actionResults.body[1].error).not.toBeDefined(); }); - it('should write error action results for hosted agent when agentIds are passed', async () => { + it('should update action results on success - kuery', async () => { + await updateTagsBatch( + soClient, + esClient, + [], + {}, + { + tagsToAdd: ['new'], + tagsToRemove: [], + kuery: '', + } + ); + + const actionResults = esClient.bulk.mock.calls[0][0] as any; + const agentIds = actionResults?.body + ?.filter((i: any) => i.agent_id) + .map((i: any) => i.agent_id); + expect(agentIds[0]).toHaveLength(36); // uuid + expect(actionResults.body[1].error).not.toBeDefined(); + }); + + it('should skip hosted agent from total when agentIds are passed', async () => { const { esClient: esClientMock, agentInHostedDoc } = createClientMock(); esClientMock.updateByQuery.mockReset(); - esClientMock.updateByQuery.mockResolvedValue({ failures: [], updated: 0, total: '0' } as any); + esClientMock.updateByQuery.mockResolvedValue({ failures: [], updated: 0, total: 0 } as any); await updateAgentTags( soClient, @@ -130,13 +156,9 @@ describe('update_agent_tags', () => { action_id: expect.anything(), agents: [], type: 'UPDATE_TAGS', - total: 1, + total: 0, }) ); - - const errorResults = esClientMock.bulk.mock.calls[0][0] as any; - expect(errorResults.body[1].agent_id).toEqual(agentInHostedDoc._id); - expect(errorResults.body[1].error).toEqual('Cannot modify tags on a hosted agent'); }); it('should write error action results when failures are returned', async () => { @@ -152,6 +174,46 @@ describe('update_agent_tags', () => { expect(errorResults.body[1].error).toEqual('error reason'); }); + it('should throw error on version conflicts', async () => { + esClient.updateByQuery.mockReset(); + esClient.updateByQuery.mockResolvedValue({ + failures: [], + updated: 0, + version_conflicts: 100, + } as any); + + await expect( + updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['one'], []) + ).rejects.toThrowError('version conflict of 100 agents'); + }); + + it('should write out error results on last retry with version conflicts', async () => { + esClient.updateByQuery.mockReset(); + esClient.updateByQuery.mockResolvedValue({ + failures: [], + updated: 0, + version_conflicts: 100, + } as any); + + await expect( + updateTagsBatch( + soClient, + esClient, + [], + {}, + { + tagsToAdd: ['new'], + tagsToRemove: [], + kuery: '', + total: 100, + retryCount: 3, + } + ) + ).rejects.toThrowError('version conflict of 100 agents'); + const errorResults = esClient.bulk.mock.calls[0][0] as any; + expect(errorResults.body[1].error).toEqual('version conflict on 3rd retry'); + }); + it('should run add tags async when actioning more agents than batch size', async () => { esClient.search.mockResolvedValue({ hits: { @@ -180,4 +242,79 @@ describe('update_agent_tags', () => { expect(mockRunAsync).toHaveBeenCalled(); }); + + it('should add tags filter if only one tag to add', async () => { + await updateTagsBatch( + soClient, + esClient, + [], + {}, + { + tagsToAdd: ['new'], + tagsToRemove: [], + kuery: '', + } + ); + + const updateByQuery = esClient.updateByQuery.mock.calls[0][0] as any; + expect(updateByQuery.query).toEqual({ + bool: { + filter: [ + { bool: { minimum_should_match: 1, should: [{ match: { active: true } }] } }, + { + bool: { + must_not: { bool: { minimum_should_match: 1, should: [{ match: { tags: 'new' } }] } }, + }, + }, + ], + }, + }); + }); + + it('should add tags filter if only one tag to remove', async () => { + await updateTagsBatch( + soClient, + esClient, + [], + {}, + { + tagsToAdd: [], + tagsToRemove: ['remove'], + kuery: '', + } + ); + + const updateByQuery = esClient.updateByQuery.mock.calls[0][0] as any; + expect(JSON.stringify(updateByQuery.query)).toContain( + '{"bool":{"should":[{"match":{"tags":"remove"}}],"minimum_should_match":1}}' + ); + }); + + it('should write total from updateByQuery result if query returns less results', async () => { + esClient.updateByQuery.mockReset(); + esClient.updateByQuery.mockResolvedValue({ failures: [], updated: 0, total: 50 } as any); + + await updateTagsBatch( + soClient, + esClient, + [], + {}, + { + tagsToAdd: ['new'], + tagsToRemove: [], + kuery: '', + total: 100, + } + ); + + const agentAction = esClient.create.mock.calls[0][0] as any; + expect(agentAction?.body).toEqual( + expect.objectContaining({ + action_id: expect.anything(), + agents: [], + type: 'UPDATE_TAGS', + total: 50, + }) + ); + }); }); diff --git a/x-pack/plugins/fleet/server/services/agents/update_agent_tags.ts b/x-pack/plugins/fleet/server/services/agents/update_agent_tags.ts index 79636ecae1015..dad2053f7ed59 100644 --- a/x-pack/plugins/fleet/server/services/agents/update_agent_tags.ts +++ b/x-pack/plugins/fleet/server/services/agents/update_agent_tags.ts @@ -11,9 +11,7 @@ import type { ElasticsearchClient, SavedObjectsClientContract } from '@kbn/core/ import type { Agent } from '../../types'; import { AgentReassignmentError } from '../../errors'; -import { SO_SEARCH_LIMIT } from '../../constants'; - -import { getAgentDocuments, getAgentsByKuery } from './crud'; +import { getAgentDocuments } from './crud'; import type { GetAgentsOptions } from '.'; import { searchHitToAgent } from './helpers'; import { UpdateAgentTagsActionRunner, updateTagsBatch } from './update_agent_tags_action_runner'; @@ -30,7 +28,7 @@ export async function updateAgentTags( tagsToRemove: string[] ): Promise<{ actionId: string }> { const outgoingErrors: Record = {}; - let givenAgents: Agent[] = []; + const givenAgents: Agent[] = []; if ('agentIds' in options) { const givenAgentsResults = await getAgentDocuments(esClient, options.agentIds); @@ -44,30 +42,17 @@ export async function updateAgentTags( } } } else if ('kuery' in options) { - const batchSize = options.batchSize ?? SO_SEARCH_LIMIT; - const res = await getAgentsByKuery(esClient, { - kuery: options.kuery, - showInactive: options.showInactive ?? false, - page: 1, - perPage: batchSize, - }); - if (res.total <= batchSize) { - givenAgents = res.agents; - } else { - return await new UpdateAgentTagsActionRunner( - esClient, - soClient, - { - ...options, - batchSize, - total: res.total, - kuery: options.kuery, - tagsToAdd, - tagsToRemove, - }, - { pitId: '' } - ).runActionAsyncWithRetry(); - } + return await new UpdateAgentTagsActionRunner( + esClient, + soClient, + { + ...options, + kuery: options.kuery, + tagsToAdd, + tagsToRemove, + }, + { pitId: '' } + ).runActionAsyncWithRetry(); } return await updateTagsBatch(soClient, esClient, givenAgents, outgoingErrors, { diff --git a/x-pack/plugins/fleet/server/services/agents/update_agent_tags_action_runner.ts b/x-pack/plugins/fleet/server/services/agents/update_agent_tags_action_runner.ts index 042707b80df24..af538260bb163 100644 --- a/x-pack/plugins/fleet/server/services/agents/update_agent_tags_action_runner.ts +++ b/x-pack/plugins/fleet/server/services/agents/update_agent_tags_action_runner.ts @@ -20,7 +20,7 @@ import { agentPolicyService } from '../agent_policy'; import { SO_SEARCH_LIMIT } from '../../../common/constants'; -import { ActionRunner } from './action_runner'; +import { ActionRunner, MAX_RETRY_COUNT } from './action_runner'; import { BulkActionTaskType } from './bulk_actions_resolver'; import { filterHostedPolicies } from './filter_hosted_agents'; @@ -63,6 +63,7 @@ export class UpdateAgentTagsActionRunner extends ActionRunner { actionId: this.actionParams.actionId, total: this.actionParams.total, kuery: this.actionParams.kuery, + retryCount: this.retryParams.retryCount, } ); @@ -82,6 +83,7 @@ export async function updateTagsBatch( actionId?: string; total?: number; kuery?: string; + retryCount?: number; } ): Promise<{ actionId: string; updated?: number; took?: number }> { const errors: Record = { ...outgoingErrors }; @@ -94,11 +96,6 @@ export async function updateTagsBatch( hostedAgentError ); const agentIds = filteredAgents.map((agent) => agent.id); - const hostedAgentIds = givenAgents - .filter( - (agent) => filteredAgents.find((filteredAgent) => filteredAgent.id === agent.id) === undefined - ) - .map((agent) => agent.id); let query: estypes.QueryDslQueryContainer | undefined; if (options.kuery !== undefined) { @@ -108,7 +105,13 @@ export async function updateTagsBatch( }); const hostedIds = hostedPolicies.items.map((item) => item.id); - query = getElasticsearchQuery(options.kuery, false, false, hostedIds); + const extraFilters = []; + if (options.tagsToAdd.length === 1 && options.tagsToRemove.length === 0) { + extraFilters.push(`NOT (tags:${options.tagsToAdd[0]})`); + } else if (options.tagsToRemove.length === 1 && options.tagsToAdd.length === 0) { + extraFilters.push(`tags:${options.tagsToRemove[0]}`); + } + query = getElasticsearchQuery(options.kuery, false, false, hostedIds, extraFilters); } else { query = { terms: { @@ -150,7 +153,7 @@ export async function updateTagsBatch( updatedAt: new Date().toISOString(), }, }, - conflicts: 'abort', // relying on the task to retry in case of conflicts + conflicts: 'proceed', // relying on the task to retry in case of conflicts - retry only conflicted agents }); } catch (error) { throw new Error('Caught error: ' + JSON.stringify(error).slice(0, 1000)); @@ -159,27 +162,27 @@ export async function updateTagsBatch( appContextService.getLogger().debug(JSON.stringify(res).slice(0, 1000)); const actionId = options.actionId ?? uuid(); - const total = options.total ?? givenAgents.length; - // creating an action doc so that update tags shows up in activity - await createAgentAction(esClient, { - id: actionId, - agents: options.kuery === undefined ? agentIds : [], - created_at: new Date().toISOString(), - type: 'UPDATE_TAGS', - total, - }); + if (options.retryCount === undefined) { + // creating an action doc so that update tags shows up in activity + await createAgentAction(esClient, { + id: actionId, + agents: options.kuery === undefined ? agentIds : [], + created_at: new Date().toISOString(), + type: 'UPDATE_TAGS', + total: res.total, + }); + } - // creating unique 0...n ids to use as agentId, as we don't have all agent ids in case of action by kuery - const getArray = (count: number) => [...Array(count).keys()]; + // creating unique ids to use as agentId, as we don't have all agent ids in case of action by kuery + const getUuidArray = (count: number) => Array.from({ length: count }, () => uuid()); // writing successful action results if (res.updated ?? 0 > 0) { await bulkCreateAgentActionResults( esClient, - - (options.kuery === undefined ? agentIds : getArray(res.updated!)).map((id) => ({ - agentId: id + '', + (options.kuery === undefined ? agentIds : getUuidArray(res.updated!)).map((id) => ({ + agentId: id, actionId, })) ); @@ -197,18 +200,19 @@ export async function updateTagsBatch( ); } - // writing hosted agent errors - hosted agents filtered out - if ((res.total ?? total) < total) { - await bulkCreateAgentActionResults( - esClient, - (options.kuery === undefined ? hostedAgentIds : getArray(total - (res.total ?? total))).map( - (id) => ({ - agentId: id + '', + if (res.version_conflicts ?? 0 > 0) { + // write out error results on last retry, so action is not stuck in progress + if (options.retryCount === MAX_RETRY_COUNT) { + await bulkCreateAgentActionResults( + esClient, + getUuidArray(res.version_conflicts!).map((id) => ({ + agentId: id, actionId, - error: hostedAgentError, - }) - ) - ); + error: 'version conflict on 3rd retry', + })) + ); + } + throw new Error(`version conflict of ${res.version_conflicts} agents`); } return { actionId, updated: res.updated, took: res.took }; diff --git a/x-pack/test/fleet_api_integration/apis/agents/update_agent_tags.ts b/x-pack/test/fleet_api_integration/apis/agents/update_agent_tags.ts index ed3a147fe8ec7..05c1a1f8ae6c7 100644 --- a/x-pack/test/fleet_api_integration/apis/agents/update_agent_tags.ts +++ b/x-pack/test/fleet_api_integration/apis/agents/update_agent_tags.ts @@ -68,69 +68,78 @@ export default function (providerContext: FtrProviderContext) { expect(agent2data.body.item.tags).to.eql(['existingTag']); }); - it('should allow to update tags of multiple agents by kuery', async () => { - await supertest + async function pollResult( + actionId: string, + nbAgentsAck: number, + verifyActionResult: Function + ) { + await new Promise((resolve, reject) => { + let attempts = 0; + const intervalId = setInterval(async () => { + if (attempts > 4) { + clearInterval(intervalId); + reject('action timed out'); + } + ++attempts; + const { + body: { items: actionStatuses }, + } = await supertest.get(`/api/fleet/agents/action_status`).set('kbn-xsrf', 'xxx'); + const action = actionStatuses.find((a: any) => a.actionId === actionId); + if (action && action.nbAgentsAck === nbAgentsAck) { + clearInterval(intervalId); + await verifyActionResult(); + resolve({}); + } + }, 1000); + }).catch((e) => { + throw e; + }); + } + + it('should bulk update tags of multiple agents by kuery - add', async () => { + const { body: actionBody } = await supertest .post(`/api/fleet/agents/bulk_update_agent_tags`) .set('kbn-xsrf', 'xxx') .send({ agents: 'active: true', tagsToAdd: ['newTag'], - tagsToRemove: ['existingTag'], + tagsToRemove: [], }) .expect(200); - const { body } = await supertest.get(`/api/fleet/agents`).set('kbn-xsrf', 'xxx'); - expect(body.total).to.eql(4); - body.items.forEach((agent: any) => { - expect(agent.tags.includes('newTag')).to.be(true); - expect(agent.tags.includes('existingTag')).to.be(false); - }); + const actionId = actionBody.actionId; + + const verifyActionResult = async () => { + const { body } = await supertest + .get(`/api/fleet/agents?kuery=tags:newTag`) + .set('kbn-xsrf', 'xxx'); + expect(body.total).to.eql(4); + }; + + await pollResult(actionId, 4, verifyActionResult); }); - it('should bulk update tags of multiple agents by kuery in batches', async () => { + it('should bulk update tags of multiple agents by kuery - remove', async () => { const { body: actionBody } = await supertest .post(`/api/fleet/agents/bulk_update_agent_tags`) .set('kbn-xsrf', 'xxx') .send({ agents: 'active: true', - tagsToAdd: ['newTag'], + tagsToAdd: [], tagsToRemove: ['existingTag'], - batchSize: 3, }) .expect(200); const actionId = actionBody.actionId; const verifyActionResult = async () => { - const { body } = await supertest.get(`/api/fleet/agents`).set('kbn-xsrf', 'xxx'); - expect(body.total).to.eql(4); - body.items.forEach((agent: any) => { - expect(agent.tags.includes('newTag')).to.be(true); - expect(agent.tags.includes('existingTag')).to.be(false); - }); + const { body } = await supertest + .get(`/api/fleet/agents?kuery=tags:existingTag`) + .set('kbn-xsrf', 'xxx'); + expect(body.total).to.eql(0); }; - await new Promise((resolve, reject) => { - let attempts = 0; - const intervalId = setInterval(async () => { - if (attempts > 4) { - clearInterval(intervalId); - reject('action timed out'); - } - ++attempts; - const { - body: { items: actionStatuses }, - } = await supertest.get(`/api/fleet/agents/action_status`).set('kbn-xsrf', 'xxx'); - const action = actionStatuses.find((a: any) => a.actionId === actionId); - if (action && action.nbAgentsAck === 4) { - clearInterval(intervalId); - await verifyActionResult(); - resolve({}); - } - }, 1000); - }).catch((e) => { - throw e; - }); + await pollResult(actionId, 2, verifyActionResult); }); it('should return a 403 if user lacks fleet all permissions', async () => { @@ -180,8 +189,8 @@ export default function (providerContext: FtrProviderContext) { .get(`/api/fleet/agents/action_status`) .set('kbn-xsrf', 'xxx'); const actionStatus = body.items[0]; - expect(actionStatus.status).to.eql('FAILED'); - expect(actionStatus.nbAgentsFailed).to.eql(1); + expect(actionStatus.status).to.eql('COMPLETE'); + expect(actionStatus.nbAgentsAck).to.eql(1); }); }); });