Skip to content

Commit

Permalink
feat(mongo): Optimize the blocking calls of the responsive API (#611)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ahoo-Wang authored Aug 13, 2024
1 parent 22c645d commit bda992f
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,15 @@

package me.ahoo.cosid.mongo.reactive;

import me.ahoo.cosid.CosId;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public final class BlockingAdapter {
public static final Duration DEFAULT_TIME_OUT = Duration.ofSeconds(10);
private static final Scheduler SCHEDULER = Schedulers.newSingle(CosId.COSID_PREFIX + BlockingAdapter.class.getSimpleName());

private BlockingAdapter() {
}
Expand All @@ -39,17 +33,11 @@ public static <R> R block(Publisher<R> publisher) {

public static <R> R block(Mono<R> mono) {
try {

return mono.subscribeOn(SCHEDULER)
.timeout(DEFAULT_TIME_OUT)
.toFuture().get(DEFAULT_TIME_OUT.toMillis(), TimeUnit.MILLISECONDS);
BlockingAdapterSubscriber<R> blockingAdapterSubscriber = new BlockingAdapterSubscriber<>();
mono.subscribe(blockingAdapterSubscriber);
return blockingAdapterSubscriber.block(DEFAULT_TIME_OUT.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
}
throw new RuntimeException(e.getCause());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright [2021-present] [ahoo wang <ahoowang@qq.com> (https://github.com/Ahoo-Wang)].
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package me.ahoo.cosid.mongo.reactive;

import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.SignalType;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class BlockingAdapterSubscriber<T> extends BaseSubscriber<T> {
private final CountDownLatch latch;

private T value;
private Throwable error;

public BlockingAdapterSubscriber() {
this.latch = new CountDownLatch(1);
}

@Override
protected void hookOnNext(final T value) {
this.value = value;
}

@Override
protected void hookOnError(Throwable throwable) {
this.error = throwable;
}

public T getValue() {
return value;
}

public T block(final long timeout, final TimeUnit unit) throws InterruptedException, TimeoutException {
return await(timeout, unit).getValue();
}

public Throwable getError() {
return error;
}

@Override
protected void hookFinally(final SignalType type) {
latch.countDown();
}

public BlockingAdapterSubscriber<T> await(final long timeout, final TimeUnit unit) throws TimeoutException, InterruptedException {
if (!latch.await(timeout, unit)) {
throw new TimeoutException("Timeout after " + timeout + " " + unit);
}
if (getError() != null) {
if (getError() instanceof RuntimeException) {
throw (RuntimeException) getError();
}
throw new RuntimeException(getError());
}
return this;
}
}

0 comments on commit bda992f

Please sign in to comment.