Skip to content

Commit

Permalink
refactor: client writable check for extension
Browse files Browse the repository at this point in the history
  • Loading branch information
guqing committed Jan 19, 2024
1 parent be59a7d commit e38778a
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import lombok.NonNull;
Expand Down Expand Up @@ -55,7 +57,8 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient {

private final IndexedQueryEngine indexedQueryEngine;

private final AtomicBoolean ready = new AtomicBoolean(false);
private final ConcurrentMap<GroupKind, AtomicBoolean> indexBuildingState =
new ConcurrentHashMap<>();

@Override
public <E extends Extension> Flux<E> list(Class<E> type, Predicate<E> predicate,
Expand Down Expand Up @@ -150,7 +153,7 @@ private Mono<Unstructured> get(GroupVersionKind gvk, String name) {
@Override
@Transactional
public <E extends Extension> Mono<E> create(E extension) {
checkClientReady();
checkClientWritable(extension);
return Mono.just(extension)
.doOnNext(ext -> {
var metadata = extension.getMetadata();
Expand Down Expand Up @@ -184,7 +187,7 @@ && hasText(extension.getMetadata().getGenerateName()))
@Override
@Transactional
public <E extends Extension> Mono<E> update(E extension) {
checkClientReady();
checkClientWritable(extension);
// Refactor the atomic reference if we have a better solution.
return getLatest(extension).flatMap(old -> {
var oldJsonExt = new JsonExtension(objectMapper, old);
Expand Down Expand Up @@ -222,7 +225,7 @@ private Mono<? extends Extension> getLatest(Extension extension) {
@Override
@Transactional
public <E extends Extension> Mono<E> delete(E extension) {
checkClientReady();
checkClientWritable(extension);
// set deletionTimestamp
extension.getMetadata().setDeletionTimestamp(Instant.now());
var extensionStore = converter.convertTo(extension);
Expand Down Expand Up @@ -259,19 +262,21 @@ <E extends Extension> Mono<E> doUpdate(E oldExtension, String name, Long version
});
}

private void checkClientReady() {
if (!ready.get()) {
throw new IllegalStateException("Service is not ready yet");
/**
* If the extension is being updated, we should the index is not building index for the
* extension, otherwise the {@link IllegalStateException} will be thrown.
*/
private <E extends Extension> void checkClientWritable(E extension) {
var buildingState = indexBuildingState.get(extension.groupVersionKind().groupKind());
if (buildingState != null && buildingState.get()) {
throw new IllegalStateException("Index is building for " + extension.groupVersionKind()
+ ", please wait for a moment and try again.");
}
}

/**
* Internal method for changing the ready state of the client.
*
* @param ready ready state
*/
void setReady(boolean ready) {
this.ready.set(ready);
void setIndexBuildingStateFor(GroupKind groupKind, boolean building) {
indexBuildingState.computeIfAbsent(groupKind, k -> new AtomicBoolean(building))
.set(building);
}

@Override
Expand Down Expand Up @@ -334,13 +339,11 @@ public void startBuildingIndex() {
final long startTimeMs = System.currentTimeMillis();
log.info("Start building index for all extensions, please wait...");
schemeManager.schemes()
.forEach(scheme -> indexerFactory.createIndexerFor(scheme.type(),
createExtensionIterator(scheme)));
.forEach(this::createIndexerFor);

schemeWatcherManager.register(event -> {
if (event instanceof SchemeWatcherManager.SchemeRegistered schemeRegistered) {
var scheme = schemeRegistered.getNewScheme();
indexerFactory.createIndexerFor(scheme.type(), createExtensionIterator(scheme));
createIndexerFor(schemeRegistered.getNewScheme());
return;
}
if (event instanceof SchemeWatcherManager.SchemeUnregistered schemeUnregistered) {
Expand All @@ -350,9 +353,12 @@ public void startBuildingIndex() {
});
log.info("Successfully built index in {}ms, Preparing to lunch application...",
System.currentTimeMillis() - startTimeMs);
}

// mark client as ready
setReady(true);
private void createIndexerFor(Scheme scheme) {
setIndexBuildingStateFor(scheme.groupVersionKind().groupKind(), true);
indexerFactory.createIndexerFor(scheme.type(), createExtensionIterator(scheme));
setIndexBuildingStateFor(scheme.groupVersionKind().groupKind(), false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ class ReactiveExtensionClientTest {

@BeforeEach
void setUp() {
client.setReady(true);
lenient().when(schemeManager.get(eq(FakeExtension.class)))
.thenReturn(fakeScheme);
lenient().when(schemeManager.get(eq(fakeScheme.groupVersionKind()))).thenReturn(fakeScheme);
Expand Down

0 comments on commit e38778a

Please sign in to comment.