Skip to content

Commit

Permalink
Store cache status info in a map by type url (#135)
Browse files Browse the repository at this point in the history
Signed-off-by: Sebastian Schepens <sebastian.schepens@mercadolibre.com>
  • Loading branch information
sschepens authored Apr 22, 2020
1 parent 4d38183 commit df5fac6
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.envoyproxy.controlplane.cache;

import java.util.ArrayList;
import java.util.Collection;
import javax.annotation.concurrent.ThreadSafe;

/**
* {@code GroupCacheStatusInfo} provides an implementation of {@link StatusInfo} for a group of {@link CacheStatusInfo}.
*/
@ThreadSafe
class GroupCacheStatusInfo<T> implements StatusInfo<T> {
private final Collection<CacheStatusInfo<T>> statuses;

public GroupCacheStatusInfo(Collection<CacheStatusInfo<T>> statuses) {
this.statuses = new ArrayList<>(statuses);
}

/**
* {@inheritDoc}
*/
@Override
public long lastWatchRequestTime() {
return statuses.stream().mapToLong(CacheStatusInfo::lastWatchRequestTime).max().orElse(0);
}

/**
* {@inheritDoc}
*/
@Override
public T nodeGroup() {
return statuses.stream().map(CacheStatusInfo::nodeGroup).findFirst().orElse(null);
}

/**
* {@inheritDoc}
*/
@Override
public int numWatches() {
return statuses.stream().mapToInt(CacheStatusInfo::numWatches).sum();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class SimpleCache<T> implements SnapshotCache<T> {

@GuardedBy("lock")
private final Map<T, Snapshot> snapshots = new HashMap<>();
private final ConcurrentMap<T, CacheStatusInfo<T>> statuses = new ConcurrentHashMap<>();
private final ConcurrentMap<T, ConcurrentMap<String, CacheStatusInfo<T>>> statuses = new ConcurrentHashMap<>();

private AtomicLong watchCount = new AtomicLong();

Expand All @@ -64,10 +64,10 @@ public boolean clearSnapshot(T group) {
// we take a writeLock to prevent watches from being created
writeLock.lock();
try {
CacheStatusInfo<T> status = statuses.get(group);
Map<String, CacheStatusInfo<T>> status = statuses.get(group);

// If we don't know about this group, do nothing.
if (status != null && status.numWatches() > 0) {
if (status != null && status.values().stream().mapToLong(CacheStatusInfo::numWatches).sum() > 0) {
LOGGER.warn("tried to clear snapshot for group with existing watches, group={}", group);

return false;
Expand Down Expand Up @@ -106,7 +106,8 @@ public Watch createWatch(
// doesn't conflict
readLock.lock();
try {
CacheStatusInfo<T> status = statuses.computeIfAbsent(group, g -> new CacheStatusInfo<>(group));
CacheStatusInfo<T> status = statuses.computeIfAbsent(group, g -> new ConcurrentHashMap<>())
.computeIfAbsent(request.getTypeUrl(), s -> new CacheStatusInfo<>(group));
status.setLastWatchRequestTime(System.currentTimeMillis());

Snapshot snapshot = snapshots.get(group);
Expand Down Expand Up @@ -212,7 +213,7 @@ public Collection<T> groups() {
@Override
public synchronized void setSnapshot(T group, Snapshot snapshot) {
// we take a writeLock to prevent watches from being created while we update the snapshot
CacheStatusInfo<T> status;
ConcurrentMap<String, CacheStatusInfo<T>> status;
writeLock.lock();
try {
// Update the existing snapshot entry.
Expand All @@ -238,15 +239,26 @@ public StatusInfo statusInfo(T group) {
readLock.lock();

try {
return statuses.get(group);
ConcurrentMap<String, CacheStatusInfo<T>> statusMap = statuses.get(group);
if (statusMap == null || statusMap.isEmpty()) {
return null;
}

return new GroupCacheStatusInfo<>(statusMap.values());
} finally {
readLock.unlock();
}
}

@VisibleForTesting
protected void respondWithSpecificOrder(T group, Snapshot snapshot, CacheStatusInfo<T> status) {
protected void respondWithSpecificOrder(T group, Snapshot snapshot,
ConcurrentMap<String, CacheStatusInfo<T>> statusMap) {
for (String typeUrl : Resources.TYPE_URLS) {
CacheStatusInfo<T> status = statusMap.get(typeUrl);
if (status == null) {
continue;
}

status.watchesRemoveIf((id, watch) -> {
if (!watch.request().getTypeUrl().equals(typeUrl)) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.envoyproxy.envoy.api.v2.core.Http2ProtocolOptions;
import io.grpc.netty.NettyServerBuilder;
import io.restassured.http.ContentType;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -192,7 +193,8 @@ public CustomCache(NodeGroup<T> groups) {
}

@Override
protected void respondWithSpecificOrder(T group, Snapshot snapshot, CacheStatusInfo<T> status) {
protected void respondWithSpecificOrder(T group, Snapshot snapshot,
ConcurrentMap<String, CacheStatusInfo<T>> status) {
// This code has been removed to show specific case which is hard to reproduce in integration test:
// 1. Envoy connects to control-plane
// 2. Snapshot already exists in control-plane <- other instance share same group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class EnvoyContainer extends GenericContainer<EnvoyContainer> {
private final Supplier<Integer> controlPlanePortSupplier;

EnvoyContainer(String config, Supplier<Integer> controlPlanePortSupplier) {
super("envoyproxy/envoy-alpine:latest");
super("envoyproxy/envoy-alpine-dev:latest");

this.config = config;
this.controlPlanePortSupplier = controlPlanePortSupplier;
Expand Down

0 comments on commit df5fac6

Please sign in to comment.