Skip to content

Commit

Permalink
Custom healthcheck with filters
Browse files Browse the repository at this point in the history
Enable filtering with custom health checks based on node attributes:
```
opensearch.optimizedHealthcheck.filters: {
  attribute_key: "attribute_value",
}
```

Also, fixes issue that expects the response to array when it was a
dictionary.

Issue:
opensearch-project#2214
opensearch-project#2203

Signed-off-by: Kawika Avilla <kavilla414@gmail.com>
  • Loading branch information
kavilla committed Aug 31, 2022
1 parent 65005be commit c67058b
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 27 deletions.
5 changes: 4 additions & 1 deletion config/opensearch_dashboards.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@
# This node attribute should assign all nodes of the same cluster an integer value that increments with each new cluster that is spun up
# e.g. in opensearch.yml file you would set the value to a setting using node.attr.cluster_id:
# Should only be enabled if there is a corresponding node attribute created in your OpenSearch config that matches the value here
#opensearch.optimizedHealthcheckId: "cluster_id"
#opensearch.optimizedHealthcheck.id: "cluster_id"
#opensearch.optimizedHealthcheck.filters: {
# attribute_key: "attribute_value",
#}

# If your OpenSearch is protected with basic authentication, these settings provide
# the username and password that the OpenSearch Dashboards server uses to perform maintenance on the OpenSearch Dashboards
Expand Down
12 changes: 8 additions & 4 deletions src/core/server/opensearch/opensearch_config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ export const configSchema = schema.object({
requestTimeout: schema.duration({ defaultValue: '30s' }),
pingTimeout: schema.duration({ defaultValue: schema.siblingRef('requestTimeout') }),
logQueries: schema.boolean({ defaultValue: false }),
optimizedHealthcheckId: schema.maybe(schema.string()),
optimizedHealthcheck: schema.object({
id: schema.maybe(schema.string()),
filters: schema.maybe(schema.recordOf(schema.string(), schema.string(), { defaultValue: {} })),
}),
ssl: schema.object(
{
verificationMode: schema.oneOf(
Expand Down Expand Up @@ -158,7 +161,8 @@ const deprecations: ConfigDeprecationProvider = ({ renameFromRoot, renameFromRoo
renameFromRoot('elasticsearch.requestTimeout', 'opensearch.requestTimeout'),
renameFromRoot('elasticsearch.pingTimeout', 'opensearch.pingTimeout'),
renameFromRoot('elasticsearch.logQueries', 'opensearch.logQueries'),
renameFromRoot('elasticsearch.optimizedHealthcheckId', 'opensearch.optimizedHealthcheckId'),
renameFromRoot('elasticsearch.optimizedHealthcheckId', 'opensearch.optimizedHealthcheck.id'),
renameFromRoot('opensearch.optimizedHealthcheckId', 'opensearch.optimizedHealthcheck.id'),
renameFromRoot('elasticsearch.ssl', 'opensearch.ssl'),
renameFromRoot('elasticsearch.apiVersion', 'opensearch.apiVersion'),
renameFromRoot('elasticsearch.healthCheck', 'opensearch.healthCheck'),
Expand Down Expand Up @@ -226,7 +230,7 @@ export class OpenSearchConfig {
* Specifies whether Dashboards should only query the local OpenSearch node when
* all nodes in the cluster have the same node attribute value
*/
public readonly optimizedHealthcheckId?: string;
public readonly optimizedHealthcheck?: OpenSearchConfigType['optimizedHealthcheck'];

/**
* Hosts that the client will connect to. If sniffing is enabled, this list will
Expand Down Expand Up @@ -314,7 +318,7 @@ export class OpenSearchConfig {
this.ignoreVersionMismatch = rawConfig.ignoreVersionMismatch;
this.apiVersion = rawConfig.apiVersion;
this.logQueries = rawConfig.logQueries;
this.optimizedHealthcheckId = rawConfig.optimizedHealthcheckId;
this.optimizedHealthcheck = rawConfig.optimizedHealthcheck;
this.hosts = Array.isArray(rawConfig.hosts) ? rawConfig.hosts : [rawConfig.hosts];
this.requestHeadersWhitelist = Array.isArray(rawConfig.requestHeadersWhitelist)
? rawConfig.requestHeadersWhitelist
Expand Down
2 changes: 1 addition & 1 deletion src/core/server/opensearch/opensearch_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ export class OpenSearchService

const opensearchNodesCompatibility$ = pollOpenSearchNodesVersion({
internalClient: this.client.asInternalUser,
optimizedHealthcheckId: config.optimizedHealthcheckId,
optimizedHealthcheck: config.optimizedHealthcheck,
log: this.log,
ignoreVersionMismatch: config.ignoreVersionMismatch,
opensearchVersionCheckInterval: config.healthCheckDelay.asMilliseconds(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,35 @@ function createNodes(...versions: string[]): NodesInfo {
return { nodes };
}

function createNodesWithAttribute(
targetId: string,
filterId: string,
targetAttributeValue: string,
filterAttributeValue: string,
...versions: string[]
): NodesInfo {
const nodes = {} as any;
versions
.map((version, i) => {
return {
version,
http: {
publish_address: 'http_address',
},
ip: 'ip',
attributes: {
cluster_id: i % 2 === 0 ? targetId : filterId,
custom_attribute: i % 2 === 0 ? targetAttributeValue : filterAttributeValue,
},
};
})
.forEach((node, i) => {
nodes[`node-${i}`] = node;
});

return { nodes };
}

describe('mapNodesVersionCompatibility', () => {
function createNodesInfoWithoutHTTP(version: string): NodesInfo {
return { nodes: { 'node-without-http': { version, ip: 'ip' } } } as any;
Expand Down Expand Up @@ -179,7 +208,7 @@ describe('pollOpenSearchNodesVersion', () => {

pollOpenSearchNodesVersion({
internalClient,
optimizedHealthcheckId: 'cluster_id',
optimizedHealthcheck: { id: 'cluster_id' },
opensearchVersionCheckInterval: 1,
ignoreVersionMismatch: false,
opensearchDashboardsVersion: OPENSEARCH_DASHBOARDS_VERSION,
Expand All @@ -203,7 +232,104 @@ describe('pollOpenSearchNodesVersion', () => {

pollOpenSearchNodesVersion({
internalClient,
optimizedHealthcheckId: 'cluster_id',
optimizedHealthcheck: { id: 'cluster_id' },
opensearchVersionCheckInterval: 1,
ignoreVersionMismatch: false,
opensearchDashboardsVersion: OPENSEARCH_DASHBOARDS_VERSION,
log: mockLogger,
})
.pipe(take(1))
.subscribe({
next: (result) => {
expect(result).toEqual(
mapNodesVersionCompatibility(nodes, OPENSEARCH_DASHBOARDS_VERSION, false)
);
},
complete: done,
error: done,
});
});

it('returns compatibility results and isCompatible=true with filters', (done) => {
expect.assertions(2);
const target = {
id: '0',
attribute: 'foo',
};
const filter = {
id: '1',
attribute: 'bar',
};

// will filter out every odd index
const nodes = createNodesWithAttribute(
target.id,
filter.id,
target.attribute,
filter.attribute,
'5.1.0',
'6.2.0',
'5.1.0',
'5.1.1-Beta1'
);

// @ts-expect-error we need to return an incompatible type to use the testScheduler here
internalClient.cluster.state.mockReturnValueOnce({ body: nodes });

nodeInfosSuccessOnce(nodes);

pollOpenSearchNodesVersion({
internalClient,
optimizedHealthcheck: { id: target.id, filters: { custom_attribute: filter.attribute } },
opensearchVersionCheckInterval: 1,
ignoreVersionMismatch: false,
opensearchDashboardsVersion: OPENSEARCH_DASHBOARDS_VERSION,
log: mockLogger,
})
.pipe(take(1))
.subscribe({
next: (result) => {
expect(result).toEqual(
mapNodesVersionCompatibility(nodes, OPENSEARCH_DASHBOARDS_VERSION, false)
);
expect(result.isCompatible).toBe(true);
},
complete: done,
error: done,
});
});

it('returns compatibility results and isCompatible=false with filters', (done) => {
expect.assertions(2);
const target = {
id: '0',
attribute: 'foo',
};
const filter = {
id: '1',
attribute: 'bar',
};

// will filter out every odd index
const nodes = createNodesWithAttribute(
target.id,
filter.id,
target.attribute,
filter.attribute,
'5.1.0',
'5.1.0',
'6.2.0',
'5.1.1-Beta1'
);

// @ts-expect-error we need to return an incompatible type to use the testScheduler here
internalClient.cluster.state.mockReturnValueOnce({ body: nodes });

nodeInfosSuccessOnce(nodes);

pollOpenSearchNodesVersion({
internalClient,
optimizedHealthcheck: { id: target.id, filters: { custom_attribute: filter.attribute } },
opensearchVersionCheckInterval: 1,
ignoreVersionMismatch: false,
opensearchDashboardsVersion: OPENSEARCH_DASHBOARDS_VERSION,
Expand All @@ -215,6 +341,7 @@ describe('pollOpenSearchNodesVersion', () => {
expect(result).toEqual(
mapNodesVersionCompatibility(nodes, OPENSEARCH_DASHBOARDS_VERSION, false)
);
expect(result.isCompatible).toBe(false);
},
complete: done,
error: done,
Expand All @@ -232,7 +359,7 @@ describe('pollOpenSearchNodesVersion', () => {

pollOpenSearchNodesVersion({
internalClient,
optimizedHealthcheckId: 'cluster_id',
optimizedHealthcheck: { id: 'cluster_id' },
opensearchVersionCheckInterval: 1,
ignoreVersionMismatch: false,
opensearchDashboardsVersion: OPENSEARCH_DASHBOARDS_VERSION,
Expand Down Expand Up @@ -268,7 +395,7 @@ describe('pollOpenSearchNodesVersion', () => {

const opensearchNodesCompatibility$ = pollOpenSearchNodesVersion({
internalClient,
optimizedHealthcheckId: 'cluster_id',
optimizedHealthcheck: { id: 'cluster_id' },
opensearchVersionCheckInterval: 100,
ignoreVersionMismatch: false,
opensearchDashboardsVersion: OPENSEARCH_DASHBOARDS_VERSION,
Expand Down Expand Up @@ -308,7 +435,7 @@ describe('pollOpenSearchNodesVersion', () => {

const opensearchNodesCompatibility$ = pollOpenSearchNodesVersion({
internalClient,
optimizedHealthcheckId: 'cluster_id',
optimizedHealthcheck: { id: 'cluster_id' },
opensearchVersionCheckInterval: 10,
ignoreVersionMismatch: false,
opensearchDashboardsVersion: OPENSEARCH_DASHBOARDS_VERSION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,38 +46,68 @@ import type { OpenSearchClient } from '../client';

/**
* Checks if all nodes in the cluster have the same cluster id node attribute
* that is supplied through the healthcheckAttributeName param. This node attribute is configurable
* in opensearch_dashboards.yml.
* that is supplied through the healthcheck param. This node attribute is configurable
* in opensearch_dashboards.yml. It can also filter attributes out by key-value pair.
* If all nodes have the same cluster id then we do not fan out the healthcheck and use '_local' node
* If there are multiple cluster ids then we use the default fan out behavior
* If the supplied node attribute is missing then we return null and use default fan out behavior
* @param {OpenSearchClient} internalClient
* @param {string} healthcheckAttributeName
* @param {OptimizedHealthcheck} healthcheck
* @returns {string|null} '_local' if all nodes have the same cluster_id, otherwise null
*/
export const getNodeId = async (
internalClient: OpenSearchClient,
healthcheckAttributeName: string
healthcheck: OptimizedHealthcheck
): Promise<string | null> => {
try {
let path = `nodes.*.attributes.${healthcheck.id}`;
const filters = healthcheck.filters;
if (filters) {
Object.keys(filters).forEach((key) => {
path += `,nodes.*.attributes.${key}`;
});
}

const state = (await internalClient.cluster.state({
metric: 'nodes',
filter_path: [`nodes.*.attributes.${healthcheckAttributeName}`],
filter_path: [path],
})) as ApiResponse;
/* Aggregate different cluster_ids from the OpenSearch nodes
* if all the nodes have the same cluster_id, retrieve nodes.info from _local node only
* Using _cluster/state/nodes to retrieve the cluster_id of each node from cluster manager node which is considered to be a lightweight operation
* else if the nodes have different cluster_ids then fan out the request to all nodes
* else there are no nodes in the cluster
*/
const sharedClusterId =
state.body.nodes.length > 0
? get(state.body.nodes[0], `attributes.${healthcheckAttributeName}`, null)
: null;
const nodes = state.body.nodes;
let nodeIds = Object.keys(nodes);
if (nodeIds.length === 0) {
return null;
}

/*
* If filters are set look for the key and value and filter out any node that matches
* the value for that attribute.
*/
if (filters) {
nodeIds.forEach((id) => {
Object.keys(filters).forEach((key) => {
const attributeValue = get(nodes[id], `attributes.${key}`, null);
if (attributeValue === filters[key]) {
delete nodes[id];
}
});
});

nodeIds = Object.keys(nodes);
if (nodeIds.length === 0) {
return null;
}
}

const sharedClusterId = get(nodes[nodeIds[0]], `attributes.${healthcheck.id}`, null);

return sharedClusterId === null ||
state.body.nodes.find(
(node: any) => sharedClusterId !== get(node, `attributes.${healthcheckAttributeName}`, null)
)
nodes.find((node: any) => sharedClusterId !== get(node, `attributes.${healthcheck.id}`, null))
? null
: '_local';
} catch (e) {
Expand All @@ -87,7 +117,7 @@ export const getNodeId = async (

export interface PollOpenSearchNodesVersionOptions {
internalClient: OpenSearchClient;
optimizedHealthcheckId?: string;
optimizedHealthcheck?: OptimizedHealthcheck;
log: Logger;
opensearchDashboardsVersion: string;
ignoreVersionMismatch: boolean;
Expand Down Expand Up @@ -118,6 +148,13 @@ export interface NodesVersionCompatibility {
nodesInfoRequestError?: Error;
}

export interface OptimizedHealthcheck {
id?: string;
filters?: {
[key: string]: string;
};
}

function getHumanizedNodeName(node: NodeInfo) {
const publishAddress = node?.http?.publish_address + ' ' || '';
return 'v' + node.version + ' @ ' + publishAddress + '(' + node.ip + ')';
Expand Down Expand Up @@ -201,7 +238,7 @@ function compareNodes(prev: NodesVersionCompatibility, curr: NodesVersionCompati

export const pollOpenSearchNodesVersion = ({
internalClient,
optimizedHealthcheckId,
optimizedHealthcheck,
log,
opensearchDashboardsVersion,
ignoreVersionMismatch,
Expand All @@ -216,8 +253,8 @@ export const pollOpenSearchNodesVersion = ({
* For better dashboards resilience, the behaviour is changed to only query the local node when all the nodes have the same cluster_id
* Using _cluster/state/nodes to retrieve the cluster_id of each node from the cluster manager node
*/
if (optimizedHealthcheckId) {
return from(getNodeId(internalClient, optimizedHealthcheckId)).pipe(
if (optimizedHealthcheck) {
return from(getNodeId(internalClient, optimizedHealthcheck)).pipe(
mergeMap((nodeId: any) =>
from(
internalClient.nodes.info<NodesInfo>({
Expand Down

0 comments on commit c67058b

Please sign in to comment.