1414import io .javaoperatorsdk .operator .api .config .ConfigurationServiceProvider ;
1515import io .javaoperatorsdk .operator .api .config .ExecutorServiceManager ;
1616import io .javaoperatorsdk .operator .api .monitoring .Metrics ;
17- import io .javaoperatorsdk .operator .api .reconciler .RetryInfo ;
1817import io .javaoperatorsdk .operator .processing .LifecycleAware ;
1918import io .javaoperatorsdk .operator .processing .MDCUtils ;
2019import io .javaoperatorsdk .operator .processing .event .rate .RateLimiter ;
@@ -41,7 +40,7 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
4140 private final Metrics metrics ;
4241 private final Cache <R > cache ;
4342 private final EventSourceManager <R > eventSourceManager ;
44- private final EventMarker eventMarker = new EventMarker ();
43+ // private final EventMarker eventMarker = new EventMarker();
4544 private final RateLimiter <? extends RateLimitState > rateLimiter ;
4645
4746 private final ResourceStateManager resourceStateManager = new ResourceStateManager ();
@@ -108,77 +107,76 @@ public synchronized void handleEvent(Event event) {
108107 log .debug ("Received event: {}" , event );
109108
110109 final var resourceID = event .getRelatedCustomResourceID ();
110+ final var state = resourceStateManager .getOrCreate (event .getRelatedCustomResourceID ());
111111 MDCUtils .addResourceIDInfo (resourceID );
112112 metrics .receivedEvent (event );
113- handleEventMarking (event );
113+ handleEventMarking (event , state );
114114 if (!this .running ) {
115115 // events are received and marked, but will be processed when started, see start() method.
116116 log .debug ("Skipping event: {} because the event processor is not started" , event );
117117 return ;
118118 }
119- handleMarkedEventForResource (resourceID );
119+ handleMarkedEventForResource (state );
120120 } finally {
121121 MDCUtils .removeResourceIDInfo ();
122122 }
123123 }
124124
125- private void handleMarkedEventForResource (ResourceID resourceID ) {
126- if (eventMarker .deleteEventPresent (resourceID )) {
127- cleanupForDeletedEvent (resourceID );
128- } else if (!eventMarker .processedMarkForDeletionPresent (resourceID )) {
129- submitReconciliationExecution (resourceID );
125+ private void handleMarkedEventForResource (ResourceState state ) {
126+ if (state .deleteEventPresent ()) {
127+ cleanupForDeletedEvent (state . getId () );
128+ } else if (!state .processedMarkForDeletionPresent ()) {
129+ submitReconciliationExecution (state );
130130 }
131131 }
132132
133- private void submitReconciliationExecution (ResourceID resourceID ) {
133+ private void submitReconciliationExecution (ResourceState state ) {
134134 try {
135- boolean controllerUnderExecution = isControllerUnderExecution (resourceID );
136- Optional <R > latest = cache .get (resourceID );
135+ boolean controllerUnderExecution = isControllerUnderExecution (state );
136+ Optional <R > latest = cache .get (state . getId () );
137137 latest .ifPresent (MDCUtils ::addResourceInfo );
138138 if (!controllerUnderExecution && latest .isPresent ()) {
139- final var resourceState = resourceStateManager .getOrCreate (resourceID );
140- var rateLimit = resourceState .getRateLimit ();
139+ var rateLimit = state .getRateLimit ();
141140 if (rateLimit == null ) {
142141 rateLimit = rateLimiter .initState ();
143- resourceState .setRateLimit (rateLimit );
142+ state .setRateLimit (rateLimit );
144143 }
145144 var rateLimiterPermission = rateLimiter .isLimited (rateLimit );
146145 if (rateLimiterPermission .isPresent ()) {
147- handleRateLimitedSubmission (resourceID , rateLimiterPermission .get ());
146+ handleRateLimitedSubmission (state . getId () , rateLimiterPermission .get ());
148147 return ;
149148 }
150- setUnderExecutionProcessing ( resourceID );
151- final var retryInfo = retryInfo ( resourceID );
149+ state . setUnderProcessing ( true );
150+ final var retryInfo = state . getRetry ( );
152151 ExecutionScope <R > executionScope = new ExecutionScope <>(latest .get (), retryInfo );
153- eventMarker .unMarkEventReceived (resourceID );
154- metrics .reconcileCustomResource (resourceID , retryInfo );
152+ state .unMarkEventReceived ();
153+ metrics .reconcileCustomResource (state . getId () , retryInfo );
155154 log .debug ("Executing events for custom resource. Scope: {}" , executionScope );
156155 executor .execute (new ControllerExecution (executionScope ));
157156 } else {
158157 log .debug (
159158 "Skipping executing controller for resource id: {}. Controller in execution: {}. Latest Resource present: {}" ,
160- resourceID ,
159+ state ,
161160 controllerUnderExecution ,
162161 latest .isPresent ());
163162 if (latest .isEmpty ()) {
164- log .debug ("no custom resource found in cache for ResourceID: {}" , resourceID );
163+ log .debug ("no custom resource found in cache for ResourceID: {}" , state );
165164 }
166165 }
167166 } finally {
168167 MDCUtils .removeResourceInfo ();
169168 }
170169 }
171170
172- private void handleEventMarking (Event event ) {
171+ private void handleEventMarking (Event event , ResourceState state ) {
173172 final var relatedCustomResourceID = event .getRelatedCustomResourceID ();
174173 if (event instanceof ResourceEvent ) {
175174 var resourceEvent = (ResourceEvent ) event ;
176175 if (resourceEvent .getAction () == ResourceAction .DELETED ) {
177176 log .debug ("Marking delete event received for: {}" , relatedCustomResourceID );
178- eventMarker .markDeleteEventReceived (event );
177+ state .markDeleteEventReceived ();
179178 } else {
180- if (eventMarker .processedMarkForDeletionPresent (relatedCustomResourceID )
181- && isResourceMarkedForDeletion (resourceEvent )) {
179+ if (state .processedMarkForDeletionPresent () && isResourceMarkedForDeletion (resourceEvent )) {
182180 log .debug (
183181 "Skipping mark of event received, since already processed mark for deletion and resource marked for deletion: {}" ,
184182 relatedCustomResourceID );
@@ -190,22 +188,21 @@ && isResourceMarkedForDeletion(resourceEvent)) {
190188 // removed, but also the informers websocket is disconnected and later reconnected. So
191189 // meanwhile the resource could be deleted and recreated. In this case we just mark a new
192190 // event as below.
193- markEventReceived (event );
191+ markEventReceived (state );
194192 }
195- } else if (!eventMarker .deleteEventPresent (relatedCustomResourceID ) ||
196- !eventMarker .processedMarkForDeletionPresent (relatedCustomResourceID )) {
197- markEventReceived (event );
193+ } else if (!state .deleteEventPresent () || !state .processedMarkForDeletionPresent ()) {
194+ markEventReceived (state );
198195 } else if (log .isDebugEnabled ()) {
199196 log .debug (
200197 "Skipped marking event as received. Delete event present: {}, processed mark for deletion: {}" ,
201- eventMarker .deleteEventPresent (relatedCustomResourceID ),
202- eventMarker .processedMarkForDeletionPresent (relatedCustomResourceID ));
198+ state .deleteEventPresent (),
199+ state .processedMarkForDeletionPresent ());
203200 }
204201 }
205202
206- private void markEventReceived (Event event ) {
207- log .debug ("Marking event received for: {}" , event . getRelatedCustomResourceID ());
208- eventMarker .markEventReceived (event );
203+ private void markEventReceived (ResourceState state ) {
204+ log .debug ("Marking event received for: {}" , state . getId ());
205+ state .markEventReceived ();
209206 }
210207
211208 private boolean isResourceMarkedForDeletion (ResourceEvent resourceEvent ) {
@@ -220,16 +217,13 @@ private void handleRateLimitedSubmission(ResourceID resourceID, Duration minimal
220217 Math .max (minimalDurationMillis , MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION ));
221218 }
222219
223- private RetryInfo retryInfo (ResourceID resourceID ) {
224- return resourceStateManager .getOrCreate (resourceID ).getRetry ();
225- }
226-
227220 synchronized void eventProcessingFinished (
228221 ExecutionScope <R > executionScope , PostExecutionControl <R > postExecutionControl ) {
229222 if (!running ) {
230223 return ;
231224 }
232225 ResourceID resourceID = executionScope .getResourceID ();
226+ final var state = resourceStateManager .getOrCreate (resourceID );
233227 log .debug (
234228 "Event processing finished. Scope: {}, PostExecutionControl: {}" ,
235229 executionScope ,
@@ -241,17 +235,17 @@ synchronized void eventProcessingFinished(
241235 // Either way we don't want to retry.
242236 if (isRetryConfigured ()
243237 && postExecutionControl .exceptionDuringExecution ()
244- && !eventMarker .deleteEventPresent (resourceID )) {
238+ && !state .deleteEventPresent ()) {
245239 handleRetryOnException (
246240 executionScope , postExecutionControl .getRuntimeException ().orElseThrow ());
247241 return ;
248242 }
249243 cleanupOnSuccessfulExecution (executionScope );
250244 metrics .finishedReconciliation (resourceID );
251- if (eventMarker .deleteEventPresent (resourceID )) {
245+ if (state .deleteEventPresent ()) {
252246 cleanupForDeletedEvent (executionScope .getResourceID ());
253247 } else if (postExecutionControl .isFinalizerRemoved ()) {
254- eventMarker .markProcessedMarkForDeletion (resourceID );
248+ state .markProcessedMarkForDeletion ();
255249 } else {
256250 postExecutionControl
257251 .getUpdatedCustomResource ()
@@ -264,8 +258,8 @@ synchronized void eventProcessingFinished(
264258 ResourceID .fromResource (r ), r , executionScope .getResource ());
265259 }
266260 });
267- if (eventMarker .eventPresent (resourceID )) {
268- submitReconciliationExecution (resourceID );
261+ if (state .eventPresent ()) {
262+ submitReconciliationExecution (state );
269263 } else {
270264 reScheduleExecutionIfInstructed (postExecutionControl , executionScope .getResource ());
271265 }
@@ -296,17 +290,17 @@ TimerEventSource<R> retryEventSource() {
296290 */
297291 private void handleRetryOnException (
298292 ExecutionScope <R > executionScope , Exception exception ) {
299- RetryExecution execution = getOrInitRetryExecution (executionScope );
300- var resourceID = executionScope . getResourceID ();
301- boolean eventPresent = eventMarker .eventPresent (resourceID );
302- eventMarker .markEventReceived (resourceID );
293+ final var state = getOrInitRetryExecution (executionScope );
294+ var resourceID = state . getId ();
295+ boolean eventPresent = state .eventPresent ();
296+ state .markEventReceived ();
303297
304298 if (eventPresent ) {
305299 log .debug ("New events exists for for resource id: {}" , resourceID );
306- submitReconciliationExecution (resourceID );
300+ submitReconciliationExecution (state );
307301 return ;
308302 }
309- Optional <Long > nextDelay = execution .nextDelay ();
303+ Optional <Long > nextDelay = state . getRetry () .nextDelay ();
310304
311305 nextDelay .ifPresentOrElse (
312306 delay -> {
@@ -329,29 +323,24 @@ private void cleanupOnSuccessfulExecution(ExecutionScope<R> executionScope) {
329323 retryEventSource ().cancelOnceSchedule (executionScope .getResourceID ());
330324 }
331325
332- private RetryExecution getOrInitRetryExecution (ExecutionScope <R > executionScope ) {
326+ private ResourceState getOrInitRetryExecution (ExecutionScope <R > executionScope ) {
333327 final var state = resourceStateManager .getOrCreate (executionScope .getResourceID ());
334328 RetryExecution retryExecution = state .getRetry ();
335329 if (retryExecution == null ) {
336330 retryExecution = retry .initExecution ();
337331 state .setRetry (retryExecution );
338332 }
339- return retryExecution ;
333+ return state ;
340334 }
341335
342336 private void cleanupForDeletedEvent (ResourceID resourceID ) {
343337 log .debug ("Cleaning up for delete event for: {}" , resourceID );
344- eventMarker .cleanup (resourceID );
345338 resourceStateManager .remove (resourceID );
346339 metrics .cleanupDoneFor (resourceID );
347340 }
348341
349- private boolean isControllerUnderExecution (ResourceID resourceID ) {
350- return resourceStateManager .getOrCreate (resourceID ).isUnderProcessing ();
351- }
352-
353- private void setUnderExecutionProcessing (ResourceID resourceID ) {
354- resourceStateManager .getOrCreate (resourceID ).setUnderProcessing (true );
342+ private boolean isControllerUnderExecution (ResourceState state ) {
343+ return state .isUnderProcessing ();
355344 }
356345
357346 private void unsetUnderExecution (ResourceID resourceID ) {
@@ -374,8 +363,8 @@ public void start() throws OperatorException {
374363 }
375364
376365 private void handleAlreadyMarkedEvents () {
377- for (ResourceID resourceID : eventMarker . resourceIDsWithEventPresent ()) {
378- handleMarkedEventForResource (resourceID );
366+ for (var state : resourceStateManager . resourcesWithEventPresent ()) {
367+ handleMarkedEventForResource (state );
379368 }
380369 }
381370
@@ -411,6 +400,6 @@ public String toString() {
411400 }
412401
413402 public synchronized boolean isUnderProcessing (ResourceID resourceID ) {
414- return isControllerUnderExecution (resourceID );
403+ return isControllerUnderExecution (resourceStateManager . getOrCreate ( resourceID ) );
415404 }
416405}
0 commit comments