1717
1818import java .util .Optional ;
1919import java .util .Set ;
20- import java .util .UUID ;
20+ import java .util .function . UnaryOperator ;
2121import java .util .stream .Collectors ;
2222
2323import org .slf4j .Logger ;
3535import io .javaoperatorsdk .operator .processing .event .ResourceID ;
3636import io .javaoperatorsdk .operator .processing .event .source .PrimaryToSecondaryMapper ;
3737
38+ import static io .javaoperatorsdk .operator .api .reconciler .Constants .DEFAULT_COMPARABLE_RESOURCE_VERSIONS ;
39+
3840/**
3941 * Wraps informer(s) so they are connected to the eventing system of the framework. Note that since
4042 * this is built on top of Fabric8 client Informers, it also supports caching resources using
@@ -78,18 +80,17 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
7880 // we need direct control for the indexer to propagate the just update resource also to the index
7981 private final PrimaryToSecondaryIndex <R > primaryToSecondaryIndex ;
8082 private final PrimaryToSecondaryMapper <P > primaryToSecondaryMapper ;
81- private final String id = UUID .randomUUID ().toString ();
8283
8384 public InformerEventSource (
8485 InformerEventSourceConfiguration <R > configuration , EventSourceContext <P > context ) {
8586 this (
8687 configuration ,
8788 configuration .getKubernetesClient ().orElse (context .getClient ()),
88- configuration .parseResourceVersionsForEventFilteringAndCaching ());
89+ configuration .comparableResourceVersions ());
8990 }
9091
9192 InformerEventSource (InformerEventSourceConfiguration <R > configuration , KubernetesClient client ) {
92- this (configuration , client , true );
93+ this (configuration , client , DEFAULT_COMPARABLE_RESOURCE_VERSIONS );
9394 }
9495
9596 @ SuppressWarnings ({"unchecked" , "rawtypes" })
@@ -122,6 +123,22 @@ private InformerEventSource(
122123 genericFilter = informerConfig .getGenericFilter ();
123124 }
124125
126+ public R updateAndCacheResource (
127+ R resourceToUpdate , Context <?> context , UnaryOperator <R > updateMethod ) {
128+ ResourceID id = ResourceID .fromResource (resourceToUpdate );
129+ if (log .isDebugEnabled ()) {
130+ log .debug ("Update and cache: {}" , id );
131+ }
132+ try {
133+ temporaryResourceCache .startModifying (id );
134+ var updated = updateMethod .apply (resourceToUpdate );
135+ handleRecentResourceUpdate (id , updated , resourceToUpdate );
136+ return updated ;
137+ } finally {
138+ temporaryResourceCache .doneModifying (id );
139+ }
140+ }
141+
125142 @ Override
126143 public void onAdd (R newResource ) {
127144 if (log .isDebugEnabled ()) {
@@ -131,9 +148,7 @@ public void onAdd(R newResource) {
131148 resourceType ().getSimpleName (),
132149 newResource .getMetadata ().getResourceVersion ());
133150 }
134- primaryToSecondaryIndex .onAddOrUpdate (newResource );
135- onAddOrUpdate (
136- Operation .ADD , newResource , null , () -> InformerEventSource .super .onAdd (newResource ));
151+ onAddOrUpdate (Operation .ADD , newResource , null );
137152 }
138153
139154 @ Override
@@ -146,16 +161,11 @@ public void onUpdate(R oldObject, R newObject) {
146161 newObject .getMetadata ().getResourceVersion (),
147162 oldObject .getMetadata ().getResourceVersion ());
148163 }
149- primaryToSecondaryIndex .onAddOrUpdate (newObject );
150- onAddOrUpdate (
151- Operation .UPDATE ,
152- newObject ,
153- oldObject ,
154- () -> InformerEventSource .super .onUpdate (oldObject , newObject ));
164+ onAddOrUpdate (Operation .UPDATE , newObject , oldObject );
155165 }
156166
157167 @ Override
158- public void onDelete (R resource , boolean b ) {
168+ public synchronized void onDelete (R resource , boolean b ) {
159169 if (log .isDebugEnabled ()) {
160170 log .debug (
161171 "On delete event received for resource id: {} type: {}" ,
@@ -177,55 +187,28 @@ public synchronized void start() {
177187 manager ().list ().forEach (primaryToSecondaryIndex ::onAddOrUpdate );
178188 }
179189
180- private synchronized void onAddOrUpdate (
181- Operation operation , R newObject , R oldObject , Runnable superOnOp ) {
190+ private synchronized void onAddOrUpdate (Operation operation , R newObject , R oldObject ) {
191+ primaryToSecondaryIndex . onAddOrUpdate ( newObject );
182192 var resourceID = ResourceID .fromResource (newObject );
183193
184- if (canSkipEvent (newObject , oldObject , resourceID )) {
194+ if (temporaryResourceCache . onAddOrUpdateEvent (newObject )) {
185195 log .debug (
186196 "Skipping event propagation for {}, since was a result of a reconcile action. Resource"
187197 + " ID: {}" ,
188198 operation ,
189199 ResourceID .fromResource (newObject ));
190- superOnOp .run ();
200+ } else if (eventAcceptedByFilter (operation , newObject , oldObject )) {
201+ log .debug (
202+ "Propagating event for {}, resource with same version not result of a reconciliation."
203+ + " Resource ID: {}" ,
204+ operation ,
205+ resourceID );
206+ propagateEvent (newObject );
191207 } else {
192- superOnOp .run ();
193- if (eventAcceptedByFilter (operation , newObject , oldObject )) {
194- log .debug (
195- "Propagating event for {}, resource with same version not result of a reconciliation."
196- + " Resource ID: {}" ,
197- operation ,
198- resourceID );
199- propagateEvent (newObject );
200- } else {
201- log .debug ("Event filtered out for operation: {}, resourceID: {}" , operation , resourceID );
202- }
208+ log .debug ("Event filtered out for operation: {}, resourceID: {}" , operation , resourceID );
203209 }
204210 }
205211
206- private boolean canSkipEvent (R newObject , R oldObject , ResourceID resourceID ) {
207- return temporaryResourceCache .canSkipEvent (resourceID , newObject )
208- || isEventKnownFromAnnotation (newObject , oldObject );
209- }
210-
211- private boolean isEventKnownFromAnnotation (R newObject , R oldObject ) {
212- String previous = newObject .getMetadata ().getAnnotations ().get (PREVIOUS_ANNOTATION_KEY );
213- boolean known = false ;
214- if (previous != null ) {
215- String [] parts = previous .split ("," );
216- if (id .equals (parts [0 ])) {
217- if (oldObject == null && parts .length == 1 ) {
218- known = true ;
219- } else if (oldObject != null
220- && parts .length == 2
221- && oldObject .getMetadata ().getResourceVersion ().equals (parts [1 ])) {
222- known = true ;
223- }
224- }
225- }
226- return known ;
227- }
228-
229212 private void propagateEvent (R object ) {
230213 var primaryResourceIdSet =
231214 configuration ().getSecondaryToPrimaryMapper ().toPrimaryResourceIDs (object );
@@ -273,13 +256,13 @@ public Set<R> getSecondaryResources(P primary) {
273256 }
274257
275258 @ Override
276- public synchronized void handleRecentResourceUpdate (
259+ public void handleRecentResourceUpdate (
277260 ResourceID resourceID , R resource , R previousVersionOfResource ) {
278261 handleRecentCreateOrUpdate (Operation .UPDATE , resource , previousVersionOfResource );
279262 }
280263
281264 @ Override
282- public synchronized void handleRecentResourceCreate (ResourceID resourceID , R resource ) {
265+ public void handleRecentResourceCreate (ResourceID resourceID , R resource ) {
283266 handleRecentCreateOrUpdate (Operation .ADD , resource , null );
284267 }
285268
@@ -313,22 +296,6 @@ private boolean acceptedByDeleteFilters(R resource, boolean b) {
313296 && (genericFilter == null || genericFilter .accept (resource ));
314297 }
315298
316- /**
317- * Add an annotation to the resource so that the subsequent will be omitted
318- *
319- * @param resourceVersion null if there is no prior version
320- * @param target mutable resource that will be returned
321- */
322- public R addPreviousAnnotation (String resourceVersion , R target ) {
323- target
324- .getMetadata ()
325- .getAnnotations ()
326- .put (
327- PREVIOUS_ANNOTATION_KEY ,
328- id + Optional .ofNullable (resourceVersion ).map (rv -> "," + rv ).orElse ("" ));
329- return target ;
330- }
331-
332299 private enum Operation {
333300 ADD ,
334301 UPDATE
0 commit comments