diff --git a/x-pack/plugins/ingest_manager/common/constants/agent.ts b/x-pack/plugins/ingest_manager/common/constants/agent.ts index 82d2ad712ef023..30b8a6b7406090 100644 --- a/x-pack/plugins/ingest_manager/common/constants/agent.ts +++ b/x-pack/plugins/ingest_manager/common/constants/agent.ts @@ -13,10 +13,12 @@ export const AGENT_TYPE_EPHEMERAL = 'EPHEMERAL'; export const AGENT_TYPE_TEMPORARY = 'TEMPORARY'; export const AGENT_POLLING_REQUEST_TIMEOUT_MS = 300000; // 5 minutes +export const AGENT_POLLING_REQUEST_TIMEOUT_MARGIN_MS = 20000; // 20s + export const AGENT_POLLING_THRESHOLD_MS = 30000; export const AGENT_POLLING_INTERVAL = 1000; export const AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS = 30000; export const AGENT_UPDATE_ACTIONS_INTERVAL_MS = 5000; -export const AGENT_POLICY_ROLLOUT_RATE_LIMIT_INTERVAL_MS = 5000; -export const AGENT_POLICY_ROLLOUT_RATE_LIMIT_REQUEST_PER_INTERVAL = 25; +export const AGENT_POLICY_ROLLOUT_RATE_LIMIT_INTERVAL_MS = 1000; +export const AGENT_POLICY_ROLLOUT_RATE_LIMIT_REQUEST_PER_INTERVAL = 5; diff --git a/x-pack/plugins/ingest_manager/server/constants/index.ts b/x-pack/plugins/ingest_manager/server/constants/index.ts index 3965e27da05427..c69ee7e4b60925 100644 --- a/x-pack/plugins/ingest_manager/server/constants/index.ts +++ b/x-pack/plugins/ingest_manager/server/constants/index.ts @@ -8,6 +8,7 @@ export { AGENT_TYPE_EPHEMERAL, AGENT_TYPE_TEMPORARY, AGENT_POLLING_THRESHOLD_MS, + AGENT_POLLING_REQUEST_TIMEOUT_MARGIN_MS, AGENT_POLLING_INTERVAL, AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS, AGENT_POLICY_ROLLOUT_RATE_LIMIT_REQUEST_PER_INTERVAL, diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.test.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.test.ts index 5e84e3a50bb440..2909899418ec24 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.test.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.test.ts @@ -15,9 +15,13 @@ describe('createRateLimiter', () => { scheduler.run(({ expectObservable, cold }) => { const source = cold('a-b-c-d-e-f|'); - const rateLimiter = createRateLimiter(10, 1, 2, scheduler); + const intervalMs = 10; + const perInterval = 1; + const maxDelayMs = 50; + const rateLimiter = createRateLimiter(intervalMs, perInterval, maxDelayMs, scheduler); const obs = source.pipe(rateLimiter()); - const results = 'a 9ms b 9ms c 9ms d 9ms e 9ms (f|)'; + // f should be dropped because of maxDelay + const results = 'a 9ms b 9ms c 9ms d 9ms (e|)'; expectObservable(obs).toBe(results); }); }); diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts index 3bbfbbd4ec1bfb..bbdaa9975eeacd 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts @@ -54,6 +54,8 @@ export function createRateLimiter( let countInCurrentInterval = 0; function createRateLimitOperator(): Rx.OperatorFunction { + const maxIntervalEnd = scheduler.now() + maxDelay; + return Rx.pipe( concatMap(function rateLimit(value: T) { const now = scheduler.now(); @@ -61,9 +63,9 @@ export function createRateLimiter( countInCurrentInterval = 1; intervalEnd = now + ratelimitIntervalMs; return Rx.of(value); - } else if (intervalEnd >= now + maxDelay) { - // re-rate limit in the future to avoid to schedule too far in the future as some observer can unsubscribe - return Rx.of(value).pipe(delay(maxDelay, scheduler), createRateLimitOperator()); + } else if (intervalEnd >= maxIntervalEnd) { + // drop the value as it's never going to success as long polling timeout is going to happen before we can send the policy + return Rx.EMPTY; } else { if (++countInCurrentInterval > ratelimitRequestPerInterval) { countInCurrentInterval = 1; diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts index 51ccdc8eb1c7c9..8ae151577fefa2 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts @@ -27,6 +27,7 @@ import * as APIKeysService from '../../api_keys'; import { AGENT_SAVED_OBJECT_TYPE, AGENT_UPDATE_ACTIONS_INTERVAL_MS, + AGENT_POLLING_REQUEST_TIMEOUT_MARGIN_MS, AGENT_POLICY_ROLLOUT_RATE_LIMIT_INTERVAL_MS, AGENT_POLICY_ROLLOUT_RATE_LIMIT_REQUEST_PER_INTERVAL, } from '../../../constants'; @@ -38,8 +39,6 @@ import { import { appContextService } from '../../app_context'; import { toPromiseAbortable, AbortError, createRateLimiter } from './rxjs_utils'; -const RATE_LIMIT_MAX_DELAY_MS = 5 * 60 * 1000; // 5 minutes - function getInternalUserSOClient() { const fakeRequest = ({ headers: {}, @@ -166,19 +165,29 @@ export async function createAgentActionFromPolicyAction( return [newAgentAction]; } +function getPollingTimeoutMs() { + const pollingTimeoutMs = appContextService.getConfig()?.fleet.pollingRequestTimeout ?? 0; + // Set a timeout 20s before the real timeout to have a chance to respond an empty response before socket timeout + return Math.max( + pollingTimeoutMs - AGENT_POLLING_REQUEST_TIMEOUT_MARGIN_MS, + AGENT_POLLING_REQUEST_TIMEOUT_MARGIN_MS + ); +} + export function agentCheckinStateNewActionsFactory() { // Shared Observables const agentPolicies$ = new Map>(); const newActions$ = createNewActionsSharedObservable(); // Rx operators - const pollingTimeoutMs = appContextService.getConfig()?.fleet.pollingRequestTimeout ?? 0; + const pollingTimeoutMs = getPollingTimeoutMs(); + const rateLimiterIntervalMs = appContextService.getConfig()?.fleet.agentPolicyRolloutRateLimitIntervalMs ?? AGENT_POLICY_ROLLOUT_RATE_LIMIT_INTERVAL_MS; const rateLimiterRequestPerInterval = appContextService.getConfig()?.fleet.agentPolicyRolloutRateLimitRequestPerInterval ?? AGENT_POLICY_ROLLOUT_RATE_LIMIT_REQUEST_PER_INTERVAL; - const rateLimiterMaxDelay = Math.min(RATE_LIMIT_MAX_DELAY_MS, pollingTimeoutMs); + const rateLimiterMaxDelay = pollingTimeoutMs; const rateLimiter = createRateLimiter( rateLimiterIntervalMs, @@ -204,10 +213,7 @@ export function agentCheckinStateNewActionsFactory() { } const stream$ = agentPolicy$.pipe( - timeout( - // Set a timeout 3s before the real timeout to have a chance to respond an empty response before socket timeout - Math.max(pollingTimeoutMs - 3000, 3000) - ), + timeout(pollingTimeoutMs), filter( (action) => agent.policy_id !== undefined &&