Skip to content

fix: startup resource cache access - alternate version #2880

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,19 @@
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper;

class DefaultPrimaryToSecondaryIndex<R extends HasMetadata> implements PrimaryToSecondaryIndex<R> {
class DefaultTemporalPrimaryToSecondaryIndex<R extends HasMetadata>
implements TemporalPrimaryToSecondaryIndex<R> {

private final SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper;
private final Map<ResourceID, Set<ResourceID>> index = new HashMap<>();

public DefaultPrimaryToSecondaryIndex(SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper) {
public DefaultTemporalPrimaryToSecondaryIndex(
SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper) {
this.secondaryToPrimaryMapper = secondaryToPrimaryMapper;
}

@Override
public synchronized void onAddOrUpdate(R resource) {
public synchronized void explicitAddOrUpdate(R resource) {
Set<ResourceID> primaryResources = secondaryToPrimaryMapper.toPrimaryResourceIDs(resource);
primaryResources.forEach(
primaryResource -> {
Expand All @@ -28,7 +30,7 @@ public synchronized void onAddOrUpdate(R resource) {
}

@Override
public synchronized void onDelete(R resource) {
public synchronized void cleanupForResource(R resource) {
Set<ResourceID> primaryResources = secondaryToPrimaryMapper.toPrimaryResourceIDs(resource);
primaryResources.forEach(
primaryResource -> {
Expand All @@ -51,7 +53,7 @@ public synchronized Set<ResourceID> getSecondaryResources(ResourceID primary) {
if (resourceIDs == null) {
return Collections.emptySet();
} else {
return Collections.unmodifiableSet(resourceIDs);
return new HashSet<>(resourceIDs);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -58,10 +59,11 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
extends ManagedInformerEventSource<R, P, InformerEventSourceConfiguration<R>>
implements ResourceEventHandler<R> {

public static final String PRIMARY_TO_SECONDARY_INDEX_NAME = "primaryToSecondary";

public static final String PREVIOUS_ANNOTATION_KEY = "javaoperatorsdk.io/previous";
private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class);
// we need direct control for the indexer to propagate the just update resource also to the index
private final PrimaryToSecondaryIndex<R> primaryToSecondaryIndex;
private final PrimaryToSecondaryMapper<P> primaryToSecondaryMapper;
private final String id = UUID.randomUUID().toString();

Expand Down Expand Up @@ -95,12 +97,14 @@ private InformerEventSource(
parseResourceVersions);
// If there is a primary to secondary mapper there is no need for primary to secondary index.
primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper();
if (primaryToSecondaryMapper == null) {
primaryToSecondaryIndex =
// The index uses the secondary to primary mapper (always present) to build the index
new DefaultPrimaryToSecondaryIndex<>(configuration.getSecondaryToPrimaryMapper());
} else {
primaryToSecondaryIndex = NOOPPrimaryToSecondaryIndex.getInstance();
if (useSecondaryToPrimaryIndex()) {
addIndexers(
Map.of(
PRIMARY_TO_SECONDARY_INDEX_NAME,
(R r) ->
configuration.getSecondaryToPrimaryMapper().toPrimaryResourceIDs(r).stream()
.map(InformerEventSource::resourceIdToString)
.toList()));
}

final var informerConfig = configuration.getInformerConfig();
Expand All @@ -119,7 +123,6 @@ public void onAdd(R newResource) {
resourceType().getSimpleName(),
newResource.getMetadata().getResourceVersion());
}
primaryToSecondaryIndex.onAddOrUpdate(newResource);
onAddOrUpdate(
Operation.ADD, newResource, null, () -> InformerEventSource.super.onAdd(newResource));
}
Expand All @@ -134,7 +137,7 @@ public void onUpdate(R oldObject, R newObject) {
newObject.getMetadata().getResourceVersion(),
oldObject.getMetadata().getResourceVersion());
}
primaryToSecondaryIndex.onAddOrUpdate(newObject);

onAddOrUpdate(
Operation.UPDATE,
newObject,
Expand All @@ -150,7 +153,6 @@ public void onDelete(R resource, boolean b) {
ResourceID.fromResource(resource),
resourceType().getSimpleName());
}
primaryToSecondaryIndex.onDelete(resource);
super.onDelete(resource, b);
if (acceptedByDeleteFilters(resource, b)) {
propagateEvent(resource);
Expand All @@ -160,7 +162,6 @@ public void onDelete(R resource, boolean b) {
private synchronized void onAddOrUpdate(
Operation operation, R newObject, R oldObject, Runnable superOnOp) {
var resourceID = ResourceID.fromResource(newObject);

if (canSkipEvent(newObject, oldObject, resourceID)) {
log.debug(
"Skipping event propagation for {}, since was a result of a reconcile action. Resource"
Expand Down Expand Up @@ -244,42 +245,73 @@ private void propagateEvent(R object) {

@Override
public Set<R> getSecondaryResources(P primary) {
Set<ResourceID> secondaryIDs;

if (useSecondaryToPrimaryIndex()) {
var primaryResourceID = ResourceID.fromResource(primary);
secondaryIDs = primaryToSecondaryIndex.getSecondaryResources(primaryResourceID);
var primaryID = ResourceID.fromResource(primary);
// Note that the order matter is these lines. This method is not synchronized
// because of performance reasons. If it was in reverse order, it could happen
// that we did not receive yet an event in the informer so the index would not
// be updated. However, before reading it from temp IDs the event arrives and erases
// the temp index. So in case of Add not id would be found.
var sid = resourceIdToString(primaryID);
var temporalIds =
temporaryResourceCache
.getTemporalPrimaryToSecondaryIndex()
.getSecondaryResources(primaryID);
var resources = byIndex(PRIMARY_TO_SECONDARY_INDEX_NAME, sid);

log.debug(
"Using PrimaryToSecondaryIndex to find secondary resources for primary: {}. Found"
+ " secondary ids: {} ",
primaryResourceID,
secondaryIDs);
"Using informer primary to secondary index to find secondary resources for primary name:"
+ " {} namespace: {}. Found number {}, String id: {}. All resources: {}",
primary.getMetadata().getName(),
primary.getMetadata().getNamespace(),
resources.size(),
sid,
manager().list().map(ResourceID::fromResource).toList());

log.debug("Complementary ids: {}", temporalIds);
var res =
resources.stream()
.map(
r -> {
var resourceId = ResourceID.fromResource(r);
temporalIds.remove(resourceId);
Optional<R> resource = temporaryResourceCache.getResourceFromCache(resourceId);
return resource.orElse(r);
})
.collect(Collectors.toSet());
temporalIds.forEach(
id -> {
Optional<R> resource = get(id);
resource.ifPresentOrElse(res::add, () -> log.warn("Resource not found: {}", id));
});
return res;
} else {
secondaryIDs = primaryToSecondaryMapper.toSecondaryResourceIDs(primary);
Set<ResourceID> secondaryIDs = primaryToSecondaryMapper.toSecondaryResourceIDs(primary);
log.debug(
"Using PrimaryToSecondaryMapper to find secondary resources for primary: {}. Found"
+ " secondary ids: {} ",
primary,
secondaryIDs);
return secondaryIDs.stream()
.map(this::get)
.flatMap(Optional::stream)
.collect(Collectors.toSet());
}
return secondaryIDs.stream()
.map(this::get)
.flatMap(Optional::stream)
.collect(Collectors.toSet());
}

@Override
public synchronized void handleRecentResourceUpdate(
ResourceID resourceID, R resource, R previousVersionOfResource) {
handleRecentCreateOrUpdate(Operation.UPDATE, resource, previousVersionOfResource);
handleRecentCreateOrUpdate(resource, previousVersionOfResource);
}

@Override
public synchronized void handleRecentResourceCreate(ResourceID resourceID, R resource) {
handleRecentCreateOrUpdate(Operation.ADD, resource, null);
handleRecentCreateOrUpdate(resource, null);
}

private void handleRecentCreateOrUpdate(Operation operation, R newResource, R oldResource) {
primaryToSecondaryIndex.onAddOrUpdate(newResource);
private void handleRecentCreateOrUpdate(R newResource, R oldResource) {
temporaryResourceCache.putResource(
newResource,
Optional.ofNullable(oldResource)
Expand Down Expand Up @@ -332,4 +364,20 @@ private enum Operation {
ADD,
UPDATE
}

private static String resourceIdToString(ResourceID resourceID) {
return resourceID.getName() + "#" + resourceID.getNamespace().orElse("$na");
}

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
protected TemporaryResourceCache<R> temporaryResourceCache() {
return new TemporaryResourceCache<>(
this,
useSecondaryToPrimaryIndex()
? new DefaultTemporalPrimaryToSecondaryIndex(
configuration().getSecondaryToPrimaryMapper())
: NOOPTemporalPrimaryToSecondaryIndex.getInstance(),
parseResourceVersions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public abstract class ManagedInformerEventSource<

private static final Logger log = LoggerFactory.getLogger(ManagedInformerEventSource.class);
private InformerManager<R, C> cache;
private final boolean parseResourceVersions;
protected final boolean parseResourceVersions;
private ControllerConfiguration<R> controllerConfiguration;
private final C configuration;
private final Map<String, Function<R, List<String>>> indexers = new HashMap<>();
Expand Down Expand Up @@ -87,7 +87,7 @@ public synchronized void start() {
if (isRunning()) {
return;
}
temporaryResourceCache = new TemporaryResourceCache<>(this, parseResourceVersions);
temporaryResourceCache = temporaryResourceCache();
this.cache = new InformerManager<>(client, configuration, this);
cache.setControllerConfiguration(controllerConfiguration);
cache.addIndexers(indexers);
Expand Down Expand Up @@ -133,6 +133,11 @@ public Optional<R> get(ResourceID resourceID) {
}
}

protected TemporaryResourceCache temporaryResourceCache() {
return new TemporaryResourceCache<>(
this, NOOPTemporalPrimaryToSecondaryIndex.getInstance(), parseResourceVersions);
}

@SuppressWarnings("unused")
public Optional<R> getCachedValue(ResourceID resourceID) {
return get(resourceID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,27 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

class NOOPPrimaryToSecondaryIndex<R extends HasMetadata> implements PrimaryToSecondaryIndex<R> {
class NOOPTemporalPrimaryToSecondaryIndex<R extends HasMetadata>
implements TemporalPrimaryToSecondaryIndex<R> {

@SuppressWarnings("rawtypes")
private static final NOOPPrimaryToSecondaryIndex instance = new NOOPPrimaryToSecondaryIndex();
private static final NOOPTemporalPrimaryToSecondaryIndex instance =
new NOOPTemporalPrimaryToSecondaryIndex();

@SuppressWarnings("unchecked")
public static <T extends HasMetadata> NOOPPrimaryToSecondaryIndex<T> getInstance() {
public static <T extends HasMetadata> NOOPTemporalPrimaryToSecondaryIndex<T> getInstance() {
return instance;
}

private NOOPPrimaryToSecondaryIndex() {}
private NOOPTemporalPrimaryToSecondaryIndex() {}

@Override
public void onAddOrUpdate(R resource) {
public void explicitAddOrUpdate(R resource) {
// empty method because of noop implementation
}

@Override
public void onDelete(R resource) {
public void cleanupForResource(R resource) {
// empty method because of noop implementation
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

public interface PrimaryToSecondaryIndex<R extends HasMetadata> {
public interface TemporalPrimaryToSecondaryIndex<R extends HasMetadata> {

void onAddOrUpdate(R resource);
void explicitAddOrUpdate(R resource);

void onDelete(R resource);
void cleanupForResource(R resource);

Set<ResourceID> getSecondaryResources(ResourceID primary);
}
Loading