From f120b21ef7bdfb49c27e12464849b5bd40c4be8b Mon Sep 17 00:00:00 2001 From: John Niang Date: Thu, 2 Feb 2023 16:06:21 +0800 Subject: [PATCH 1/2] Fix concurrent issue of controller queue --- .../run/halo/app/core/extension/Plugin.java | 5 +- .../run/halo/app/core/extension/Role.java | 1 + .../java/run/halo/app/extension/Metadata.java | 2 +- .../ReactiveExtensionClientImpl.java | 9 +- .../run/halo/app/extension/Unstructured.java | 2 + .../controller/ControllerBuilder.java | 2 +- .../controller/DefaultDelayQueue.java | 85 ----------- .../extension/controller/DefaultQueue.java | 141 ++++++++++++++++++ .../extension/controller/RequestQueue.java | 4 + .../halo/app/extension/gc/GcReconciler.java | 4 +- .../run/halo/app/infra/ConditionList.java | 21 ++- .../app/metrics/ReplyEventReconciler.java | 4 +- .../app/metrics/VisitedEventReconciler.java | 4 +- .../app/metrics/VotedEventReconciler.java | 4 +- .../plugin/PluginCreatedEventReconciler.java | 4 +- .../app/search/post/PostEventReconciler.java | 4 +- .../controller/DefaultDelayQueueTest.java | 23 ++- 17 files changed, 199 insertions(+), 120 deletions(-) delete mode 100644 src/main/java/run/halo/app/extension/controller/DefaultDelayQueue.java create mode 100644 src/main/java/run/halo/app/extension/controller/DefaultQueue.java diff --git a/src/main/java/run/halo/app/core/extension/Plugin.java b/src/main/java/run/halo/app/core/extension/Plugin.java index 692ada66a0..1348ee6079 100644 --- a/src/main/java/run/halo/app/core/extension/Plugin.java +++ b/src/main/java/run/halo/app/core/extension/Plugin.java @@ -10,8 +10,6 @@ import java.util.Map; import lombok.Data; import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; import lombok.ToString; import org.pf4j.PluginState; import org.springframework.lang.NonNull; @@ -93,8 +91,7 @@ public static class PluginSpec { private String configMapName; } - @Getter - @Setter + @Data public static class License { private String name; private String url; diff --git a/src/main/java/run/halo/app/core/extension/Role.java b/src/main/java/run/halo/app/core/extension/Role.java index 6b6ddd47ce..0cc0d31b1e 100644 --- a/src/main/java/run/halo/app/core/extension/Role.java +++ b/src/main/java/run/halo/app/core/extension/Role.java @@ -53,6 +53,7 @@ public class Role extends AbstractExtension { * @since 2.0.0 */ @Getter + @EqualsAndHashCode public static class PolicyRule implements Comparable { /** * APIGroups is the name of the APIGroup that contains the resources. diff --git a/src/main/java/run/halo/app/extension/Metadata.java b/src/main/java/run/halo/app/extension/Metadata.java index 874afa2704..c6418b5bac 100644 --- a/src/main/java/run/halo/app/extension/Metadata.java +++ b/src/main/java/run/halo/app/extension/Metadata.java @@ -12,7 +12,7 @@ * @author johnniang */ @Data -@EqualsAndHashCode +@EqualsAndHashCode(exclude = "version") public class Metadata implements MetadataOperator { /** diff --git a/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java b/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java index 741d024429..d48910371d 100644 --- a/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java +++ b/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java @@ -7,6 +7,7 @@ import java.time.Instant; import java.util.Comparator; import java.util.List; +import java.util.Objects; import java.util.function.Predicate; import org.springframework.dao.DataIntegrityViolationException; import org.springframework.data.util.Predicates; @@ -136,15 +137,19 @@ public Mono update(E extension) { mono = get(extension.getClass(), extension.getMetadata().getName()); } return mono - .map(old -> { + .flatMap(old -> { // reset some fields var oldMetadata = old.getMetadata(); var newMetadata = extension.getMetadata(); newMetadata.setCreationTimestamp(oldMetadata.getCreationTimestamp()); newMetadata.setDeletionTimestamp(oldMetadata.getDeletionTimestamp()); extension.setMetadata(newMetadata); - return converter.convertTo(extension); + if (Objects.equals(old, extension)) { + return Mono.empty(); + } + return Mono.just(extension); }) + .map(converter::convertTo) .flatMap(extensionStore -> client.update(extensionStore.getName(), extensionStore.getVersion(), extensionStore.getData())) diff --git a/src/main/java/run/halo/app/extension/Unstructured.java b/src/main/java/run/halo/app/extension/Unstructured.java index 32b5f97308..2d3a078210 100644 --- a/src/main/java/run/halo/app/extension/Unstructured.java +++ b/src/main/java/run/halo/app/extension/Unstructured.java @@ -22,6 +22,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import lombok.EqualsAndHashCode; /** * Unstructured is a generic Extension, which wraps ObjectNode to maintain the Extension data, like @@ -65,6 +66,7 @@ public MetadataOperator getMetadata() { return new UnstructuredMetadata(); } + @EqualsAndHashCode(exclude = "version") class UnstructuredMetadata implements MetadataOperator { @Override diff --git a/src/main/java/run/halo/app/extension/controller/ControllerBuilder.java b/src/main/java/run/halo/app/extension/controller/ControllerBuilder.java index 337fdc7bb6..52f5810cee 100644 --- a/src/main/java/run/halo/app/extension/controller/ControllerBuilder.java +++ b/src/main/java/run/halo/app/extension/controller/ControllerBuilder.java @@ -106,7 +106,7 @@ public Controller build() { Assert.notNull(extension, "Extension must not be null"); Assert.notNull(reconciler, "Reconciler must not be null"); - var queue = new DefaultDelayQueue(nowSupplier, minDelay); + var queue = new DefaultQueue(nowSupplier, minDelay); var predicates = new WatcherPredicates.Builder() .withGroupVersionKind(extension.groupVersionKind()) .onAddPredicate(onAddPredicate) diff --git a/src/main/java/run/halo/app/extension/controller/DefaultDelayQueue.java b/src/main/java/run/halo/app/extension/controller/DefaultDelayQueue.java deleted file mode 100644 index 8401573a72..0000000000 --- a/src/main/java/run/halo/app/extension/controller/DefaultDelayQueue.java +++ /dev/null @@ -1,85 +0,0 @@ -package run.halo.app.extension.controller; - -import java.time.Duration; -import java.time.Instant; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.DelayQueue; -import java.util.function.Supplier; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class DefaultDelayQueue - extends DelayQueue> implements RequestQueue { - - private final Supplier nowSupplier; - - private volatile boolean disposed = false; - - private final Duration minDelay; - - private final Set processing; - - public DefaultDelayQueue(Supplier nowSupplier) { - this(nowSupplier, Duration.ZERO); - } - - public DefaultDelayQueue(Supplier nowSupplier, Duration minDelay) { - this.nowSupplier = nowSupplier; - this.minDelay = minDelay; - this.processing = new HashSet<>(); - } - - @Override - public boolean addImmediately(R request) { - log.debug("Adding request {} immediately", request); - var delayedEntry = new DelayedEntry<>(request, minDelay, nowSupplier); - return offer(delayedEntry); - } - - @Override - public boolean add(DelayedEntry entry) { - if (entry.getRetryAfter().compareTo(minDelay) < 0) { - log.warn("Request {} will be retried after {} ms, but minimum delay is {} ms", - entry.getEntry(), entry.getRetryAfter().toMillis(), minDelay.toMillis()); - entry = new DelayedEntry<>(entry.getEntry(), minDelay, nowSupplier); - } - return super.add(entry); - } - - @Override - public DelayedEntry take() throws InterruptedException { - var entry = super.take(); - processing.add(entry.getEntry()); - return entry; - } - - @Override - public void done(R request) { - processing.remove(request); - } - - @Override - public boolean offer(DelayedEntry entry) { - if (this.isDisposed() || processing.contains(entry.getEntry())) { - return false; - } - // remove the existing entry before adding the new one - // to refresh the delay. - this.remove(entry); - return super.offer(entry); - } - - @Override - public void dispose() { - this.disposed = true; - this.clear(); - this.processing.clear(); - } - - @Override - public boolean isDisposed() { - return this.disposed; - } - -} diff --git a/src/main/java/run/halo/app/extension/controller/DefaultQueue.java b/src/main/java/run/halo/app/extension/controller/DefaultQueue.java new file mode 100644 index 0000000000..22f38a4463 --- /dev/null +++ b/src/main/java/run/halo/app/extension/controller/DefaultQueue.java @@ -0,0 +1,141 @@ +package run.halo.app.extension.controller; + +import java.time.Duration; +import java.time.Instant; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class DefaultQueue implements RequestQueue { + + private final Lock lock; + + private final DelayQueue> queue; + + private final Supplier nowSupplier; + + private volatile boolean disposed = false; + + private final Duration minDelay; + + private final Set processing; + + private final Set dirty; + + public DefaultQueue(Supplier nowSupplier) { + this(nowSupplier, Duration.ZERO); + } + + public DefaultQueue(Supplier nowSupplier, Duration minDelay) { + this.lock = new ReentrantLock(); + this.nowSupplier = nowSupplier; + this.minDelay = minDelay; + this.processing = new HashSet<>(); + this.dirty = new HashSet<>(); + this.queue = new DelayQueue<>(); + } + + @Override + public boolean addImmediately(R request) { + log.debug("Adding request {} immediately", request); + var delayedEntry = new DelayedEntry<>(request, minDelay, nowSupplier); + return add(delayedEntry); + } + + @Override + public boolean add(DelayedEntry entry) { + lock.lock(); + try { + if (isDisposed()) { + return false; + } + log.debug("Adding request {} after {}", entry.getEntry(), entry.getRetryAfter()); + if (entry.getRetryAfter().compareTo(minDelay) < 0) { + log.warn("Request {} will be retried after {} ms, but minimum delay is {} ms", + entry.getEntry(), entry.getRetryAfter().toMillis(), minDelay.toMillis()); + entry = new DelayedEntry<>(entry.getEntry(), minDelay, nowSupplier); + } + if (dirty.contains(entry.getEntry())) { + return false; + } + dirty.add(entry.getEntry()); + if (processing.contains(entry.getEntry())) { + return false; + } + + boolean added = queue.add(entry); + log.debug("Added request {} after {}", entry.getEntry(), entry.getRetryAfter()); + return added; + } finally { + lock.unlock(); + } + } + + @Override + public DelayedEntry take() throws InterruptedException { + var entry = queue.take(); + log.debug("Take request {} at {}", entry.getEntry(), Instant.now()); + lock.lockInterruptibly(); + try { + if (isDisposed()) { + throw new InterruptedException( + "Queue has been disposed. Cannot take any elements now"); + } + processing.add(entry.getEntry()); + dirty.remove(entry.getEntry()); + return entry; + } finally { + lock.unlock(); + } + } + + @Override + public void done(R request) { + lock.lock(); + try { + if (isDisposed()) { + return; + } + processing.remove(request); + if (dirty.contains(request)) { + queue.add(new DelayedEntry<>(request, minDelay, nowSupplier)); + } + } finally { + lock.unlock(); + } + } + + @Override + public long size() { + return queue.size(); + } + + @Override + public DelayedEntry peek() { + return queue.peek(); + } + + @Override + public void dispose() { + lock.lock(); + try { + disposed = true; + queue.clear(); + processing.clear(); + dirty.clear(); + } finally { + lock.unlock(); + } + } + + @Override + public boolean isDisposed() { + return this.disposed; + } + +} diff --git a/src/main/java/run/halo/app/extension/controller/RequestQueue.java b/src/main/java/run/halo/app/extension/controller/RequestQueue.java index 0b6b2ef301..564c1c5eac 100644 --- a/src/main/java/run/halo/app/extension/controller/RequestQueue.java +++ b/src/main/java/run/halo/app/extension/controller/RequestQueue.java @@ -18,6 +18,10 @@ public interface RequestQueue extends Disposable { void done(E request); + long size(); + + DelayedEntry peek(); + class DelayedEntry implements Delayed { private final E entry; diff --git a/src/main/java/run/halo/app/extension/gc/GcReconciler.java b/src/main/java/run/halo/app/extension/gc/GcReconciler.java index 44a759cd8a..47531f4949 100644 --- a/src/main/java/run/halo/app/extension/gc/GcReconciler.java +++ b/src/main/java/run/halo/app/extension/gc/GcReconciler.java @@ -13,7 +13,7 @@ import run.halo.app.extension.controller.Controller; import run.halo.app.extension.controller.ControllerBuilder; import run.halo.app.extension.controller.DefaultController; -import run.halo.app.extension.controller.DefaultDelayQueue; +import run.halo.app.extension.controller.DefaultQueue; import run.halo.app.extension.controller.Reconciler; import run.halo.app.extension.store.ExtensionStoreClient; @@ -55,7 +55,7 @@ public Result reconcile(GcRequest request) { @Override public Controller setupWith(ControllerBuilder builder) { - var queue = new DefaultDelayQueue(Instant::now, Duration.ofMillis(500)); + var queue = new DefaultQueue(Instant::now, Duration.ofMillis(500)); var synchronizer = new GcSynchronizer(client, queue, schemeManager); return new DefaultController<>( "garbage-collector-controller", diff --git a/src/main/java/run/halo/app/infra/ConditionList.java b/src/main/java/run/halo/app/infra/ConditionList.java index 60a5771918..6bd31ab2f7 100644 --- a/src/main/java/run/halo/app/infra/ConditionList.java +++ b/src/main/java/run/halo/app/infra/ConditionList.java @@ -1,9 +1,9 @@ package run.halo.app.infra; import java.util.AbstractCollection; -import java.util.ArrayDeque; import java.util.Deque; import java.util.Iterator; +import java.util.LinkedList; import java.util.Objects; import java.util.function.Consumer; import org.springframework.lang.NonNull; @@ -20,7 +20,7 @@ */ public class ConditionList extends AbstractCollection { private static final int EVICT_THRESHOLD = 20; - private final Deque conditions = new ArrayDeque<>(); + private final Deque conditions = new LinkedList<>(); @Override public boolean add(@NonNull Condition condition) { @@ -113,4 +113,21 @@ public Iterator iterator() { public void forEach(Consumer action) { conditions.forEach(action); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ConditionList that = (ConditionList) o; + return Objects.equals(conditions, that.conditions); + } + + @Override + public int hashCode() { + return Objects.hash(conditions); + } } diff --git a/src/main/java/run/halo/app/metrics/ReplyEventReconciler.java b/src/main/java/run/halo/app/metrics/ReplyEventReconciler.java index 96bed710bb..6d1074f5a7 100644 --- a/src/main/java/run/halo/app/metrics/ReplyEventReconciler.java +++ b/src/main/java/run/halo/app/metrics/ReplyEventReconciler.java @@ -16,7 +16,7 @@ import run.halo.app.extension.controller.Controller; import run.halo.app.extension.controller.ControllerBuilder; import run.halo.app.extension.controller.DefaultController; -import run.halo.app.extension.controller.DefaultDelayQueue; +import run.halo.app.extension.controller.DefaultQueue; import run.halo.app.extension.controller.Reconciler; import run.halo.app.extension.controller.RequestQueue; @@ -37,7 +37,7 @@ public class ReplyEventReconciler implements Reconciler, SmartLifecy public ReplyEventReconciler(ExtensionClient client) { this.client = client; - replyEventQueue = new DefaultDelayQueue<>(Instant::now); + replyEventQueue = new DefaultQueue<>(Instant::now); replyEventController = this.setupWith(null); } diff --git a/src/main/java/run/halo/app/metrics/VisitedEventReconciler.java b/src/main/java/run/halo/app/metrics/VisitedEventReconciler.java index 735052e7a5..3e11bf7b0d 100644 --- a/src/main/java/run/halo/app/metrics/VisitedEventReconciler.java +++ b/src/main/java/run/halo/app/metrics/VisitedEventReconciler.java @@ -17,7 +17,7 @@ import run.halo.app.extension.controller.Controller; import run.halo.app.extension.controller.ControllerBuilder; import run.halo.app.extension.controller.DefaultController; -import run.halo.app.extension.controller.DefaultDelayQueue; +import run.halo.app.extension.controller.DefaultQueue; import run.halo.app.extension.controller.Reconciler; import run.halo.app.extension.controller.RequestQueue; @@ -41,7 +41,7 @@ public class VisitedEventReconciler public VisitedEventReconciler(ExtensionClient client) { this.client = client; - visitedEventQueue = new DefaultDelayQueue<>(Instant::now); + visitedEventQueue = new DefaultQueue<>(Instant::now); visitedEventController = this.setupWith(null); } diff --git a/src/main/java/run/halo/app/metrics/VotedEventReconciler.java b/src/main/java/run/halo/app/metrics/VotedEventReconciler.java index aade9ce33b..244b3bc11c 100644 --- a/src/main/java/run/halo/app/metrics/VotedEventReconciler.java +++ b/src/main/java/run/halo/app/metrics/VotedEventReconciler.java @@ -15,7 +15,7 @@ import run.halo.app.extension.controller.Controller; import run.halo.app.extension.controller.ControllerBuilder; import run.halo.app.extension.controller.DefaultController; -import run.halo.app.extension.controller.DefaultDelayQueue; +import run.halo.app.extension.controller.DefaultQueue; import run.halo.app.extension.controller.Reconciler; import run.halo.app.extension.controller.RequestQueue; @@ -36,7 +36,7 @@ public class VotedEventReconciler implements Reconciler, SmartLifecy public VotedEventReconciler(ExtensionClient client) { this.client = client; - votedEventQueue = new DefaultDelayQueue<>(Instant::now); + votedEventQueue = new DefaultQueue<>(Instant::now); votedEventController = this.setupWith(null); } diff --git a/src/main/java/run/halo/app/plugin/PluginCreatedEventReconciler.java b/src/main/java/run/halo/app/plugin/PluginCreatedEventReconciler.java index 112bcfac22..711938386f 100644 --- a/src/main/java/run/halo/app/plugin/PluginCreatedEventReconciler.java +++ b/src/main/java/run/halo/app/plugin/PluginCreatedEventReconciler.java @@ -15,7 +15,7 @@ import run.halo.app.extension.controller.Controller; import run.halo.app.extension.controller.ControllerBuilder; import run.halo.app.extension.controller.DefaultController; -import run.halo.app.extension.controller.DefaultDelayQueue; +import run.halo.app.extension.controller.DefaultQueue; import run.halo.app.extension.controller.Reconciler; import run.halo.app.extension.controller.RequestQueue; import run.halo.app.plugin.event.PluginCreatedEvent; @@ -42,7 +42,7 @@ public class PluginCreatedEventReconciler public PluginCreatedEventReconciler(ReactiveExtensionClient client) { this.client = client; - pluginEventQueue = new DefaultDelayQueue<>(Instant::now); + pluginEventQueue = new DefaultQueue<>(Instant::now); pluginEventController = this.setupWith(null); } diff --git a/src/main/java/run/halo/app/search/post/PostEventReconciler.java b/src/main/java/run/halo/app/search/post/PostEventReconciler.java index b7c0993b8c..793c637f77 100644 --- a/src/main/java/run/halo/app/search/post/PostEventReconciler.java +++ b/src/main/java/run/halo/app/search/post/PostEventReconciler.java @@ -17,7 +17,7 @@ import run.halo.app.extension.controller.Controller; import run.halo.app.extension.controller.ControllerBuilder; import run.halo.app.extension.controller.DefaultController; -import run.halo.app.extension.controller.DefaultDelayQueue; +import run.halo.app.extension.controller.DefaultQueue; import run.halo.app.extension.controller.Reconciler; import run.halo.app.extension.controller.RequestQueue; import run.halo.app.plugin.extensionpoint.ExtensionGetter; @@ -43,7 +43,7 @@ public PostEventReconciler(ExtensionGetter extensionGetter, this.extensionGetter = extensionGetter; this.postFinder = postFinder; - postEventQueue = new DefaultDelayQueue<>(Instant::now); + postEventQueue = new DefaultQueue<>(Instant::now); postEventController = this.setupWith(null); } diff --git a/src/test/java/run/halo/app/extension/controller/DefaultDelayQueueTest.java b/src/test/java/run/halo/app/extension/controller/DefaultDelayQueueTest.java index 8ddb12efb6..4ac93aa8b4 100644 --- a/src/test/java/run/halo/app/extension/controller/DefaultDelayQueueTest.java +++ b/src/test/java/run/halo/app/extension/controller/DefaultDelayQueueTest.java @@ -4,9 +4,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; import java.time.Duration; import java.time.Instant; @@ -20,13 +17,13 @@ class DefaultDelayQueueTest { Instant now = Instant.now(); - DefaultDelayQueue queue; + DefaultQueue queue; final Duration minDelay = Duration.ofMillis(1); @BeforeEach void setUp() { - queue = new DefaultDelayQueue<>(() -> now, minDelay); + queue = new DefaultQueue<>(() -> now, minDelay); } @Test @@ -82,23 +79,23 @@ void shouldNotAddAfterDisposing() { @Test void shouldNotAddRepeatedlyIfNotDone() throws InterruptedException { - var entrySpy = spy(new DelayedEntry<>(newRequest("fake-name"), minDelay, () -> now)); + queue = new DefaultQueue<>(() -> now, Duration.ZERO); + var fakeEntry = new DelayedEntry<>(newRequest("fake-name"), Duration.ZERO, + () -> this.now); - doReturn(0L).when(entrySpy).getDelay(any()); - - queue.add(entrySpy); + queue.add(fakeEntry); assertEquals(1, queue.size()); - assertEquals(entrySpy, queue.peek()); + assertEquals(fakeEntry, queue.peek()); queue.take(); assertEquals(0, queue.size()); - queue.add(entrySpy); + queue.add(fakeEntry); assertEquals(0, queue.size()); queue.done(newRequest("fake-name")); - queue.add(entrySpy); + queue.add(fakeEntry); assertEquals(1, queue.size()); - assertEquals(entrySpy, queue.peek()); + assertEquals(fakeEntry, queue.peek()); } Request newRequest(String name) { From 109f4aa882010c93513c9ce93ff2668ad417a5d5 Mon Sep 17 00:00:00 2001 From: John Niang Date: Thu, 2 Feb 2023 17:25:34 +0800 Subject: [PATCH 2/2] Fix test issue --- .../ReactiveExtensionClientImpl.java | 3 +- .../ReactiveExtensionClientTest.java | 65 ++++++++++++++++--- 2 files changed, 59 insertions(+), 9 deletions(-) diff --git a/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java b/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java index d48910371d..c07abd8808 100644 --- a/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java +++ b/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java @@ -154,7 +154,8 @@ public Mono update(E extension) { extensionStore.getVersion(), extensionStore.getData())) .map(updated -> converter.convertFrom((Class) extension.getClass(), updated)) - .doOnNext(updated -> watchers.onUpdate(extension, updated)); + .doOnNext(updated -> watchers.onUpdate(extension, updated)) + .switchIfEmpty(Mono.defer(() -> Mono.just(extension))); } @Override diff --git a/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java b/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java index edc2786e90..abf01c8d18 100644 --- a/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java +++ b/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import java.util.List; +import java.util.Map; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; @@ -395,6 +396,7 @@ void shouldCreateUsingUnstructuredSuccessfully() throws JsonProcessingException @Test void shouldUpdateSuccessfully() { var fake = createFakeExtension("fake", 2L); + fake.getMetadata().setLabels(Map.of("new", "true")); var storeName = "/registry/fake.halo.run/fakes/fake"; when(converter.convertTo(any())).thenReturn( createExtensionStore(storeName, 2L)); @@ -402,17 +404,47 @@ void shouldUpdateSuccessfully() { Mono.just(createExtensionStore(storeName, 2L))); when(storeClient.fetchByName(storeName)).thenReturn( Mono.just(createExtensionStore(storeName, 1L))); - when(converter.convertFrom(same(FakeExtension.class), any())).thenReturn(fake); + + var oldFake = createFakeExtension("fake", 2L); + oldFake.getMetadata().setLabels(Map.of("old", "true")); + + var updatedFake = createFakeExtension("fake", 3L); + updatedFake.getMetadata().setLabels(Map.of("updated", "true")); + when(converter.convertFrom(same(FakeExtension.class), any())) + .thenReturn(oldFake) + .thenReturn(updatedFake); StepVerifier.create(client.update(fake)) - .expectNext(fake) + .expectNext(updatedFake) .verifyComplete(); - verify(converter, times(1)).convertTo(eq(fake)); - verify(storeClient, times(1)) + verify(storeClient).fetchByName(storeName); + verify(converter).convertTo(eq(fake)); + verify(converter, times(2)).convertFrom(same(FakeExtension.class), any()); + verify(storeClient) .update(eq("/registry/fake.halo.run/fakes/fake"), eq(2L), any()); } + @Test + void shouldNotUpdateIfExtensionNotChange() { + var fake = createFakeExtension("fake", 2L); + var storeName = "/registry/fake.halo.run/fakes/fake"; + when(storeClient.fetchByName(storeName)).thenReturn( + Mono.just(createExtensionStore(storeName, 1L))); + + var oldFake = createFakeExtension("fake", 2L); + when(converter.convertFrom(same(FakeExtension.class), any())).thenReturn(oldFake); + + StepVerifier.create(client.update(fake)) + .expectNext(fake) + .verifyComplete(); + + verify(storeClient).fetchByName(storeName); + verify(converter).convertFrom(same(FakeExtension.class), any()); + verify(converter, never()).convertTo(any()); + verify(storeClient, never()).update(any(), any(), any()); + } + @Test void shouldUpdateUnstructuredSuccessfully() throws JsonProcessingException { var fake = createUnstructured(); @@ -423,14 +455,24 @@ void shouldUpdateUnstructuredSuccessfully() throws JsonProcessingException { .thenReturn(Mono.just(createExtensionStore(name, 12345L))); when(storeClient.fetchByName(name)) .thenReturn(Mono.just(createExtensionStore(name, 12346L))); - when(converter.convertFrom(same(Unstructured.class), any())).thenReturn(fake); + + var oldFake = createUnstructured(); + oldFake.getMetadata().setLabels(Map.of("old", "true")); + + var updatedFake = createUnstructured(); + updatedFake.getMetadata().setLabels(Map.of("updated", "true")); + when(converter.convertFrom(same(Unstructured.class), any())) + .thenReturn(oldFake) + .thenReturn(updatedFake); StepVerifier.create(client.update(fake)) - .expectNext(fake) + .expectNext(updatedFake) .verifyComplete(); - verify(converter, times(1)).convertTo(eq(fake)); - verify(storeClient, times(1)) + verify(storeClient).fetchByName(name); + verify(converter).convertTo(eq(fake)); + verify(converter, times(2)).convertFrom(same(Unstructured.class), any()); + verify(storeClient) .update(eq("/registry/fake.halo.run/fakes/fake"), eq(12345L), any()); } @@ -480,6 +522,13 @@ void shouldWatchOnUpdateSuccessfully() { verify(watcher, times(1)).onUpdate(any(), any()); } + @Test + void shouldNotWatchOnUpdateIfExtensionNotChange() { + shouldNotUpdateIfExtensionNotChange(); + + verify(watcher, never()).onUpdate(any(), any()); + } + @Test void shouldWatchOnDeleteSuccessfully() { doNothing().when(watcher).onDelete(any());