From 98f9eb3c6135ff9dc4c22e67d70f6bbfb2e5939f Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 11 Sep 2018 15:21:31 -0400 Subject: [PATCH 01/12] Expose CCR stats to monitoring This commit exposes the CCR stats endpoint to monitoring collection. --- .../java/org/elasticsearch/xpack/ccr/Ccr.java | 3 +- .../elasticsearch/xpack/ccr/CcrSettings.java | 8 +- .../ccr/action/TransportCcrStatsAction.java | 32 +-- .../xpack/ccr/rest/RestCcrStatsAction.java | 2 +- .../elasticsearch/xpack/ccr/CcrLicenseIT.java | 4 +- .../xpack/core/XPackSettings.java | 6 + .../core/ccr/ShardFollowNodeTaskStatus.java | 95 +++++---- .../xpack/core/ccr/action/CcrStatsAction.java | 46 ++-- .../xpack/core/ccr/client/CcrClient.java | 8 +- .../xpack/monitoring/Monitoring.java | 4 + .../collector/ccr/CcrStatsCollector.java | 88 ++++++++ .../collector/ccr/CcrStatsMonitoringDoc.java | 41 ++++ .../collector/ccr/CcrStatsCollectorTests.java | 196 ++++++++++++++++++ .../ccr/CcrStatsMonitoringDocTests.java | 171 +++++++++++++++ 14 files changed, 608 insertions(+), 96 deletions(-) create mode 100644 x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollector.java create mode 100644 x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDoc.java create mode 100644 x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java create mode 100644 x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 66b8a5c9590db..b62fe16f34956 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -41,6 +41,7 @@ import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator; import org.elasticsearch.xpack.ccr.action.TransportUnfollowIndexAction; +import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.ccr.action.TransportCreateAndFollowIndexAction; import org.elasticsearch.xpack.ccr.action.TransportFollowIndexAction; @@ -76,7 +77,7 @@ import java.util.function.Supplier; import static java.util.Collections.emptyList; -import static org.elasticsearch.xpack.ccr.CcrSettings.CCR_ENABLED_SETTING; +import static org.elasticsearch.xpack.core.XPackSettings.CCR_ENABLED_SETTING; import static org.elasticsearch.xpack.ccr.CcrSettings.CCR_FOLLOWING_INDEX_SETTING; /** diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java index a942990ea5a74..122f5a913d216 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java @@ -8,6 +8,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.core.XPackSettings; import java.util.Arrays; import java.util.List; @@ -22,11 +23,6 @@ private CcrSettings() { } - /** - * Setting for controlling whether or not CCR is enabled. - */ - static final Setting CCR_ENABLED_SETTING = Setting.boolSetting("xpack.ccr.enabled", true, Property.NodeScope); - /** * Index setting for a following index. */ @@ -46,7 +42,7 @@ private CcrSettings() { */ static List> getSettings() { return Arrays.asList( - CCR_ENABLED_SETTING, + XPackSettings.CCR_ENABLED_SETTING, CCR_FOLLOWING_INDEX_SETTING, CCR_AUTO_FOLLOW_POLL_INTERVAL); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportCcrStatsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportCcrStatsAction.java index d4425773fa1c8..f227a56f1582f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportCcrStatsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportCcrStatsAction.java @@ -34,8 +34,8 @@ public class TransportCcrStatsAction extends TransportTasksAction< ShardFollowNodeTask, - CcrStatsAction.TasksRequest, - CcrStatsAction.TasksResponse, CcrStatsAction.TaskResponse> { + CcrStatsAction.StatsRequest, + CcrStatsAction.StatsResponses, CcrStatsAction.StatsResponse> { private final IndexNameExpressionResolver resolver; private final CcrLicenseChecker ccrLicenseChecker; @@ -54,8 +54,8 @@ public TransportCcrStatsAction( clusterService, transportService, actionFilters, - CcrStatsAction.TasksRequest::new, - CcrStatsAction.TasksResponse::new, + CcrStatsAction.StatsRequest::new, + CcrStatsAction.StatsResponses::new, Ccr.CCR_THREAD_POOL_NAME); this.resolver = Objects.requireNonNull(resolver); this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker); @@ -64,8 +64,8 @@ public TransportCcrStatsAction( @Override protected void doExecute( final Task task, - final CcrStatsAction.TasksRequest request, - final ActionListener listener) { + final CcrStatsAction.StatsRequest request, + final ActionListener listener) { if (ccrLicenseChecker.isCcrAllowed() == false) { listener.onFailure(LicenseUtils.newComplianceException("ccr")); return; @@ -74,21 +74,21 @@ protected void doExecute( } @Override - protected CcrStatsAction.TasksResponse newResponse( - final CcrStatsAction.TasksRequest request, - final List taskResponses, + protected CcrStatsAction.StatsResponses newResponse( + final CcrStatsAction.StatsRequest request, + final List statsRespons, final List taskOperationFailures, final List failedNodeExceptions) { - return new CcrStatsAction.TasksResponse(taskOperationFailures, failedNodeExceptions, taskResponses); + return new CcrStatsAction.StatsResponses(taskOperationFailures, failedNodeExceptions, statsRespons); } @Override - protected CcrStatsAction.TaskResponse readTaskResponse(final StreamInput in) throws IOException { - return new CcrStatsAction.TaskResponse(in); + protected CcrStatsAction.StatsResponse readTaskResponse(final StreamInput in) throws IOException { + return new CcrStatsAction.StatsResponse(in); } @Override - protected void processTasks(final CcrStatsAction.TasksRequest request, final Consumer operation) { + protected void processTasks(final CcrStatsAction.StatsRequest request, final Consumer operation) { final ClusterState state = clusterService.state(); final Set concreteIndices = new HashSet<>(Arrays.asList(resolver.concreteIndexNames(state, request))); for (final Task task : taskManager.getTasks().values()) { @@ -103,10 +103,10 @@ protected void processTasks(final CcrStatsAction.TasksRequest request, final Con @Override protected void taskOperation( - final CcrStatsAction.TasksRequest request, + final CcrStatsAction.StatsRequest request, final ShardFollowNodeTask task, - final ActionListener listener) { - listener.onResponse(new CcrStatsAction.TaskResponse(task.getFollowShardId(), task.getStatus())); + final ActionListener listener) { + listener.onResponse(new CcrStatsAction.StatsResponse(task.getFollowShardId(), task.getStatus())); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestCcrStatsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestCcrStatsAction.java index 0cf0aaf2e49b1..de285dba19ec2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestCcrStatsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestCcrStatsAction.java @@ -33,7 +33,7 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException { - final CcrStatsAction.TasksRequest request = new CcrStatsAction.TasksRequest(); + final CcrStatsAction.StatsRequest request = new CcrStatsAction.StatsRequest(); request.setIndices(Strings.splitStringByCommaToArray(restRequest.param("index"))); request.setIndicesOptions(IndicesOptions.fromRequest(restRequest, request.indicesOptions())); return channel -> client.execute(CcrStatsAction.INSTANCE, request, new RestToXContentListener<>(channel)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java index ecf2bd47fc7d9..f791be6a63331 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java @@ -90,9 +90,9 @@ public void onFailure(final Exception e) { public void testThatCcrStatsAreUnavailableWithNonCompliantLicense() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); - client().execute(CcrStatsAction.INSTANCE, new CcrStatsAction.TasksRequest(), new ActionListener() { + client().execute(CcrStatsAction.INSTANCE, new CcrStatsAction.StatsRequest(), new ActionListener() { @Override - public void onResponse(final CcrStatsAction.TasksResponse tasksResponse) { + public void onResponse(final CcrStatsAction.StatsResponses statsResponses) { latch.countDown(); fail(); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackSettings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackSettings.java index fb4ce0b90f4a4..997f04e33bd77 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackSettings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackSettings.java @@ -35,6 +35,12 @@ private XPackSettings() { throw new IllegalStateException("Utility class should not be instantiated"); } + + /** + * Setting for controlling whether or not CCR is enabled. + */ + public static final Setting CCR_ENABLED_SETTING = Setting.boolSetting("xpack.ccr.enabled", true, Property.NodeScope); + /** Setting for enabling or disabling security. Defaults to true. */ public static final Setting SECURITY_ENABLED = Setting.boolSetting("xpack.security.enabled", true, Setting.Property.NodeScope); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java index 783999cf1836e..2f3c4efb9ad3b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java @@ -369,58 +369,63 @@ public void writeTo(final StreamOutput out) throws IOException { public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { builder.startObject(); { - builder.field(LEADER_INDEX.getPreferredName(), leaderIndex); - builder.field(SHARD_ID.getPreferredName(), shardId); - builder.field(LEADER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), leaderGlobalCheckpoint); - builder.field(LEADER_MAX_SEQ_NO_FIELD.getPreferredName(), leaderMaxSeqNo); - builder.field(FOLLOWER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), followerGlobalCheckpoint); - builder.field(FOLLOWER_MAX_SEQ_NO_FIELD.getPreferredName(), followerMaxSeqNo); - builder.field(LAST_REQUESTED_SEQ_NO_FIELD.getPreferredName(), lastRequestedSeqNo); - builder.field(NUMBER_OF_CONCURRENT_READS_FIELD.getPreferredName(), numberOfConcurrentReads); - builder.field(NUMBER_OF_CONCURRENT_WRITES_FIELD.getPreferredName(), numberOfConcurrentWrites); - builder.field(NUMBER_OF_QUEUED_WRITES_FIELD.getPreferredName(), numberOfQueuedWrites); - builder.field(MAPPING_VERSION_FIELD.getPreferredName(), mappingVersion); - builder.humanReadableField( - TOTAL_FETCH_TIME_MILLIS_FIELD.getPreferredName(), - "total_fetch_time", - new TimeValue(totalFetchTimeMillis, TimeUnit.MILLISECONDS)); - builder.field(NUMBER_OF_SUCCESSFUL_FETCHES_FIELD.getPreferredName(), numberOfSuccessfulFetches); - builder.field(NUMBER_OF_FAILED_FETCHES_FIELD.getPreferredName(), numberOfFailedFetches); - builder.field(OPERATIONS_RECEIVED_FIELD.getPreferredName(), operationsReceived); - builder.humanReadableField( - TOTAL_TRANSFERRED_BYTES.getPreferredName(), - "total_transferred", - new ByteSizeValue(totalTransferredBytes, ByteSizeUnit.BYTES)); - builder.humanReadableField( - TOTAL_INDEX_TIME_MILLIS_FIELD.getPreferredName(), - "total_index_time", - new TimeValue(totalIndexTimeMillis, TimeUnit.MILLISECONDS)); - builder.field(NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfSuccessfulBulkOperations); - builder.field(NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfFailedBulkOperations); - builder.field(NUMBER_OF_OPERATIONS_INDEXED_FIELD.getPreferredName(), numberOfOperationsIndexed); - builder.startArray(FETCH_EXCEPTIONS.getPreferredName()); - { - for (final Map.Entry entry : fetchExceptions.entrySet()) { + toXContentFragment(builder, params); + } + builder.endObject(); + return builder; + } + + public XContentBuilder toXContentFragment(final XContentBuilder builder, final Params params) throws IOException { + builder.field(LEADER_INDEX.getPreferredName(), leaderIndex); + builder.field(SHARD_ID.getPreferredName(), shardId); + builder.field(LEADER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), leaderGlobalCheckpoint); + builder.field(LEADER_MAX_SEQ_NO_FIELD.getPreferredName(), leaderMaxSeqNo); + builder.field(FOLLOWER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), followerGlobalCheckpoint); + builder.field(FOLLOWER_MAX_SEQ_NO_FIELD.getPreferredName(), followerMaxSeqNo); + builder.field(LAST_REQUESTED_SEQ_NO_FIELD.getPreferredName(), lastRequestedSeqNo); + builder.field(NUMBER_OF_CONCURRENT_READS_FIELD.getPreferredName(), numberOfConcurrentReads); + builder.field(NUMBER_OF_CONCURRENT_WRITES_FIELD.getPreferredName(), numberOfConcurrentWrites); + builder.field(NUMBER_OF_QUEUED_WRITES_FIELD.getPreferredName(), numberOfQueuedWrites); + builder.field(MAPPING_VERSION_FIELD.getPreferredName(), mappingVersion); + builder.humanReadableField( + TOTAL_FETCH_TIME_MILLIS_FIELD.getPreferredName(), + "total_fetch_time", + new TimeValue(totalFetchTimeMillis, TimeUnit.MILLISECONDS)); + builder.field(NUMBER_OF_SUCCESSFUL_FETCHES_FIELD.getPreferredName(), numberOfSuccessfulFetches); + builder.field(NUMBER_OF_FAILED_FETCHES_FIELD.getPreferredName(), numberOfFailedFetches); + builder.field(OPERATIONS_RECEIVED_FIELD.getPreferredName(), operationsReceived); + builder.humanReadableField( + TOTAL_TRANSFERRED_BYTES.getPreferredName(), + "total_transferred", + new ByteSizeValue(totalTransferredBytes, ByteSizeUnit.BYTES)); + builder.humanReadableField( + TOTAL_INDEX_TIME_MILLIS_FIELD.getPreferredName(), + "total_index_time", + new TimeValue(totalIndexTimeMillis, TimeUnit.MILLISECONDS)); + builder.field(NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfSuccessfulBulkOperations); + builder.field(NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfFailedBulkOperations); + builder.field(NUMBER_OF_OPERATIONS_INDEXED_FIELD.getPreferredName(), numberOfOperationsIndexed); + builder.startArray(FETCH_EXCEPTIONS.getPreferredName()); + { + for (final Map.Entry entry : fetchExceptions.entrySet()) { + builder.startObject(); + { + builder.field(FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO.getPreferredName(), entry.getKey()); + builder.field(FETCH_EXCEPTIONS_ENTRY_EXCEPTION.getPreferredName()); builder.startObject(); { - builder.field(FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO.getPreferredName(), entry.getKey()); - builder.field(FETCH_EXCEPTIONS_ENTRY_EXCEPTION.getPreferredName()); - builder.startObject(); - { - ElasticsearchException.generateThrowableXContent(builder, params, entry.getValue()); - } - builder.endObject(); + ElasticsearchException.generateThrowableXContent(builder, params, entry.getValue()); } builder.endObject(); } + builder.endObject(); } - builder.endArray(); - builder.humanReadableField( - TIME_SINCE_LAST_FETCH_MILLIS_FIELD.getPreferredName(), - "time_since_last_fetch", - new TimeValue(timeSinceLastFetchMillis, TimeUnit.MILLISECONDS)); } - builder.endObject(); + builder.endArray(); + builder.humanReadableField( + TIME_SINCE_LAST_FETCH_MILLIS_FIELD.getPreferredName(), + "time_since_last_fetch", + new TimeValue(timeSinceLastFetchMillis, TimeUnit.MILLISECONDS)); return builder; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/CcrStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/CcrStatsAction.java index ace3d6bb194e2..1074b6905d33e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/CcrStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/CcrStatsAction.java @@ -29,7 +29,7 @@ import java.util.Map; import java.util.TreeMap; -public class CcrStatsAction extends Action { +public class CcrStatsAction extends Action { public static final String NAME = "cluster:monitor/ccr/stats"; @@ -40,41 +40,45 @@ private CcrStatsAction() { } @Override - public TasksResponse newResponse() { - return new TasksResponse(); + public StatsResponses newResponse() { + return new StatsResponses(); } - public static class TasksResponse extends BaseTasksResponse implements ToXContentObject { + public static class StatsResponses extends BaseTasksResponse implements ToXContentObject { - private final List taskResponses; + private final List statsResponse; - public TasksResponse() { + public List getStatsResponses() { + return statsResponse; + } + + public StatsResponses() { this(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); } - public TasksResponse( + public StatsResponses( final List taskFailures, final List nodeFailures, - final List taskResponses) { + final List statsResponse) { super(taskFailures, nodeFailures); - this.taskResponses = taskResponses; + this.statsResponse = statsResponse; } @Override public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { // sort by index name, then shard ID - final Map> taskResponsesByIndex = new TreeMap<>(); - for (final TaskResponse taskResponse : taskResponses) { + final Map> taskResponsesByIndex = new TreeMap<>(); + for (final StatsResponse statsResponse : statsResponse) { taskResponsesByIndex.computeIfAbsent( - taskResponse.followerShardId().getIndexName(), - k -> new TreeMap<>()).put(taskResponse.followerShardId().getId(), taskResponse); + statsResponse.followerShardId().getIndexName(), + k -> new TreeMap<>()).put(statsResponse.followerShardId().getId(), statsResponse); } builder.startObject(); { - for (final Map.Entry> index : taskResponsesByIndex.entrySet()) { + for (final Map.Entry> index : taskResponsesByIndex.entrySet()) { builder.startArray(index.getKey()); { - for (final Map.Entry shard : index.getValue().entrySet()) { + for (final Map.Entry shard : index.getValue().entrySet()) { shard.getValue().status().toXContent(builder, params); } } @@ -86,7 +90,7 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa } } - public static class TasksRequest extends BaseTasksRequest implements IndicesRequest { + public static class StatsRequest extends BaseTasksRequest implements IndicesRequest { private String[] indices; @@ -144,26 +148,26 @@ public void writeTo(StreamOutput out) throws IOException { } - public static class TaskResponse implements Writeable { + public static class StatsResponse implements Writeable { private final ShardId followerShardId; - ShardId followerShardId() { + public ShardId followerShardId() { return followerShardId; } private final ShardFollowNodeTaskStatus status; - ShardFollowNodeTaskStatus status() { + public ShardFollowNodeTaskStatus status() { return status; } - public TaskResponse(final ShardId followerShardId, final ShardFollowNodeTaskStatus status) { + public StatsResponse(final ShardId followerShardId, final ShardFollowNodeTaskStatus status) { this.followerShardId = followerShardId; this.status = status; } - public TaskResponse(final StreamInput in) throws IOException { + public StatsResponse(final StreamInput in) throws IOException { this.followerShardId = ShardId.readShardId(in); this.status = new ShardFollowNodeTaskStatus(in); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java index e880b38482460..881979e3d7972 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java @@ -49,13 +49,13 @@ public ActionFuture follow(final FollowIndexAction.Request } public void stats( - final CcrStatsAction.TasksRequest request, - final ActionListener listener) { + final CcrStatsAction.StatsRequest request, + final ActionListener listener) { client.execute(CcrStatsAction.INSTANCE, request, listener); } - public ActionFuture stats(final CcrStatsAction.TasksRequest request) { - final PlainActionFuture listener = PlainActionFuture.newFuture(); + public ActionFuture stats(final CcrStatsAction.StatsRequest request) { + final PlainActionFuture listener = PlainActionFuture.newFuture(); client.execute(CcrStatsAction.INSTANCE, request, listener); return listener; } diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java index 4f9119df589b1..9c463b0513dd3 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java @@ -33,12 +33,14 @@ import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.core.monitoring.MonitoringField; import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkAction; import org.elasticsearch.xpack.core.ssl.SSLService; import org.elasticsearch.xpack.monitoring.action.TransportMonitoringBulkAction; import org.elasticsearch.xpack.monitoring.cleaner.CleanerService; import org.elasticsearch.xpack.monitoring.collector.Collector; +import org.elasticsearch.xpack.monitoring.collector.ccr.CcrStatsCollector; import org.elasticsearch.xpack.monitoring.collector.cluster.ClusterStatsCollector; import org.elasticsearch.xpack.monitoring.collector.indices.IndexRecoveryCollector; import org.elasticsearch.xpack.monitoring.collector.indices.IndexStatsCollector; @@ -142,6 +144,7 @@ public Collection createComponents(Client client, ClusterService cluster collectors.add(new NodeStatsCollector(settings, clusterService, getLicenseState(), client)); collectors.add(new IndexRecoveryCollector(settings, clusterService, getLicenseState(), client)); collectors.add(new JobStatsCollector(settings, clusterService, getLicenseState(), client)); + collectors.add(new CcrStatsCollector(settings, clusterService, getLicenseState(), client)); final MonitoringService monitoringService = new MonitoringService(settings, clusterService, threadPool, collectors, exporters); @@ -179,6 +182,7 @@ public List> getSettings() { settings.add(IndexRecoveryCollector.INDEX_RECOVERY_ACTIVE_ONLY); settings.add(IndexStatsCollector.INDEX_STATS_TIMEOUT); settings.add(JobStatsCollector.JOB_STATS_TIMEOUT); + settings.add(CcrStatsCollector.CCR_STATS_TIMEOUT); settings.add(NodeStatsCollector.NODE_STATS_TIMEOUT); settings.addAll(Exporters.getSettings()); return Collections.unmodifiableList(settings); diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollector.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollector.java new file mode 100644 index 0000000000000..17787d88e4482 --- /dev/null +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollector.java @@ -0,0 +1,88 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.monitoring.collector.ccr; + +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.xpack.core.XPackClient; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; +import org.elasticsearch.xpack.core.ccr.client.CcrClient; +import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; +import org.elasticsearch.xpack.monitoring.collector.Collector; + +import java.util.Collection; +import java.util.stream.Collectors; + +import static org.elasticsearch.xpack.core.ClientHelper.MONITORING_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin; +import static org.elasticsearch.xpack.monitoring.collector.ccr.CcrStatsMonitoringDoc.TYPE; + +public class CcrStatsCollector extends Collector { + + public static final Setting CCR_STATS_TIMEOUT = collectionTimeoutSetting("ccr.stats.timeout"); + + private final ThreadContext threadContext; + private final CcrClient ccrClient; + + public CcrStatsCollector( + final Settings settings, + final ClusterService clusterService, + final XPackLicenseState licenseState, + final Client client) { + this(settings, clusterService, licenseState, new XPackClient(client).ccr(), client.threadPool().getThreadContext()); + } + + CcrStatsCollector( + final Settings settings, + final ClusterService clusterService, + final XPackLicenseState licenseState, + final CcrClient ccrClient, + final ThreadContext threadContext) { + super(settings, TYPE, clusterService, CCR_STATS_TIMEOUT, licenseState); + this.ccrClient = ccrClient; + this.threadContext = threadContext; + } + + @Override + protected boolean shouldCollect(final boolean isElectedMaster) { + // this can only run when monitoring is allowed and CCR is enabled and allowed, but also only on the elected master node + return isElectedMaster + && super.shouldCollect(isElectedMaster) + && XPackSettings.CCR_ENABLED_SETTING.get(settings) + && licenseState.isCcrAllowed(); + } + + + @Override + protected Collection doCollect( + final MonitoringDoc.Node node, + final long interval, + final ClusterState clusterState) throws Exception { + try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, MONITORING_ORIGIN)) { + final CcrStatsAction.StatsRequest request = new CcrStatsAction.StatsRequest(); + request.setIndices(Strings.EMPTY_ARRAY); + final CcrStatsAction.StatsResponses responses = ccrClient.stats(request).actionGet(getCollectionTimeout()); + + final long timestamp = timestamp(); + final String clusterUuid = clusterUuid(clusterState); + + return responses + .getStatsResponses() + .stream() + .map(stats -> new CcrStatsMonitoringDoc(clusterUuid, timestamp, interval, node, stats.status())) + .collect(Collectors.toList()); + } + } +} diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDoc.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDoc.java new file mode 100644 index 0000000000000..c46f2da7219b0 --- /dev/null +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDoc.java @@ -0,0 +1,41 @@ +package org.elasticsearch.xpack.monitoring.collector.ccr; + +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; +import org.elasticsearch.xpack.core.monitoring.MonitoredSystem; +import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; + +import java.io.IOException; +import java.util.Objects; + +public class CcrStatsMonitoringDoc extends MonitoringDoc { + + public static final String TYPE = "ccr_stats"; + + private final ShardFollowNodeTaskStatus status; + + public ShardFollowNodeTaskStatus status() { + return status; + } + + public CcrStatsMonitoringDoc( + final String cluster, + final long timestamp, + final long intervalMillis, + final MonitoringDoc.Node node, + final ShardFollowNodeTaskStatus status) { + super(cluster, timestamp, intervalMillis, node, MonitoredSystem.ES, TYPE, null); + this.status = Objects.requireNonNull(status, "status"); + } + + + @Override + protected void innerToXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(TYPE); + { + status.toXContentFragment(builder, params); + } + builder.endObject(); + } + +} diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java new file mode 100644 index 0000000000000..1ef433486744f --- /dev/null +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java @@ -0,0 +1,196 @@ +package org.elasticsearch.xpack.monitoring.collector.ccr; + +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; +import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; +import org.elasticsearch.xpack.core.ccr.client.CcrClient; +import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; +import org.elasticsearch.xpack.core.ml.action.util.QueryPage; +import org.elasticsearch.xpack.core.ml.client.MachineLearningClient; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.monitoring.MonitoredSystem; +import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; +import org.elasticsearch.xpack.monitoring.BaseCollectorTestCase; +import org.elasticsearch.xpack.monitoring.collector.ml.JobStatsCollector; +import org.elasticsearch.xpack.monitoring.collector.ml.JobStatsMonitoringDoc; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import static java.util.Collections.emptyList; +import static org.elasticsearch.xpack.monitoring.MonitoringTestUtils.randomMonitoringNode; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.*; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CcrStatsCollectorTests extends BaseCollectorTestCase { + + public void testShouldCollectReturnsFalseIfMonitoringNotAllowed() { + final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings()); + final boolean ccrAllowed = randomBoolean(); + final boolean isElectedMaster = randomBoolean(); + whenLocalNodeElectedMaster(isElectedMaster); + + // this controls the blockage + when(licenseState.isMonitoringAllowed()).thenReturn(false); + when(licenseState.isCcrAllowed()).thenReturn(ccrAllowed); + + final CcrStatsCollector collector = new CcrStatsCollector(settings, clusterService, licenseState, client); + + assertThat(collector.shouldCollect(isElectedMaster), is(false)); + if (isElectedMaster) { + verify(licenseState).isMonitoringAllowed(); + } + } + + public void testShouldCollectReturnsFalseIfNotMaster() { + // regardless of CCR being enabled + final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings()); + + when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean()); + when(licenseState.isCcrAllowed()).thenReturn(randomBoolean()); + // this controls the blockage + final boolean isElectedMaster = false; + + final CcrStatsCollector collector = new CcrStatsCollector(settings, clusterService, licenseState, client); + + assertThat(collector.shouldCollect(isElectedMaster), is(false)); + } + + public void testShouldCollectReturnsFalseIfCCRIsDisabled() { + // this is controls the blockage + final Settings settings = ccrDisabledSettings(); + + when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean()); + when(licenseState.isCcrAllowed()).thenReturn(randomBoolean()); + + final boolean isElectedMaster = randomBoolean(); + whenLocalNodeElectedMaster(isElectedMaster); + + final CcrStatsCollector collector = new CcrStatsCollector(settings, clusterService, licenseState, client); + + assertThat(collector.shouldCollect(isElectedMaster), is(false)); + + if (isElectedMaster) { + verify(licenseState).isMonitoringAllowed(); + } + } + + public void testShouldCollectReturnsFalseIfCCRIsNotAllowed() { + final Settings settings = randomFrom(ccrEnabledSettings(), ccrDisabledSettings()); + + when(licenseState.isMonitoringAllowed()).thenReturn(randomBoolean()); + // this is controls the blockage + when(licenseState.isCcrAllowed()).thenReturn(false); + final boolean isElectedMaster = randomBoolean(); + whenLocalNodeElectedMaster(isElectedMaster); + + final CcrStatsCollector collector = new CcrStatsCollector(settings, clusterService, licenseState, client); + + assertThat(collector.shouldCollect(isElectedMaster), is(false)); + + if (isElectedMaster) { + verify(licenseState).isMonitoringAllowed(); + } + } + + public void testShouldCollectReturnsTrue() { + final Settings settings = ccrEnabledSettings(); + + when(licenseState.isMonitoringAllowed()).thenReturn(true); + when(licenseState.isCcrAllowed()).thenReturn(true); + final boolean isElectedMaster = true; + + final CcrStatsCollector collector = new CcrStatsCollector(settings, clusterService, licenseState, client); + + assertThat(collector.shouldCollect(isElectedMaster), is(true)); + + verify(licenseState).isMonitoringAllowed(); + } + + public void testDoCollect() throws Exception { + final String clusterUuid = randomAlphaOfLength(5); + whenClusterStateWithUUID(clusterUuid); + + final MonitoringDoc.Node node = randomMonitoringNode(random()); + final CcrClient client = mock(CcrClient.class); + final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + + final TimeValue timeout = TimeValue.timeValueSeconds(randomIntBetween(1, 120)); + withCollectionTimeout(CcrStatsCollector.CCR_STATS_TIMEOUT, timeout); + + final CcrStatsCollector collector = new CcrStatsCollector(Settings.EMPTY, clusterService, licenseState, client, threadContext); + assertEquals(timeout, collector.getCollectionTimeout()); + + final List statuses = mockStatuses(); + + @SuppressWarnings("unchecked") + final ActionFuture future = (ActionFuture)mock(ActionFuture.class); + final CcrStatsAction.StatsResponses responses = new CcrStatsAction.StatsResponses(emptyList(), emptyList(), statuses); + + final CcrStatsAction.StatsRequest request = new CcrStatsAction.StatsRequest(); + request.setIndices(Strings.EMPTY_ARRAY); + when(client.stats(eq(request))).thenReturn(future); + when(future.actionGet(timeout)).thenReturn(responses); + + final long interval = randomNonNegativeLong(); + + final Collection documents = collector.doCollect(node, interval, clusterState); + verify(clusterState).metaData(); + verify(metaData).clusterUUID(); + + assertThat(documents, hasSize(statuses.size())); + + int index = 0; + for (final Iterator it = documents.iterator(); it.hasNext(); index++) { + final CcrStatsMonitoringDoc document = (CcrStatsMonitoringDoc)it.next(); + final CcrStatsAction.StatsResponse status = statuses.get(index); + + assertThat(document.getCluster(), is(clusterUuid)); + assertThat(document.getTimestamp(), greaterThan(0L)); + assertThat(document.getIntervalMillis(), equalTo(interval)); + assertThat(document.getNode(), equalTo(node)); + assertThat(document.getSystem(), is(MonitoredSystem.ES)); + assertThat(document.getType(), is(JobStatsMonitoringDoc.TYPE)); + assertThat(document.getId(), nullValue()); + assertThat(document.status(), is(status)); + } + } + + private List mockStatuses() { + final int count = randomIntBetween(1, 8); + final List statuses = new ArrayList<>(count); + + for (int i = 0; i < count; ++i) { + statuses.add(mock(CcrStatsAction.StatsResponse.class)); + } + + return statuses; + } + + private Settings ccrEnabledSettings() { + // since it's the default, we want to ensure we test both with/without it + return randomBoolean() ? Settings.EMPTY : Settings.builder().put(XPackSettings.CCR_ENABLED_SETTING.getKey(), true).build(); + } + + private Settings ccrDisabledSettings() { + return Settings.builder().put(XPackSettings.CCR_ENABLED_SETTING.getKey(), false).build(); + } + +} \ No newline at end of file diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java new file mode 100644 index 0000000000000..1153b81bc3384 --- /dev/null +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java @@ -0,0 +1,171 @@ +package org.elasticsearch.xpack.monitoring.collector.ccr; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; +import org.elasticsearch.xpack.core.monitoring.MonitoredSystem; +import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; +import org.elasticsearch.xpack.monitoring.collector.ml.JobStatsMonitoringDoc; +import org.elasticsearch.xpack.monitoring.exporter.BaseMonitoringDocTestCase; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.junit.Before; + +import java.io.IOException; +import java.util.Collections; +import java.util.NavigableMap; +import java.util.TreeMap; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + +public class CcrStatsMonitoringDocTests extends BaseMonitoringDocTestCase { + + private ShardFollowNodeTaskStatus status; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + status = mock(ShardFollowNodeTaskStatus.class); + } + + public void testConstructorStatusMustNotBeNull() { + final NullPointerException e = + expectThrows(NullPointerException.class, () -> new CcrStatsMonitoringDoc(cluster, timestamp, interval, node, null)); + assertThat(e, hasToString(containsString("status"))); + } + + @Override + protected CcrStatsMonitoringDoc createMonitoringDoc( + final String cluster, + final long timestamp, + final long interval, + final MonitoringDoc.Node node, + final MonitoredSystem system, + final String type, + final String id) { + return new CcrStatsMonitoringDoc(cluster, timestamp, interval, node, status); + } + + @Override + protected void assertMonitoringDoc(CcrStatsMonitoringDoc document) { + assertThat(document.getSystem(), is(MonitoredSystem.ES)); + assertThat(document.getType(), is(CcrStatsMonitoringDoc.TYPE)); + assertThat(document.getId(), nullValue()); + assertThat(document.status(), is(status)); + } + + @Override + public void testToXContent() throws IOException { + final long timestamp = System.currentTimeMillis(); + final long intervalMillis = System.currentTimeMillis(); + final long nodeTimestamp = System.currentTimeMillis(); + final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", nodeTimestamp); + // these random values do not need to be internally consistent, they are only for testing formatting + final int shardId = randomIntBetween(0, Integer.MAX_VALUE); + final long leaderGlobalCheckpoint = randomNonNegativeLong(); + final long leaderMaxSeqNo = randomNonNegativeLong(); + final long followerGlobalCheckpoint = randomNonNegativeLong(); + final long followerMaxSeqNo = randomNonNegativeLong(); + final long lastRequestedSeqNo = randomNonNegativeLong(); + final int numberOfConcurrentReads = randomIntBetween(1, Integer.MAX_VALUE); + final int numberOfConcurrentWrites = randomIntBetween(1, Integer.MAX_VALUE); + final int numberOfQueuedWrites = randomIntBetween(0, Integer.MAX_VALUE); + final long mappingVersion = randomIntBetween(0, Integer.MAX_VALUE); + final long totalFetchTimeMillis = randomLongBetween(0, 4096); + final long numberOfSuccessfulFetches = randomNonNegativeLong(); + final long numberOfFailedFetches = randomLongBetween(0, 8); + final long operationsReceived = randomNonNegativeLong(); + final long totalTransferredBytes = randomNonNegativeLong(); + final long totalIndexTimeMillis = randomNonNegativeLong(); + final long numberOfSuccessfulBulkOperations = randomNonNegativeLong(); + final long numberOfFailedBulkOperations = randomNonNegativeLong(); + final long numberOfOperationsIndexed = randomNonNegativeLong(); + final NavigableMap fetchExceptions = + new TreeMap<>(Collections.singletonMap(randomNonNegativeLong(), new ElasticsearchException("shard is sad"))); + final long timeSinceLastFetchMillis = randomNonNegativeLong(); + final ShardFollowNodeTaskStatus status = new ShardFollowNodeTaskStatus( + "cluster_alias:leader_index", + shardId, + leaderGlobalCheckpoint, + leaderMaxSeqNo, + followerGlobalCheckpoint, + followerMaxSeqNo, + lastRequestedSeqNo, + numberOfConcurrentReads, + numberOfConcurrentWrites, + numberOfQueuedWrites, + mappingVersion, + totalFetchTimeMillis, + numberOfSuccessfulFetches, + numberOfFailedFetches, + operationsReceived, + totalTransferredBytes, + totalIndexTimeMillis, + numberOfSuccessfulBulkOperations, + numberOfFailedBulkOperations, + numberOfOperationsIndexed, + fetchExceptions, + timeSinceLastFetchMillis); + final CcrStatsMonitoringDoc document = new CcrStatsMonitoringDoc("_cluster", timestamp, intervalMillis, node, status); + final BytesReference xContent = XContentHelper.toXContent(document, XContentType.JSON, false); + assertThat( + xContent.utf8ToString(), + equalTo( + "{" + + "\"cluster_uuid\":\"_cluster\"," + + "\"timestamp\":\"" + new DateTime(timestamp, DateTimeZone.UTC).toString() + "\"," + + "\"interval_ms\":" + intervalMillis + "," + + "\"type\":\"ccr_stats\"," + + "\"source_node\":{" + + "\"uuid\":\"_uuid\"," + + "\"host\":\"_host\"," + + "\"transport_address\":\"_addr\"," + + "\"ip\":\"_ip\"," + + "\"name\":\"_name\"," + + "\"timestamp\":\"" + new DateTime(nodeTimestamp, DateTimeZone.UTC).toString() + "\"" + + "}," + + "\"ccr_stats\":{" + + "\"leader_index\":\"cluster_alias:leader_index\"," + + "\"shard_id\":" + shardId + "," + + "\"leader_global_checkpoint\":" + leaderGlobalCheckpoint + "," + + "\"leader_max_seq_no\":" + leaderMaxSeqNo + "," + + "\"follower_global_checkpoint\":" + followerGlobalCheckpoint + "," + + "\"follower_max_seq_no\":" + followerMaxSeqNo + "," + + "\"last_requested_seq_no\":" + lastRequestedSeqNo + "," + + "\"number_of_concurrent_reads\":" + numberOfConcurrentReads + "," + + "\"number_of_concurrent_writes\":" + numberOfConcurrentWrites + "," + + "\"number_of_queued_writes\":" + numberOfQueuedWrites + "," + + "\"mapping_version\":" + mappingVersion + "," + + "\"total_fetch_time_millis\":" + totalFetchTimeMillis + "," + + "\"number_of_successful_fetches\":" + numberOfSuccessfulFetches + "," + + "\"number_of_failed_fetches\":" + numberOfFailedFetches + "," + + "\"operations_received\":" + operationsReceived + "," + + "\"total_transferred_bytes\":" + totalTransferredBytes + "," + + "\"total_index_time_millis\":" + totalIndexTimeMillis +"," + + "\"number_of_successful_bulk_operations\":" + numberOfSuccessfulBulkOperations + "," + + "\"number_of_failed_bulk_operations\":" + numberOfFailedBulkOperations + "," + + "\"number_of_operations_indexed\":" + numberOfOperationsIndexed + "," + + "\"fetch_exceptions\":[" + + "{" + + "\"from_seq_no\":" + fetchExceptions.keySet().iterator().next() + "," + + "\"exception\":{" + + "\"type\":\"exception\"," + + "\"reason\":\"shard is sad\"" + + "}" + + "}" + + "]," + + "\"time_since_last_fetch_millis\":" + timeSinceLastFetchMillis + + "}" + + "}")); + } + +} \ No newline at end of file From d5de41fdc9e30061d8e1f287ba72363519b102fe Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 11 Sep 2018 23:00:07 -0400 Subject: [PATCH 02/12] Add missing newline --- .../xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java index 1ef433486744f..9efeb34777af1 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java @@ -193,4 +193,4 @@ private Settings ccrDisabledSettings() { return Settings.builder().put(XPackSettings.CCR_ENABLED_SETTING.getKey(), false).build(); } -} \ No newline at end of file +} From c3944f00a9b4a96ed1da7d8f53fb5e3e1f43de82 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 11 Sep 2018 23:00:27 -0400 Subject: [PATCH 03/12] Add another missing newline --- .../xpack/monitoring/collector/ccr/CcrStatsCollector.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollector.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollector.java index 17787d88e4482..fbb7505af4d07 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollector.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollector.java @@ -85,4 +85,5 @@ protected Collection doCollect( .collect(Collectors.toList()); } } + } From 1b95239bf3f9411eb3eca6bd301562d61092a07a Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 11 Sep 2018 23:23:22 -0400 Subject: [PATCH 04/12] Fix imports --- .../java/org/elasticsearch/xpack/ccr/Ccr.java | 17 ++++++++--------- .../xpack/ccr/action/ShardFollowNodeTask.java | 2 +- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index b62fe16f34956..0a40626af5870 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -40,22 +40,17 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator; -import org.elasticsearch.xpack.ccr.action.TransportUnfollowIndexAction; -import org.elasticsearch.xpack.core.XPackSettings; -import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; -import org.elasticsearch.xpack.ccr.action.TransportCreateAndFollowIndexAction; -import org.elasticsearch.xpack.ccr.action.TransportFollowIndexAction; -import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction; import org.elasticsearch.xpack.ccr.action.DeleteAutoFollowPatternAction; -import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction; import org.elasticsearch.xpack.ccr.action.PutAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.ShardChangesAction; import org.elasticsearch.xpack.ccr.action.ShardFollowTask; import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor; import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction; +import org.elasticsearch.xpack.ccr.action.TransportCreateAndFollowIndexAction; import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction; +import org.elasticsearch.xpack.ccr.action.TransportFollowIndexAction; import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction; -import org.elasticsearch.xpack.core.ccr.action.UnfollowIndexAction; +import org.elasticsearch.xpack.ccr.action.TransportUnfollowIndexAction; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; @@ -67,6 +62,10 @@ import org.elasticsearch.xpack.ccr.rest.RestUnfollowIndexAction; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; +import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; +import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction; +import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction; +import org.elasticsearch.xpack.core.ccr.action.UnfollowIndexAction; import java.util.Arrays; import java.util.Collection; @@ -77,8 +76,8 @@ import java.util.function.Supplier; import static java.util.Collections.emptyList; -import static org.elasticsearch.xpack.core.XPackSettings.CCR_ENABLED_SETTING; import static org.elasticsearch.xpack.ccr.CcrSettings.CCR_FOLLOWING_INDEX_SETTING; +import static org.elasticsearch.xpack.core.XPackSettings.CCR_ENABLED_SETTING; /** * Container class for CCR functionality. diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 0a0a6877dc92a..07d894788664f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -18,8 +18,8 @@ import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; -import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; +import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction; import java.util.ArrayList; import java.util.Arrays; From ca642b0ca884df5d61e2b825b94d2458ce2d15d5 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 12 Sep 2018 01:21:40 -0400 Subject: [PATCH 05/12] Fix imports --- .../main/java/org/elasticsearch/xpack/monitoring/Monitoring.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java index 9c463b0513dd3..bb2ed76831da2 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/Monitoring.java @@ -33,7 +33,6 @@ import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; -import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.core.monitoring.MonitoringField; import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkAction; import org.elasticsearch.xpack.core.ssl.SSLService; From 418b7185a488a87a39f8eab729b97dc473daf5e4 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 12 Sep 2018 01:39:07 -0400 Subject: [PATCH 06/12] Remove imports --- .../monitoring/collector/ccr/CcrStatsCollectorTests.java | 9 --------- .../collector/ccr/CcrStatsMonitoringDocTests.java | 2 -- 2 files changed, 11 deletions(-) diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java index 9efeb34777af1..062bb61815d66 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java @@ -1,28 +1,20 @@ package org.elasticsearch.xpack.monitoring.collector.ccr; import org.elasticsearch.action.ActionFuture; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.xpack.core.XPackSettings; -import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.core.ccr.client.CcrClient; -import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; -import org.elasticsearch.xpack.core.ml.action.util.QueryPage; -import org.elasticsearch.xpack.core.ml.client.MachineLearningClient; -import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.monitoring.MonitoredSystem; import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; import org.elasticsearch.xpack.monitoring.BaseCollectorTestCase; -import org.elasticsearch.xpack.monitoring.collector.ml.JobStatsCollector; import org.elasticsearch.xpack.monitoring.collector.ml.JobStatsMonitoringDoc; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -33,7 +25,6 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.*; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java index 1153b81bc3384..9a1570fd24294 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java @@ -7,7 +7,6 @@ import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; import org.elasticsearch.xpack.core.monitoring.MonitoredSystem; import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; -import org.elasticsearch.xpack.monitoring.collector.ml.JobStatsMonitoringDoc; import org.elasticsearch.xpack.monitoring.exporter.BaseMonitoringDocTestCase; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -23,7 +22,6 @@ import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.*; import static org.mockito.Mockito.mock; public class CcrStatsMonitoringDocTests extends BaseMonitoringDocTestCase { From eb95aaf7a0e5721631438d9bd73cd3e5318d504d Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 12 Sep 2018 01:43:52 -0400 Subject: [PATCH 07/12] Newline --- .../monitoring/collector/ccr/CcrStatsMonitoringDocTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java index 9a1570fd24294..90dd391b9465d 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java @@ -166,4 +166,4 @@ public void testToXContent() throws IOException { + "}")); } -} \ No newline at end of file +} From b1a6b38d28250ebead17a371657e74393c9a0dde Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 12 Sep 2018 02:06:25 -0400 Subject: [PATCH 08/12] License headers --- .../monitoring/collector/ccr/CcrStatsMonitoringDoc.java | 6 ++++++ .../monitoring/collector/ccr/CcrStatsCollectorTests.java | 6 ++++++ .../collector/ccr/CcrStatsMonitoringDocTests.java | 6 ++++++ 3 files changed, 18 insertions(+) diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDoc.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDoc.java index c46f2da7219b0..45c6a8607d473 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDoc.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDoc.java @@ -1,3 +1,9 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + package org.elasticsearch.xpack.monitoring.collector.ccr; import org.elasticsearch.common.xcontent.XContentBuilder; diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java index 062bb61815d66..7cf36f0158ad1 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java @@ -1,3 +1,9 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + package org.elasticsearch.xpack.monitoring.collector.ccr; import org.elasticsearch.action.ActionFuture; diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java index 90dd391b9465d..47f2bdf5d2e50 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java @@ -1,3 +1,9 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + package org.elasticsearch.xpack.monitoring.collector.ccr; import org.elasticsearch.ElasticsearchException; From 6772a2ebc4c82656c0e9f1ebbbf3b76aebc490b0 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 12 Sep 2018 09:59:57 +0200 Subject: [PATCH 09/12] fixed test --- .../collector/ccr/CcrStatsCollectorTests.java | 36 +++++++++++++++---- 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java index 7cf36f0158ad1..aaf3a61643b5e 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsCollectorTests.java @@ -12,14 +12,16 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.core.ccr.client.CcrClient; import org.elasticsearch.xpack.core.monitoring.MonitoredSystem; import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; import org.elasticsearch.xpack.monitoring.BaseCollectorTestCase; -import org.elasticsearch.xpack.monitoring.collector.ml.JobStatsMonitoringDoc; +import org.mockito.ArgumentMatcher; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -31,7 +33,7 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; -import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -143,7 +145,7 @@ public void testDoCollect() throws Exception { final CcrStatsAction.StatsRequest request = new CcrStatsAction.StatsRequest(); request.setIndices(Strings.EMPTY_ARRAY); - when(client.stats(eq(request))).thenReturn(future); + when(client.stats(statsRequestEq(request))).thenReturn(future); when(future.actionGet(timeout)).thenReturn(responses); final long interval = randomNonNegativeLong(); @@ -164,9 +166,9 @@ public void testDoCollect() throws Exception { assertThat(document.getIntervalMillis(), equalTo(interval)); assertThat(document.getNode(), equalTo(node)); assertThat(document.getSystem(), is(MonitoredSystem.ES)); - assertThat(document.getType(), is(JobStatsMonitoringDoc.TYPE)); + assertThat(document.getType(), is(CcrStatsMonitoringDoc.TYPE)); assertThat(document.getId(), nullValue()); - assertThat(document.status(), is(status)); + assertThat(document.status(), is(status.status())); } } @@ -175,7 +177,10 @@ private List mockStatuses() { final List statuses = new ArrayList<>(count); for (int i = 0; i < count; ++i) { - statuses.add(mock(CcrStatsAction.StatsResponse.class)); + CcrStatsAction.StatsResponse statsResponse = mock(CcrStatsAction.StatsResponse.class); + ShardFollowNodeTaskStatus status = mock(ShardFollowNodeTaskStatus.class); + when(statsResponse.status()).thenReturn(status); + statuses.add(statsResponse); } return statuses; @@ -190,4 +195,23 @@ private Settings ccrDisabledSettings() { return Settings.builder().put(XPackSettings.CCR_ENABLED_SETTING.getKey(), false).build(); } + private static CcrStatsAction.StatsRequest statsRequestEq(CcrStatsAction.StatsRequest expected) { + return argThat(new StatsRequestMatches(expected)); + } + + private static class StatsRequestMatches extends ArgumentMatcher { + + private final CcrStatsAction.StatsRequest expected; + + private StatsRequestMatches(CcrStatsAction.StatsRequest expected) { + this.expected = expected; + } + + @Override + public boolean matches(Object o) { + CcrStatsAction.StatsRequest actual = (CcrStatsAction.StatsRequest) o; + return Arrays.equals(expected.indices(), actual.indices()); + } + } + } From 584b2597e248eec9f44af562da69efdee8296f5f Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 12 Sep 2018 11:47:29 +0200 Subject: [PATCH 10/12] also verify whether monitoring is indexing ccr stats docs --- .../plugin/ccr/qa/multi-cluster/build.gradle | 1 + .../xpack/ccr/FollowIndexIT.java | 37 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/x-pack/plugin/ccr/qa/multi-cluster/build.gradle b/x-pack/plugin/ccr/qa/multi-cluster/build.gradle index 396c247af40b0..b3b6372384888 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/build.gradle +++ b/x-pack/plugin/ccr/qa/multi-cluster/build.gradle @@ -27,6 +27,7 @@ followClusterTestCluster { dependsOn leaderClusterTestRunner numNodes = 1 clusterName = 'follow-cluster' + setting 'xpack.monitoring.collection.enabled', 'true' setting 'xpack.license.self_generated.type', 'trial' setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\"" } diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java index 76d0e43813594..5dd269a261640 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java @@ -22,9 +22,11 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class FollowIndexIT extends ESRestTestCase { @@ -75,6 +77,7 @@ public void testFollowIndex() throws Exception { index(leaderClient, leaderIndexName, Integer.toString(id + 2), "field", id + 2, "filtered_field", "true"); } assertBusy(() -> verifyDocuments(followIndexName, numDocs + 3)); + assertBusy(() -> verifyCcrMonitoring(leaderIndexName)); } } @@ -104,6 +107,7 @@ public void testAutoFollowPatterns() throws Exception { ensureYellow("logs-20190101"); verifyDocuments("logs-20190101", 5); }); + assertBusy(() -> verifyCcrMonitoring("logs-20190101")); } private static void index(RestClient client, String index, String id, Object... fields) throws IOException { @@ -155,6 +159,39 @@ private static void verifyDocuments(String index, int expectedNumDocs) throws IO } } + private static void verifyCcrMonitoring(String expectedLeaderIndex) throws IOException { + ensureYellow(".monitoring-*"); + + Request request = new Request("GET", "/.monitoring-*/_search"); + request.setJsonEntity("{\"query\": {\"term\": {\"type\": \"ccr_stats\"}}}"); + Map response = toMap(client().performRequest(request)); + + int numDocs = (int) XContentMapValues.extractValue("hits.total", response); + assertThat(numDocs, greaterThanOrEqualTo(1)); + + int numberOfOperationsReceived = 0; + int numberOfOperationsIndexed = 0; + + List hits = (List) XContentMapValues.extractValue("hits.hits", response); + for (int i = 0; i < numDocs; i++) { + Map hit = (Map) hits.get(i); + String leaderIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.leader_index", hit); + if (leaderIndex.endsWith(expectedLeaderIndex) == false) { + continue; + } + + int foundNumberOfOperationsReceived = + (int) XContentMapValues.extractValue("_source.ccr_stats.operations_received", hit); + numberOfOperationsReceived = Math.max(numberOfOperationsReceived, foundNumberOfOperationsReceived); + int foundNumberOfOperationsIndexed = + (int) XContentMapValues.extractValue("_source.ccr_stats.number_of_operations_indexed", hit); + numberOfOperationsIndexed = Math.max(numberOfOperationsIndexed, foundNumberOfOperationsIndexed); + } + + assertThat(numberOfOperationsReceived, greaterThanOrEqualTo(1)); + assertThat(numberOfOperationsIndexed, greaterThanOrEqualTo(1)); + } + private static Map toMap(Response response) throws IOException { return toMap(EntityUtils.toString(response.getEntity())); } From 2c9bdbfa1d664ffc8e826007f914720c59c1c3ca Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 12 Sep 2018 11:52:35 +0200 Subject: [PATCH 11/12] and now in combination with security --- .../multi-cluster-with-security/build.gradle | 2 +- .../xpack/ccr/FollowIndexSecurityIT.java | 44 +++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-security/build.gradle b/x-pack/plugin/ccr/qa/multi-cluster-with-security/build.gradle index d4fe9ee554c3d..e2c772d708846 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-security/build.gradle +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-security/build.gradle @@ -47,7 +47,7 @@ followClusterTestCluster { setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\"" setting 'xpack.license.self_generated.type', 'trial' setting 'xpack.security.enabled', 'true' - setting 'xpack.monitoring.enabled', 'false' + setting 'xpack.monitoring.collection.enabled', 'true' extraConfigFile 'roles.yml', 'roles.yml' setupCommand 'setupTestAdmin', 'bin/elasticsearch-users', 'useradd', "test_admin", '-p', 'x-pack-test-password', '-r', "superuser" diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java index d8357a74e8ebc..26389302ece49 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java @@ -30,6 +30,7 @@ import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; public class FollowIndexSecurityIT extends ESRestTestCase { @@ -80,6 +81,7 @@ public void testFollowIndex() throws Exception { createAndFollowIndex("leader_cluster:" + allowedIndex, allowedIndex); assertBusy(() -> verifyDocuments(client(), allowedIndex, numDocs)); assertThat(countCcrNodeTasks(), equalTo(1)); + assertBusy(() -> verifyCcrMonitoring(allowedIndex)); assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/unfollow"))); // Make sure that there are no other ccr relates operations running: assertBusy(() -> { @@ -203,4 +205,46 @@ private static boolean indexExists(RestClient client, String index) throws IOExc return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode(); } + private static void verifyCcrMonitoring(String expectedLeaderIndex) throws IOException { + ensureYellow(".monitoring-*"); + + Request request = new Request("GET", "/.monitoring-*/_search"); + request.setJsonEntity("{\"query\": {\"term\": {\"type\": \"ccr_stats\"}}}"); + Map response = toMap(adminClient().performRequest(request)); + + int numDocs = (int) XContentMapValues.extractValue("hits.total", response); + assertThat(numDocs, greaterThanOrEqualTo(1)); + + int numberOfOperationsReceived = 0; + int numberOfOperationsIndexed = 0; + + List hits = (List) XContentMapValues.extractValue("hits.hits", response); + for (int i = 0; i < numDocs; i++) { + Map hit = (Map) hits.get(i); + String leaderIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.leader_index", hit); + if (leaderIndex.endsWith(expectedLeaderIndex) == false) { + continue; + } + + int foundNumberOfOperationsReceived = + (int) XContentMapValues.extractValue("_source.ccr_stats.operations_received", hit); + numberOfOperationsReceived = Math.max(numberOfOperationsReceived, foundNumberOfOperationsReceived); + int foundNumberOfOperationsIndexed = + (int) XContentMapValues.extractValue("_source.ccr_stats.number_of_operations_indexed", hit); + numberOfOperationsIndexed = Math.max(numberOfOperationsIndexed, foundNumberOfOperationsIndexed); + } + + assertThat(numberOfOperationsReceived, greaterThanOrEqualTo(1)); + assertThat(numberOfOperationsIndexed, greaterThanOrEqualTo(1)); + } + + private static void ensureYellow(String index) throws IOException { + Request request = new Request("GET", "/_cluster/health/" + index); + request.addParameter("wait_for_status", "yellow"); + request.addParameter("wait_for_no_relocating_shards", "true"); + request.addParameter("timeout", "70s"); + request.addParameter("level", "shards"); + adminClient().performRequest(request); + } + } From 589e259cc5e0a98a83d698d7ea42cbe6c834543e Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 12 Sep 2018 12:46:44 +0200 Subject: [PATCH 12/12] fixed checkstyle violation --- .../src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java index 5dd269a261640..0e56084e10c54 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.equalTo;