Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,46 +1,57 @@
package io.javaoperatorsdk.operator.processing.event.internal;

import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedInformer;
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.fabric8.kubernetes.client.informers.cache.Store;
import io.javaoperatorsdk.operator.processing.event.AbstractEventSource;

public class InformerEventSource<T extends HasMetadata> extends AbstractEventSource {

private final SharedInformer<T> sharedInformer;
private final ResourceToRelatedCustomResourceUIDMapper<T> mapper;
private final Function<T, Set<String>> resourceToUIDs;
private final Function<HasMetadata, T> associatedWith;
private final boolean skipUpdateEventPropagationIfNoChange;

public InformerEventSource(SharedInformer<T> sharedInformer,
ResourceToRelatedCustomResourceUIDMapper<T> mapper) {
this(sharedInformer, mapper, true);
Function<T, Set<String>> resourceToUIDs) {
this(sharedInformer, resourceToUIDs, null, true);
}

InformerEventSource(KubernetesClient client, Class<T> type,
ResourceToRelatedCustomResourceUIDMapper<T> mapper) {
this(client, type, mapper, false);
public InformerEventSource(KubernetesClient client, Class<T> type,
Function<T, Set<String>> resourceToUIDs) {
this(client, type, resourceToUIDs, false);
}

InformerEventSource(KubernetesClient client, Class<T> type,
ResourceToRelatedCustomResourceUIDMapper<T> mapper,
Function<T, Set<String>> resourceToUIDs,
boolean skipUpdateEventPropagationIfNoChange) {
this(client.informers().sharedIndexInformerFor(type, 0), mapper,
this(client.informers().sharedIndexInformerFor(type, 0), resourceToUIDs, null,
skipUpdateEventPropagationIfNoChange);
}

public InformerEventSource(SharedInformer<T> sharedInformer,
ResourceToRelatedCustomResourceUIDMapper<T> mapper,
Function<T, Set<String>> resourceToUIDs,
Function<HasMetadata, T> associatedWith,
boolean skipUpdateEventPropagationIfNoChange) {
this.sharedInformer = sharedInformer;
this.mapper = mapper;
this.resourceToUIDs = resourceToUIDs;
this.skipUpdateEventPropagationIfNoChange = skipUpdateEventPropagationIfNoChange;

sharedInformer.addEventHandler(new ResourceEventHandler<T>() {
this.associatedWith = Objects.requireNonNullElseGet(associatedWith, () -> cr -> {
final var metadata = cr.getMetadata();
return getStore().getByKey(Cache.namespaceKeyFunc(metadata.getNamespace(),
metadata.getName()));
});

sharedInformer.addEventHandler(new ResourceEventHandler<>() {
@Override
public void onAdd(T t) {
propagateEvent(InformerEvent.Action.ADD, t, null);
Expand All @@ -64,7 +75,7 @@ public void onDelete(T t, boolean b) {
}

private void propagateEvent(InformerEvent.Action action, T object, T oldObject) {
var uids = mapper.map(object);
var uids = resourceToUIDs.apply(object);
if (uids.isEmpty()) {
return;
}
Expand All @@ -88,12 +99,19 @@ public Store<T> getStore() {
return sharedInformer.getStore();
}

public SharedInformer<T> getSharedInformer() {
return sharedInformer;
/**
* Retrieves the informed resource associated with the specified primary resource as defined by
* the function provided when this InformerEventSource was created
*
* @param resource the primary resource we want to retrieve the associated resource for
* @return the informed resource associated with the specified primary resource
*/
public T getAssociated(HasMetadata resource) {
return associatedWith.apply(resource);
}

public interface ResourceToRelatedCustomResourceUIDMapper<T> {
List<String> map(T resource);
}

public SharedInformer<T> getSharedInformer() {
return sharedInformer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.javaoperatorsdk.operator.processing.event.internal;

import java.util.Collections;
import java.util.Set;
import java.util.function.Function;

import io.fabric8.kubernetes.api.model.HasMetadata;

public class Mappers {
public static <T extends HasMetadata> Function<T, Set<String>> fromAnnotation(
String annotationKey) {
return fromMetadata(annotationKey, false);
}

public static <T extends HasMetadata> Function<T, Set<String>> fromLabel(
String labelKey) {
return fromMetadata(labelKey, true);
}

private static <T extends HasMetadata> Function<T, Set<String>> fromMetadata(
String key, boolean isLabel) {
return resource -> {
final var metadata = resource.getMetadata();
if (metadata == null) {
return Collections.emptySet();
} else {
final var map = isLabel ? metadata.getLabels() : metadata.getAnnotations();
return map != null ? Set.of(map.get(key)) : Collections.emptySet();
}
};
}
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
package io.javaoperatorsdk.operator.sample.informereventsource;

import java.util.Arrays;
import java.util.Collections;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.informers.SharedInformer;
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.javaoperatorsdk.operator.api.Context;
import io.javaoperatorsdk.operator.api.Controller;
import io.javaoperatorsdk.operator.api.ResourceController;
import io.javaoperatorsdk.operator.api.UpdateControl;
import io.javaoperatorsdk.operator.junit.KubernetesClientAware;
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
import io.javaoperatorsdk.operator.processing.event.internal.InformerEventSource;
import io.javaoperatorsdk.operator.processing.event.internal.Mappers;

import static io.javaoperatorsdk.operator.api.Controller.NO_FINALIZER;

Expand All @@ -36,19 +31,13 @@ public class InformerEventSourceTestCustomResourceController implements
public static final String TARGET_CONFIG_MAP_KEY = "targetStatus";

private KubernetesClient kubernetesClient;
private SharedInformer<ConfigMap> informer;
private InformerEventSource<ConfigMap> eventSource;

@Override
public void init(EventSourceManager eventSourceManager) {
SharedInformerFactory sharedInformerFactory = kubernetesClient.informers();
informer = sharedInformerFactory.sharedIndexInformerFor(ConfigMap.class, 0);
eventSourceManager.registerEventSource("configmap", new InformerEventSource<>(informer,
resource -> {
if (resource.getMetadata() == null || resource.getMetadata().getAnnotations() == null) {
return Collections.emptyList();
}
return Arrays.asList(resource.getMetadata().getAnnotations().get(RELATED_RESOURCE_UID));
}));
eventSource = new InformerEventSource<>(kubernetesClient, ConfigMap.class,
Mappers.fromAnnotation(RELATED_RESOURCE_UID));
eventSourceManager.registerEventSource("configmap", eventSource);
}

@Override
Expand All @@ -58,9 +47,7 @@ public UpdateControl<InformerEventSourceTestCustomResource> createOrUpdateResour

// Reading the config map from the informer not from the API
// name of the config map same as custom resource for sake of simplicity
ConfigMap configMap =
informer.getStore().getByKey(Cache.namespaceKeyFunc(resource.getMetadata().getNamespace(),
resource.getMetadata().getName()));
ConfigMap configMap = eventSource.getAssociated(resource);

String targetStatus = configMap.getData().get(TARGET_CONFIG_MAP_KEY);
LOGGER.debug("Setting target status for CR: {}", targetStatus);
Expand Down