Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Endpoint Telemetry: Agents Metrics + Policy Config / Response #102171

Merged
merged 37 commits into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
1432e1b
[PH] Initial setup for endpoint task telemetry.
pjhampton Jun 15, 2021
4fa12e3
Refactor / Add daily task for collecting fleet detail / policy resp /…
pjhampton Jun 15, 2021
4c75e55
[PH CD] Code walkthrough. Start fetching fleet policy configs.
pjhampton Jun 15, 2021
92e8e0d
[PH] pass in fleet agent service rather than homebrew kuerys.
pjhampton Jun 16, 2021
26420dc
[PH] prepare to move away from legacy es client. Get fleet ep agents.
pjhampton Jun 16, 2021
e9a3c4c
Fetch agent policy configs.
pjhampton Jun 16, 2021
9a72714
Stub ep policy responses.
pjhampton Jun 16, 2021
f2ddc22
Fix CI + Types. Fix dep injection. Reimagine SO client creation.
pjhampton Jun 17, 2021
5975ed7
Create SO client properly
pjhampton Jun 17, 2021
7b9d43c
Merge branch 'master' into pjhampton/endpoint-telemetry
kibanamachine Jun 17, 2021
7b6a365
Fetch EP Policy responses.
pjhampton Jun 18, 2021
0661f22
Fetch EP Policy responses.
pjhampton Jun 18, 2021
a2308c8
Merge branch 'pjhampton/endpoint-telemetry' of github.com:elastic/kib…
pjhampton Jun 18, 2021
699e068
Remove unused import
pjhampton Jun 18, 2021
b810618
Fetch failed policy responses from EP data stream.
pjhampton Jun 21, 2021
6932470
Remove unused imports.
pjhampton Jun 21, 2021
05e6f6b
Combine failed policy responses with policy configs.
pjhampton Jun 22, 2021
9b4e310
Attach fleet agent + ep agent ids
pjhampton Jun 22, 2021
f9b7e92
Merge branch 'master' into pjhampton/endpoint-telemetry
kibanamachine Jun 22, 2021
4ccd4df
Add dedicated channel sender. Temp disable with feature flag.
pjhampton Jun 23, 2021
1fb9c30
Remove ublock from the failed policy response.
pjhampton Jun 23, 2021
3d7eaa8
Merge branch 'master' into pjhampton/endpoint-telemetry
kibanamachine Jun 23, 2021
841edd6
Fetch endpoint metrics.
pjhampton Jun 23, 2021
3d5c511
Fix bad merge commit.
pjhampton Jun 23, 2021
c590280
Get EP telemetry.
pjhampton Jun 24, 2021
c64ea5c
Merge branch 'master' into pjhampton/endpoint-telemetry
kibanamachine Jun 24, 2021
8e54c0d
Merge branch 'master' into pjhampton/endpoint-telemetry
kibanamachine Jun 24, 2021
0781c54
Merge branch 'master' into pjhampton/endpoint-telemetry
kibanamachine Jun 28, 2021
ee38ae7
Record last execution time of endpoint task
pjhampton Jun 29, 2021
eb75a93
Merge branch 'master' into pjhampton/endpoint-telemetry
kibanamachine Jun 29, 2021
cbd499d
Remove send on demand feature flag.
pjhampton Jun 29, 2021
464db0d
Simplify cache conditional.
pjhampton Jun 29, 2021
4e5804b
Refactor into Promise.allSettled
pjhampton Jun 29, 2021
d769be8
Fix type error.
pjhampton Jun 29, 2021
d88d151
Bail if there is no endpoint metrics
pjhampton Jun 29, 2021
99ceeef
Bump interval to 24h.
pjhampton Jun 29, 2021
c4316b0
Merge branch 'master' into pjhampton/endpoint-telemetry
kibanamachine Jun 30, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@
* 2.0.
*/

import moment from 'moment';
import { loggingSystemMock } from 'src/core/server/mocks';

import { taskManagerMock } from '../../../../task_manager/server/mocks';
import { TaskStatus } from '../../../../task_manager/server';

import { TelemetryDiagTask, TelemetryDiagTaskConstants } from './task';
import { TelemetryDiagTask, TelemetryDiagTaskConstants } from './diagnostic_task';
import { createMockTelemetryEventsSender, MockTelemetryDiagnosticTask } from './mocks';

describe('test', () => {
Expand All @@ -22,7 +19,7 @@ describe('test', () => {
});

describe('basic diagnostic alert telemetry sanity checks', () => {
test('task can register', () => {
test('diagnostic task can register', () => {
const telemetryDiagTask = new TelemetryDiagTask(
logger,
taskManagerMock.createSetup(),
Expand All @@ -40,7 +37,7 @@ describe('test', () => {
expect(mockTaskManager.registerTaskDefinitions).toHaveBeenCalled();
});

test('task should be scheduled', async () => {
test('diagnostic task should be scheduled', async () => {
const mockTaskManagerSetup = taskManagerMock.createSetup();
const telemetryDiagTask = new TelemetryDiagTask(
logger,
Expand All @@ -53,7 +50,7 @@ describe('test', () => {
expect(mockTaskManagerStart.ensureScheduled).toHaveBeenCalled();
});

test('task should run', async () => {
test('diagnostic task should run', async () => {
const mockContext = createMockTelemetryEventsSender(true);
const mockTaskManager = taskManagerMock.createSetup();
const telemetryDiagTask = new MockTelemetryDiagnosticTask(logger, mockTaskManager, mockContext);
Expand All @@ -79,7 +76,7 @@ describe('test', () => {
expect(telemetryDiagTask.runTask).toHaveBeenCalled();
});

test('task should not query elastic if telemetry is not opted in', async () => {
test('diagnostic task should not query elastic if telemetry is not opted in', async () => {
const mockSender = createMockTelemetryEventsSender(false);
const mockTaskManager = taskManagerMock.createSetup();
new MockTelemetryDiagnosticTask(logger, mockTaskManager, mockSender);
Expand All @@ -104,48 +101,4 @@ describe('test', () => {
await taskRunner.run();
expect(mockSender.fetchDiagnosticAlerts).not.toHaveBeenCalled();
});

test('test -5 mins is returned when there is no previous task run', async () => {
const telemetryDiagTask = new TelemetryDiagTask(
logger,
taskManagerMock.createSetup(),
createMockTelemetryEventsSender(true)
);

const executeTo = moment().utc().toISOString();
const executeFrom = undefined;
const newExecuteFrom = telemetryDiagTask.getLastExecutionTimestamp(executeTo, executeFrom);

expect(newExecuteFrom).toEqual(moment(executeTo).subtract(5, 'minutes').toISOString());
});

test('test -6 mins is returned when there was a previous task run', async () => {
const telemetryDiagTask = new TelemetryDiagTask(
logger,
taskManagerMock.createSetup(),
createMockTelemetryEventsSender(true)
);

const executeTo = moment().utc().toISOString();
const executeFrom = moment(executeTo).subtract(6, 'minutes').toISOString();
const newExecuteFrom = telemetryDiagTask.getLastExecutionTimestamp(executeTo, executeFrom);

expect(newExecuteFrom).toEqual(executeFrom);
});

// it's possible if Kibana is down for a prolonged period the stored lastRun would have drifted
// if that is the case we will just roll it back to a 10 min search window
test('test 10 mins is returned when previous task run took longer than 10 minutes', async () => {
const telemetryDiagTask = new TelemetryDiagTask(
logger,
taskManagerMock.createSetup(),
createMockTelemetryEventsSender(true)
);

const executeTo = moment().utc().toISOString();
const executeFrom = moment(executeTo).subtract(142, 'minutes').toISOString();
const newExecuteFrom = telemetryDiagTask.getLastExecutionTimestamp(executeTo, executeFrom);

expect(newExecuteFrom).toEqual(moment(executeTo).subtract(10, 'minutes').toISOString());
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
TaskManagerSetupContract,
TaskManagerStartContract,
} from '../../../../task_manager/server';
import { getLastTaskExecutionTimestamp } from './helpers';
pjhampton marked this conversation as resolved.
Show resolved Hide resolved
import { TelemetryEventsSender, TelemetryEvent } from './sender';

export const TelemetryDiagTaskConstants = {
Expand Down Expand Up @@ -43,7 +44,7 @@ export class TelemetryDiagTask {
return {
run: async () => {
const executeTo = moment().utc().toISOString();
const executeFrom = this.getLastExecutionTimestamp(
const executeFrom = getLastTaskExecutionTimestamp(
executeTo,
taskInstance.state?.lastExecutionTimestamp
);
Expand All @@ -64,20 +65,6 @@ export class TelemetryDiagTask {
});
}

public getLastExecutionTimestamp(executeTo: string, lastExecutionTimestamp?: string) {
if (lastExecutionTimestamp === undefined) {
this.logger.debug(`No last execution timestamp defined`);
return moment(executeTo).subtract(5, 'minutes').toISOString();
}

if (moment(executeTo).diff(lastExecutionTimestamp, 'minutes') >= 10) {
this.logger.debug(`last execution timestamp was greater than 10 minutes`);
return moment(executeTo).subtract(10, 'minutes').toISOString();
}

return lastExecutionTimestamp;
}

public start = async (taskManager: TaskManagerStartContract) => {
try {
await taskManager.ensureScheduled({
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { loggingSystemMock } from 'src/core/server/mocks';
import { taskManagerMock } from '../../../../task_manager/server/mocks';
import { TelemetryEndpointTask } from './endpoint_task';
import { createMockTelemetryEventsSender } from './mocks';

describe('test', () => {
pjhampton marked this conversation as resolved.
Show resolved Hide resolved
let logger: ReturnType<typeof loggingSystemMock.createLogger>;

beforeEach(() => {
logger = loggingSystemMock.createLogger();
});

describe('endpoint alert telemetry checks', () => {
test('the task can register', () => {
const telemetryEndpointTask = new TelemetryEndpointTask(
logger,
taskManagerMock.createSetup(),
createMockTelemetryEventsSender(true)
);

expect(telemetryEndpointTask).toBeInstanceOf(TelemetryEndpointTask);
});
});

test('the endpoint task should be registered', () => {
const mockTaskManager = taskManagerMock.createSetup();
new TelemetryEndpointTask(logger, mockTaskManager, createMockTelemetryEventsSender(true));

expect(mockTaskManager.registerTaskDefinitions).toHaveBeenCalled();
});

test('the endpoint task should be scheduled', async () => {
const mockTaskManagerSetup = taskManagerMock.createSetup();
const telemetryEndpointTask = new TelemetryEndpointTask(
logger,
mockTaskManagerSetup,
createMockTelemetryEventsSender(true)
);

const mockTaskManagerStart = taskManagerMock.createStart();
await telemetryEndpointTask.start(mockTaskManagerStart);
expect(mockTaskManagerStart.ensureScheduled).toHaveBeenCalled();
});
});
197 changes: 197 additions & 0 deletions x-pack/plugins/security_solution/server/lib/telemetry/endpoint_task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import moment from 'moment';
import { Logger } from 'src/core/server';
import {
ConcreteTaskInstance,
TaskManagerSetupContract,
TaskManagerStartContract,
} from '../../../../task_manager/server';
import { TelemetryEventsSender } from './sender';
import { FullAgentPolicyInput } from '../../../../fleet/common/types/models/agent_policy';
import {
EndpointMetricsAggregation,
EndpointPolicyResponseAggregation,
EndpointPolicyResponseDocument,
FleetAgentCacheItem,
} from './types';

export const TelemetryEndpointTaskConstants = {
TIMEOUT: '5m',
TYPE: 'security:endpoint-meta-telemetry',
INTERVAL: '24h',
VERSION: '1.0.0',
};

export class TelemetryEndpointTask {
private readonly logger: Logger;
private readonly sender: TelemetryEventsSender;

constructor(
logger: Logger,
taskManager: TaskManagerSetupContract,
sender: TelemetryEventsSender
) {
this.logger = logger;
this.sender = sender;

taskManager.registerTaskDefinitions({
[TelemetryEndpointTaskConstants.TYPE]: {
title: 'Security Solution Telemetry Endpoint Metrics and Info task',
timeout: TelemetryEndpointTaskConstants.TIMEOUT,
createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => {
const { state } = taskInstance;

return {
run: async () => {
const lastExecutionTimestamp = moment().utc().toISOString();
pjhampton marked this conversation as resolved.
Show resolved Hide resolved

const hits = await this.runTask(taskInstance.id);
this.logger.debug(`hits: ${hits}`);

return {
state: {
lastExecutionTimestamp,
runs: (state.runs || 0) + 1,
},
};
},
cancel: async () => {},
};
},
},
});
}

public start = async (taskManager: TaskManagerStartContract) => {
try {
await taskManager.ensureScheduled({
id: this.getTaskId(),
taskType: TelemetryEndpointTaskConstants.TYPE,
scope: ['securitySolution'],
schedule: {
interval: TelemetryEndpointTaskConstants.INTERVAL,
},
state: { runs: 0 },
params: { version: TelemetryEndpointTaskConstants.VERSION },
});
} catch (e) {
this.logger.error(`Error scheduling task, received ${e.message}`);
}
};

private getTaskId = (): string => {
return `${TelemetryEndpointTaskConstants.TYPE}:${TelemetryEndpointTaskConstants.VERSION}`;
pjhampton marked this conversation as resolved.
Show resolved Hide resolved
};

public runTask = async (taskId: string) => {
pjhampton marked this conversation as resolved.
Show resolved Hide resolved
if (taskId !== this.getTaskId()) {
this.logger.debug(`Outdated task running: ${taskId}`);
return 0;
}

const isOptedIn = await this.sender.isTelemetryOptedIn();
if (!isOptedIn) {
this.logger.debug(`Telemetry is not opted-in.`);
return 0;
}

const {
body: endpointMetricsResponse,
} = ((await this.sender.fetchEndpointMetrics()) as unknown) as {
body: EndpointMetricsAggregation;
};
const endpointMetrics = endpointMetricsResponse.aggregations.endpoint_agents.buckets.map(
(epMetrics) => {
return {
endpoint_agent: epMetrics.latest_metrics.hits.hits[0]._source.agent.id,
pjhampton marked this conversation as resolved.
Show resolved Hide resolved
endpoint_metrics: epMetrics.latest_metrics.hits.hits[0]._source,
};
}
);

if (endpointMetrics.length === 0) {
this.logger.debug('no reported endpoint metrics');
return 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would there ever be a case wherein we have a policy failure but no installed endpoints? Would we want to submit a endpoint-metadata payload for that day so that we'd know Cluster X has an agent with a failed endpoint install?

Copy link
Contributor Author

@pjhampton pjhampton Jun 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible - yeah. The reasoning behind this is that there should be at least one running endpoint agent to consider it a cluster of interest.

However, to capture these both in one scoop that way is algorithmically awkward (what we have now) and the security team are going to have a bad time maintaining / scaling it. The reason for this is that we will have a lot of very nested ifs and conditional branching which is hard to track. If we want to make sure we always capture these we should split these into 2 separate telemetry docs and send to 2 dedicated channels:

Metrics / Policy Document

[
  {
    "agent_id": "****-****-****-****",
    "endpoint_id": "****-****-****-****",
    "metrics": {
      ...
     },
    "policy_config": {
     ...
    } 
  }
]

Policy Repsonse

[
  {
    "seen_times": 44,
     "config": {
      ...
     },
    "response": {
     ...
    } 
  }
]

We could still do it in the one task. This would remove a lot of complexity from the code and be easier to maintain. It will also prevent us from sending the same config / response many times - particularly if a user's fleet is having a bad day. I'm not sure the policy response adds much to the metrics. I think separating is the best course of action. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After meditating on this for a while, I think we can do this in a more efficient way - but it's not going to be much better to maintain. I will implement it this afternoon.

}

const agentsResponse = await this.sender.fetchFleetAgents();
if (agentsResponse === undefined) {
this.logger.debug('no agents to report');
return 0;
}

const fleetAgents = agentsResponse?.agents.reduce((cache, agent) => {
cache.set(agent.id, { policy_id: agent.policy_id, policy_version: agent.policy_revision });
return cache;
}, new Map<string, FleetAgentCacheItem>());

const endpointPolicyCache = new Map<string, FullAgentPolicyInput>();
for (const policyInfo of fleetAgents.values()) {
if (policyInfo.policy_id !== null && policyInfo.policy_id !== undefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a personal preference here, but might make it a bit easier to read intent if we store the boolean logic in a variable

const shouldCachePolicy = 
	policyInfo.policy_id !== null &&
	policyInfo.policy_id !== undefined &&
	!endpointPolicyCache.has(policyInfo.policy_id)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that is slick. Thanks for the feedback!

if (!endpointPolicyCache.has(policyInfo.policy_id)) {
const packagePolicies = await this.sender.fetchEndpointPolicyConfigs(
policyInfo.policy_id
);
packagePolicies?.inputs.forEach((input) => {
if (input.type === 'endpoint' && policyInfo.policy_id !== undefined) {
endpointPolicyCache.set(policyInfo.policy_id, input);
}
});
}
}
}

const {
body: failedPolicyResponses,
} = ((await this.sender.fetchFailedEndpointPolicyResponses()) as unknown) as {
body: EndpointPolicyResponseAggregation;
pjhampton marked this conversation as resolved.
Show resolved Hide resolved
};
const policyResponses = failedPolicyResponses.aggregations.policy_responses.buckets.reduce(
(cache, bucket) => {
const doc = bucket.latest_response.hits.hits[0];
cache.set(bucket.key, doc);
return cache;
},
new Map<string, EndpointPolicyResponseDocument>()
);

const telemetryPayloads = endpointMetrics.map((endpoint) => {
let policyConfig = null;
let failedPolicy = null;

const fleetAgentId = endpoint.endpoint_metrics.elastic.agent.id;
const endpointAgentId = endpoint.endpoint_agent;

const policyInformation = fleetAgents.get(fleetAgentId);
if (policyInformation?.policy_id) {
policyConfig = endpointPolicyCache.get(policyInformation?.policy_id);
if (policyConfig) {
failedPolicy = policyResponses.get(policyConfig?.id);
}
}

return {
agent_id: fleetAgentId,
endpoint_id: endpointAgentId,
endpoint_metrics: {
os: endpoint.endpoint_metrics.host.os,
cpu: endpoint.endpoint_metrics.Endpoint.metrics.cpu,
memory: endpoint.endpoint_metrics.Endpoint.metrics.memory,
uptime: endpoint.endpoint_metrics.Endpoint.metrics.uptime,
},
policy_config: policyConfig,
policy_failure: failedPolicy,
};
});

// Feature flag disabling channel send for now
this.sender.sendOnDemand('endpoint-metadata', telemetryPayloads, true);
return telemetryPayloads.length;
};
}
Loading