Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
add spotless to format code (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
wnbts authored Jan 15, 2020
1 parent 5a728ae commit 1637128
Show file tree
Hide file tree
Showing 151 changed files with 4,363 additions and 2,701 deletions.
362 changes: 362 additions & 0 deletions .eclipseformat.xml

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Currently we just put RCF jar in lib as dependency. Plan to publish to Maven and
1. `./gradlew :run` launches a single node cluster with the AD plugin installed
1. `./gradlew :integTest` launches a single node cluster with the AD plugin installed and runs all integration tests
1. ` ./gradlew :integTest --tests="**.test execute foo"` runs a single integration test class or method
1. `./gradlew spotlessApply` formats code. And/or import formatting rules in `.eclipseformat.xml` with IDE.

When launching a cluster using one of the above commands logs are placed in `/build/cluster/run node0/elasticsearch-<version>/logs`. Though the logs are teed to the console, in practices it's best to check the actual log file.

Expand Down
11 changes: 9 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ buildscript {

plugins {
id 'nebula.ospackage' version "5.3.0"
id "com.diffplug.gradle.spotless" version "3.26.1"
}

repositories {
Expand All @@ -54,7 +55,6 @@ apply plugin: 'elasticsearch.esplugin'
apply plugin: 'base'
apply plugin: 'jacoco'


allprojects {
group = 'com.amazon.opendistroforelasticsearch'

Expand Down Expand Up @@ -257,7 +257,8 @@ List<String> jacocoExclusions = [
'com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorAction',
'com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorRequest',
'com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorResponse',
'com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorTransportAction'
'com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorTransportAction',
'com.amazon.opendistroforelasticsearch.ad.util.ParseUtils'
]

jacocoTestCoverageVerification {
Expand Down Expand Up @@ -360,3 +361,9 @@ afterEvaluate {
tasks = ['build', 'buildRpm', 'buildDeb']
}
}

spotless {
java {
eclipse().configFile rootProject.file('.eclipseformat.xml')
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -55,59 +55,70 @@ public AnomalyDetectorRunner(ModelManager modelManager, FeatureManager featureMa
* @param endTime detection period end time
* @param listener handle anomaly result
*/
public void run(AnomalyDetector detector, Instant startTime, Instant endTime,
ActionListener<List<AnomalyResult>> listener) {
public void run(AnomalyDetector detector, Instant startTime, Instant endTime, ActionListener<List<AnomalyResult>> listener) {
executeDetector(detector, startTime, endTime, listener);
}

private void executeDetector(AnomalyDetector detector, Instant startTime, Instant endTime,
ActionListener<List<AnomalyResult>> listener) {
featureManager.getPreviewFeatures(detector, startTime.toEpochMilli(), endTime.toEpochMilli(),
ActionListener.wrap(features -> {
try {
List<ThresholdingResult> results = modelManager
.getPreviewResults(features.getProcessedFeatures());
listener.onResponse(sample(parsePreviewResult(detector, features, results), 200));
} catch (Exception e) {
logger.error("Fail to execute anomaly detector " + detector.getDetectorId(), e);
listener.onResponse(parsePreviewResult(detector, features, null));
}
}, listener::onFailure));
private void executeDetector(
AnomalyDetector detector,
Instant startTime,
Instant endTime,
ActionListener<List<AnomalyResult>> listener
) {
featureManager.getPreviewFeatures(detector, startTime.toEpochMilli(), endTime.toEpochMilli(), ActionListener.wrap(features -> {
try {
List<ThresholdingResult> results = modelManager.getPreviewResults(features.getProcessedFeatures());
listener.onResponse(sample(parsePreviewResult(detector, features, results), 200));
} catch (Exception e) {
logger.error("Fail to execute anomaly detector " + detector.getDetectorId(), e);
listener.onResponse(parsePreviewResult(detector, features, null));
}
}, listener::onFailure));

}


private List<AnomalyResult> parsePreviewResult(AnomalyDetector detector, Features features,
List<ThresholdingResult> results) {
private List<AnomalyResult> parsePreviewResult(AnomalyDetector detector, Features features, List<ThresholdingResult> results) {
// unprocessedFeatures[][], each row is for one date range.
// For example, unprocessedFeatures[0][2] is for the first time range, the third feature
double[][] unprocessedFeatures = features.getUnprocessedFeatures();
List<Map.Entry<Long, Long>> timeRanges = features.getTimeRanges();
List<Feature> featureAttributes = detector.getFeatureAttributes().stream()
.filter(Feature::getEnabled).collect(Collectors.toList());
List<Feature> featureAttributes = detector.getFeatureAttributes().stream().filter(Feature::getEnabled).collect(Collectors.toList());

List<AnomalyResult> anomalyResults = new ArrayList<>();
if (timeRanges != null && timeRanges.size() > 0) {
for (int i = 0; i < timeRanges.size(); i++) {
Map.Entry<Long, Long> timeRange = timeRanges.get(i);

List<FeatureData> featureDatas = new ArrayList<>();
for (int j=0;j<featureAttributes.size();j++) {
for (int j = 0; j < featureAttributes.size(); j++) {
double value = unprocessedFeatures[i][j];
Feature feature = featureAttributes.get(j);
FeatureData data = new FeatureData(feature.getId(), feature.getName(), value);
featureDatas.add(data);
}

AnomalyResult result;
if (results!=null && results.size() > i) {
if (results != null && results.size() > i) {
ThresholdingResult thresholdingResult = results.get(i);
result = new AnomalyResult(detector.getDetectorId(), null,
thresholdingResult.getGrade(), thresholdingResult.getConfidence(), featureDatas,
Instant.ofEpochMilli(timeRange.getKey()), Instant.ofEpochMilli(timeRange.getValue()));
result = new AnomalyResult(
detector.getDetectorId(),
null,
thresholdingResult.getGrade(),
thresholdingResult.getConfidence(),
featureDatas,
Instant.ofEpochMilli(timeRange.getKey()),
Instant.ofEpochMilli(timeRange.getValue())
);
} else {
result = new AnomalyResult(detector.getDetectorId(), null, null, null, featureDatas,
Instant.ofEpochMilli(timeRange.getKey()), Instant.ofEpochMilli(timeRange.getValue()));
result = new AnomalyResult(
detector.getDetectorId(),
null,
null,
null,
featureDatas,
Instant.ofEpochMilli(timeRange.getKey()),
Instant.ofEpochMilli(timeRange.getValue())
);
}

anomalyResults.add(result);
Expand All @@ -120,10 +131,10 @@ private List<AnomalyResult> sample(List<AnomalyResult> results, int sampleSize)
if (results.size() <= sampleSize) {
return results;
} else {
double stepSize = (results.size() - 1.0)/(sampleSize - 1.0);
double stepSize = (results.size() - 1.0) / (sampleSize - 1.0);
List<AnomalyResult> samples = new ArrayList<>(sampleSize);
for (int i = 0; i < sampleSize; i++) {
int index = Math.min((int)(stepSize * i), results.size() - 1);
int index = Math.min((int) (stepSize * i), results.size() - 1);
samples.add(results.get(index));
}
return samples;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* Jacoco will ignore the annotated code.
* Similar to Lombok Generated annotation. Create this similar annotation as we don't involve Lombok.
*/
@Target({ElementType.CONSTRUCTOR, ElementType.METHOD, ElementType.FIELD, ElementType.TYPE})
@Target({ ElementType.CONSTRUCTOR, ElementType.METHOD, ElementType.FIELD, ElementType.TYPE })
@Retention(RetentionPolicy.CLASS)
public @interface Generated {
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public CircuitBreaker getBreaker(String name) {
* @return ADCircuitBreakerService
*/
public ADCircuitBreakerService init() {
//Register memory circuit breaker
// Register memory circuit breaker
registerBreaker(BreakerName.MEM.getName(), new MemoryCircuitBreaker(this.jvmService));
logger.info("Registered memory breaker.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,17 @@ public class ADMetaData implements Custom {

static {
PARSER = new ObjectParser<>("ad_meta", true, Builder::new);
PARSER.declareObjectArray(Builder::addAllDeletedDetector, AnomalyDetectorGraveyard.getParser(),
new ParseField(DETECTOR_GRAVEYARD_FIELD));
XCONTENT_REGISTRY = new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(TYPE),
it -> PARSER.parse(it, null).build());
PARSER
.declareObjectArray(
Builder::addAllDeletedDetector,
AnomalyDetectorGraveyard.getParser(),
new ParseField(DETECTOR_GRAVEYARD_FIELD)
);
XCONTENT_REGISTRY = new NamedXContentRegistry.Entry(
MetaData.Custom.class,
new ParseField(TYPE),
it -> PARSER.parse(it, null).build()
);
}

private Set<AnomalyDetectorGraveyard> deadDetectors;
Expand All @@ -75,7 +82,7 @@ public ADMetaData(Set<AnomalyDetectorGraveyard> deadDetectors) {

public ADMetaData(AnomalyDetectorGraveyard... deadDetectors) {
Set<AnomalyDetectorGraveyard> deletedDetectorSet = new HashSet<>();
for(AnomalyDetectorGraveyard deletedDetector : deadDetectors) {
for (AnomalyDetectorGraveyard deletedDetector : deadDetectors) {
deletedDetectorSet.add(deletedDetector);
}
this.deadDetectors = Collections.unmodifiableSet(deletedDetectorSet);
Expand All @@ -84,22 +91,22 @@ public ADMetaData(AnomalyDetectorGraveyard... deadDetectors) {
public ADMetaData(StreamInput in) throws IOException {
int size = in.readVInt();
Set<AnomalyDetectorGraveyard> deadDetectors = new HashSet<>();
for (int i=0; i<size; i++) {
for (int i = 0; i < size; i++) {
deadDetectors.add(new AnomalyDetectorGraveyard(in));
}
this.deadDetectors = Collections.unmodifiableSet(deadDetectors);
}

@Override
public Diff<Custom> diff(Custom previousState) {
return new ADMetaDataDiff((ADMetaData)previousState, this);
return new ADMetaDataDiff((ADMetaData) previousState, this);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.write(deadDetectors.size());
for(AnomalyDetectorGraveyard deadDetector : deadDetectors) {
deadDetector.writeTo(out);
for (AnomalyDetectorGraveyard deadDetector : deadDetectors) {
deadDetector.writeTo(out);
}
}

Expand All @@ -116,7 +123,7 @@ public Version getMinimalSupportedVersion() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startArray(DETECTOR_GRAVEYARD_FIELD);
for(AnomalyDetectorGraveyard deadDetector : deadDetectors) {
for (AnomalyDetectorGraveyard deadDetector : deadDetectors) {
deadDetector.toXContent(builder, params);
}
builder.endArray();
Expand Down Expand Up @@ -145,8 +152,10 @@ public static ADMetaData getADMetaData(ClusterState state) {
@Generated
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
ADMetaData that = (ADMetaData) o;
return Objects.equal(deadDetectors, that.getAnomalyDetectorGraveyard());
}
Expand Down Expand Up @@ -194,14 +203,14 @@ public ADMetaDataDiff(ADMetaData currentMeta, ADMetaData newMeta) {
}

public ADMetaDataDiff(StreamInput in) throws IOException {
this.addedDetectorGraveyard = Collections.unmodifiableSet(in.readSet(AnomalyDetectorGraveyard::new));
this.removedDetectorGraveyard = Collections.unmodifiableSet(in.readSet(AnomalyDetectorGraveyard::new));
this.addedDetectorGraveyard = Collections.unmodifiableSet(in.readSet(AnomalyDetectorGraveyard::new));
this.removedDetectorGraveyard = Collections.unmodifiableSet(in.readSet(AnomalyDetectorGraveyard::new));
}

@Override
public Custom apply(Custom current) {
// currentDeadDetectors is unmodifiable
Set<AnomalyDetectorGraveyard> currentDeadDetectors = ((ADMetaData)current).deadDetectors;
Set<AnomalyDetectorGraveyard> currentDeadDetectors = ((ADMetaData) current).deadDetectors;
Set<AnomalyDetectorGraveyard> newDeadDetectors = new HashSet<>(currentDeadDetectors);
newDeadDetectors.addAll(addedDetectorGraveyard);
newDeadDetectors.removeAll(removedDetectorGraveyard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,15 @@
* Data structure defining detector id and when this detector gets deleted
*
*/
public class AnomalyDetectorGraveyard extends AbstractDiffable<AnomalyDetectorGraveyard>
implements Writeable, ToXContentObject {
public class AnomalyDetectorGraveyard extends AbstractDiffable<AnomalyDetectorGraveyard> implements Writeable, ToXContentObject {
static final String DETECTOR_ID_KEY = "adID";
static final String DELETE_TIME_KEY = "deleteMillis";
private static final ObjectParser<Builder, Void> PARSER;

static {
PARSER = new ObjectParser<>("adGraveyard", true, Builder::new);
PARSER.declareString(Builder::detectorID, new ParseField(DETECTOR_ID_KEY));
PARSER.declareLong(Builder::deleteEpochMillis, new ParseField(DELETE_TIME_KEY));
PARSER.declareString(Builder::detectorID, new ParseField(DETECTOR_ID_KEY));
PARSER.declareLong(Builder::deleteEpochMillis, new ParseField(DELETE_TIME_KEY));
}

private String detectorID;
Expand Down Expand Up @@ -85,11 +84,12 @@ public long getDeleteEpochMillis() {
@Generated
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
AnomalyDetectorGraveyard that = (AnomalyDetectorGraveyard) o;
return Objects.equal(detectorID, that.getDetectorID()) &&
Objects.equal(deleteEpochMillis, that.getDeleteEpochMillis());
return Objects.equal(detectorID, that.getDetectorID()) && Objects.equal(deleteEpochMillis, that.getDeleteEpochMillis());
}

@Generated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ public class DailyCron implements Runnable {
private final Client client;
private final Duration checkpointTtl;

public DailyCron(DeleteDetector deleteUtil, Clock clock, Client client,
Duration checkpointTtl) {
public DailyCron(DeleteDetector deleteUtil, Clock clock, Client client, Duration checkpointTtl) {
this.deleteUtil = deleteUtil;
this.clock = clock;
this.client = client;
Expand All @@ -53,21 +52,37 @@ public DailyCron(DeleteDetector deleteUtil, Clock clock, Client client,
@Override
public void run() {
DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(CommonName.CHECKPOINT_INDEX_NAME)
.setQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.rangeQuery(CheckpointDao.TIMESTAMP)
.lte(clock.millis() - checkpointTtl.toMillis()).format(CommonName.EPOCH_MILLIS_FORMAT)))
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
client.execute(DeleteByQueryAction.INSTANCE, deleteRequest, ActionListener.wrap(response -> {
// if 0 docs get deleted, it means our query cannot find any matching doc
LOG.info("{} " + CHECKPOINT_DELETED_MSG, response.getDeleted());
}, exception -> {
if (exception instanceof IndexNotFoundException) {
LOG.info(CHECKPOINT_NOT_EXIST_MSG);
} else {
// Gonna eventually delete in maintenance window.
LOG.error(CANNOT_DELETE_OLD_CHECKPOINT_MSG, exception);
}
}));
.setQuery(
QueryBuilders
.boolQuery()
.filter(
QueryBuilders
.rangeQuery(CheckpointDao.TIMESTAMP)
.lte(clock.millis() - checkpointTtl.toMillis())
.format(CommonName.EPOCH_MILLIS_FORMAT)
)
)
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
client
.execute(
DeleteByQueryAction.INSTANCE,
deleteRequest,
ActionListener
.wrap(
response -> {
// if 0 docs get deleted, it means our query cannot find any matching doc
LOG.info("{} " + CHECKPOINT_DELETED_MSG, response.getDeleted());
},
exception -> {
if (exception instanceof IndexNotFoundException) {
LOG.info(CHECKPOINT_NOT_EXIST_MSG);
} else {
// Gonna eventually delete in maintenance window.
LOG.error(CANNOT_DELETE_OLD_CHECKPOINT_MSG, exception);
}
}
)
);

deleteUtil.deleteDetectorResult(client);
}
Expand Down
Loading

0 comments on commit 1637128

Please sign in to comment.