55import java .util .Map ;
66import java .util .Optional ;
77import java .util .concurrent .ExecutorService ;
8- import java .util .concurrent .ScheduledThreadPoolExecutor ;
98
109import org .slf4j .Logger ;
1110import org .slf4j .LoggerFactory ;
1211
1312import io .fabric8 .kubernetes .api .model .HasMetadata ;
1413import io .javaoperatorsdk .operator .OperatorException ;
15- import io .javaoperatorsdk .operator .api .config .ConfigurationService ;
1614import io .javaoperatorsdk .operator .api .config .ConfigurationServiceProvider ;
1715import io .javaoperatorsdk .operator .api .config .ControllerConfiguration ;
1816import io .javaoperatorsdk .operator .api .config .ExecutorServiceManager ;
@@ -40,20 +38,19 @@ public class EventProcessor<P extends HasMetadata> implements EventHandler, Life
4038 private final ControllerConfiguration <?> controllerConfiguration ;
4139 private final ReconciliationDispatcher <P > reconciliationDispatcher ;
4240 private final Retry retry ;
43- private final ExecutorService executor ;
4441 private final Metrics metrics ;
4542 private final Cache <P > cache ;
4643 private final EventSourceManager <P > eventSourceManager ;
4744 private final RateLimiter <? extends RateLimitState > rateLimiter ;
4845 private final ResourceStateManager resourceStateManager = new ResourceStateManager ();
4946 private final Map <String , Object > metricsMetadata ;
47+ private ExecutorService executor ;
5048
5149
5250 public EventProcessor (EventSourceManager <P > eventSourceManager ) {
5351 this (
5452 eventSourceManager .getController ().getConfiguration (),
5553 eventSourceManager .getControllerResourceEventSource (),
56- ExecutorServiceManager .instance ().executorService (),
5754 new ReconciliationDispatcher <>(eventSourceManager .getController ()),
5855 ConfigurationServiceProvider .instance ().getMetrics (),
5956 eventSourceManager );
@@ -68,7 +65,6 @@ public EventProcessor(EventSourceManager<P> eventSourceManager) {
6865 this (
6966 controllerConfiguration ,
7067 eventSourceManager .getControllerResourceEventSource (),
71- null ,
7268 reconciliationDispatcher ,
7369 metrics ,
7470 eventSourceManager );
@@ -78,17 +74,11 @@ public EventProcessor(EventSourceManager<P> eventSourceManager) {
7874 private EventProcessor (
7975 ControllerConfiguration controllerConfiguration ,
8076 Cache <P > cache ,
81- ExecutorService executor ,
8277 ReconciliationDispatcher <P > reconciliationDispatcher ,
8378 Metrics metrics ,
8479 EventSourceManager <P > eventSourceManager ) {
8580 this .controllerConfiguration = controllerConfiguration ;
8681 this .running = false ;
87- this .executor =
88- executor == null
89- ? new ScheduledThreadPoolExecutor (
90- ConfigurationService .DEFAULT_RECONCILIATION_THREADS_NUMBER )
91- : executor ;
9282 this .reconciliationDispatcher = reconciliationDispatcher ;
9383 this .retry = controllerConfiguration .getRetry ();
9484 this .cache = cache ;
@@ -376,6 +366,7 @@ public synchronized void stop() {
376366
377367 @ Override
378368 public void start () throws OperatorException {
369+ executor = ExecutorServiceManager .instance ().executorService ();
379370 this .running = true ;
380371 handleAlreadyMarkedEvents ();
381372 }
0 commit comments