Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vhvapm 444 #1653

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
package rocks.inspectit.ocelot.core.metrics;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.opencensus.tags.TagContext;
import io.opencensus.tags.TagContextBuilder;
import io.opencensus.tags.TagKey;
import io.opencensus.tags.Tags;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.NonNull;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -25,17 +21,13 @@
import rocks.inspectit.ocelot.core.instrumentation.context.InspectitContextImpl;
import rocks.inspectit.ocelot.core.instrumentation.hook.actions.IHookAction;
import rocks.inspectit.ocelot.core.instrumentation.hook.actions.model.MetricAccessor;
import rocks.inspectit.ocelot.core.metrics.tagGuards.PersistedTagsReaderWriter;
import rocks.inspectit.ocelot.core.selfmonitoring.AgentHealthManager;
import rocks.inspectit.ocelot.core.tags.CommonTagsManager;
import rocks.inspectit.ocelot.core.tags.TagUtils;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.Future;
Expand All @@ -46,43 +38,38 @@
@Slf4j
public class MeasureTagValueGuard {
private static final String tagOverFlowMessageTemplate = "Overflow in measure %s for tag key %s";

/**
* Map of measure names and their related set of tag keys, which are currently blocked.
*/
private final Map<String, Set<String>> blockedTagKeysByMeasure = Maps.newHashMap();
PersistedTagsReaderWriter fileReaderWriter;
@Autowired
private InspectitEnvironment env;

@Autowired
private AgentHealthManager agentHealthManager;

/**
* Common tags manager needed for gathering common tags when recording metrics.
*/
@Autowired
private CommonTagsManager commonTagsManager;

@Autowired
private ScheduledExecutorService executor;

private PersistedTagsReaderWriter fileReaderWriter;

private volatile boolean isShuttingDown = false;

private boolean hasTagValueOverflow = false;

/**
* Map of measure names and their related set of tag keys, which are currently blocked.
*/
private final Map<String, Set<String>> blockedTagKeysByMeasure = Maps.newHashMap();

private Set<TagsHolder> latestTags = Collections.synchronizedSet(new HashSet<>());

private Future<?> blockTagValuesFuture;

@PostConstruct
protected void init() {
TagGuardSettings tagGuardSettings = env.getCurrentConfig().getMetrics().getTagGuard();
if (!tagGuardSettings.isEnabled()) return;

fileReaderWriter = new PersistedTagsReaderWriter(tagGuardSettings.getDatabaseFile(), new ObjectMapper());
final String filename = tagGuardSettings.getDatabaseFile();
if (StringUtils.isBlank(filename)) {

}
fileReaderWriter = PersistedTagsReaderWriter.of(filename);
jenniferWitzig marked this conversation as resolved.
Show resolved Hide resolved

scheduleTagGuardJob();

log.info(String.format("TagValueGuard started with scheduleDelay %s and database file %s", tagGuardSettings.getScheduleDelay(), tagGuardSettings.getDatabaseFile()));
Expand All @@ -101,78 +88,6 @@ protected void stop() {
blockTagValuesFuture.cancel(true);
}

/**
* Task, which reads the persisted tag values to determine, which tags should be blocked, because of exceeding
* the specific tag value limit.
* If new tags values have been created, they will be persisted.
*/
@VisibleForTesting
Runnable blockTagValuesTask = () -> {
if (!env.getCurrentConfig().getMetrics().getTagGuard().isEnabled()) return;

// read current tag value database
Map<String, Map<String, Set<String>>> availableTagsByMeasure = fileReaderWriter.read();

Set<TagsHolder> copy = latestTags;
latestTags = Collections.synchronizedSet(new HashSet<>());

// process new tags
copy.forEach(tagsHolder -> {
String measureName = tagsHolder.getMeasureName();
Map<String, String> newTags = tagsHolder.getTags();
int maxValuesPerTag = getMaxValuesPerTag(measureName, env.getCurrentConfig());

Map<String, Set<String>> tagValuesByTagKey = availableTagsByMeasure.computeIfAbsent(measureName, k -> Maps.newHashMap());
newTags.forEach((tagKey, tagValue) -> {
Set<String> tagValues = tagValuesByTagKey.computeIfAbsent(tagKey, (x) -> new HashSet<>());
// if tag value is new AND max values per tag is already reached
if (!tagValues.contains(tagValue) && tagValues.size() >= maxValuesPerTag) {
boolean isNewBlockedTag = blockedTagKeysByMeasure.computeIfAbsent(measureName, measure -> Sets.newHashSet()).add(tagKey);
if(isNewBlockedTag) {
agentHealthManager.handleInvalidatableHealth(AgentHealth.ERROR, this.getClass(),
String.format(tagOverFlowMessageTemplate, measureName, tagKey));
hasTagValueOverflow = true;
}
} else {
tagValues.add(tagValue);
}
});

});

fileReaderWriter.write(availableTagsByMeasure);

// remove all blocked tags, if no values are stored in the database file
if(availableTagsByMeasure.isEmpty()) blockedTagKeysByMeasure.clear();

// independent of processing new tags, check if tags should be blocked or unblocked due to their tag value limit
availableTagsByMeasure.forEach((measureName, tags) -> {
int maxValuesPerTag = getMaxValuesPerTag(measureName, env.getCurrentConfig());
tags.forEach((tagKey, tagValues) -> {
if(tagValues.size() >= maxValuesPerTag) {
boolean isNewBlockedTag = blockedTagKeysByMeasure.computeIfAbsent(measureName, measure -> Sets.newHashSet())
.add(tagKey);
if(isNewBlockedTag) {
agentHealthManager.handleInvalidatableHealth(AgentHealth.ERROR, this.getClass(),
String.format(tagOverFlowMessageTemplate, measureName, tagKey));
hasTagValueOverflow = true;
}
} else {
blockedTagKeysByMeasure.getOrDefault(measureName, Sets.newHashSet()).remove(tagKey);
}
});
});

// invalidate incident, if tag overflow was detected, but no more tags are blocked
boolean noBlockedTagKeys = blockedTagKeysByMeasure.values().stream().allMatch(Set::isEmpty);
if(hasTagValueOverflow && noBlockedTagKeys) {
agentHealthManager.invalidateIncident(this.getClass(), "Overflow for tags resolved");
hasTagValueOverflow = false;
}

if (!isShuttingDown) scheduleTagGuardJob();
};

/**
* Gets the max value amount per tag for the given measure by hierarchically extracting
* {@link MetricDefinitionSettings#maxValuesPerTag} (prio 1),
Expand All @@ -198,7 +113,8 @@ int getMaxValuesPerTag(String measureName, InspectitConfig config) {

/**
* Creates the full tag context, including all specified tags, for the current measure
* @param context current context
*
* @param context current context
* @param metricAccessor accessor for the measure as well as the particular tags
* @return TagContext including all tags for the current measure
*/
Expand Down Expand Up @@ -247,7 +163,77 @@ public TagContext getTagContext(IHookAction.ExecutionContext context, MetricAcce

return tagContextBuilder.build();

}
} /**
* Task, which reads the persisted tag values to determine, which tags should be blocked, because of exceeding
* the specific tag value limit.
* If new tags values have been created, they will be persisted.
*/
@VisibleForTesting
Runnable blockTagValuesTask = () -> {
if (!env.getCurrentConfig().getMetrics().getTagGuard().isEnabled()) return;

// read current tag value database
Map<String, Map<String, Set<String>>> availableTagsByMeasure = fileReaderWriter.read();

Set<TagsHolder> copy = latestTags;
latestTags = Collections.synchronizedSet(new HashSet<>());

// process new tags
copy.forEach(tagsHolder -> {
String measureName = tagsHolder.getMeasureName();
Map<String, String> newTags = tagsHolder.getTags();
int maxValuesPerTag = getMaxValuesPerTag(measureName, env.getCurrentConfig());

Map<String, Set<String>> tagValuesByTagKey = availableTagsByMeasure.computeIfAbsent(measureName, k -> Maps.newHashMap());
newTags.forEach((tagKey, tagValue) -> {
Set<String> tagValues = tagValuesByTagKey.computeIfAbsent(tagKey, (x) -> new HashSet<>());
// if tag value is new AND max values per tag is already reached
if (!tagValues.contains(tagValue) && tagValues.size() >= maxValuesPerTag) {
boolean isNewBlockedTag = blockedTagKeysByMeasure.computeIfAbsent(measureName, measure -> Sets.newHashSet()).add(tagKey);
if (isNewBlockedTag) {
agentHealthManager.handleInvalidatableHealth(AgentHealth.ERROR, this.getClass(),
String.format(tagOverFlowMessageTemplate, measureName, tagKey));
hasTagValueOverflow = true;
}
} else {
tagValues.add(tagValue);
}
});

});

fileReaderWriter.write(availableTagsByMeasure);

// remove all blocked tags, if no values are stored in the database file
if (availableTagsByMeasure.isEmpty()) blockedTagKeysByMeasure.clear();

// independent of processing new tags, check if tags should be blocked or unblocked due to their tag value limit
availableTagsByMeasure.forEach((measureName, tags) -> {
int maxValuesPerTag = getMaxValuesPerTag(measureName, env.getCurrentConfig());
tags.forEach((tagKey, tagValues) -> {
if (tagValues.size() >= maxValuesPerTag) {
boolean isNewBlockedTag = blockedTagKeysByMeasure.computeIfAbsent(measureName, measure -> Sets.newHashSet())
.add(tagKey);
if (isNewBlockedTag) {
agentHealthManager.handleInvalidatableHealth(AgentHealth.ERROR, this.getClass(),
String.format(tagOverFlowMessageTemplate, measureName, tagKey));
hasTagValueOverflow = true;
}
} else {
blockedTagKeysByMeasure.getOrDefault(measureName, Sets.newHashSet()).remove(tagKey);
}
});
});

// invalidate incident, if tag overflow was detected, but no more tags are blocked
boolean noBlockedTagKeys = blockedTagKeysByMeasure.values().stream().allMatch(Set::isEmpty);
if (hasTagValueOverflow && noBlockedTagKeys) {
agentHealthManager.invalidateIncident(this.getClass(), "Overflow for tags resolved");
hasTagValueOverflow = false;
}

if (!isShuttingDown) scheduleTagGuardJob();
};

@Value
@EqualsAndHashCode
Expand All @@ -259,46 +245,9 @@ private static class TagsHolder {

}

@AllArgsConstructor
EddeCCC marked this conversation as resolved.
Show resolved Hide resolved
static class PersistedTagsReaderWriter {

@NonNull
private String fileName;

@NonNull
private ObjectMapper mapper;

public Map<String, Map<String, Set<String>>> read() {
if (!StringUtils.isBlank(fileName)) {
Path path = Paths.get(fileName);
if (Files.exists(path)) {
try {
byte[] content = Files.readAllBytes(path);
@SuppressWarnings("unchecked") Map<String, Map<String, Set<String>>> tags = mapper.readValue(content, new TypeReference<Map<String, Map<String, Set<String>>>>() {
});
return tags;
} catch (Exception e) {
log.error("Error loading tag-guard database from persistence file '{}'", fileName, e);
}
} else {
log.info("Could not find tag-guard database file. File will be created during next write");
}
}
return Maps.newHashMap();
}

public void write(Map<String, Map<String, Set<String>>> tagValues) {
if (!StringUtils.isBlank(fileName)) {
try {
Path path = Paths.get(fileName);
Files.createDirectories(path.getParent());
String tagValuesString = mapper.writeValueAsString(tagValues);
Files.write(path, tagValuesString.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
log.error("Error writing tag-guard database to file '{}'", fileName, e);
}
}
}
}


}


Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package rocks.inspectit.ocelot.core.metrics.tagGuards;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;


@Getter
@Slf4j
public class PersistedTagsReaderWriter {

private final ObjectMapper mapper;
@NotNull
private final Path path;

private PersistedTagsReaderWriter(final ObjectMapper mapper, @NotNull final Path path) {
this.mapper = mapper;
this.path = path;
}

public static PersistedTagsReaderWriter of(final String filenameInput) {
final Path path = Paths.get(filenameInput);
return new PersistedTagsReaderWriter(new ObjectMapper(), path);
}

public Map<String, Map<String, Set<String>>> read() {
if (!Files.exists(path)) {
log.info("Could not find tag-guard database file. File will be created during next write");
return new HashMap<>();
}

try {
byte[] content = Files.readAllBytes(path);
return mapper.readValue(content, new TypeReference<Map<String, Map<String, Set<String>>>>() {
});
} catch (final Exception e) {
log.error("Error loading tag-guard database from persistence file", e);
return new HashMap<>();
}
}

public void write(Map<String, Map<String, Set<String>>> tagValues) {
try {
final Path parent = path.getParent();
if (Objects.isNull(parent) || !Files.isWritable(parent) ) {
log.error("Cannot find write the file because of an invalid path.");
jenniferWitzig marked this conversation as resolved.
Show resolved Hide resolved
return;
}

createFileDirectory(parent);
final String tagValuesString = mapper.writeValueAsString(tagValues);
Files.writeString(path, tagValuesString);
jenniferWitzig marked this conversation as resolved.
Show resolved Hide resolved
} catch (final IOException e) {
log.error("Error writing tag-guard database to file", e);
}
}

private void createFileDirectory(final Path parent) throws IOException {
if (!Files.isDirectory(parent)) {
Files.createDirectories(parent);
}
}
}
Loading
Loading