Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ingest Manager] Set the default timeout to 5 minute with a 20 seconds and improve rate limiter #79307

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions x-pack/plugins/ingest_manager/common/constants/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ export const AGENT_TYPE_EPHEMERAL = 'EPHEMERAL';
export const AGENT_TYPE_TEMPORARY = 'TEMPORARY';

export const AGENT_POLLING_REQUEST_TIMEOUT_MS = 300000; // 5 minutes
export const AGENT_POLLING_REQUEST_TIMEOUT_MARGIN_MS = 20000; // 20s

export const AGENT_POLLING_THRESHOLD_MS = 30000;
export const AGENT_POLLING_INTERVAL = 1000;
export const AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS = 30000;
export const AGENT_UPDATE_ACTIONS_INTERVAL_MS = 5000;

export const AGENT_POLICY_ROLLOUT_RATE_LIMIT_INTERVAL_MS = 5000;
export const AGENT_POLICY_ROLLOUT_RATE_LIMIT_REQUEST_PER_INTERVAL = 25;
export const AGENT_POLICY_ROLLOUT_RATE_LIMIT_INTERVAL_MS = 1000;
export const AGENT_POLICY_ROLLOUT_RATE_LIMIT_REQUEST_PER_INTERVAL = 5;
1 change: 1 addition & 0 deletions x-pack/plugins/ingest_manager/server/constants/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export {
AGENT_TYPE_EPHEMERAL,
AGENT_TYPE_TEMPORARY,
AGENT_POLLING_THRESHOLD_MS,
AGENT_POLLING_REQUEST_TIMEOUT_MARGIN_MS,
AGENT_POLLING_INTERVAL,
AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS,
AGENT_POLICY_ROLLOUT_RATE_LIMIT_REQUEST_PER_INTERVAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@ describe('createRateLimiter', () => {

scheduler.run(({ expectObservable, cold }) => {
const source = cold('a-b-c-d-e-f|');
const rateLimiter = createRateLimiter(10, 1, 2, scheduler);
const intervalMs = 10;
const perInterval = 1;
const maxDelayMs = 50;
const rateLimiter = createRateLimiter(intervalMs, perInterval, maxDelayMs, scheduler);
const obs = source.pipe(rateLimiter());
const results = 'a 9ms b 9ms c 9ms d 9ms e 9ms (f|)';
// f should be dropped because of maxDelay
const results = 'a 9ms b 9ms c 9ms d 9ms (e|)';
expectObservable(obs).toBe(results);
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,18 @@ export function createRateLimiter(
let countInCurrentInterval = 0;

function createRateLimitOperator<T>(): Rx.OperatorFunction<T, T> {
const maxIntervalEnd = scheduler.now() + maxDelay;

return Rx.pipe(
concatMap(function rateLimit(value: T) {
const now = scheduler.now();
if (intervalEnd <= now) {
countInCurrentInterval = 1;
intervalEnd = now + ratelimitIntervalMs;
return Rx.of(value);
} else if (intervalEnd >= now + maxDelay) {
// re-rate limit in the future to avoid to schedule too far in the future as some observer can unsubscribe
return Rx.of(value).pipe(delay(maxDelay, scheduler), createRateLimitOperator<T>());
} else if (intervalEnd >= maxIntervalEnd) {
// drop the value as it's never going to success as long polling timeout is going to happen before we can send the policy
return Rx.EMPTY;
} else {
if (++countInCurrentInterval > ratelimitRequestPerInterval) {
countInCurrentInterval = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import * as APIKeysService from '../../api_keys';
import {
AGENT_SAVED_OBJECT_TYPE,
AGENT_UPDATE_ACTIONS_INTERVAL_MS,
AGENT_POLLING_REQUEST_TIMEOUT_MARGIN_MS,
AGENT_POLICY_ROLLOUT_RATE_LIMIT_INTERVAL_MS,
AGENT_POLICY_ROLLOUT_RATE_LIMIT_REQUEST_PER_INTERVAL,
} from '../../../constants';
Expand All @@ -38,8 +39,6 @@ import {
import { appContextService } from '../../app_context';
import { toPromiseAbortable, AbortError, createRateLimiter } from './rxjs_utils';

const RATE_LIMIT_MAX_DELAY_MS = 5 * 60 * 1000; // 5 minutes

function getInternalUserSOClient() {
const fakeRequest = ({
headers: {},
Expand Down Expand Up @@ -166,19 +165,29 @@ export async function createAgentActionFromPolicyAction(
return [newAgentAction];
}

function getPollingTimeoutMs() {
const pollingTimeoutMs = appContextService.getConfig()?.fleet.pollingRequestTimeout ?? 0;
// Set a timeout 20s before the real timeout to have a chance to respond an empty response before socket timeout
return Math.max(
pollingTimeoutMs - AGENT_POLLING_REQUEST_TIMEOUT_MARGIN_MS,
AGENT_POLLING_REQUEST_TIMEOUT_MARGIN_MS
);
}

export function agentCheckinStateNewActionsFactory() {
// Shared Observables
const agentPolicies$ = new Map<string, Observable<AgentPolicyAction>>();
const newActions$ = createNewActionsSharedObservable();
// Rx operators
const pollingTimeoutMs = appContextService.getConfig()?.fleet.pollingRequestTimeout ?? 0;
const pollingTimeoutMs = getPollingTimeoutMs();

const rateLimiterIntervalMs =
appContextService.getConfig()?.fleet.agentPolicyRolloutRateLimitIntervalMs ??
AGENT_POLICY_ROLLOUT_RATE_LIMIT_INTERVAL_MS;
const rateLimiterRequestPerInterval =
appContextService.getConfig()?.fleet.agentPolicyRolloutRateLimitRequestPerInterval ??
AGENT_POLICY_ROLLOUT_RATE_LIMIT_REQUEST_PER_INTERVAL;
const rateLimiterMaxDelay = Math.min(RATE_LIMIT_MAX_DELAY_MS, pollingTimeoutMs);
const rateLimiterMaxDelay = pollingTimeoutMs;

const rateLimiter = createRateLimiter(
rateLimiterIntervalMs,
Expand All @@ -204,10 +213,7 @@ export function agentCheckinStateNewActionsFactory() {
}

const stream$ = agentPolicy$.pipe(
timeout(
// Set a timeout 3s before the real timeout to have a chance to respond an empty response before socket timeout
Math.max(pollingTimeoutMs - 3000, 3000)
),
timeout(pollingTimeoutMs),
filter(
(action) =>
agent.policy_id !== undefined &&
Expand Down