Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent Reactor Pools #179

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions reactor-pool/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ task japicmp(type: JapicmpTask) {
classExcludes = [
]
methodExcludes = [
"reactor.pool.decorators.InstrumentedPoolDecorators#concurrentPools(int, org.reactivestreams.Publisher, java.util.function.Function)"
]
}
check.dependsOn japicmp
Expand Down
20 changes: 15 additions & 5 deletions reactor-pool/src/main/java/reactor/pool/AbstractPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -390,7 +391,7 @@ static final class Borrower<POOLABLE> extends AtomicBoolean implements Scannable
static final Disposable TIMEOUT_DISPOSED = Disposables.disposed();

final CoreSubscriber<? super AbstractPooledRef<POOLABLE>> actual;
final AbstractPool<POOLABLE> pool;
final AtomicReference<AbstractPool<POOLABLE>> pool;
final Duration pendingAcquireTimeout;

long pendingAcquireStart;
Expand All @@ -400,7 +401,7 @@ static final class Borrower<POOLABLE> extends AtomicBoolean implements Scannable
AbstractPool<POOLABLE> pool,
Duration pendingAcquireTimeout) {
this.actual = actual;
this.pool = pool;
this.pool = new AtomicReference<>(pool);
this.pendingAcquireTimeout = pendingAcquireTimeout;
this.timeoutTask = TIMEOUT_DISPOSED;
}
Expand All @@ -414,7 +415,7 @@ public void run() {
if (Borrower.this.compareAndSet(false, true)) {
// this is failure, a timeout was observed
stopPendingCountdown(false);
pool.cancelAcquire(Borrower.this);
pool().cancelAcquire(Borrower.this);
actual.onError(new PoolAcquireTimeoutException(pendingAcquireTimeout));
}
}
Expand All @@ -423,7 +424,7 @@ public void run() {
public void request(long n) {
if (Operators.validate(n)) {
// doAcquire will check for acquire timeout
pool.doAcquire(this);
pool().doAcquire(this);
}
}

Expand All @@ -432,6 +433,7 @@ public void request(long n) {
*/
void stopPendingCountdown(boolean success) {
if (pendingAcquireStart > 0) {
AbstractPool<POOLABLE> pool = pool();
if (success) {
pool.metricsRecorder.recordPendingSuccessAndLatency(pool.clock.millis() - pendingAcquireStart);
} else {
Expand All @@ -446,7 +448,7 @@ void stopPendingCountdown(boolean success) {
@Override
public void cancel() {
set(true);
pool.cancelAcquire(this);
pool().cancelAcquire(this);
stopPendingCountdown(true); // this is not failure, the subscription was canceled
}

Expand Down Expand Up @@ -485,6 +487,14 @@ void fail(Throwable error) {
public String toString() {
return get() ? "Borrower(cancelled)" : "Borrower";
}

AbstractPool<POOLABLE> pool() {
return pool.get();
}

void setPool(AbstractPool<POOLABLE> replace) {
pool.set(replace);
}
}

}
32 changes: 30 additions & 2 deletions reactor-pool/src/main/java/reactor/pool/DefaultPoolConfig.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,7 +27,6 @@
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/**
* A default {@link PoolConfig} that can be extended to bear more configuration options
Expand All @@ -50,6 +49,7 @@ public class DefaultPoolConfig<POOLABLE> implements PoolConfig<POOLABLE> {
protected final PoolMetricsRecorder metricsRecorder;
protected final Clock clock;
protected final boolean isIdleLRU;
protected final ResourceManager resourceManager;

public DefaultPoolConfig(Mono<POOLABLE> allocator,
AllocationStrategy allocationStrategy,
Expand All @@ -64,6 +64,26 @@ public DefaultPoolConfig(Mono<POOLABLE> allocator,
PoolMetricsRecorder metricsRecorder,
Clock clock,
boolean isIdleLRU) {
this(allocator, allocationStrategy, maxPending, pendingAcquireTimer, releaseHandler, destroyHandler,
evictionPredicate, evictInBackgroundInterval, evictInBackgroundScheduler, acquisitionScheduler,
metricsRecorder, clock, isIdleLRU,
PoolBuilder.DEFAULT_RESOURCE_MANAGER);
}

public DefaultPoolConfig(Mono<POOLABLE> allocator,
AllocationStrategy allocationStrategy,
int maxPending,
BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer,
Function<POOLABLE, ? extends Publisher<Void>> releaseHandler,
Function<POOLABLE, ? extends Publisher<Void>> destroyHandler,
BiPredicate<POOLABLE, PooledRefMetadata> evictionPredicate,
Duration evictInBackgroundInterval,
Scheduler evictInBackgroundScheduler,
Scheduler acquisitionScheduler,
PoolMetricsRecorder metricsRecorder,
Clock clock,
boolean isIdleLRU,
ResourceManager resourceManager) {
this.pendingAcquireTimer = pendingAcquireTimer;
this.allocator = allocator;
this.allocationStrategy = allocationStrategy;
Expand All @@ -77,6 +97,7 @@ public DefaultPoolConfig(Mono<POOLABLE> allocator,
this.metricsRecorder = metricsRecorder;
this.clock = clock;
this.isIdleLRU = isIdleLRU;
this.resourceManager = resourceManager;
}

/**
Expand All @@ -101,6 +122,7 @@ protected DefaultPoolConfig(PoolConfig<POOLABLE> toCopy) {
this.metricsRecorder = toCopyDpc.metricsRecorder;
this.clock = toCopyDpc.clock;
this.isIdleLRU = toCopyDpc.isIdleLRU;
this.resourceManager = toCopyDpc.resourceManager;
}
else {
this.allocator = toCopy.allocator();
Expand All @@ -116,6 +138,7 @@ protected DefaultPoolConfig(PoolConfig<POOLABLE> toCopy) {
this.metricsRecorder = toCopy.metricsRecorder();
this.clock = toCopy.clock();
this.isIdleLRU = toCopy.reuseIdleResourcesInLruOrder();
this.resourceManager = toCopy.resourceManager();
}
}

Expand Down Expand Up @@ -183,4 +206,9 @@ public Clock clock() {
public boolean reuseIdleResourcesInLruOrder() {
return isIdleLRU;
}

@Override
public ResourceManager resourceManager() {
return resourceManager;
}
}
12 changes: 11 additions & 1 deletion reactor-pool/src/main/java/reactor/pool/InstrumentedPool.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,6 +31,16 @@ public interface InstrumentedPool<POOLABLE> extends Pool<POOLABLE> {
*/
PoolMetrics metrics();

/**
* Estimates if the pool can currently either reuse or create some resources
* @return true if the pool can currently either reuse or create some resources, false if no idles resources are
* currently available and no more resources can be currently created.
*/
default boolean hasAvailableResources() {
PoolMetrics pm = metrics();
return (pm.idleSize() + config().allocationStrategy().estimatePermitCount()) - pm.pendingAcquireSize() >= 0;
}

/**
* An object that can be used to get live information about a {@link Pool}, suitable
* for gauge metrics.
Expand Down
12 changes: 11 additions & 1 deletion reactor-pool/src/main/java/reactor/pool/Pool.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2018-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -178,4 +178,14 @@ default void dispose() {
* @return a Mono triggering the shutdown of the pool once subscribed.
*/
Mono<Void> disposeLater();

/**
* Transfer some pending borrowers from another pool into this pool.
*
* @param from another pool to steal resources from
* @return true if some borrowers have been moved from <code>from</code> into this pool instance
*/
default boolean transferBorrowersFrom(InstrumentedPool<POOLABLE> from) {
return false;
}
}
14 changes: 12 additions & 2 deletions reactor-pool/src/main/java/reactor/pool/PoolBuilder.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2018-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -79,6 +79,8 @@ public static <T> PoolBuilder<T, PoolConfig<T>> from(Publisher<? extends T> allo
boolean idleLruOrder = true;
BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer = DEFAULT_PENDING_ACQUIRE_TIMER;

ResourceManager resourceManager = DEFAULT_RESOURCE_MANAGER;

PoolBuilder(Mono<T> allocator, Function<PoolConfig<T>, CONF> configModifier) {
this.allocator = allocator;
this.configModifier = configModifier;
Expand Down Expand Up @@ -445,6 +447,11 @@ public PoolBuilder<T, CONF> idleResourceReuseOrder(boolean isLru) {
return this;
}

public PoolBuilder<T, CONF> resourceManager(ResourceManager resourceManager) {
this.resourceManager = resourceManager;
return this;
}

/**
* Add implementation-specific configuration, changing the type of {@link PoolConfig}
* passed to the {@link Pool} factory in {@link #build(Function)}.
Expand Down Expand Up @@ -508,7 +515,8 @@ CONF buildConfig() {
acquisitionScheduler,
metricsRecorder,
clock,
idleLruOrder);
idleLruOrder,
resourceManager);

return this.configModifier.apply(baseConfig);
}
Expand All @@ -531,4 +539,6 @@ static <T> BiPredicate<T, PooledRefMetadata> idlePredicate(Duration maxIdleTime)
static final BiPredicate<?, ?> NEVER_PREDICATE = (ignored1, ignored2) -> false;
static final BiFunction<Runnable, Duration, Disposable> DEFAULT_PENDING_ACQUIRE_TIMER = (r, d) -> Schedulers.parallel().schedule(r, d.toNanos(), TimeUnit.NANOSECONDS);
static final int DEFAULT_WARMUP_PARALLELISM = 1;

static final ResourceManager DEFAULT_RESOURCE_MANAGER = () -> {};
}
6 changes: 5 additions & 1 deletion reactor-pool/src/main/java/reactor/pool/PoolConfig.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2018-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -141,4 +141,8 @@ default BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer() {
return PoolBuilder.DEFAULT_PENDING_ACQUIRE_TIMER;
}

default ResourceManager resourceManager() {
return PoolBuilder.DEFAULT_RESOURCE_MANAGER;
}

}
39 changes: 39 additions & 0 deletions reactor-pool/src/main/java/reactor/pool/PoolScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.pool;

import java.util.List;

/**
* A Pool that can schedule resource acquisition among multiple sub pools,
* each managing a portion of resources. Resource acquisitions will
* be concurrently distributed across sub pools using sub pool executors, in a work stealing style.
*/
public interface PoolScheduler<T> extends InstrumentedPool<T> {
/**
* Get the number of borrowers steal count (only if the Scheduler supports work stealing).
*
* @return the number of Pool steal count, or -1
*/
long stealCount();

/**
* Returns the number of sub pools managed this this scheduler.
* @return the number of sub pools managed this this scheduler
*/
List<InstrumentedPool<T>> getPools();
}
30 changes: 30 additions & 0 deletions reactor-pool/src/main/java/reactor/pool/ResourceManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.pool;

/**
* A resource manager utilized by concrete Pool implementations. This manager enables Pools to interact
* with its pool scheduler, if there is one enabled.
* Pools can access their resource manager via the {@link PoolConfig#resourceManager()} method.
*/
public interface ResourceManager {
/**
* Notifies the pool scheduler that some resources can be acquired from the current pool because either certain
* resources are currently estimated to be idle or available for allocation.
*/
void resourceAvailable();
}
Loading
Loading