-
Notifications
You must be signed in to change notification settings - Fork 236
feat: add a DependentResource implementation that's also an EventSource #1094
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
af466f9
748f865
958a471
7e8e06d
fcace92
af774d6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| package io.javaoperatorsdk.operator.processing; | ||
|
|
||
| import java.util.Optional; | ||
|
|
||
| import io.fabric8.kubernetes.api.model.HasMetadata; | ||
|
|
||
| public interface ResourceOwner<R, P extends HasMetadata> { | ||
|
|
||
| /** | ||
| * Retrieves the resource type associated with this ResourceOwner | ||
| * | ||
| * @return the resource type associated with this ResourceOwner | ||
| */ | ||
| Class<R> resourceType(); | ||
|
|
||
| /** | ||
| * Retrieves the resource associated with the specified primary one, returning the actual state of | ||
| * the resource. Typically, this state might come from a local cache, updated after | ||
| * reconciliation. | ||
| * | ||
| * @param primary the primary resource for which we want to retrieve the secondary resource | ||
| * @return an {@link Optional} containing the secondary resource or {@link Optional#empty()} if it | ||
| * doesn't exist | ||
| */ | ||
| Optional<R> getAssociatedResource(P primary); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,9 +7,6 @@ | |
| import io.javaoperatorsdk.operator.api.reconciler.Context; | ||
| import io.javaoperatorsdk.operator.api.reconciler.dependent.Deleter; | ||
| import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; | ||
| import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceProvider; | ||
| import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller; | ||
| import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationEventFilter; | ||
| import io.javaoperatorsdk.operator.api.reconciler.dependent.ReconcileResult; | ||
| import io.javaoperatorsdk.operator.processing.event.ResourceID; | ||
|
|
||
|
|
@@ -31,14 +28,12 @@ public AbstractDependentResource() { | |
|
|
||
| @Override | ||
| public ReconcileResult<R> reconcile(P primary, Context<P> context) { | ||
| var maybeActual = getResource(primary); | ||
| var maybeActual = getAssociatedResource(primary); | ||
| if (creatable || updatable) { | ||
| if (maybeActual.isEmpty()) { | ||
| if (creatable) { | ||
| var desired = desired(primary, context); | ||
| log.info("Creating {} for primary {}", desired.getClass().getSimpleName(), | ||
| ResourceID.fromResource(primary)); | ||
| log.debug("Creating dependent {} for primary {}", desired, primary); | ||
| logForOperation("Creating", primary, desired); | ||
| var createdResource = handleCreate(desired, primary, context); | ||
| return ReconcileResult.resourceCreated(createdResource); | ||
| } | ||
|
|
@@ -48,9 +43,7 @@ public ReconcileResult<R> reconcile(P primary, Context<P> context) { | |
| final var match = updater.match(actual, primary, context); | ||
| if (!match.matched()) { | ||
| final var desired = match.computedDesired().orElse(desired(primary, context)); | ||
| log.info("Updating {} for primary {}", desired.getClass().getSimpleName(), | ||
| ResourceID.fromResource(primary)); | ||
| log.debug("Updating dependent {} for primary {}", desired, primary); | ||
| logForOperation("Updating", primary, desired); | ||
| var updatedResource = handleUpdate(actual, desired, primary, context); | ||
| return ReconcileResult.resourceUpdated(updatedResource); | ||
| } | ||
|
|
@@ -66,91 +59,47 @@ public ReconcileResult<R> reconcile(P primary, Context<P> context) { | |
| return ReconcileResult.noOperation(maybeActual.orElse(null)); | ||
| } | ||
|
|
||
| protected R handleCreate(R desired, P primary, Context<P> context) { | ||
| ResourceID resourceID = ResourceID.fromResource(primary); | ||
| R created = null; | ||
| try { | ||
| prepareEventFiltering(desired, resourceID); | ||
| created = creator.create(desired, primary, context); | ||
| cacheAfterCreate(resourceID, created); | ||
| return created; | ||
| } catch (RuntimeException e) { | ||
| cleanupAfterEventFiltering(desired, resourceID, created); | ||
| throw e; | ||
| } | ||
| } | ||
|
|
||
| private void cleanupAfterEventFiltering(R desired, ResourceID resourceID, R created) { | ||
| if (isFilteringEventSource()) { | ||
| eventSourceAsRecentOperationEventFilter() | ||
| .cleanupOnCreateOrUpdateEventFiltering(resourceID, created); | ||
| } | ||
| } | ||
|
|
||
| private void cacheAfterCreate(ResourceID resourceID, R created) { | ||
| if (isRecentOperationCacheFiller()) { | ||
| eventSourceAsRecentOperationCacheFiller().handleRecentResourceCreate(resourceID, created); | ||
| } | ||
| private void logForOperation(String operation, P primary, R desired) { | ||
| final var desiredDesc = desired instanceof HasMetadata | ||
| ? "'" + ((HasMetadata) desired).getMetadata().getName() + "' " | ||
| + ((HasMetadata) desired).getKind() | ||
| : desired.getClass().getSimpleName(); | ||
| log.info("{} {} for primary {}", operation, desiredDesc, ResourceID.fromResource(primary)); | ||
| log.debug("{} dependent {} for primary {}", operation, desired, primary); | ||
| } | ||
|
|
||
| private void cacheAfterUpdate(R actual, ResourceID resourceID, R updated) { | ||
| if (isRecentOperationCacheFiller()) { | ||
| eventSourceAsRecentOperationCacheFiller().handleRecentResourceUpdate(resourceID, updated, | ||
| actual); | ||
| } | ||
| protected R handleCreate(R desired, P primary, Context<P> context) { | ||
| ResourceID resourceID = ResourceID.fromResource(primary); | ||
| R created = creator.create(desired, primary, context); | ||
| processPostCreate(resourceID, created); | ||
| return created; | ||
| } | ||
|
|
||
| private void prepareEventFiltering(R desired, ResourceID resourceID) { | ||
| if (isFilteringEventSource()) { | ||
| eventSourceAsRecentOperationEventFilter().prepareForCreateOrUpdateEventFiltering(resourceID, | ||
| desired); | ||
| } | ||
| } | ||
| /** | ||
| * Allows sub-classes to perform additional processing (e.g. caching) on the created resource if | ||
| * needed. | ||
| * | ||
| * @param primaryResourceId the {@link ResourceID} of the primary resource associated with the | ||
| * newly created resource | ||
| * @param created the newly created resource | ||
| */ | ||
| protected abstract void processPostCreate(ResourceID primaryResourceId, R created); | ||
|
||
|
|
||
| /** | ||
| * Allows sub-classes to perform additional processing on the updated resource if needed. | ||
| * | ||
| * @param primaryResourceId the {@link ResourceID} of the primary resource associated with the | ||
| * newly updated resource | ||
| * @param updated the updated resource | ||
| * @param actual the resource as it was before the update | ||
| */ | ||
| protected abstract void processPostUpdate(ResourceID primaryResourceId, R updated, R actual); | ||
csviri marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| protected R handleUpdate(R actual, R desired, P primary, Context<P> context) { | ||
| ResourceID resourceID = ResourceID.fromResource(primary); | ||
| R updated = null; | ||
| try { | ||
| prepareEventFiltering(desired, resourceID); | ||
| updated = updater.update(actual, desired, primary, context); | ||
| cacheAfterUpdate(actual, resourceID, updated); | ||
| return updated; | ||
| } catch (RuntimeException e) { | ||
| cleanupAfterEventFiltering(desired, resourceID, updated); | ||
| throw e; | ||
| } | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| private RecentOperationEventFilter<R> eventSourceAsRecentOperationEventFilter() { | ||
| return (RecentOperationEventFilter<R>) ((EventSourceProvider<P>) this).getEventSource(); | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| private RecentOperationCacheFiller<R> eventSourceAsRecentOperationCacheFiller() { | ||
| return (RecentOperationCacheFiller<R>) ((EventSourceProvider<P>) this).getEventSource(); | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| // this cannot be done in constructor since event source might be initialized later | ||
| protected boolean isFilteringEventSource() { | ||
| if (this instanceof EventSourceProvider) { | ||
| final var eventSource = ((EventSourceProvider<P>) this).getEventSource(); | ||
| return eventSource instanceof RecentOperationEventFilter; | ||
| } else { | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| // this cannot be done in constructor since event source might be initialized later | ||
| protected boolean isRecentOperationCacheFiller() { | ||
| if (this instanceof EventSourceProvider) { | ||
| final var eventSource = ((EventSourceProvider<P>) this).getEventSource(); | ||
| return eventSource instanceof RecentOperationCacheFiller; | ||
| } else { | ||
| return false; | ||
| } | ||
| R updated = updater.update(actual, desired, primary, context); | ||
| processPostUpdate(resourceID, updated, actual); | ||
| return updated; | ||
| } | ||
|
|
||
| protected R desired(P primary, Context<P> context) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.