Skip to content

Commit

Permalink
[BUG] fix healthcheck logic to expect object and return ids (opensear…
Browse files Browse the repository at this point in the history
…ch-project#2277)

Original implementation incorrectly assumed the return list of
nodes was an object array.
This PR:
opensearch-project#2232

Addressed the return but didn't catch the nodes.find in the return
which is a function for an array.

Also, refactors to return a list of node_ids because the original
implementation indicated that it should but it can return node ids
but it never did. It only returned `null` or `_local`, the problem
with this approach is that it doesn't expect valid node version
with different DIs or filter out nodes when we pass `null` as the node
ID for the node info call because it was fan out the request to all
nodes.

Now this function will return `_local` if all the nodes share the
same cluster_id using a greedy approach since we can assume it is
all the same version.

Will return node ids, if the cluster_id are different so it will
pass a CSV to the node info call and return the info for those nodes.
And null if no cluster_id is present, ie, fan out the response.

Original issue:
opensearch-project#2203

Signed-off-by: Kawika Avilla <kavilla414@gmail.com>
Signed-off-by: Sergey V. Osipov <sipopo@yandex.ru>
  • Loading branch information
kavilla authored and sipopo committed Sep 14, 2022
1 parent 176d7b3 commit a813370
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -253,18 +253,18 @@ describe('pollOpenSearchNodesVersion', () => {
it('returns compatibility results and isCompatible=true with filters', (done) => {
expect.assertions(2);
const target = {
id: '0',
cluster_id: '0',
attribute: 'foo',
};
const filter = {
id: '1',
cluster_id: '1',
attribute: 'bar',
};

// will filter out every odd index
const nodes = createNodesWithAttribute(
target.id,
filter.id,
target.cluster_id,
filter.cluster_id,
target.attribute,
filter.attribute,
'5.1.0',
Expand All @@ -273,14 +273,18 @@ describe('pollOpenSearchNodesVersion', () => {
'5.1.1-Beta1'
);

const filteredNodes = nodes;
delete filteredNodes.nodes['node-1'];
delete filteredNodes.nodes['node-3'];

// @ts-expect-error we need to return an incompatible type to use the testScheduler here
internalClient.cluster.state.mockReturnValueOnce({ body: nodes });

nodeInfosSuccessOnce(nodes);

pollOpenSearchNodesVersion({
internalClient,
optimizedHealthcheck: { id: target.id, filters: { custom_attribute: filter.attribute } },
optimizedHealthcheck: { id: 'cluster_id', filters: { custom_attribute: filter.attribute } },
opensearchVersionCheckInterval: 1,
ignoreVersionMismatch: false,
opensearchDashboardsVersion: OPENSEARCH_DASHBOARDS_VERSION,
Expand All @@ -290,7 +294,7 @@ describe('pollOpenSearchNodesVersion', () => {
.subscribe({
next: (result) => {
expect(result).toEqual(
mapNodesVersionCompatibility(nodes, OPENSEARCH_DASHBOARDS_VERSION, false)
mapNodesVersionCompatibility(filteredNodes, OPENSEARCH_DASHBOARDS_VERSION, false)
);
expect(result.isCompatible).toBe(true);
},
Expand All @@ -302,18 +306,18 @@ describe('pollOpenSearchNodesVersion', () => {
it('returns compatibility results and isCompatible=false with filters', (done) => {
expect.assertions(2);
const target = {
id: '0',
cluster_id: '0',
attribute: 'foo',
};
const filter = {
id: '1',
cluster_id: '1',
attribute: 'bar',
};

// will filter out every odd index
const nodes = createNodesWithAttribute(
target.id,
filter.id,
target.cluster_id,
filter.cluster_id,
target.attribute,
filter.attribute,
'5.1.0',
Expand All @@ -322,14 +326,18 @@ describe('pollOpenSearchNodesVersion', () => {
'5.1.1-Beta1'
);

const filteredNodes = nodes;
delete filteredNodes.nodes['node-1'];
delete filteredNodes.nodes['node-3'];

// @ts-expect-error we need to return an incompatible type to use the testScheduler here
internalClient.cluster.state.mockReturnValueOnce({ body: nodes });

nodeInfosSuccessOnce(nodes);

pollOpenSearchNodesVersion({
internalClient,
optimizedHealthcheck: { id: target.id, filters: { custom_attribute: filter.attribute } },
optimizedHealthcheck: { id: 'cluster_id', filters: { custom_attribute: filter.attribute } },
opensearchVersionCheckInterval: 1,
ignoreVersionMismatch: false,
opensearchDashboardsVersion: OPENSEARCH_DASHBOARDS_VERSION,
Expand All @@ -339,7 +347,7 @@ describe('pollOpenSearchNodesVersion', () => {
.subscribe({
next: (result) => {
expect(result).toEqual(
mapNodesVersionCompatibility(nodes, OPENSEARCH_DASHBOARDS_VERSION, false)
mapNodesVersionCompatibility(filteredNodes, OPENSEARCH_DASHBOARDS_VERSION, false)
);
expect(result.isCompatible).toBe(false);
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@

import { timer, of, from, Observable } from 'rxjs';
import { map, distinctUntilChanged, catchError, exhaustMap, mergeMap } from 'rxjs/operators';
import { get } from 'lodash';
import { ApiResponse } from '@opensearch-project/opensearch';
import {
opensearchVersionCompatibleWithOpenSearchDashboards,
Expand All @@ -49,67 +48,66 @@ import type { OpenSearchClient } from '../client';
* that is supplied through the healthcheck param. This node attribute is configurable
* in opensearch_dashboards.yml. It can also filter attributes out by key-value pair.
* If all nodes have the same cluster id then we do not fan out the healthcheck and use '_local' node
* If there are multiple cluster ids then we use the default fan out behavior
* If there are multiple cluster ids then we return an array of node ids to check.
* If the supplied node attribute is missing then we return null and use default fan out behavior
* @param {OpenSearchClient} internalClient
* @param {OptimizedHealthcheck} healthcheck
* @returns {string|null} '_local' if all nodes have the same cluster_id, otherwise null
* @returns {string|string[]|null} '_local' if all nodes have the same cluster_id, array of node ids if different cluster_id, null if no cluster_id or nodes returned
*/
export const getNodeId = async (
internalClient: OpenSearchClient,
healthcheck: OptimizedHealthcheck
): Promise<string | null> => {
): Promise<'_local' | string[] | null> => {
try {
// If missing an id, we have nothing to check
if (!healthcheck.id) return null;

let path = `nodes.*.attributes.${healthcheck.id}`;
const filters = healthcheck.filters;
if (filters) {
Object.keys(filters).forEach((key) => {
path += `,nodes.*.attributes.${key}`;
});
const filterKeys = filters ? Object.keys(filters) : [];

for (const key of filterKeys) {
path += `,nodes.*.attributes.${key}`;
}

/*
* Using _cluster/state/nodes to retrieve the cluster_id of each node from cluster manager node which
* is considered to be a lightweight operation to aggegrate different cluster_ids from the OpenSearch nodes.
*/
const state = (await internalClient.cluster.state({
metric: 'nodes',
filter_path: [path],
})) as ApiResponse;
/* Aggregate different cluster_ids from the OpenSearch nodes
* if all the nodes have the same cluster_id, retrieve nodes.info from _local node only
* Using _cluster/state/nodes to retrieve the cluster_id of each node from cluster manager node which is considered to be a lightweight operation
* else if the nodes have different cluster_ids then fan out the request to all nodes
* else there are no nodes in the cluster
*/

const nodes = state.body.nodes;
let nodeIds = Object.keys(nodes);
if (nodeIds.length === 0) {
return null;
}
const nodeIds = new Set(Object.keys(nodes));

/*
* If filters are set look for the key and value and filter out any node that matches
* the value for that attribute.
*/
if (filters) {
nodeIds.forEach((id) => {
Object.keys(filters).forEach((key) => {
const attributeValue = get(nodes[id], `attributes.${key}`, null);
if (attributeValue === filters[key]) {
delete nodes[id];
}
});
});
for (const id of nodeIds) {
for (const key of filterKeys) {
const attributeValue = nodes[id].attributes?.[key] ?? null;

nodeIds = Object.keys(nodes);
if (nodeIds.length === 0) {
return null;
if (attributeValue === filters![key]) nodeIds.delete(id);
}
}

const sharedClusterId = get(nodes[nodeIds[0]], `attributes.${healthcheck.id}`, null);
if (nodeIds.size === 0) return null;

const [firstNodeId] = nodeIds;
const sharedClusterId = nodes[firstNodeId].attributes?.[healthcheck.id] ?? null;
// If cluster_id is not set then fan out
if (sharedClusterId === null) return null;

// If a node is found to have a different cluster_id, return node ids
for (const id of nodeIds) {
if (nodes[id].attributes?.[healthcheck.id] !== sharedClusterId) return Array.from(nodeIds);
}

return sharedClusterId === null ||
nodes.find((node: any) => sharedClusterId !== get(node, `attributes.${healthcheck.id}`, null))
? null
: '_local';
// When all nodes share the same cluster_id, return _local
return '_local';
} catch (e) {
return null;
}
Expand Down

0 comments on commit a813370

Please sign in to comment.