Skip to content

Commit

Permalink
[Monitoring] Read from metricbeat-* for ES node_stats (#76015) (#76963)
Browse files Browse the repository at this point in the history
* Read from metricbeat-* for node_stats

* Fix tests

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
chrisronline and elasticmachine authored Sep 8, 2020
1 parent f5d89b9 commit 8bde2b3
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 16 deletions.
3 changes: 3 additions & 0 deletions x-pack/plugins/monitoring/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ describe('config schema', () => {
"index": "filebeat-*",
},
"max_bucket_size": 10000,
"metricbeat": Object {
"index": "metricbeat-*",
},
"min_interval_seconds": 10,
"show_license_expiration": true,
},
Expand Down
3 changes: 3 additions & 0 deletions x-pack/plugins/monitoring/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ export const configSchema = schema.object({
logs: schema.object({
index: schema.string({ defaultValue: 'filebeat-*' }),
}),
metricbeat: schema.object({
index: schema.string({ defaultValue: 'metricbeat-*' }),
}),
max_bucket_size: schema.number({ defaultValue: 10000 }),
elasticsearch: monitoringElasticsearchConfigSchema,
container: schema.object({
Expand Down
21 changes: 18 additions & 3 deletions x-pack/plugins/monitoring/server/lib/ccs_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,21 @@
*/
import { isFunction, get } from 'lodash';

export function appendMetricbeatIndex(config, indexPattern) {
// Leverage this function to also append the dynamic metricbeat index too
let mbIndex = null;
// TODO: NP
// This function is called with both NP config and LP config
if (isFunction(config.get)) {
mbIndex = config.get('monitoring.ui.metricbeat.index');
} else {
mbIndex = get(config, 'monitoring.ui.metricbeat.index');
}

const newIndexPattern = `${indexPattern},${mbIndex}`;
return newIndexPattern;
}

/**
* Prefix all comma separated index patterns within the original {@code indexPattern}.
*
Expand All @@ -27,18 +42,18 @@ export function prefixIndexPattern(config, indexPattern, ccs) {
}

if (!ccsEnabled || !ccs) {
return indexPattern;
return appendMetricbeatIndex(config, indexPattern);
}

const patterns = indexPattern.split(',');
const prefixedPattern = patterns.map((pattern) => `${ccs}:${pattern}`).join(',');

// if a wildcard is used, then we also want to search the local indices
if (ccs === '*') {
return `${prefixedPattern},${indexPattern}`;
return appendMetricbeatIndex(config, `${prefixedPattern},${indexPattern}`);
}

return prefixedPattern;
return appendMetricbeatIndex(config, prefixedPattern);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/monitoring/server/lib/create_query.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export function createQuery(options) {

let typeFilter;
if (type) {
typeFilter = { term: { type } };
typeFilter = { bool: { should: [{ term: { type } }, { term: { 'metricset.name': type } }] } };
}

let clusterUuidFilter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,23 @@ export function handleResponse(clusterState, shardStats, nodeUuid) {
return (response) => {
let nodeSummary = {};
const nodeStatsHits = get(response, 'hits.hits', []);
const nodes = nodeStatsHits.map((hit) => hit._source.source_node); // using [0] value because query results are sorted desc per timestamp
const nodes = nodeStatsHits.map((hit) =>
get(hit, '_source.elasticsearch.node', hit._source.source_node)
); // using [0] value because query results are sorted desc per timestamp
const node = nodes[0] || getDefaultNodeFromId(nodeUuid);
const sourceStats = get(response, 'hits.hits[0]._source.node_stats');
const sourceStats =
get(response, 'hits.hits[0]._source.elasticsearch.node.stats') ||
get(response, 'hits.hits[0]._source.node_stats');
const clusterNode = get(clusterState, ['nodes', nodeUuid]);
const stats = {
resolver: nodeUuid,
node_ids: nodes.map((node) => node.uuid),
node_ids: nodes.map((node) => node.id || node.uuid),
attributes: node.attributes,
transport_address: node.transport_address,
transport_address: get(
response,
'hits.hits[0]._source.service.address',
node.transport_address
),
name: node.name,
type: node.type,
};
Expand All @@ -45,10 +53,17 @@ export function handleResponse(clusterState, shardStats, nodeUuid) {
totalShards: _shardStats.shardCount,
indexCount: _shardStats.indexCount,
documents: get(sourceStats, 'indices.docs.count'),
dataSize: get(sourceStats, 'indices.store.size_in_bytes'),
freeSpace: get(sourceStats, 'fs.total.available_in_bytes'),
totalSpace: get(sourceStats, 'fs.total.total_in_bytes'),
usedHeap: get(sourceStats, 'jvm.mem.heap_used_percent'),
dataSize:
get(sourceStats, 'indices.store.size_in_bytes') ||
get(sourceStats, 'indices.store.size.bytes'),
freeSpace:
get(sourceStats, 'fs.total.available_in_bytes') ||
get(sourceStats, 'fs.summary.available.bytes'),
totalSpace:
get(sourceStats, 'fs.total.total_in_bytes') || get(sourceStats, 'fs.summary.total.bytes'),
usedHeap:
get(sourceStats, 'jvm.mem.heap_used_percent') ||
get(sourceStats, 'jvm.mem.heap.used.pct'),
status: i18n.translate('xpack.monitoring.es.nodes.onlineStatusLabel', {
defaultMessage: 'Online',
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export async function getNodeIds(req, indexPattern, { clusterUuid }, size) {
filterPath: ['aggregations.composite_data.buckets'],
body: {
query: createQuery({
type: 'node_stats',
start,
end,
metric: ElasticsearchMetric.getMetricFields(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ export async function getNodes(req, esIndexPattern, pageOfNodes, clusterStats, n
},
filterPath: [
'hits.hits._source.source_node',
'hits.hits._source.elasticsearch.node',
'aggregations.nodes.buckets.key',
...LISTING_METRICS_PATHS,
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,29 @@ export function mapNodesInfo(nodeHits, clusterStats, nodesShardCount) {
const clusterState = get(clusterStats, 'cluster_state', { nodes: {} });

return nodeHits.reduce((prev, node) => {
const sourceNode = get(node, '_source.source_node');
const sourceNode = get(node, '_source.source_node') || get(node, '_source.elasticsearch.node');

const calculatedNodeType = calculateNodeType(sourceNode, get(clusterState, 'master_node'));
const { nodeType, nodeTypeLabel, nodeTypeClass } = getNodeTypeClassLabel(
sourceNode,
calculatedNodeType
);
const isOnline = !isUndefined(get(clusterState, ['nodes', sourceNode.uuid]));
const isOnline = !isUndefined(get(clusterState, ['nodes', sourceNode.uuid || sourceNode.id]));

return {
...prev,
[sourceNode.uuid]: {
[sourceNode.uuid || sourceNode.id]: {
name: sourceNode.name,
transport_address: sourceNode.transport_address,
type: nodeType,
isOnline,
nodeTypeLabel: nodeTypeLabel,
nodeTypeClass: nodeTypeClass,
shardCount: get(nodesShardCount, `nodes[${sourceNode.uuid}].shardCount`, 0),
shardCount: get(
nodesShardCount,
`nodes[${sourceNode.uuid || sourceNode.id}].shardCount`,
0
),
},
};
}, {});
Expand Down

0 comments on commit 8bde2b3

Please sign in to comment.