diff --git a/x-pack/plugins/cross_cluster_replication/fixtures/follower_index.js b/x-pack/plugins/cross_cluster_replication/fixtures/follower_index.js new file mode 100644 index 0000000000000..e3639d0a9d738 --- /dev/null +++ b/x-pack/plugins/cross_cluster_replication/fixtures/follower_index.js @@ -0,0 +1,116 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +const Chance = require('chance'); // eslint-disable-line import/no-extraneous-dependencies +const chance = new Chance(); + +export const getFollowerIndexMock = ( + name = chance.string(), + shards = [{ + id: chance.string(), + remoteCluster: chance.string(), + leaderIndex: chance.string(), + leaderGlobalCheckpoint: chance.integer(), + leaderMaxSequenceNum: chance.integer(), + followerGlobalCheckpoint: chance.integer(), + followerMaxSequenceNum: chance.integer(), + lastRequestedSequenceNum: chance.integer(), + outstandingReadRequestsCount: chance.integer(), + outstandingWriteRequestsCount: chance.integer(), + writeBufferOperationsCount: chance.integer(), + writeBufferSizeBytes: chance.integer(), + followerMappingVersion: chance.integer(), + followerSettingsVersion: chance.integer(), + totalReadTimeMs: chance.integer(), + totalReadRemoteExecTimeMs: chance.integer(), + successfulReadRequestCount: chance.integer(), + failedReadRequestsCount: chance.integer(), + operationsReadCount: chance.integer(), + bytesReadCount: chance.integer(), + totalWriteTimeMs: chance.integer(), + successfulWriteRequestsCount: chance.integer(), + failedWriteRequestsCount: chance.integer(), + operationsWrittenCount: chance.integer(), + readExceptions: [ chance.string() ], + timeSinceLastReadMs: chance.integer(), + }] +) => { + const serializeShard = ({ + id, + remoteCluster, + leaderIndex, + leaderGlobalCheckpoint, + leaderMaxSequenceNum, + followerGlobalCheckpoint, + followerMaxSequenceNum, + lastRequestedSequenceNum, + outstandingReadRequestsCount, + outstandingWriteRequestsCount, + writeBufferOperationsCount, + writeBufferSizeBytes, + followerMappingVersion, + followerSettingsVersion, + totalReadTimeMs, + totalReadRemoteExecTimeMs, + successfulReadRequestCount, + failedReadRequestsCount, + operationsReadCount, + bytesReadCount, + totalWriteTimeMs, + successfulWriteRequestsCount, + failedWriteRequestsCount, + operationsWrittenCount, + readExceptions, + timeSinceLastReadMs, + }) => ({ + shard_id: id, + remote_cluster: remoteCluster, + leader_index: leaderIndex, + leader_global_checkpoint: leaderGlobalCheckpoint, + leader_max_seq_no: leaderMaxSequenceNum, + follower_global_checkpoint: followerGlobalCheckpoint, + follower_max_seq_no: followerMaxSequenceNum, + last_requested_seq_no: lastRequestedSequenceNum, + outstanding_read_requests: outstandingReadRequestsCount, + outstanding_write_requests: outstandingWriteRequestsCount, + write_buffer_operation_count: writeBufferOperationsCount, + write_buffer_size_in_bytes: writeBufferSizeBytes, + follower_mapping_version: followerMappingVersion, + follower_settings_version: followerSettingsVersion, + total_read_time_millis: totalReadTimeMs, + total_read_remote_exec_time_millis: totalReadRemoteExecTimeMs, + successful_read_requests: successfulReadRequestCount, + failed_read_requests: failedReadRequestsCount, + operations_read: operationsReadCount, + bytes_read: bytesReadCount, + total_write_time_millis: totalWriteTimeMs, + successful_write_requests: successfulWriteRequestsCount, + failed_write_requests: failedWriteRequestsCount, + operations_written: operationsWrittenCount, + read_exceptions: readExceptions, + time_since_last_read_millis: timeSinceLastReadMs, + }); + + return { + index: name, + shards: shards.map(serializeShard), + }; +}; + +export const getFollowerIndexListMock = (total = 3) => { + const list = { + follow_stats: { + indices: [], + }, + }; + + let i = total; + while(i--) { + list.follow_stats.indices.push(getFollowerIndexMock()); + } + + return list; +}; diff --git a/x-pack/plugins/cross_cluster_replication/fixtures/index.js b/x-pack/plugins/cross_cluster_replication/fixtures/index.js index 20fbce562a087..fc1a2078ae5a9 100644 --- a/x-pack/plugins/cross_cluster_replication/fixtures/index.js +++ b/x-pack/plugins/cross_cluster_replication/fixtures/index.js @@ -10,3 +10,8 @@ export { } from './auto_follow_pattern'; export { esErrors } from './es_errors'; + +export { + getFollowerIndexMock, + getFollowerIndexListMock, +} from './follower_index'; diff --git a/x-pack/plugins/cross_cluster_replication/server/client/elasticsearch_ccr.js b/x-pack/plugins/cross_cluster_replication/server/client/elasticsearch_ccr.js index 146d0ba559748..8c03769f6ec14 100644 --- a/x-pack/plugins/cross_cluster_replication/server/client/elasticsearch_ccr.js +++ b/x-pack/plugins/cross_cluster_replication/server/client/elasticsearch_ccr.js @@ -62,4 +62,28 @@ export const elasticsearchJsPlugin = (Client, config, components) => { needBody: true, method: 'DELETE' }); + + ccr.followerIndices = ca({ + urls: [ + { + fmt: '/_ccr/stats', + } + ], + method: 'GET' + }); + + ccr.saveFollowerIndex = ca({ + urls: [ + { + fmt: '/<%=name%>/_ccr/follow', + req: { + name: { + type: 'string' + } + } + } + ], + needBody: true, + method: 'PUT' + }); }; diff --git a/x-pack/plugins/cross_cluster_replication/server/lib/follower_index_serialization.js b/x-pack/plugins/cross_cluster_replication/server/lib/follower_index_serialization.js new file mode 100644 index 0000000000000..7b5a0e453b65f --- /dev/null +++ b/x-pack/plugins/cross_cluster_replication/server/lib/follower_index_serialization.js @@ -0,0 +1,77 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +/* eslint-disable camelcase */ +export const deserializeShard = ({ + remote_cluster, + leader_index, + shard_id, + leader_global_checkpoint, + leader_max_seq_no, + follower_global_checkpoint, + follower_max_seq_no, + last_requested_seq_no, + outstanding_read_requests, + outstanding_write_requests, + write_buffer_operation_count, + write_buffer_size_in_bytes, + follower_mapping_version, + follower_settings_version, + total_read_time_millis, + total_read_remote_exec_time_millis, + successful_read_requests, + failed_read_requests, + operations_read, + bytes_read, + total_write_time_millis, + successful_write_requests, + failed_write_requests, + operations_written, + read_exceptions, + time_since_last_read_millis, +}) => ({ + id: shard_id, + remoteCluster: remote_cluster, + leaderIndex: leader_index, + leaderGlobalCheckpoint: leader_global_checkpoint, + leaderMaxSequenceNum: leader_max_seq_no, + followerGlobalCheckpoint: follower_global_checkpoint, + followerMaxSequenceNum: follower_max_seq_no, + lastRequestedSequenceNum: last_requested_seq_no, + outstandingReadRequestsCount: outstanding_read_requests, + outstandingWriteRequestsCount: outstanding_write_requests, + writeBufferOperationsCount: write_buffer_operation_count, + writeBufferSizeBytes: write_buffer_size_in_bytes, + followerMappingVersion: follower_mapping_version, + followerSettingsVersion: follower_settings_version, + totalReadTimeMs: total_read_time_millis, + totalReadRemoteExecTimeMs: total_read_remote_exec_time_millis, + successfulReadRequestCount: successful_read_requests, + failedReadRequestsCount: failed_read_requests, + operationsReadCount: operations_read, + bytesReadCount: bytes_read, + totalWriteTimeMs: total_write_time_millis, + successfulWriteRequestsCount: successful_write_requests, + failedWriteRequestsCount: failed_write_requests, + operationsWrittenCount: operations_written, + // This is an array of exception objects + readExceptions: read_exceptions, + timeSinceLastReadMs: time_since_last_read_millis, +}); +/* eslint-enable camelcase */ + +export const deserializeFollowerIndex = ({ index, shards }) => ({ + name: index, + shards: shards.map(deserializeShard), +}); + +export const deserializeListFollowerIndices = followerIndices => + followerIndices.map(deserializeFollowerIndex); + +export const serializeFollowerIndex = ({ remoteCluster, leaderIndex }) => ({ + remote_cluster: remoteCluster, + leader_index: leaderIndex, +}); diff --git a/x-pack/plugins/cross_cluster_replication/server/lib/follower_index_serialization.test.js b/x-pack/plugins/cross_cluster_replication/server/lib/follower_index_serialization.test.js new file mode 100644 index 0000000000000..db6c969b3c934 --- /dev/null +++ b/x-pack/plugins/cross_cluster_replication/server/lib/follower_index_serialization.test.js @@ -0,0 +1,191 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { + deserializeShard, + deserializeFollowerIndex, + deserializeListFollowerIndices, + serializeFollowerIndex, +} from './follower_index_serialization'; + +describe('[CCR] follower index serialization', () => { + describe('deserializeShard()', () => { + it('deserializes shard', () => { + const serializedShard = { + remote_cluster: 'remote cluster', + leader_index: 'leader index', + shard_id: 'shard id', + leader_global_checkpoint: 'leader global checkpoint', + leader_max_seq_no: 'leader max seq no', + follower_global_checkpoint: 'follower global checkpoint', + follower_max_seq_no: 'follower max seq no', + last_requested_seq_no: 'last requested seq no', + outstanding_read_requests: 'outstanding read requests', + outstanding_write_requests: 'outstanding write requests', + write_buffer_operation_count: 'write buffer operation count', + write_buffer_size_in_bytes: 'write buffer size in bytes', + follower_mapping_version: 'follower mapping version', + follower_settings_version: 'follower settings version', + total_read_time_millis: 'total read time millis', + total_read_remote_exec_time_millis: 'total read remote exec time millis', + successful_read_requests: 'successful read requests', + failed_read_requests: 'failed read requests', + operations_read: 'operations read', + bytes_read: 'bytes read', + total_write_time_millis: 'total write time millis', + successful_write_requests: 'successful write requests', + failed_write_requests: 'failed write requests', + operations_written: 'operations written', + read_exceptions: ['read exception'], + time_since_last_read_millis: 'time since last read millis', + }; + + const deserializedShard = { + id: 'shard id', + remoteCluster: 'remote cluster', + leaderIndex: 'leader index', + leaderGlobalCheckpoint: 'leader global checkpoint', + leaderMaxSequenceNum: 'leader max seq no', + followerGlobalCheckpoint: 'follower global checkpoint', + followerMaxSequenceNum: 'follower max seq no', + lastRequestedSequenceNum: 'last requested seq no', + outstandingReadRequestsCount: 'outstanding read requests', + outstandingWriteRequestsCount: 'outstanding write requests', + writeBufferOperationsCount: 'write buffer operation count', + writeBufferSizeBytes: 'write buffer size in bytes', + followerMappingVersion: 'follower mapping version', + followerSettingsVersion: 'follower settings version', + totalReadTimeMs: 'total read time millis', + totalReadRemoteExecTimeMs: 'total read remote exec time millis', + successfulReadRequestCount: 'successful read requests', + failedReadRequestsCount: 'failed read requests', + operationsReadCount: 'operations read', + bytesReadCount: 'bytes read', + totalWriteTimeMs: 'total write time millis', + successfulWriteRequestsCount: 'successful write requests', + failedWriteRequestsCount: 'failed write requests', + operationsWrittenCount: 'operations written', + readExceptions: ['read exception'], + timeSinceLastReadMs: 'time since last read millis', + }; + + expect(deserializeShard(serializedShard)).toEqual(deserializedShard); + }); + }); + + describe('deserializeFollowerIndex()', () => { + it('deserializes Elasticsearch follower index object', () => { + const serializedFollowerIndex = { + index: 'follower index name', + shards: [{ + shard_id: 'shard 1', + }, { + shard_id: 'shard 2', + }], + }; + + const deserializedFollowerIndex = { + name: 'follower index name', + shards: [{ + id: 'shard 1', + remoteCluster: undefined, + leaderIndex: undefined, + leaderGlobalCheckpoint: undefined, + leaderMaxSequenceNum: undefined, + followerGlobalCheckpoint: undefined, + followerMaxSequenceNum: undefined, + lastRequestedSequenceNum: undefined, + outstandingReadRequestsCount: undefined, + outstandingWriteRequestsCount: undefined, + writeBufferOperationsCount: undefined, + writeBufferSizeBytes: undefined, + followerMappingVersion: undefined, + followerSettingsVersion: undefined, + totalReadTimeMs: undefined, + totalReadRemoteExecTimeMs: undefined, + successfulReadRequestCount: undefined, + failedReadRequestsCount: undefined, + operationsReadCount: undefined, + bytesReadCount: undefined, + totalWriteTimeMs: undefined, + successfulWriteRequestsCount: undefined, + failedWriteRequestsCount: undefined, + operationsWrittenCount: undefined, + readExceptions: undefined, + timeSinceLastReadMs: undefined, + }, { + id: 'shard 2', + remoteCluster: undefined, + leaderIndex: undefined, + leaderGlobalCheckpoint: undefined, + leaderMaxSequenceNum: undefined, + followerGlobalCheckpoint: undefined, + followerMaxSequenceNum: undefined, + lastRequestedSequenceNum: undefined, + outstandingReadRequestsCount: undefined, + outstandingWriteRequestsCount: undefined, + writeBufferOperationsCount: undefined, + writeBufferSizeBytes: undefined, + followerMappingVersion: undefined, + followerSettingsVersion: undefined, + totalReadTimeMs: undefined, + totalReadRemoteExecTimeMs: undefined, + successfulReadRequestCount: undefined, + failedReadRequestsCount: undefined, + operationsReadCount: undefined, + bytesReadCount: undefined, + totalWriteTimeMs: undefined, + successfulWriteRequestsCount: undefined, + failedWriteRequestsCount: undefined, + operationsWrittenCount: undefined, + readExceptions: undefined, + timeSinceLastReadMs: undefined, + }], + }; + + expect(deserializeFollowerIndex(serializedFollowerIndex)).toEqual(deserializedFollowerIndex); + }); + }); + + describe('deserializeListFollowerIndices()', () => { + it('deserializes list of Elasticsearch follower index objects', () => { + const serializedFollowerIndexList = [{ + index: 'follower index 1', + shards: [], + }, { + index: 'follower index 2', + shards: [], + }]; + + const deserializedFollowerIndexList = [{ + name: 'follower index 1', + shards: [], + }, { + name: 'follower index 2', + shards: [], + }]; + + expect(deserializeListFollowerIndices(serializedFollowerIndexList)) + .toEqual(deserializedFollowerIndexList); + }); + }); + + describe('serializeFollowerIndex()', () => { + it('serializes object to Elasticsearch follower index object', () => { + const deserializedFollowerIndex = { + remoteCluster: 'remote cluster', + leaderIndex: 'leader index', + }; + + const serializedFollowerIndex = { + remote_cluster: 'remote cluster', + leader_index: 'leader index', + }; + + expect(serializeFollowerIndex(deserializedFollowerIndex)).toEqual(serializedFollowerIndex); + }); + }); +}); diff --git a/x-pack/plugins/cross_cluster_replication/server/routes/api/auto_follow_pattern.js b/x-pack/plugins/cross_cluster_replication/server/routes/api/auto_follow_pattern.js index 17354de2fb11e..4c6c34ce68479 100644 --- a/x-pack/plugins/cross_cluster_replication/server/routes/api/auto_follow_pattern.js +++ b/x-pack/plugins/cross_cluster_replication/server/routes/api/auto_follow_pattern.js @@ -15,8 +15,6 @@ import { import { licensePreRoutingFactory } from'../../lib/license_pre_routing_factory'; import { API_BASE_PATH } from '../../../common/constants'; -// import { esErrors } from '../../../fixtures'; // Temp for development to test ES error in UI - export const registerAutoFollowPatternRoutes = (server) => { const isEsError = isEsErrorFactory(server); const licensePreRouting = licensePreRoutingFactory(server); @@ -33,8 +31,6 @@ export const registerAutoFollowPatternRoutes = (server) => { handler: async (request) => { const callWithRequest = callWithRequestFactory(server, request); - // throw wrapEsError(esErrors[403]); // Temp for development to test ES error in UI. MUST be commented in CR - try { const response = await callWithRequest('ccr.autoFollowPatterns'); return ({ diff --git a/x-pack/plugins/cross_cluster_replication/server/routes/api/follower_index.js b/x-pack/plugins/cross_cluster_replication/server/routes/api/follower_index.js new file mode 100644 index 0000000000000..239d1f664eeff --- /dev/null +++ b/x-pack/plugins/cross_cluster_replication/server/routes/api/follower_index.js @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { callWithRequestFactory } from '../../lib/call_with_request_factory'; +import { isEsErrorFactory } from '../../lib/is_es_error_factory'; +import { wrapEsError, wrapUnknownError } from '../../lib/error_wrappers'; +import { + deserializeListFollowerIndices, + serializeFollowerIndex, +} from '../../lib/follower_index_serialization'; +import { licensePreRoutingFactory } from'../../lib/license_pre_routing_factory'; +import { API_BASE_PATH } from '../../../common/constants'; + +export const registerFollowerIndexRoutes = (server) => { + const isEsError = isEsErrorFactory(server); + const licensePreRouting = licensePreRoutingFactory(server); + + /** + * Returns a list of all Follower indices + */ + server.route({ + path: `${API_BASE_PATH}/follower_indices`, + method: 'GET', + config: { + pre: [ licensePreRouting ] + }, + handler: async (request) => { + const callWithRequest = callWithRequestFactory(server, request); + + try { + const response = await callWithRequest('ccr.followerIndices'); + return ({ + indices: deserializeListFollowerIndices(response.follow_stats.indices) + }); + } catch(err) { + if (isEsError(err)) { + throw wrapEsError(err); + } + throw wrapUnknownError(err); + } + }, + }); + + /** + * Create a follower index + */ + server.route({ + path: `${API_BASE_PATH}/follower_indices`, + method: 'POST', + config: { + pre: [ licensePreRouting ] + }, + handler: async (request) => { + const callWithRequest = callWithRequestFactory(server, request); + const { name, ...rest } = request.payload; + const body = serializeFollowerIndex(rest); + + try { + return await callWithRequest('ccr.saveFollowerIndex', { name, body }); + } catch(err) { + if (isEsError(err)) { + throw wrapEsError(err); + } + throw wrapUnknownError(err); + } + }, + }); +}; diff --git a/x-pack/plugins/cross_cluster_replication/server/routes/api/follower_index.test.js b/x-pack/plugins/cross_cluster_replication/server/routes/api/follower_index.test.js new file mode 100644 index 0000000000000..2fdb50ad7a96c --- /dev/null +++ b/x-pack/plugins/cross_cluster_replication/server/routes/api/follower_index.test.js @@ -0,0 +1,127 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { callWithRequestFactory } from '../../lib/call_with_request_factory'; +import { isEsErrorFactory } from '../../lib/is_es_error_factory'; +import { registerFollowerIndexRoutes } from './follower_index'; +import { getFollowerIndexMock, getFollowerIndexListMock } from '../../../fixtures'; +import { deserializeFollowerIndex } from '../../lib/follower_index_serialization'; + +jest.mock('../../lib/call_with_request_factory'); +jest.mock('../../lib/is_es_error_factory'); +jest.mock('../../lib/license_pre_routing_factory'); + +const DESERIALIZED_KEYS = Object.keys(deserializeFollowerIndex(getFollowerIndexMock())); + +/** + * Hashtable to save the route handlers + */ +const routeHandlers = {}; + +/** + * Helper to extract all the different server route handler so we can easily call them in our tests. + * + * Important: This method registers the handlers in the order that they appear in the file, so + * if a 'server.route()' call is moved or deleted, then the HANDLER_INDEX_TO_ACTION must be updated here. + */ +const registerHandlers = () => { + let index = 0; + + const HANDLER_INDEX_TO_ACTION = { + 0: 'list', + 1: 'create', + }; + + const server = { + route({ handler }) { + // Save handler and increment index + routeHandlers[HANDLER_INDEX_TO_ACTION[index]] = handler; + index++; + }, + }; + + registerFollowerIndexRoutes(server); +}; + +/** + * Queue to save request response and errors + * It allows us to fake multiple responses from the + * callWithRequestFactory() when the request handler call it + * multiple times. + */ +let requestResponseQueue = []; + +/** + * Helper to mock the response from the call to Elasticsearch + * + * @param {*} err The mock error to throw + * @param {*} response The response to return + */ +const setHttpRequestResponse = (error, response) => { + requestResponseQueue.push ({ error, response }); +}; + +const resetHttpRequestResponses = () => requestResponseQueue = []; + +const getNextResponseFromQueue = () => { + if (!requestResponseQueue.length) { + return null; + } + + const next = requestResponseQueue.shift(); + if (next.error) { + return Promise.reject(next.error); + } + return Promise.resolve(next.response); +}; + +describe('[CCR API Routes] Follower Index', () => { + let routeHandler; + + beforeAll(() => { + isEsErrorFactory.mockReturnValue(() => false); + callWithRequestFactory.mockReturnValue(getNextResponseFromQueue); + registerHandlers(); + }); + + describe('list()', () => { + beforeEach(() => { + routeHandler = routeHandlers.list; + }); + + it('deserializes the response from Elasticsearch', async () => { + const totalResult = 2; + setHttpRequestResponse(null, getFollowerIndexListMock(totalResult)); + + const response = await routeHandler(); + const autoFollowPattern = response.indices[0]; + + expect(response.indices.length).toEqual(totalResult); + expect(Object.keys(autoFollowPattern)).toEqual(DESERIALIZED_KEYS); + }); + }); + + describe('create()', () => { + beforeEach(() => { + resetHttpRequestResponses(); + routeHandler = routeHandlers.create; + }); + + it('should return 200 status when follower index is created', async () => { + setHttpRequestResponse(null, { acknowledge: true }); + + const response = await routeHandler({ + payload: { + name: 'follower_index', + remoteCluster: 'remote_cluster', + leaderIndex: 'leader_index', + }, + }); + + expect(response).toEqual({ acknowledge: true }); + }); + }); +}); diff --git a/x-pack/plugins/cross_cluster_replication/server/routes/register_routes.js b/x-pack/plugins/cross_cluster_replication/server/routes/register_routes.js index 486f9784c3367..f478b714e4c5c 100644 --- a/x-pack/plugins/cross_cluster_replication/server/routes/register_routes.js +++ b/x-pack/plugins/cross_cluster_replication/server/routes/register_routes.js @@ -5,7 +5,9 @@ */ import { registerAutoFollowPatternRoutes } from './api/auto_follow_pattern'; +import { registerFollowerIndexRoutes } from './api/follower_index'; export function registerRoutes(server) { registerAutoFollowPatternRoutes(server); + registerFollowerIndexRoutes(server); }