Skip to content

Commit

Permalink
IT plus fixes; going towards HealthNode direction
Browse files Browse the repository at this point in the history
  • Loading branch information
ldematte committed Feb 5, 2024
1 parent 2d7e95b commit 882f059
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 152 deletions.
1 change: 1 addition & 0 deletions modules/index-size/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* 2.0.
*/
apply plugin: 'elasticsearch.internal-es-plugin'
apply plugin: 'elasticsearch.internal-cluster-test'

esplugin {
name 'index-size'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.metering.indexsize;

import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.After;

import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class IndexSizeTaskIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Stream.concat(super.nodePlugins().stream(), Stream.of(IndexSizePlugin.class)).collect(Collectors.toSet());
}

@After
public void cleanUp() {
updateClusterSettings(
Settings.builder()
.putNull(IndexSizeTaskExecutor.ENABLED_SETTING.getKey())
.putNull(IndexSizeTaskExecutor.POLL_INTERVAL_SETTING.getKey())
);
}

public void testTaskRemovedAfterCancellation() throws Exception {
updateClusterSettings(Settings.builder().put(IndexSizeTaskExecutor.ENABLED_SETTING.getKey(), true));
assertBusy(() -> {
var task = IndexSize.findTask(clusterService().state());
assertNotNull(task);
assertTrue(task.isAssigned());
});
assertBusy(() -> {
ListTasksResponse tasks = clusterAdmin().listTasks(new ListTasksRequest().setActions("index-size[c]")).actionGet();
assertEquals(1, tasks.getTasks().size());
});
updateClusterSettings(Settings.builder().put(IndexSizeTaskExecutor.ENABLED_SETTING.getKey(), false));
assertBusy(() -> {
ListTasksResponse tasks2 = clusterAdmin().listTasks(new ListTasksRequest().setActions("index-size[c]")).actionGet();
assertEquals(0, tasks2.getTasks().size());
});
}

private PersistentTasksCustomMetadata.PersistentTask<PersistentTaskParams> getTask() {
return PersistentTasksCustomMetadata.getTaskWithId(clusterService().state(), IndexSize.TASK_NAME);
}
}
14 changes: 14 additions & 0 deletions modules/index-size/src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module elasticsearch.modules.index.size.main {

requires org.elasticsearch.base;
requires org.elasticsearch.server;
requires org.elasticsearch.xcontent;
requires org.apache.logging.log4j;
requires org.apache.lucene.core;

provides org.elasticsearch.features.FeatureSpecification
with
org.elasticsearch.metering.indexsize.IndexSizeFeatures;

exports org.elasticsearch.metering.indexsize;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,15 @@

package org.elasticsearch.metering.indexsize;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Map;
import java.util.function.Supplier;

public class IndexSize extends AllocatedPersistentTask {

Expand All @@ -35,21 +29,13 @@ public class IndexSize extends AllocatedPersistentTask {
String action,
String description,
TaskId parentTask,
Map<String, String> headers,
ThreadPool threadPool, // Alternative B
Supplier<TimeValue> pollIntervalSupplier // Alternative B
Map<String, String> headers
) {
super(id, type, action, description, parentTask, headers);
this.threadPool = threadPool;
this.pollIntervalSupplier = pollIntervalSupplier;
}

@Override
protected void onCancelled() {
// Alternative B
if (scheduled != null) {
scheduled.cancel();
}
markAsCompleted();
}

Expand All @@ -69,54 +55,4 @@ public static DiscoveryNode findIndexSizeNode(ClusterState clusterState) {
}
return clusterState.nodes().get(task.getAssignment().getExecutorNode());
}

// "GeoIpDownloader" style (Alternative B)
private static final Logger logger = LogManager.getLogger(IndexSize.class);
private final ThreadPool threadPool;
private volatile Scheduler.ScheduledCancellable scheduled;
private final Supplier<TimeValue> pollIntervalSupplier;

void run() {
if (isCancelled() || isCompleted()) {
return;
}
try {
doSomething();
} catch (Exception e) {
logger.error("exception during IX action", e);
}
scheduleNextRun(pollIntervalSupplier.get());
}

private void doSomething() {
// TODO: call IndexSizeService
}

private void scheduleNextRun(TimeValue time) {
if (threadPool.scheduler().isShutdown() == false) {
scheduled = threadPool.schedule(this::run, time, threadPool.generic());
}
}

/**
* This method requests the task to be rescheduled and run immediately, presumably because a dynamic property supplied by
* pollIntervalSupplier has changed. This method does nothing if this task is cancelled, completed, or has not yet been
* scheduled to run for the first time. It cancels any existing scheduled run.
*/
void requestReschedule() {
if (isCancelled() || isCompleted()) {
return;
}
if (scheduled != null && scheduled.cancel()) {
scheduleNextRun(TimeValue.ZERO);
}
}

// Alternative B
// IndexSizeTaskState implements PersistentTaskState
// void updateTaskState() {
// PlainActionFuture<PersistentTasksCustomMetadata.PersistentTask<?>> future = new PlainActionFuture<>();
// updatePersistentTaskState(state, future);
// state = ((IndexSizeTaskState) future.actionGet().getState());
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
public class IndexSizeFeatures implements FeatureSpecification {
@Override
public Set<NodeFeature> getFeatures() {
// return Set.of(IndexSize.INDEX_SIZE_SUPPORTED);
return FeatureSpecification.super.getFeatures();
return Set.of(IndexSize.INDEX_SIZE_SUPPORTED);
//return FeatureSpecification.super.getFeatures();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,33 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.plugins.PersistentTaskPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ParseField;

import java.io.IOException;
import java.util.Collection;
import java.util.List;

public class IndexSizePlugin extends Plugin implements PersistentTaskPlugin {

private IndexSizeTaskExecutor indexSizeTaskExecutor;
private IndexSizePeriodicConsumer indexSizePeriodicConsumer;

@Override
public List<Setting<?>> getSettings() {
return List.of(
IndexSizeTaskExecutor.ENABLED_SETTING,
IndexSizeTaskExecutor.POLL_INTERVAL_SETTING
);
}

@Override
public Collection<?> createComponents(Plugin.PluginServices services) {
Expand All @@ -34,8 +49,7 @@ public Collection<?> createComponents(Plugin.PluginServices services) {
);
indexSizeTaskExecutor.registerListeners(services.clusterService().getClusterSettings());

// Alternative A
var indexSizePeriodicConsumer = IndexSizePeriodicConsumer.create(
indexSizePeriodicConsumer = IndexSizePeriodicConsumer.create(
services.clusterService().getSettings(),
services.clusterService(),
services.client()
Expand All @@ -53,4 +67,31 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
) {
return List.of(indexSizeTaskExecutor);
}

@Override
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return List.of(
new NamedXContentRegistry.Entry(
PersistentTaskParams.class,
new ParseField(IndexSize.TASK_NAME),
IndexSizeTaskParams::fromXContent
)
);
}

@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return List.of(
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, IndexSize.TASK_NAME, reader -> IndexSizeTaskParams.INSTANCE)
);
}

@Override
public void close() throws IOException {
super.close();
if (indexSizePeriodicConsumer != null) {
indexSizePeriodicConsumer.close();
indexSizePeriodicConsumer = null;
}
}
}
Loading

0 comments on commit 882f059

Please sign in to comment.