Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Add missing Locale in String.format
Browse files Browse the repository at this point in the history
  • Loading branch information
kaituo committed Jan 27, 2021
1 parent 4ad4706 commit 5201c71
Show file tree
Hide file tree
Showing 14 changed files with 78 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -87,7 +88,7 @@ public void executeDetector(AnomalyDetector detector, Instant startTime, Instant
new MultiResponsesDelegateActionListener<EntityAnomalyResult>(
entityAnomalyResultListener,
entities.size(),
String.format("Fail to get preview result for multi entity detector %s", detector.getDetectorId()),
String.format(Locale.US, "Fail to get preview result for multi entity detector %s", detector.getDetectorId()),
true
);
for (Entity entity : entities) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
Expand Down Expand Up @@ -418,7 +419,7 @@ public Optional<Entry<double[][], Integer>> getFeaturesForSampledPeriods(
Map<Long, double[]> cache = new HashMap<>();
int currentStride = maxStride;
Optional<double[][]> features = Optional.empty();
logger.info(String.format("Getting features for detector %s starting %d", detector.getDetectorId(), endTime));
logger.info(String.format(Locale.US, "Getting features for detector %s starting %d", detector.getDetectorId(), endTime));
while (currentStride >= 1) {
boolean isInterpolatable = currentStride < maxStride;
features = getFeaturesForSampledPeriods(detector, maxSamples, currentStride, endTime, cache, isInterpolatable);
Expand Down Expand Up @@ -511,7 +512,7 @@ public void getFeaturesForSampledPeriods(
ActionListener<Optional<Entry<double[][], Integer>>> listener
) {
Map<Long, double[]> cache = new HashMap<>();
logger.info(String.format("Getting features for detector %s ending at %d", detector.getDetectorId(), endTime));
logger.info(String.format(Locale.US, "Getting features for detector %s ending at %d", detector.getDetectorId(), endTime));
getFeatureSamplesWithCache(detector, maxSamples, maxStride, endTime, cache, maxStride, listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Arrays;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
Expand Down Expand Up @@ -223,7 +224,7 @@ private void onCheckpointNotExist(Map<String, Object> source, String modelId, bo
saveModelCheckpointSync(source, modelId);
}
} else {
logger.error(String.format("Unexpected error creating index %s", indexName), exception);
logger.error(String.format(Locale.US, "Unexpected error creating index %s", indexName), exception);
}
}));
}
Expand Down Expand Up @@ -271,7 +272,7 @@ public void flush() {
// It is possible the index has been created while we sending the create request
flush(bulkRequest);
} else {
logger.error(String.format("Unexpected error creating index %s", indexName), exception);
logger.error(String.format(Locale.US, "Unexpected error creating index %s", indexName), exception);
}
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ public List<ModelState<?>> getAllModels() {
*/
@Deprecated
public void stopModel(String detectorId, String modelId) {
logger.info(String.format("Stopping detector %s model %s", detectorId, modelId));
logger.info(String.format(Locale.US, "Stopping detector %s model %s", detectorId, modelId));
stopModel(forests, modelId, this::toCheckpoint);
stopModel(thresholds, modelId, this::toCheckpoint);
}
Expand All @@ -458,7 +458,7 @@ private <T> void stopModel(Map<String, ModelState<T>> models, String modelId, Fu
* @param listener onResponse is called with null when the operation is completed
*/
public void stopModel(String detectorId, String modelId, ActionListener<Void> listener) {
logger.info(String.format("Stopping detector %s model %s", detectorId, modelId));
logger.info(String.format(Locale.US, "Stopping detector %s model %s", detectorId, modelId));
stopModel(
forests,
modelId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.amazon.opendistroforelasticsearch.ad.ml;

import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Locale;
import java.util.Map.Entry;

import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -133,7 +134,7 @@ public Entry<Integer, Integer> getPartitionedForestSizes(RandomCutForest forest,
* @return ID for the RCF model partition
*/
public String getRcfModelId(String detectorId, int partitionNumber) {
return String.format(RCF_MODEL_ID_PATTERN, detectorId, partitionNumber);
return String.format(Locale.US, RCF_MODEL_ID_PATTERN, detectorId, partitionNumber);
}

/**
Expand All @@ -143,6 +144,6 @@ public String getRcfModelId(String detectorId, int partitionNumber) {
* @return ID for the thresholding model
*/
public String getThresholdModelId(String detectorId) {
return String.format(THRESHOLD_MODEL_ID_PATTERN, detectorId);
return String.format(Locale.US, THRESHOLD_MODEL_ID_PATTERN, detectorId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ public List<RestHandler.Route> routes() {
String.format(Locale.ROOT, "%s/%s", AnomalyDetectorPlugin.AD_BASE_DETECTORS_URI, COUNT)
),
// get if a detector name exists with name
new RestHandler.Route(RestRequest.Method.GET, String.format("%s/%s", AnomalyDetectorPlugin.AD_BASE_DETECTORS_URI, MATCH))
new RestHandler.Route(
RestRequest.Method.GET,
String.format(Locale.US, "%s/%s", AnomalyDetectorPlugin.AD_BASE_DETECTORS_URI, MATCH)
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -391,13 +392,13 @@ private void validateCategoricalField(String detectorId) {
}

if (foundField == false) {
listener.onFailure(new IllegalArgumentException(String.format(NOT_FOUND_ERR_MSG, categoryField0)));
listener.onFailure(new IllegalArgumentException(String.format(Locale.US, NOT_FOUND_ERR_MSG, categoryField0)));
return;
}

searchAdInputIndices(detectorId);
}, error -> {
String message = String.format("Fail to get the index mapping of %s", anomalyDetector.getIndices());
String message = String.format(Locale.US, "Fail to get the index mapping of %s", anomalyDetector.getIndices());
logger.error(message, error);
listener.onFailure(new IllegalArgumentException(message));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.elasticsearch.index.IndexingPressure.MAX_INDEXING_BYTES;

import java.io.IOException;
import java.util.Locale;
import java.util.Random;

import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -121,7 +122,7 @@ private void addResult(BulkRequest bulkRequest, AnomalyResult result) {
IndexRequest indexRequest = new IndexRequest(indexName).source(result.toXContent(builder, RestHandlerUtils.XCONTENT_WITH_TYPE));
bulkRequest.add(indexRequest);
} catch (IOException e) {
LOG.error(String.format("Failed to prepare bulk %s", indexName), e);
LOG.error(String.format(Locale.US, "Failed to prepare bulk %s", indexName), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ private boolean shouldStart(
}

if (stateManager.isMuted(thresholdNodeId)) {
listener.onFailure(new InternalFailure(adID, String.format(NODE_UNRESPONSIVE_ERR_MSG + " %s", thresholdModelID)));
listener.onFailure(new InternalFailure(adID, String.format(Locale.US, NODE_UNRESPONSIVE_ERR_MSG + " %s", thresholdModelID)));
return false;
}

Expand Down Expand Up @@ -1060,7 +1060,7 @@ private Optional<AnomalyDetectionException> coldStartIfNoCheckPoint(AnomalyDetec
LOG.info("Trigger cold start for {}", detectorId);
coldStart(detector);
} else {
String errorMsg = String.format("Fail to get checkpoint state for %s", detectorId);
String errorMsg = String.format(Locale.US, "Fail to get checkpoint state for %s", detectorId);
LOG.error(errorMsg, exception);
stateManager.setLastColdStartException(detectorId, new AnomalyDetectionException(errorMsg, exception));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.amazon.opendistroforelasticsearch.ad.transport;

import java.io.IOException;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -152,7 +153,7 @@ public String executor() {
}
);
} catch (Exception e) {
LOG.error(String.format("Fail to get entity profile for detector {}, entity {}", adID, entityValue), e);
LOG.error(String.format(Locale.US, "Fail to get entity profile for detector {}, entity {}", adID, entityValue), e);
listener.onFailure(new AnomalyDetectionException(adID, FAIL_TO_GET_ENTITY_PROFILE_MSG, e));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.amazon.opendistroforelasticsearch.ad.transport;

import java.io.IOException;
import java.util.Locale;
import java.util.Optional;

import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -139,7 +140,7 @@ public String executor() {

});
} catch (Exception e) {
LOG.error(String.format("Fail to poll RCF models for {}", adID), e);
LOG.error(String.format(Locale.US, "Fail to poll RCF models for {}", adID), e);
listener.onFailure(new AnomalyDetectionException(adID, FAIL_TO_GET_RCF_UPDATE_MSG, e));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void index(T toSave, String detectorId) {
} else {
throw new AnomalyDetectionException(
detectorId,
String.format("Unexpected error creating index %s", indexName),
String.format(Locale.US, "Unexpected error creating index %s", indexName),
exception
);
}
Expand All @@ -154,7 +154,10 @@ private void onCreateIndexResponse(CreateIndexResponse response, T toSave, Strin
if (response.isAcknowledged()) {
save(toSave, detectorId);
} else {
throw new AnomalyDetectionException(detectorId, String.format("Creating %s with mappings call not acknowledged.", indexName));
throw new AnomalyDetectionException(
detectorId,
String.format(Locale.US, "Creating %s with mappings call not acknowledged.", indexName)
);
}
}

Expand All @@ -167,8 +170,8 @@ protected void save(T toSave, String detectorId) {

saveIteration(indexRequest, detectorId, savingBackoffPolicy.iterator());
} catch (Exception e) {
LOG.error(String.format("Failed to save %s", indexName), e);
throw new AnomalyDetectionException(detectorId, String.format("Cannot save %s", indexName));
LOG.error(String.format(Locale.US, "Failed to save %s", indexName), e);
throw new AnomalyDetectionException(detectorId, String.format(Locale.US, "Cannot save %s", indexName));
}
}

Expand All @@ -177,33 +180,37 @@ void saveIteration(IndexRequest indexRequest, String detectorId, Iterator<TimeVa
.<IndexRequest, IndexResponse>asyncRequest(
indexRequest,
client::index,
ActionListener.<IndexResponse>wrap(response -> { LOG.debug(String.format(SUCCESS_SAVING_MSG, detectorId)); }, exception -> {
// Elasticsearch has a thread pool and a queue for write per node. A thread
// pool will have N number of workers ready to handle the requests. When a
// request comes and if a worker is free , this is handled by the worker. Now by
// default the number of workers is equal to the number of cores on that CPU.
// When the workers are full and there are more write requests, the request
// will go to queue. The size of queue is also limited. If by default size is,
// say, 200 and if there happens more parallel requests than this, then those
// requests would be rejected as you can see EsRejectedExecutionException.
// So EsRejectedExecutionException is the way that Elasticsearch tells us that
// it cannot keep up with the current indexing rate.
// When it happens, we should pause indexing a bit before trying again, ideally
// with randomized exponential backoff.
Throwable cause = ExceptionsHelper.unwrapCause(exception);
if (!(cause instanceof EsRejectedExecutionException) || !backoff.hasNext()) {
LOG.error(String.format(FAIL_TO_SAVE_ERR_MSG, detectorId), cause);
} else {
TimeValue nextDelay = backoff.next();
LOG.warn(String.format(RETRY_SAVING_ERR_MSG, detectorId), cause);
// copy original request's source without other information like autoGeneratedTimestamp
// otherwise, an exception will be thrown indicating autoGeneratedTimestamp should not be set
// while request id is already set (id is set because we have already sent the request before).
IndexRequest newReuqest = new IndexRequest(indexRequest.index());
newReuqest.source(indexRequest.source(), indexRequest.getContentType());
threadPool.schedule(() -> saveIteration(newReuqest, detectorId, backoff), nextDelay, ThreadPool.Names.SAME);
}
})
ActionListener
.<IndexResponse>wrap(
response -> { LOG.debug(String.format(Locale.US, SUCCESS_SAVING_MSG, detectorId)); },
exception -> {
// Elasticsearch has a thread pool and a queue for write per node. A thread
// pool will have N number of workers ready to handle the requests. When a
// request comes and if a worker is free , this is handled by the worker. Now by
// default the number of workers is equal to the number of cores on that CPU.
// When the workers are full and there are more write requests, the request
// will go to queue. The size of queue is also limited. If by default size is,
// say, 200 and if there happens more parallel requests than this, then those
// requests would be rejected as you can see EsRejectedExecutionException.
// So EsRejectedExecutionException is the way that Elasticsearch tells us that
// it cannot keep up with the current indexing rate.
// When it happens, we should pause indexing a bit before trying again, ideally
// with randomized exponential backoff.
Throwable cause = ExceptionsHelper.unwrapCause(exception);
if (!(cause instanceof EsRejectedExecutionException) || !backoff.hasNext()) {
LOG.error(String.format(Locale.US, FAIL_TO_SAVE_ERR_MSG, detectorId), cause);
} else {
TimeValue nextDelay = backoff.next();
LOG.warn(String.format(Locale.US, RETRY_SAVING_ERR_MSG, detectorId), cause);
// copy original request's source without other information like autoGeneratedTimestamp
// otherwise, an exception will be thrown indicating autoGeneratedTimestamp should not be set
// while request id is already set (id is set because we have already sent the request before).
IndexRequest newReuqest = new IndexRequest(indexRequest.index());
newReuqest.source(indexRequest.source(), indexRequest.getContentType());
threadPool.schedule(() -> saveIteration(newReuqest, detectorId, backoff), nextDelay, ThreadPool.Names.SAME);
}
}
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void flush(ADResultBulkRequest currentBulkRequest, String detectorId) {
} else {
throw new AnomalyDetectionException(
detectorId,
String.format("Unexpected error creating index %s", indexName),
String.format(Locale.US, "Unexpected error creating index %s", indexName),
exception
);
}
Expand Down Expand Up @@ -166,15 +166,16 @@ private void bulk(ADResultBulkRequest currentBulkRequest, String detectorId) {
.execute(
ADResultBulkAction.INSTANCE,
currentBulkRequest,
ActionListener.<BulkResponse>wrap(response -> LOG.debug(String.format(SUCCESS_SAVING_MSG, detectorId)), exception -> {
LOG.error(String.format(FAIL_TO_SAVE_ERR_MSG, detectorId), exception);
Throwable cause = Throwables.getRootCause(exception);
// too much indexing pressure
// TODO: pause indexing a bit before trying again, ideally with randomized exponential backoff.
if (cause instanceof RejectedExecutionException) {
nodeStateManager.setLastIndexThrottledTime(clock.instant());
}
})
ActionListener
.<BulkResponse>wrap(response -> LOG.debug(String.format(Locale.US, SUCCESS_SAVING_MSG, detectorId)), exception -> {
LOG.error(String.format(Locale.US, FAIL_TO_SAVE_ERR_MSG, detectorId), exception);
Throwable cause = Throwables.getRootCause(exception);
// too much indexing pressure
// TODO: pause indexing a bit before trying again, ideally with randomized exponential backoff.
if (cause instanceof RejectedExecutionException) {
nodeStateManager.setLastIndexThrottledTime(clock.instant());
}
})
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void onFailure(Exception e) {
private void finish() {
if (this.returnOnPartialResults || this.exceptions.size() == 0) {
if (this.exceptions.size() > 0) {
LOG.error(String.format("Although returning result, there exists exceptions: %s", this.exceptions));
LOG.error(String.format(Locale.US, "Although returning result, there exists exceptions: %s", this.exceptions));
}
handleSavedResponses();
} else {
Expand Down

0 comments on commit 5201c71

Please sign in to comment.