Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Update comments, add a limit on tracker size, and change from entityM…
Browse files Browse the repository at this point in the history
…odelId to entityId
  • Loading branch information
kaituo committed Feb 24, 2021
1 parent c5946d0 commit ab741f5
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@
public class CacheBuffer implements ExpiringState, MaintenanceState {
private static final Logger LOG = LogManager.getLogger(CacheBuffer.class);

// max entities to track per detector
private final int MAX_TRACKING_ENTITIES = 1000000;

private final int minimumCapacity;
// key -> value
private final ConcurrentHashMap<String, ModelState<EntityModel>> items;
Expand Down Expand Up @@ -100,7 +103,7 @@ public CacheBuffer(

this.reservedBytes = memoryConsumptionPerEntity * minimumCapacity;
this.clock = clock;
this.priorityTracker = new PriorityTracker(clock, intervalSecs, clock.instant().getEpochSecond());
this.priorityTracker = new PriorityTracker(clock, intervalSecs, clock.instant().getEpochSecond(), MAX_TRACKING_ENTITIES);
}

/**
Expand Down Expand Up @@ -204,7 +207,7 @@ public ModelState<EntityModel> remove() {
// The removed one loses references and soon GC will collect it.
// We have memory tracking correction to fix incorrect memory usage record.
// put: not a problem as it is unlikely we are removing and putting the same thing
Optional<String> key = priorityTracker.getMinimumPriorityEntityModelId();
Optional<String> key = priorityTracker.getMinimumPriorityEntityId();
if (key.isPresent()) {
return remove(key.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ public long getTotalUpdates(String detectorId) {
return Optional
.of(activeEnities)
.map(entities -> entities.get(detectorId))
.map(buffer -> buffer.getPriorityTracker().getHighestPriorityEntityModelId())
.map(buffer -> buffer.getPriorityTracker().getHighestPriorityEntityId())
.map(entityModelIdOptional -> entityModelIdOptional.get())
.map(entityModelId -> getTotalUpdates(detectorId, entityModelId))
.orElse(0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,21 +149,26 @@ public int compare(PriorityNode priority, PriorityNode priority2) {
// We chose 0.125 because multiplying 0.125 can be implemented efficiently using 3 right
// shift and the half life is not too fast or slow .
private final int DECAY_CONSTANT;
// the max number of entities to track
private final int maxEntities;

/**
* Constructor
* Create a priority tracker for a detector. Detector and priority tracker
* have 1:1 mapping.
*
* @param clock Used to get current time.
* @param intervalSecs Detector interval seconds.
* @param landmarkEpoch The epoch time when the priority tracking starts.
* @param maxEntities the max number of entities to track
*/
public PriorityTracker(Clock clock, long intervalSecs, long landmarkEpoch) {
public PriorityTracker(Clock clock, long intervalSecs, long landmarkEpoch, int maxEntities) {
this.key2Priority = new ConcurrentHashMap<>();
this.clock = clock;
this.intervalSecs = intervalSecs;
this.landmarkEpoch = landmarkEpoch;
this.priorityList = new ConcurrentSkipListSet<>(new PriorityNodeComparator());
this.DECAY_CONSTANT = 3;
this.maxEntities = maxEntities;
}

/**
Expand All @@ -188,51 +193,67 @@ public Entry<String, Float> getMinimumPriority() {

/**
*
* @return the minimum priority entity's model Id
* @return the minimum priority entity's Id
*/
public Optional<String> getMinimumPriorityEntityModelId() {
public Optional<String> getMinimumPriorityEntityId() {
return Optional.of(priorityList).map(list -> list.first()).map(node -> node.key);
}

/**
*
* @return Get maximum priority entity's model Id
* @return Get maximum priority entity's Id
*/
public Optional<String> getHighestPriorityEntityModelId() {
public Optional<String> getHighestPriorityEntityId() {
return Optional.of(priorityList).map(list -> list.last()).map(node -> node.key);
}

/**
* Update an entity's priority with count increment
* @param entityModelId Entity model Id
* @param entityId Entity Id
*/
protected void updatePriority(String entityModelId) {
PriorityNode node = key2Priority.computeIfAbsent(entityModelId, k -> new PriorityNode(entityModelId, 0f));
public void updatePriority(String entityId) {
PriorityNode node = key2Priority.computeIfAbsent(entityId, k -> new PriorityNode(entityId, 0f));
// reposition this node
this.priorityList.remove(node);
node.priority = getUpdatedPriority(node.priority);
this.priorityList.add(node);

adjustSizeIfRequired();
}

/**
* Associate the specified priority with the entity model Id
* @param entityModelId Entity model Id
* Associate the specified priority with the entity Id
* @param entityId Entity Id
* @param priority priority
*/
protected void addPriority(String entityModelId, float priority) {
PriorityNode node = new PriorityNode(entityModelId, priority);
key2Priority.put(entityModelId, node);
protected void addPriority(String entityId, float priority) {
PriorityNode node = new PriorityNode(entityId, priority);
key2Priority.put(entityId, node);
priorityList.add(node);

adjustSizeIfRequired();
}

/**
* Adjust tracking list if the size exceeded the limit
*/
private void adjustSizeIfRequired() {
if (key2Priority.size() > maxEntities) {
Optional<String> minPriorityId = getMinimumPriorityEntityId();
if (minPriorityId.isPresent()) {
removePriority(minPriorityId.get());
}
}
}

/**
* Remove an entity in the tracker
* @param entityModelId Entity model Id
* @param entityId Entity Id
*/
protected void removePriority(String entityModelId) {
protected void removePriority(String entityId) {
// remove if the key matches; priority does not matter
priorityList.remove(new PriorityNode(entityModelId, 0));
key2Priority.remove(entityModelId);
priorityList.remove(new PriorityNode(entityId, 0));
key2Priority.remove(entityId);
}

/**
Expand All @@ -254,7 +275,7 @@ protected void clearPriority() {
* detector is enabled. i - L measures the elapsed periods since detector starts.
* 0.125 is the decay constant.
*
* Since g(p−L) is changing and they are the same for all entities of the same detector,
* Since g(i−L) is changing and they are the same for all entities of the same detector,
* we can compare entities' priorities by considering the accumulated sum of g(i−L).
*
* @param oldPriority Existing priority
Expand Down Expand Up @@ -317,4 +338,12 @@ public List<String> getTopNEntities(int n) {
}
return entities;
}

/**
*
* @return the number of tracked entities
*/
public int size() {
return key2Priority.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void testCanRemove() {
cacheBuffer.replace(modelId3, MLUtil.randomModelState(initialPriority, modelId3));
assertTrue(cacheBuffer.isActive(modelId2));
assertTrue(cacheBuffer.isActive(modelId3));
assertEquals(modelId3, cacheBuffer.getPriorityTracker().getHighestPriorityEntityModelId().get());
assertEquals(modelId3, cacheBuffer.getPriorityTracker().getHighestPriorityEntityId().get());
assertEquals(2, cacheBuffer.getActiveEntities());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void setUp() throws Exception {
super.setUp();
clock = mock(Clock.class);
now = Instant.now();
tracker = new PriorityTracker(clock, 1, now.getEpochSecond());
tracker = new PriorityTracker(clock, 1, now.getEpochSecond(), 3);
entity1 = "entity1";
entity2 = "entity2";
entity3 = "entity3";
Expand Down Expand Up @@ -83,4 +83,15 @@ public void testOverflow() {
// overflow happens, we use increment as the new priority
assertEquals(0, tracker.getMinimumScaledPriority().getValue().floatValue(), 0.001);
}

public void testTooManyEntities() {
when(clock.instant()).thenReturn(now);
tracker = new PriorityTracker(clock, 1, now.getEpochSecond(), 2);
tracker.updatePriority(entity1);
tracker.updatePriority(entity3);
assertEquals(2, tracker.size());
tracker.updatePriority(entity2);
// one entity is kicked out due to the size limit is reached.
assertEquals(2, tracker.size());
}
}

0 comments on commit ab741f5

Please sign in to comment.