Skip to content

Commit

Permalink
[Ingest Manager] Set the default timeout to 5 minute with a 20 second…
Browse files Browse the repository at this point in the history
…s margin and improve the rate limitter (#79307)
  • Loading branch information
nchaulet authored Oct 2, 2020
1 parent 9973667 commit 54fa55d
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 15 deletions.
6 changes: 4 additions & 2 deletions x-pack/plugins/ingest_manager/common/constants/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ export const AGENT_TYPE_EPHEMERAL = 'EPHEMERAL';
export const AGENT_TYPE_TEMPORARY = 'TEMPORARY';

export const AGENT_POLLING_REQUEST_TIMEOUT_MS = 300000; // 5 minutes
export const AGENT_POLLING_REQUEST_TIMEOUT_MARGIN_MS = 20000; // 20s

export const AGENT_POLLING_THRESHOLD_MS = 30000;
export const AGENT_POLLING_INTERVAL = 1000;
export const AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS = 30000;
export const AGENT_UPDATE_ACTIONS_INTERVAL_MS = 5000;

export const AGENT_POLICY_ROLLOUT_RATE_LIMIT_INTERVAL_MS = 5000;
export const AGENT_POLICY_ROLLOUT_RATE_LIMIT_REQUEST_PER_INTERVAL = 25;
export const AGENT_POLICY_ROLLOUT_RATE_LIMIT_INTERVAL_MS = 1000;
export const AGENT_POLICY_ROLLOUT_RATE_LIMIT_REQUEST_PER_INTERVAL = 5;
1 change: 1 addition & 0 deletions x-pack/plugins/ingest_manager/server/constants/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export {
AGENT_TYPE_EPHEMERAL,
AGENT_TYPE_TEMPORARY,
AGENT_POLLING_THRESHOLD_MS,
AGENT_POLLING_REQUEST_TIMEOUT_MARGIN_MS,
AGENT_POLLING_INTERVAL,
AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS,
AGENT_POLICY_ROLLOUT_RATE_LIMIT_REQUEST_PER_INTERVAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@ describe('createRateLimiter', () => {

scheduler.run(({ expectObservable, cold }) => {
const source = cold('a-b-c-d-e-f|');
const rateLimiter = createRateLimiter(10, 1, 2, scheduler);
const intervalMs = 10;
const perInterval = 1;
const maxDelayMs = 50;
const rateLimiter = createRateLimiter(intervalMs, perInterval, maxDelayMs, scheduler);
const obs = source.pipe(rateLimiter());
const results = 'a 9ms b 9ms c 9ms d 9ms e 9ms (f|)';
// f should be dropped because of maxDelay
const results = 'a 9ms b 9ms c 9ms d 9ms (e|)';
expectObservable(obs).toBe(results);
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,18 @@ export function createRateLimiter(
let countInCurrentInterval = 0;

function createRateLimitOperator<T>(): Rx.OperatorFunction<T, T> {
const maxIntervalEnd = scheduler.now() + maxDelay;

return Rx.pipe(
concatMap(function rateLimit(value: T) {
const now = scheduler.now();
if (intervalEnd <= now) {
countInCurrentInterval = 1;
intervalEnd = now + ratelimitIntervalMs;
return Rx.of(value);
} else if (intervalEnd >= now + maxDelay) {
// re-rate limit in the future to avoid to schedule too far in the future as some observer can unsubscribe
return Rx.of(value).pipe(delay(maxDelay, scheduler), createRateLimitOperator<T>());
} else if (intervalEnd >= maxIntervalEnd) {
// drop the value as it's never going to success as long polling timeout is going to happen before we can send the policy
return Rx.EMPTY;
} else {
if (++countInCurrentInterval > ratelimitRequestPerInterval) {
countInCurrentInterval = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import * as APIKeysService from '../../api_keys';
import {
AGENT_SAVED_OBJECT_TYPE,
AGENT_UPDATE_ACTIONS_INTERVAL_MS,
AGENT_POLLING_REQUEST_TIMEOUT_MARGIN_MS,
AGENT_POLICY_ROLLOUT_RATE_LIMIT_INTERVAL_MS,
AGENT_POLICY_ROLLOUT_RATE_LIMIT_REQUEST_PER_INTERVAL,
} from '../../../constants';
Expand All @@ -38,8 +39,6 @@ import {
import { appContextService } from '../../app_context';
import { toPromiseAbortable, AbortError, createRateLimiter } from './rxjs_utils';

const RATE_LIMIT_MAX_DELAY_MS = 5 * 60 * 1000; // 5 minutes

function getInternalUserSOClient() {
const fakeRequest = ({
headers: {},
Expand Down Expand Up @@ -166,19 +165,29 @@ export async function createAgentActionFromPolicyAction(
return [newAgentAction];
}

function getPollingTimeoutMs() {
const pollingTimeoutMs = appContextService.getConfig()?.fleet.pollingRequestTimeout ?? 0;
// Set a timeout 20s before the real timeout to have a chance to respond an empty response before socket timeout
return Math.max(
pollingTimeoutMs - AGENT_POLLING_REQUEST_TIMEOUT_MARGIN_MS,
AGENT_POLLING_REQUEST_TIMEOUT_MARGIN_MS
);
}

export function agentCheckinStateNewActionsFactory() {
// Shared Observables
const agentPolicies$ = new Map<string, Observable<AgentPolicyAction>>();
const newActions$ = createNewActionsSharedObservable();
// Rx operators
const pollingTimeoutMs = appContextService.getConfig()?.fleet.pollingRequestTimeout ?? 0;
const pollingTimeoutMs = getPollingTimeoutMs();

const rateLimiterIntervalMs =
appContextService.getConfig()?.fleet.agentPolicyRolloutRateLimitIntervalMs ??
AGENT_POLICY_ROLLOUT_RATE_LIMIT_INTERVAL_MS;
const rateLimiterRequestPerInterval =
appContextService.getConfig()?.fleet.agentPolicyRolloutRateLimitRequestPerInterval ??
AGENT_POLICY_ROLLOUT_RATE_LIMIT_REQUEST_PER_INTERVAL;
const rateLimiterMaxDelay = Math.min(RATE_LIMIT_MAX_DELAY_MS, pollingTimeoutMs);
const rateLimiterMaxDelay = pollingTimeoutMs;

const rateLimiter = createRateLimiter(
rateLimiterIntervalMs,
Expand All @@ -204,10 +213,7 @@ export function agentCheckinStateNewActionsFactory() {
}

const stream$ = agentPolicy$.pipe(
timeout(
// Set a timeout 3s before the real timeout to have a chance to respond an empty response before socket timeout
Math.max(pollingTimeoutMs - 3000, 3000)
),
timeout(pollingTimeoutMs),
filter(
(action) =>
agent.policy_id !== undefined &&
Expand Down

0 comments on commit 54fa55d

Please sign in to comment.