Skip to content

Commit

Permalink
Fleet Usage telemetry extension (elastic#145353)
Browse files Browse the repository at this point in the history
## Summary

Closes elastic/ingest-dev#1261

Added a snippet to the telemetry that I added for each requirement.
Please review and let me know if any changes are needed.
Also asked a few questions below. @jlind23 @kpollich 

6. is blocked by [elasticsearch
change](elastic/elasticsearch#91701) to give
kibana_system the missing privilege to read logs-elastic_agent* indices.

Took inspiration for task versioning from
https://github.com/elastic/kibana/pull/144494/files#diff-0c7c49bf5c55c45c19e9c42d5428e99e52c3a39dd6703633f427724d36108186

- [x] 1. Elastic Agent versions
Versions of all the Elastic Agent running: `agent.version` field on
`.fleet-agents` documents

```
"agent_versions": [
    "8.6.0"
  ],
```

- [x] 2. Fleet server configuration
Think we can query for `.fleet-policies` where some `input` has `type:
'fleet-server'` for this, as well as use the `Fleet Server Hosts`
settings that we define via saved objects in Fleet


```
  "fleet_server_config": {
    "policies": [
      {
        "input_config": {
          "server": {
            "limits.max_agents": 10000
          },
          "server.runtime": "gc_percent:20"
        }
      }
    ]
  }
```

- [x] 3. Number of policies
Count of `.fleet-policies` index 

To confirm, did we mean agent policies here?

```
 "agent_policies": {
    "count": 7,
```

- [x] 4. Output type contained in those policies
Collecting this from ts logic, querying from `.fleet-policies` index.
The alternative would be to write a painless script (because the
`outputs` are an object with dynamic keys, we can't do an aggregation
directly).

```
"agent_policies": {
    "output_types": [
      "elasticsearch"
    ]
  }
```

Did we mean to just collect the types here, or any other info? e.g.
output urls

- [x] 5. Average number of checkin failures
We only have the most recent checkin status and timestamp on
`.fleet-agents`.

Do we mean here to publish the total last checkin failure count? E.g. 3
if 3 agents are in failure checkin status currently.
Or do we mean to publish specific info for all agents
(`last_checkin_status`, `last_checkin` time, `last_checkin_message`)?
Are the only statuses `error` and `degraded` that we want to send?

```
  "agent_last_checkin_status": {
    "error": 0,
    "degraded": 0
  },
```

- [ ] 6. Top 3 most common errors in the Elastic Agent logs

Do we mean here elastic-agent logs only, or fleet-server logs as well
(maybe separately)?

I found an alternative way to query the message field using sampler and
categorize text aggregation:
```
GET logs-elastic_agent*/_search
{
    "size": 0,
    "query": {
        "bool": {
            "must": [
                {
                    "term": {
                        "log.level": "error"
                    }
                },
                {
                    "range": {
                        "@timestamp": {
                            "gte": "now-1h"
                        }
                    }
                }
            ]
        }
    },
    "aggregations": {
        "message_sample": {
            "sampler": {
                "shard_size": 200
            },
            "aggs": {
                "categories": {
                    "categorize_text": {
                        "field": "message",
                        "size": 10
                    }
                }
            }
        }
    }
}
```
Example response:
```
"aggregations": {
    "message_sample": {
      "doc_count": 112,
      "categories": {
        "buckets": [
          {
            "doc_count": 73,
            "key": "failed to unenroll offline agents",
            "regex": ".*?failed.+?to.+?unenroll.+?offline.+?agents.*?",
            "max_matching_length": 36
          },
          {
            "doc_count": 7,
            "key": """stderr panic close of closed channel n ngoroutine running Stop ngithub.com/elastic/beats/v7/libbeat/cmd/instance Beat launch.func5 \n\t/go/src/github.com/elastic/beats/libbeat/cmd/instance/beat.go n
```


- [x] 7.  Number of checkin failure over the past period of time

I think this is almost the same as #5. The difference would be to report
new failures happened only in the last hour, or report all agents in
failure state. (which would be an increasing number if the agent stays
in failed state).
Do we want these 2 separate telemetry fields?

EDIT: removed the last1hr query, instead added a new field to report
agents enrolled per policy (top 10). See comments below.

```
  "agent_checkin_status": {
    "error": 3,
    "degraded": 0
  },
  "agents_per_policy": [2, 1000],
```

- [x] 8. Number of Elastic Agent and number of fleet server

This is already there in the existing telemetry:
```
  "agents": {
    "total_enrolled": 0,
    "healthy": 0,
    "unhealthy": 0,
    "offline": 0,
    "total_all_statuses": 1,
    "updating": 0
  },
  "fleet_server": {
    "total_enrolled": 0,
    "healthy": 0,
    "unhealthy": 0,
    "offline": 0,
    "updating": 0,
    "total_all_statuses": 0,
    "num_host_urls": 1
  },
```




### Checklist

- [ ] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
  • Loading branch information
juliaElastic and kibanamachine authored Nov 23, 2022
1 parent f1cdc08 commit e00e26e
Show file tree
Hide file tree
Showing 10 changed files with 783 additions and 204 deletions.
85 changes: 83 additions & 2 deletions x-pack/plugins/fleet/server/collectors/agent_collectors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@

import type { SavedObjectsClient, ElasticsearchClient } from '@kbn/core/server';

import type { FleetConfigType } from '../../common/types';
import { AGENTS_INDEX } from '../../common';
import * as AgentService from '../services/agents';
import { appContextService } from '../services';

export interface AgentUsage {
total_enrolled: number;
Expand All @@ -20,7 +21,6 @@ export interface AgentUsage {
}

export const getAgentUsage = async (
config: FleetConfigType,
soClient?: SavedObjectsClient,
esClient?: ElasticsearchClient
): Promise<AgentUsage> => {
Expand All @@ -47,3 +47,84 @@ export const getAgentUsage = async (
updating,
};
};

export interface AgentData {
agent_versions: string[];
agent_checkin_status: {
error: number;
degraded: number;
};
agents_per_policy: number[];
}

const DEFAULT_AGENT_DATA = {
agent_versions: [],
agent_checkin_status: { error: 0, degraded: 0 },
agents_per_policy: [],
};

export const getAgentData = async (
esClient: ElasticsearchClient,
abortController: AbortController
): Promise<AgentData> => {
try {
const transformLastCheckinStatusBuckets = (resp: any) =>
((resp?.aggregations?.last_checkin_status as any).buckets ?? []).reduce(
(acc: any, bucket: any) => {
if (acc[bucket.key] !== undefined) acc[bucket.key] = bucket.doc_count;
return acc;
},
{ error: 0, degraded: 0 }
);
const response = await esClient.search(
{
index: AGENTS_INDEX,
query: {
bool: {
filter: [
{
term: {
active: 'true',
},
},
],
},
},
size: 0,
aggs: {
versions: {
terms: { field: 'agent.version' },
},
last_checkin_status: {
terms: { field: 'last_checkin_status' },
},
policies: {
terms: { field: 'policy_id' },
},
},
},
{ signal: abortController.signal }
);
const versions = ((response?.aggregations?.versions as any).buckets ?? []).map(
(bucket: any) => bucket.key
);
const statuses = transformLastCheckinStatusBuckets(response);

const agentsPerPolicy = ((response?.aggregations?.policies as any).buckets ?? []).map(
(bucket: any) => bucket.doc_count
);

return {
agent_versions: versions,
agent_checkin_status: statuses,
agents_per_policy: agentsPerPolicy,
};
} catch (error) {
if (error.statusCode === 404) {
appContextService.getLogger().debug('Index .fleet-agents does not exist yet.');
} else {
throw error;
}
return DEFAULT_AGENT_DATA;
}
};
61 changes: 61 additions & 0 deletions x-pack/plugins/fleet/server/collectors/agent_policies.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { ElasticsearchClient } from '@kbn/core/server';

import { AGENT_POLICY_INDEX } from '../../common';
import { ES_SEARCH_LIMIT } from '../../common/constants';
import { appContextService } from '../services';

export interface AgentPoliciesUsage {
count: number;
output_types: string[];
}

const DEFAULT_AGENT_POLICIES_USAGE = {
count: 0,
output_types: [],
};

export const getAgentPoliciesUsage = async (
esClient: ElasticsearchClient,
abortController: AbortController
): Promise<AgentPoliciesUsage> => {
try {
const res = await esClient.search(
{
index: AGENT_POLICY_INDEX,
size: ES_SEARCH_LIMIT,
track_total_hits: true,
rest_total_hits_as_int: true,
},
{ signal: abortController.signal }
);

const agentPolicies = res.hits.hits;

const outputTypes = new Set<string>();
agentPolicies.forEach((item) => {
const source = (item._source as any) ?? {};
Object.keys(source.data.outputs).forEach((output) => {
outputTypes.add(source.data.outputs[output].type);
});
});

return {
count: res.hits.total as number,
output_types: Array.from(outputTypes),
};
} catch (error) {
if (error.statusCode === 404) {
appContextService.getLogger().debug('Index .fleet-policies does not exist yet.');
} else {
throw error;
}
return DEFAULT_AGENT_POLICIES_USAGE;
}
};
46 changes: 46 additions & 0 deletions x-pack/plugins/fleet/server/collectors/fleet_server_collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import type { SavedObjectsClient, ElasticsearchClient } from '@kbn/core/server';

import { PACKAGE_POLICY_SAVED_OBJECT_TYPE, SO_SEARCH_LIMIT } from '../constants';

import { packagePolicyService } from '../services';
import { getAgentStatusForAgentPolicy } from '../services/agents';
import { listFleetServerHosts } from '../services/fleet_server_host';
Expand Down Expand Up @@ -84,3 +86,47 @@ export const getFleetServerUsage = async (
num_host_urls: numHostsUrls,
};
};

export const getFleetServerConfig = async (soClient: SavedObjectsClient): Promise<any> => {
const res = await packagePolicyService.list(soClient, {
page: 1,
perPage: SO_SEARCH_LIMIT,
kuery: `${PACKAGE_POLICY_SAVED_OBJECT_TYPE}.package.name:fleet_server`,
});
const getInputConfig = (item: any) => {
const config = (item.inputs[0] ?? {}).compiled_input;
if (config?.server) {
// whitelist only server limits, timeouts and runtime, sometimes fields are coming in "server.limits" format instead of nested object
const newConfig = Object.keys(config)
.filter((key) => key.startsWith('server'))
.reduce((acc: any, curr: string) => {
if (curr === 'server') {
acc.server = {};
Object.keys(config.server)
.filter(
(key) =>
key.startsWith('limits') ||
key.startsWith('timeouts') ||
key.startsWith('runtime')
)
.forEach((serverKey: string) => {
acc.server[serverKey] = config.server[serverKey];
return acc;
});
} else {
acc[curr] = config[curr];
}
return acc;
}, {});

return newConfig;
} else {
return {};
}
};
const policies = res.items.map((item) => ({
input_config: getInputConfig(item),
}));

return { policies };
};
33 changes: 28 additions & 5 deletions x-pack/plugins/fleet/server/collectors/register.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import type { CoreSetup } from '@kbn/core/server';
import type { FleetConfigType } from '..';

import { getIsAgentsEnabled } from './config_collectors';
import { getAgentUsage } from './agent_collectors';
import { getAgentUsage, getAgentData } from './agent_collectors';
import type { AgentUsage } from './agent_collectors';
import { getInternalClients } from './helpers';
import { getPackageUsage } from './package_collectors';
import type { PackageUsage } from './package_collectors';
import { getFleetServerUsage } from './fleet_server_collector';
import { getFleetServerUsage, getFleetServerConfig } from './fleet_server_collector';
import type { FleetServerUsage } from './fleet_server_collector';
import { getAgentPoliciesUsage } from './agent_policies';

export interface Usage {
agents_enabled: boolean;
Expand All @@ -26,11 +27,33 @@ export interface Usage {
fleet_server: FleetServerUsage;
}

export const fetchUsage = async (core: CoreSetup, config: FleetConfigType) => {
export const fetchFleetUsage = async (
core: CoreSetup,
config: FleetConfigType,
abortController: AbortController
) => {
const [soClient, esClient] = await getInternalClients(core);
if (!soClient || !esClient) {
return;
}
const usage = {
agents_enabled: getIsAgentsEnabled(config),
agents: await getAgentUsage(soClient, esClient),
fleet_server: await getFleetServerUsage(soClient, esClient),
packages: await getPackageUsage(soClient),
...(await getAgentData(esClient, abortController)),
fleet_server_config: await getFleetServerConfig(soClient),
agent_policies: await getAgentPoliciesUsage(esClient, abortController),
};
return usage;
};

// used by kibana daily collector
const fetchUsage = async (core: CoreSetup, config: FleetConfigType) => {
const [soClient, esClient] = await getInternalClients(core);
const usage = {
agents_enabled: getIsAgentsEnabled(config),
agents: await getAgentUsage(config, soClient, esClient),
agents: await getAgentUsage(soClient, esClient),
fleet_server: await getFleetServerUsage(soClient, esClient),
packages: await getPackageUsage(soClient),
};
Expand All @@ -41,7 +64,7 @@ export const fetchAgentsUsage = async (core: CoreSetup, config: FleetConfigType)
const [soClient, esClient] = await getInternalClients(core);
const usage = {
agents_enabled: getIsAgentsEnabled(config),
agents: await getAgentUsage(config, soClient, esClient),
agents: await getAgentUsage(soClient, esClient),
fleet_server: await getFleetServerUsage(soClient, esClient),
};
return usage;
Expand Down
Loading

0 comments on commit e00e26e

Please sign in to comment.