Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Integration with Ultrawarm - Follow up #97

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@
import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyResultHandler;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils;
import com.amazon.opendistroforelasticsearch.ad.util.ColdStartRunner;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;
import com.amazon.opendistroforelasticsearch.ad.util.IndexUtils;
import com.amazon.opendistroforelasticsearch.ad.util.Throttler;
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobSchedulerExtension;
Expand All @@ -92,7 +92,6 @@
import org.elasticsearch.cluster.metadata.MetaData.Custom;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
Expand Down Expand Up @@ -123,7 +122,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
Expand All @@ -148,7 +146,7 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip
private ADStats adStats;
private NamedXContentRegistry xContentRegistry;
private ClientUtil clientUtil;
private ClusterStateUtils clusterStateUtils;
private DiscoveryNodeFilterer nodeFilter;

static {
SpecialPermission.check();
Expand Down Expand Up @@ -208,7 +206,7 @@ public List<RestHandler> getRestHandlers(
RestStatsAnomalyDetectorAction statsAnomalyDetectorAction = new RestStatsAnomalyDetectorAction(
restController,
adStats,
this.clusterStateUtils
this.nodeFilter
);
RestAnomalyDetectorJobAction anomalyDetectorJobAction = new RestAnomalyDetectorJobAction(
settings,
Expand Down Expand Up @@ -267,11 +265,10 @@ public Collection<Object> createComponents(
RandomCutForestSerDe rcfSerde = new RandomCutForestSerDe();
CheckpointDao checkpoint = new CheckpointDao(client, clientUtil, CommonName.CHECKPOINT_INDEX_NAME);

HashMap<String, String> ignoredAttributes = new HashMap<>();
ignoredAttributes.put(CommonName.BOX_TYPE_KEY, CommonName.WARM_BOX_TYPE);
this.clusterStateUtils = new ClusterStateUtils(clusterService, ignoredAttributes);
this.nodeFilter = new DiscoveryNodeFilterer(this.clusterService);

ModelManager modelManager = new ModelManager(
clusterStateUtils,
nodeFilter,
jvmService,
rcfSerde,
checkpoint,
Expand All @@ -295,7 +292,7 @@ public Collection<Object> createComponents(
AnomalyDetectorSettings.SHINGLE_SIZE
);

HashRing hashRing = new HashRing(clusterStateUtils, clock, settings);
HashRing hashRing = new HashRing(nodeFilter, clock, settings);
ADStateManager stateManager = new ADStateManager(
client,
xContentRegistry,
Expand Down Expand Up @@ -364,12 +361,12 @@ public Collection<Object> createComponents(
clock,
stateManager,
runner,
new ADClusterEventListener(clusterService, hashRing, modelManager, clusterStateUtils),
new ADClusterEventListener(clusterService, hashRing, modelManager, nodeFilter),
deleteUtil,
adCircuitBreakerService,
adStats,
new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil, clusterStateUtils),
clusterStateUtils
new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil, nodeFilter),
nodeFilter
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.concurrent.Semaphore;

import com.amazon.opendistroforelasticsearch.ad.ml.ModelManager;
import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -45,27 +45,27 @@ public class ADClusterEventListener implements ClusterStateListener {
private HashRing hashRing;
private ModelManager modelManager;
private final ClusterService clusterService;
private final ClusterStateUtils clusterStateUtils;
private final DiscoveryNodeFilterer nodeFilter;

@Inject
public ADClusterEventListener(
ClusterService clusterService,
HashRing hashRing,
ModelManager modelManager,
ClusterStateUtils clusterStateUtils
DiscoveryNodeFilterer nodeFilter
) {
this.clusterService = clusterService;
this.clusterService.addListener(this);
this.hashRing = hashRing;
this.modelManager = modelManager;
this.inProgress = new Semaphore(1);
this.clusterStateUtils = clusterStateUtils;
this.nodeFilter = nodeFilter;
}

@Override
public void clusterChanged(ClusterChangedEvent event) {

if (clusterStateUtils.isIgnoredNode(event.state().nodes().getLocalNode())) {
if (!nodeFilter.isEligibleNode(event.state().nodes().getLocalNode())) {
LOG.debug(NODE_NOT_APPLIED_MSG);
return;
}
Expand All @@ -88,7 +88,7 @@ public void clusterChanged(ClusterChangedEvent event) {
// Check whether it was a data node that was removed
boolean dataNodeRemoved = false;
for (DiscoveryNode removedNode : delta.removedNodes()) {
if (!clusterStateUtils.isIgnoredNode(removedNode)) {
if (nodeFilter.isEligibleNode(removedNode)) {
LOG.info(NODE_REMOVED_MSG + " {}", removedNode.getId());
dataNodeRemoved = true;
break;
Expand All @@ -98,7 +98,7 @@ public void clusterChanged(ClusterChangedEvent event) {
// Check whether it was a data node that was added
boolean dataNodeAdded = false;
for (DiscoveryNode addedNode : delta.addedNodes()) {
if (!clusterStateUtils.isIgnoredNode(addedNode)) {
if (nodeFilter.isEligibleNode(addedNode)) {
LOG.info(NODE_ADDED_MSG + " {}", addedNode.getId());
dataNodeAdded = true;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;

import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;

public class HashRing {
private static final Logger LOG = LogManager.getLogger(HashRing.class);
Expand All @@ -43,7 +42,7 @@ public class HashRing {
static final String COOLDOWN_MSG = "Hash ring doesn't respond to cluster state change within the cooldown period.";

private final int VIRTUAL_NODE_COUNT = 100;
private final ClusterStateUtils clusterStateUtils;
private final DiscoveryNodeFilterer nodeFilter;
private TreeMap<Integer, DiscoveryNode> circle;
private Semaphore inProgress;
// the UTC epoch milliseconds of the most recent successful update
Expand All @@ -52,9 +51,9 @@ public class HashRing {
private final Clock clock;
private AtomicBoolean membershipChangeRequied;

public HashRing(ClusterStateUtils clusterStateUtils, Clock clock, Settings settings) {
public HashRing(DiscoveryNodeFilterer nodeFilter, Clock clock, Settings settings) {
this.circle = new TreeMap<Integer, DiscoveryNode>();
this.clusterStateUtils = clusterStateUtils;
this.nodeFilter = nodeFilter;
this.inProgress = new Semaphore(1);
this.clock = clock;
this.coolDownPeriod = COOLDOWN_MINUTES.get(settings);
Expand Down Expand Up @@ -93,8 +92,7 @@ public boolean build() {
TreeMap<Integer, DiscoveryNode> newCircle = new TreeMap<>();

try {
for (ObjectCursor<DiscoveryNode> cursor : clusterStateUtils.getEligibleDataNodes().values()) {
DiscoveryNode curNode = cursor.value;
for (DiscoveryNode curNode : nodeFilter.getEligibleDataNodes()) {
for (int i = 0; i < VIRTUAL_NODE_COUNT; i++) {
newCircle.put(Murmur3HashFunction.hash(curNode.getId() + i), curNode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,24 @@

import com.amazon.opendistroforelasticsearch.ad.transport.CronAction;
import com.amazon.opendistroforelasticsearch.ad.transport.CronRequest;
import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;

public class HourlyCron implements Runnable {
private static final Logger LOG = LogManager.getLogger(HourlyCron.class);
static final String SUCCEEDS_LOG_MSG = "Hourly maintenance succeeds";
static final String NODE_EXCEPTION_LOG_MSG = "Hourly maintenance of node has exception";
static final String EXCEPTION_LOG_MSG = "Hourly maintenance has exception.";
private ClusterStateUtils clusterStateUtils;
private DiscoveryNodeFilterer nodeFilter;
private Client client;

public HourlyCron(Client client, ClusterStateUtils clusterStateUtils) {
this.clusterStateUtils = clusterStateUtils;
public HourlyCron(Client client, DiscoveryNodeFilterer nodeFilter) {
this.nodeFilter = nodeFilter;
this.client = client;
}

@Override
public void run() {
DiscoveryNode[] dataNodes = clusterStateUtils.getEligibleDataNodes().values().toArray(DiscoveryNode.class);
DiscoveryNode[] dataNodes = nodeFilter.getEligibleDataNodes();

// we also add the cancel query function here based on query text from the negative cache.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;

import org.elasticsearch.threadpool.ThreadPool;

Expand All @@ -40,7 +40,7 @@ public class MasterEventListener implements LocalNodeMasterListener {
private Client client;
private Clock clock;
private ClientUtil clientUtil;
private ClusterStateUtils clusterStateUtils;
private DiscoveryNodeFilterer nodeFilter;

public MasterEventListener(
ClusterService clusterService,
Expand All @@ -49,7 +49,7 @@ public MasterEventListener(
Client client,
Clock clock,
ClientUtil clientUtil,
ClusterStateUtils clusterStateUtils
DiscoveryNodeFilterer nodeFilter
) {
this.clusterService = clusterService;
this.threadPool = threadPool;
Expand All @@ -58,14 +58,13 @@ public MasterEventListener(
this.clusterService.addLocalNodeMasterListener(this);
this.clock = clock;
this.clientUtil = clientUtil;
this.clusterStateUtils = clusterStateUtils;
this.nodeFilter = nodeFilter;
}

@Override
public void onMaster() {
if (hourlyCron == null) {
hourlyCron = threadPool
.scheduleWithFixedDelay(new HourlyCron(client, clusterStateUtils), TimeValue.timeValueHours(1), executorName());
hourlyCron = threadPool.scheduleWithFixedDelay(new HourlyCron(client, nodeFilter), TimeValue.timeValueHours(1), executorName());
clusterService.addLifecycleListener(new LifecycleListener() {
@Override
public void beforeStop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public class CommonName {
// Ultrawarm node attributes
// ======================================

// hot node
public static String HOT_BOX_TYPE = "hot";

// warm node
public static String WARM_BOX_TYPE = "warm";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages;
import com.amazon.opendistroforelasticsearch.ad.ml.rcf.CombinedRcfResult;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;
import com.amazon.randomcutforest.RandomCutForest;
import com.amazon.randomcutforest.serialize.RandomCutForestSerDe;

Expand Down Expand Up @@ -103,7 +103,7 @@ public String getName() {
private final Duration checkpointInterval;

// dependencies
private final ClusterStateUtils clusterStateUtils;
private final DiscoveryNodeFilterer nodeFilter;
private final JvmService jvmService;
private final RandomCutForestSerDe rcfSerde;
private final CheckpointDao checkpointDao;
Expand All @@ -119,7 +119,7 @@ public String getName() {
/**
* Constructor.
*
* @param clusterStateUtils cluster info
* @param nodeFilter utility class to select nodesr
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: typo nodes r

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

* @param jvmService jvm info
* @param rcfSerde RCF model serialization
* @param checkpointDao model checkpoint storage
Expand All @@ -143,7 +143,7 @@ public String getName() {
* @param shingleSize size of a shingle
*/
public ModelManager(
ClusterStateUtils clusterStateUtils,
DiscoveryNodeFilterer nodeFilter,
JvmService jvmService,
RandomCutForestSerDe rcfSerde,
CheckpointDao checkpointDao,
Expand All @@ -167,7 +167,7 @@ public ModelManager(
int shingleSize
) {

this.clusterStateUtils = clusterStateUtils;
this.nodeFilter = nodeFilter;
this.jvmService = jvmService;
this.rcfSerde = rcfSerde;
this.checkpointDao = checkpointDao;
Expand Down Expand Up @@ -260,7 +260,7 @@ public Entry<Integer, Integer> getPartitionedForestSizes(RandomCutForest forest,
int numPartitions = (int) Math.ceil((double) totalSize / (double) partitionSize);
int forestSize = (int) Math.ceil((double) forest.getNumberOfTrees() / (double) numPartitions);

int numNodes = clusterStateUtils.getEligibleDataNodes().size();
int numNodes = nodeFilter.getEligibleDataNodes().length;
if (numPartitions > numNodes) {
// partition by cluster size
partitionSize = (long) Math.ceil((double) totalSize / (double) numNodes);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,7 @@
import com.amazon.opendistroforelasticsearch.ad.stats.ADStats;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsRequest;
import com.amazon.opendistroforelasticsearch.ad.util.ClusterStateUtils;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;

import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -42,22 +42,22 @@ public class RestStatsAnomalyDetectorAction extends BaseRestHandler {

private static final String STATS_ANOMALY_DETECTOR_ACTION = "stats_anomaly_detector";
private ADStats adStats;
private ClusterStateUtils clusterStateUtils;
private DiscoveryNodeFilterer nodeFilter;

/**
* Constructor
*
* @param controller Rest Controller
* @param adStats ADStats object
* @param clusterStateUtils util to get eligible data nodes
* @param nodeFilter util class to get eligible data nodes
*/
public RestStatsAnomalyDetectorAction(RestController controller, ADStats adStats, ClusterStateUtils clusterStateUtils) {
public RestStatsAnomalyDetectorAction(RestController controller, ADStats adStats, DiscoveryNodeFilterer nodeFilter) {
controller.registerHandler(RestRequest.Method.GET, AD_BASE_URI + "/{nodeId}/stats/", this);
controller.registerHandler(RestRequest.Method.GET, AD_BASE_URI + "/{nodeId}/stats/{stat}", this);
controller.registerHandler(RestRequest.Method.GET, AD_BASE_URI + "/stats/", this);
controller.registerHandler(RestRequest.Method.GET, AD_BASE_URI + "/stats/{stat}", this);
this.adStats = adStats;
this.clusterStateUtils = clusterStateUtils;
this.nodeFilter = nodeFilter;
}

@Override
Expand Down Expand Up @@ -87,7 +87,7 @@ private ADStatsRequest getRequest(RestRequest request) {
String[] nodeIdsArr = nodesIdsStr.split(",");
adStatsRequest = new ADStatsRequest(nodeIdsArr);
} else {
DiscoveryNode[] dataNodes = clusterStateUtils.getEligibleDataNodes().values().toArray(DiscoveryNode.class);
DiscoveryNode[] dataNodes = nodeFilter.getEligibleDataNodes();
adStatsRequest = new ADStatsRequest(dataNodes);
}

Expand Down
Loading