Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ingest Manager] Internal action for policy reassign #78493

Merged
merged 3 commits into from
Oct 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion x-pack/plugins/ingest_manager/common/types/models/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ export type AgentStatus =
| 'updating'
| 'degraded';

export type AgentActionType = 'POLICY_CHANGE' | 'UNENROLL' | 'UPGRADE';
export type AgentActionType =
| 'POLICY_CHANGE'
| 'UNENROLL'
| 'UPGRADE'
// INTERNAL* actions are mean to interupt long polling calls these actions will not be distributed to the agent
| 'INTERNAL_POLICY_REASSIGN';

export interface NewAgentAction {
type: AgentActionType;
data?: any;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ export async function getAgentActionsForCheckin(
nodeTypes.literal.buildNode(false),
])
),
nodeTypes.function.buildNode(
'not',
nodeTypes.function.buildNodeWithArgumentNodes('is', [
nodeTypes.literal.buildNode(`${AGENT_ACTION_SAVED_OBJECT_TYPE}.attributes.type`),
nodeTypes.literal.buildNode('INTERNAL_POLICY_REASSIGN'),
nodeTypes.literal.buildNode(false),
])
),
nodeTypes.function.buildNodeWithArgumentNodes('is', [
nodeTypes.literal.buildNode(`${AGENT_ACTION_SAVED_OBJECT_TYPE}.attributes.agent_id`),
nodeTypes.literal.buildNode(agentId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import semver from 'semver';
import { timer, from, Observable, TimeoutError } from 'rxjs';
import { timer, from, Observable, TimeoutError, of, EMPTY } from 'rxjs';
import { omit } from 'lodash';
import {
shareReplay,
share,
distinctUntilKeyChanged,
switchMap,
concatMap,
merge,
filter,
timeout,
Expand Down Expand Up @@ -38,6 +41,7 @@ import {
} from '../actions';
import { appContextService } from '../../app_context';
import { toPromiseAbortable, AbortError, createRateLimiter } from './rxjs_utils';
import { getAgent } from '../crud';

function getInternalUserSOClient() {
const fakeRequest = ({
Expand Down Expand Up @@ -69,7 +73,8 @@ function createNewActionsSharedObservable(): Observable<AgentAction[]> {
lastTimestamp = new Date().toISOString();
return from(getNewActionsSince(internalSOClient, timestamp));
}),
shareReplay({ refCount: true, bufferSize: 1 })
filter((data) => data.length > 0),
share()
);
}

Expand Down Expand Up @@ -201,6 +206,18 @@ export function agentCheckinStateNewActionsFactory() {
rateLimiterMaxDelay
);

function getOrCreateAgentPolicyObservable(agentPolicyId: string) {
if (!agentPolicies$.has(agentPolicyId)) {
agentPolicies$.set(agentPolicyId, createAgentPolicyActionSharedObservable(agentPolicyId));
}
const agentPolicy$ = agentPolicies$.get(agentPolicyId);
if (!agentPolicy$) {
throw new Error(`Invalid state, no observable for policy ${agentPolicyId}`);
}

return agentPolicy$;
}

async function subscribeToNewActions(
soClient: SavedObjectsClientContract,
agent: Agent,
Expand All @@ -209,14 +226,7 @@ export function agentCheckinStateNewActionsFactory() {
if (!agent.policy_id) {
throw new Error('Agent does not have a policy');
}
const agentPolicyId = agent.policy_id;
if (!agentPolicies$.has(agentPolicyId)) {
agentPolicies$.set(agentPolicyId, createAgentPolicyActionSharedObservable(agentPolicyId));
}
const agentPolicy$ = agentPolicies$.get(agentPolicyId);
if (!agentPolicy$) {
throw new Error(`Invalid state, no observable for policy ${agentPolicyId}`);
}
const agentPolicy$ = getOrCreateAgentPolicyObservable(agent.policy_id);

const stream$ = agentPolicy$.pipe(
timeout(pollingTimeoutMs),
Expand All @@ -229,25 +239,43 @@ export function agentCheckinStateNewActionsFactory() {
(!agent.policy_revision || action.policy_revision > agent.policy_revision)
),
rateLimiter(),
switchMap((policyAction) => createAgentActionFromPolicyAction(soClient, agent, policyAction)),
concatMap((policyAction) => createAgentActionFromPolicyAction(soClient, agent, policyAction)),
merge(newActions$),
switchMap(async (data) => {
if (!data) {
return;
concatMap((data: AgentAction[] | undefined) => {
if (data === undefined) {
return EMPTY;
}
const newActions = data.filter((action) => action.agent_id === agent.id);
if (newActions.length === 0) {
return;
return EMPTY;
}

return newActions;
const hasConfigReassign = newActions.some(
(action) => action.type === 'INTERNAL_POLICY_REASSIGN'
);
if (hasConfigReassign) {
return from(getAgent(soClient, agent.id)).pipe(
concatMap((refreshedAgent) => {
if (!refreshedAgent.policy_id) {
throw new Error('Agent does not have a policy assigned');
}
const newAgentPolicy$ = getOrCreateAgentPolicyObservable(refreshedAgent.policy_id);
return newAgentPolicy$;
}),
rateLimiter(),
concatMap((policyAction) =>
createAgentActionFromPolicyAction(soClient, agent, policyAction)
)
);
}

return of(newActions);
}),
filter((data) => data !== undefined),
take(1)
);
try {
const data = await toPromiseAbortable(stream$, options?.signal);

return data || [];
} catch (err) {
if (err instanceof TimeoutError || err instanceof AbortError) {
Expand Down
20 changes: 19 additions & 1 deletion x-pack/plugins/ingest_manager/server/services/agents/reassign.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { AGENT_SAVED_OBJECT_TYPE } from '../../constants';
import { AgentSOAttributes } from '../../types';
import { agentPolicyService } from '../agent_policy';
import { getAgents, listAllAgents } from './crud';
import { createAgentAction, bulkCreateAgentActions } from './actions';

export async function reassignAgent(
soClient: SavedObjectsClientContract,
Expand All @@ -25,6 +26,12 @@ export async function reassignAgent(
policy_id: newAgentPolicyId,
policy_revision: null,
});

await createAgentAction(soClient, {
agent_id: agentId,
created_at: new Date().toISOString(),
type: 'INTERNAL_POLICY_REASSIGN',
});
}

export async function reassignAgents(
Expand Down Expand Up @@ -56,7 +63,7 @@ export async function reassignAgents(
const agentsToUpdate = agents.filter((agent) => agent.policy_id !== newAgentPolicyId);

// Update the necessary agents
return await soClient.bulkUpdate<AgentSOAttributes>(
const res = await soClient.bulkUpdate<AgentSOAttributes>(
agentsToUpdate.map((agent) => ({
type: AGENT_SAVED_OBJECT_TYPE,
id: agent.id,
Expand All @@ -66,4 +73,15 @@ export async function reassignAgents(
},
}))
);
const now = new Date().toISOString();
await bulkCreateAgentActions(
soClient,
agentsToUpdate.map((agent) => ({
agent_id: agent.id,
created_at: now,
type: 'INTERNAL_POLICY_REASSIGN',
}))
);

return res;
}
1 change: 1 addition & 0 deletions x-pack/plugins/ingest_manager/server/types/models/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ export const NewAgentActionSchema = schema.object({
schema.literal('POLICY_CHANGE'),
schema.literal('UNENROLL'),
schema.literal('UPGRADE'),
schema.literal('INTERNAL_POLICY_REASSIGN'),
]),
data: schema.maybe(schema.any()),
ack_data: schema.maybe(schema.any()),
Expand Down