77import java .util .Set ;
88import java .util .concurrent .ExecutorService ;
99import java .util .concurrent .ScheduledThreadPoolExecutor ;
10- import java .util .concurrent .locks .ReentrantLock ;
1110
1211import org .slf4j .Logger ;
1312import org .slf4j .LoggerFactory ;
@@ -40,7 +39,6 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
4039 private final Map <ResourceID , RetryExecution > retryState = new HashMap <>();
4140 private final ExecutorService executor ;
4241 private final String controllerName ;
43- private final ReentrantLock lock = new ReentrantLock ();
4442 private final Metrics metrics ;
4543 private volatile boolean running ;
4644 private final Cache <R > cache ;
@@ -97,8 +95,7 @@ private EventProcessor(
9795 }
9896
9997 @ Override
100- public void handleEvent (Event event ) {
101- lock .lock ();
98+ public synchronized void handleEvent (Event event ) {
10299 try {
103100 log .debug ("Received event: {}" , event );
104101
@@ -113,7 +110,6 @@ public void handleEvent(Event event) {
113110 }
114111 handleMarkedEventForResource (resourceID );
115112 } finally {
116- lock .unlock ();
117113 MDCUtils .removeResourceIDInfo ();
118114 }
119115 }
@@ -201,57 +197,53 @@ private RetryInfo retryInfo(ResourceID resourceID) {
201197 return retryState .get (resourceID );
202198 }
203199
204- void eventProcessingFinished (
200+ synchronized void eventProcessingFinished (
205201 ExecutionScope <R > executionScope , PostExecutionControl <R > postExecutionControl ) {
206- lock .lock ();
207- try {
208- if (!running ) {
209- return ;
210- }
211- ResourceID resourceID = executionScope .getResourceID ();
212- log .debug (
213- "Event processing finished. Scope: {}, PostExecutionControl: {}" ,
214- executionScope ,
215- postExecutionControl );
216- unsetUnderExecution (resourceID );
217-
218- // If a delete event present at this phase, it was received during reconciliation.
219- // So we either removed the finalizer during reconciliation or we don't use finalizers.
220- // Either way we don't want to retry.
221- if (isRetryConfigured ()
222- && postExecutionControl .exceptionDuringExecution ()
223- && !eventMarker .deleteEventPresent (resourceID )) {
224- handleRetryOnException (
225- executionScope , postExecutionControl .getRuntimeException ().orElseThrow ());
226- return ;
227- }
228- cleanupOnSuccessfulExecution (executionScope );
229- metrics .finishedReconciliation (resourceID );
230- if (eventMarker .deleteEventPresent (resourceID )) {
231- cleanupForDeletedEvent (executionScope .getResourceID ());
232- } else if (postExecutionControl .isFinalizerRemoved ()) {
233- eventMarker .markProcessedMarkForDeletion (resourceID );
202+ if (!running ) {
203+ return ;
204+ }
205+ ResourceID resourceID = executionScope .getResourceID ();
206+ log .debug (
207+ "Event processing finished. Scope: {}, PostExecutionControl: {}" ,
208+ executionScope ,
209+ postExecutionControl );
210+ unsetUnderExecution (resourceID );
211+
212+ // If a delete event present at this phase, it was received during reconciliation.
213+ // So we either removed the finalizer during reconciliation or we don't use finalizers.
214+ // Either way we don't want to retry.
215+ if (isRetryConfigured ()
216+ && postExecutionControl .exceptionDuringExecution ()
217+ && !eventMarker .deleteEventPresent (resourceID )) {
218+ handleRetryOnException (
219+ executionScope , postExecutionControl .getRuntimeException ().orElseThrow ());
220+ return ;
221+ }
222+ cleanupOnSuccessfulExecution (executionScope );
223+ metrics .finishedReconciliation (resourceID );
224+ if (eventMarker .deleteEventPresent (resourceID )) {
225+ cleanupForDeletedEvent (executionScope .getResourceID ());
226+ } else if (postExecutionControl .isFinalizerRemoved ()) {
227+ eventMarker .markProcessedMarkForDeletion (resourceID );
228+ } else {
229+ postExecutionControl
230+ .getUpdatedCustomResource ()
231+ .ifPresent (
232+ r -> {
233+ if (!postExecutionControl .updateIsStatusPatch ()) {
234+ eventSourceManager
235+ .getControllerResourceEventSource ()
236+ .handleRecentResourceUpdate (
237+ ResourceID .fromResource (r ), r , executionScope .getResource ());
238+ }
239+ });
240+ if (eventMarker .eventPresent (resourceID )) {
241+ submitReconciliationExecution (resourceID );
234242 } else {
235- postExecutionControl
236- .getUpdatedCustomResource ()
237- .ifPresent (
238- r -> {
239- if (!postExecutionControl .updateIsStatusPatch ()) {
240- eventSourceManager
241- .getControllerResourceEventSource ()
242- .handleRecentResourceUpdate (
243- ResourceID .fromResource (r ), r , executionScope .getResource ());
244- }
245- });
246- if (eventMarker .eventPresent (resourceID )) {
247- submitReconciliationExecution (resourceID );
248- } else {
249- reScheduleExecutionIfInstructed (postExecutionControl , executionScope .getResource ());
250- }
243+ reScheduleExecutionIfInstructed (postExecutionControl , executionScope .getResource ());
251244 }
252- } finally {
253- lock .unlock ();
254245 }
246+
255247 }
256248
257249 private void reScheduleExecutionIfInstructed (
@@ -343,24 +335,14 @@ private boolean isRetryConfigured() {
343335 }
344336
345337 @ Override
346- public void stop () {
347- lock .lock ();
348- try {
349- this .running = false ;
350- } finally {
351- lock .unlock ();
352- }
338+ public synchronized void stop () {
339+ this .running = false ;
353340 }
354341
355342 @ Override
356343 public void start () throws OperatorException {
357- lock .lock ();
358- try {
359- this .running = true ;
360- handleAlreadyMarkedEvents ();
361- } finally {
362- lock .unlock ();
363- }
344+ this .running = true ;
345+ handleAlreadyMarkedEvents ();
364346 }
365347
366348 private void handleAlreadyMarkedEvents () {
0 commit comments