diff --git a/reactor-netty-core/src/main/java/reactor/netty/Metrics.java b/reactor-netty-core/src/main/java/reactor/netty/Metrics.java
index 5cf3345bf8..d33970df1d 100644
--- a/reactor-netty-core/src/main/java/reactor/netty/Metrics.java
+++ b/reactor-netty-core/src/main/java/reactor/netty/Metrics.java
@@ -204,6 +204,10 @@ public class Metrics {
 	 */
 	public static final String PENDING_STREAMS = ".pending.streams";
 
+	/**
+	 * The number of HTTP/2 stream acquisitions steal count.
+	 */
+	public static final String STEAL_STREAMS = ".steal.streams";
 
 	// ByteBufAllocator Metrics
 	/**
diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java
index b97af6b532..44aa18d979 100644
--- a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java
+++ b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java
@@ -527,6 +527,20 @@ public InstrumentedPool<T> newPool(
 			return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate).build(poolFactory);
 		}
 
+		public InstrumentedPool<T> newPool(
+		        PoolBuilder<T, PoolConfig<T>> poolBuilder,
+				int maxConnections,
+				@Nullable AllocationStrategy<?> allocationStrategy,
+				Function<T, Publisher<Void>> destroyHandler,
+				BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate,
+				Function<PoolConfig<T>, InstrumentedPool<T>> poolFactory) {
+			if (disposeTimeout != null) {
+				return newPoolInternal(poolBuilder, maxConnections, allocationStrategy, destroyHandler, defaultEvictionPredicate, null)
+					.build(poolFactory.andThen(InstrumentedPoolDecorators::gracefulShutdown));
+			}
+			return newPoolInternal(poolBuilder, maxConnections, allocationStrategy, destroyHandler, defaultEvictionPredicate, null).build(poolFactory);
+		}
+
 		public InstrumentedPool<T> newPool(
 				Publisher<T> allocator,
 				Function<T, Publisher<Void>> destroyHandler,
@@ -540,6 +554,21 @@ public InstrumentedPool<T> newPool(
 			return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder).build(poolFactory);
 		}
 
+		public InstrumentedPool<T> newPool(
+				PoolBuilder<T, PoolConfig<T>> poolBuilder,
+				int maxConnections,
+				@Nullable AllocationStrategy<?> allocationStrategy,
+				Function<T, Publisher<Void>> destroyHandler,
+				BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate,
+				PoolMetricsRecorder poolMetricsRecorder,
+				Function<PoolConfig<T>, InstrumentedPool<T>> poolFactory) {
+			if (disposeTimeout != null) {
+				return newPoolInternal(poolBuilder, maxConnections, allocationStrategy, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder)
+						.build(poolFactory.andThen(InstrumentedPoolDecorators::gracefulShutdown));
+			}
+			return newPoolInternal(poolBuilder, maxConnections, allocationStrategy, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder).build(poolFactory);
+		}
+
 		PoolBuilder<T, PoolConfig<T>> newPoolInternal(
 				Publisher<T> allocator,
 				Function<T, Publisher<Void>> destroyHandler,
@@ -552,11 +581,22 @@ PoolBuilder<T, PoolConfig<T>> newPoolInternal(
 				Function<T, Publisher<Void>> destroyHandler,
 				BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate,
 				@Nullable PoolMetricsRecorder poolMetricsRecorder) {
-			PoolBuilder<T, PoolConfig<T>> poolBuilder =
-					PoolBuilder.from(allocator)
-					           .destroyHandler(destroyHandler)
-					           .maxPendingAcquire(pendingAcquireMaxCount)
-					           .evictInBackground(evictionInterval);
+			return newPoolInternal(PoolBuilder.from(allocator), -1, null, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder);
+		}
+
+		PoolBuilder<T, PoolConfig<T>> newPoolInternal(
+				PoolBuilder<T, PoolConfig<T>> poolBuilder,
+				int maxConnections,
+				@Nullable AllocationStrategy<?> allocationStrategy,
+				Function<T, Publisher<Void>> destroyHandler,
+				BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate,
+				@Nullable PoolMetricsRecorder poolMetricsRecorder) {
+			maxConnections = (maxConnections == -1) ? this.maxConnections : maxConnections;
+			allocationStrategy = (allocationStrategy == null) ? this.allocationStrategy : allocationStrategy;
+			poolBuilder = poolBuilder
+							.destroyHandler(destroyHandler)
+							.maxPendingAcquire(pendingAcquireMaxCount)
+							.evictInBackground(evictionInterval);
 
 			if (this.evictionPredicate != null) {
 				poolBuilder = poolBuilder.evictionPredicate(
diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java
index c6d3ba29d7..790913109a 100644
--- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2022-2023 VMware, Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2022-2024 VMware, Inc. or its affiliates, All Rights Reserved.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -66,6 +66,23 @@ public interface Builder {
 		 * @return {@code this}
 		 */
 		Builder minConnections(int minConnections);
+
+		/**
+		 * Enables or disables work stealing mode for managing HTTP2 Connection Pools.
+		 * <p>
+		 * By default, a single Connection Pool is used by multiple Netty event loop threads.
+		 * When work stealing is enabled, each Netty event loop will maintain its own
+		 * HTTP2 Connection Pool, and HTTP2 streams allocation will be distributed over all available
+		 * pools using a work stealing strategy. This approach maximizes throughput and
+		 * resource utilization in a multithreaded environment.
+		 *
+		 * @param progressive true if the HTTP2 Connection pools should be enabled gradually (when the nth pool becomes
+		 *                    is starting to get some pendingg acquisitions request, then enable one more
+		 *                    pool until all available pools are enabled).
+		 *
+		 * @return {@code this}
+		 */
+		Builder enableWorkStealing(boolean progressive);
 	}
 
 	/**
@@ -77,6 +94,18 @@ public static Http2AllocationStrategy.Builder builder() {
 		return new Http2AllocationStrategy.Build();
 	}
 
+	/**
+	 * Creates a builder for {@link Http2AllocationStrategy} and initialize it
+	 * with an existing strategy. This method can be used to create a mutated version
+	 * of an existing strategy.
+	 *
+	 * @return a new {@link Http2AllocationStrategy.Builder} initialized with an existing http2
+	 * allocation strategy.
+	 */
+	public static Http2AllocationStrategy.Builder builder(Http2AllocationStrategy existing) {
+		return new Http2AllocationStrategy.Build(existing);
+	}
+
 	@Override
 	public Http2AllocationStrategy copy() {
 		return new Http2AllocationStrategy(this);
@@ -141,9 +170,14 @@ public void returnPermits(int returned) {
 		}
 	}
 
+	public boolean enableWorkStealing() {
+		return enableWorkStealing;
+	}
+
 	final long maxConcurrentStreams;
 	final int maxConnections;
 	final int minConnections;
+	final boolean enableWorkStealing;
 
 	volatile int permits;
 	static final AtomicIntegerFieldUpdater<Http2AllocationStrategy> PERMITS = AtomicIntegerFieldUpdater.newUpdater(Http2AllocationStrategy.class, "permits");
@@ -152,6 +186,7 @@ public void returnPermits(int returned) {
 		this.maxConcurrentStreams = build.maxConcurrentStreams;
 		this.maxConnections = build.maxConnections;
 		this.minConnections = build.minConnections;
+		this.enableWorkStealing = build.enableWorkStealing;
 		PERMITS.lazySet(this, this.maxConnections);
 	}
 
@@ -159,6 +194,7 @@ public void returnPermits(int returned) {
 		this.maxConcurrentStreams = copy.maxConcurrentStreams;
 		this.maxConnections = copy.maxConnections;
 		this.minConnections = copy.minConnections;
+		this.enableWorkStealing = copy.enableWorkStealing;
 		PERMITS.lazySet(this, this.maxConnections);
 	}
 
@@ -170,6 +206,17 @@ static final class Build implements Builder {
 		long maxConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
 		int maxConnections = DEFAULT_MAX_CONNECTIONS;
 		int minConnections = DEFAULT_MIN_CONNECTIONS;
+		boolean enableWorkStealing = Boolean.getBoolean("reactor.netty.pool.h2.enableworkstealing");
+
+		Build() {
+		}
+
+		Build(Http2AllocationStrategy existing) {
+			this.maxConcurrentStreams = existing.maxConcurrentStreams;
+			this.minConnections = existing.minConnections;
+			this.maxConnections = existing.maxConnections;
+			this.enableWorkStealing = existing.enableWorkStealing;
+		}
 
 		@Override
 		public Http2AllocationStrategy build() {
@@ -206,5 +253,11 @@ public Builder minConnections(int minConnections) {
 			this.minConnections = minConnections;
 			return this;
 		}
+
+		@Override
+		public Builder enableWorkStealing(boolean progressive) {
+			this.enableWorkStealing = true;
+			return this;
+		}
 	}
 }
diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java
index 071a37d0b7..057562b843 100644
--- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java
@@ -39,6 +39,9 @@
 import reactor.netty.ConnectionObserver;
 import reactor.netty.channel.ChannelMetricsRecorder;
 import reactor.netty.channel.ChannelOperations;
+import reactor.netty.internal.shaded.reactor.pool.PoolBuilder;
+import reactor.netty.internal.shaded.reactor.pool.PoolConfig;
+import reactor.netty.internal.shaded.reactor.pool.decorators.InstrumentedPoolDecorators;
 import reactor.netty.resources.ConnectionProvider;
 import reactor.netty.resources.PooledConnectionProvider;
 import reactor.netty.transport.ClientTransportConfig;
@@ -51,13 +54,20 @@
 import reactor.util.annotation.Nullable;
 import reactor.util.concurrent.Queues;
 import reactor.util.context.Context;
+import reactor.util.function.Tuples;
 
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.time.Duration;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Queue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiPredicate;
 import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static reactor.netty.ReactorNetty.format;
 import static reactor.netty.ReactorNetty.getChannelContext;
@@ -565,12 +575,56 @@ static final class PooledConnectionAllocator {
 			this.config = (HttpClientConfig) config;
 			this.remoteAddress = remoteAddress;
 			this.resolver = resolver;
-			this.pool = id == null ?
-					poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
-							poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())) :
-					poolFactory.newPool(connectChannel(), DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
-							new MicrometerPoolMetricsRecorder(id, name, remoteAddress),
-							poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy()));
+
+			Http2AllocationStrategy http2Strategy = poolFactory.allocationStrategy() instanceof Http2AllocationStrategy ?
+					(Http2AllocationStrategy) poolFactory.allocationStrategy() : null;
+
+			if (http2Strategy == null || !http2Strategy.enableWorkStealing) {
+				this.pool = id == null ?
+						poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
+								poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())) :
+						poolFactory.newPool(connectChannel(), DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
+								new MicrometerPoolMetricsRecorder(id, name, remoteAddress),
+								poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy()));
+			}
+			else {
+				// Create one connection allocator (it will be shared by all Http2Pool instances)
+				Publisher<Connection> allocator = connectChannel();
+
+				List<Executor> execs =  StreamSupport.stream(config.loopResources().onClient(true).spliterator(), false)
+						.limit(http2Strategy.maxConnections)
+						.collect(Collectors.toList());
+				Iterator<Executor> execsIter = execs.iterator();
+
+				MicrometerPoolMetricsRecorder micrometerRecorder = id == null ? null : new MicrometerPoolMetricsRecorder(id, name, remoteAddress);
+				AtomicInteger subPoolIndex = new AtomicInteger();
+
+				this.pool = InstrumentedPoolDecorators.concurrentPools(execs.size(), allocator,
+						(PoolBuilder<Connection, PoolConfig<Connection>> poolBuilder) -> {
+					int index = subPoolIndex.getAndIncrement();
+					int minDiv = http2Strategy.minConnections / execs.size();
+					int minMod = http2Strategy.minConnections % execs.size();
+					int maxDiv = http2Strategy.maxConnections / execs.size();
+					int maxMod = http2Strategy.maxConnections % execs.size();
+
+					int minConn = index < minMod ? minDiv + 1 : minDiv;
+					int maxConn = index < maxMod ? maxDiv + 1 : maxDiv;
+
+					Http2AllocationStrategy adaptedH2Strategy = Http2AllocationStrategy.builder(http2Strategy)
+							.minConnections(minConn)
+							.maxConnections(maxConn)
+							.build();
+
+					InstrumentedPool<Connection> pool =
+							id == null ?
+									poolFactory.newPool(poolBuilder, maxConn, adaptedH2Strategy, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
+											poolConfig -> new Http2Pool(poolConfig, adaptedH2Strategy)) :
+									poolFactory.newPool(poolBuilder, maxConn, adaptedH2Strategy, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
+											micrometerRecorder,
+											poolConfig -> new Http2Pool(poolConfig, adaptedH2Strategy));
+					return Tuples.of(pool, execsIter.next());
+				});
+			}
 		}
 
 		Publisher<Connection> connectChannel() {
diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java
index 72e3bd986c..970b5f6a77 100644
--- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2022-2023 VMware, Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2022-2024 VMware, Inc. or its affiliates, All Rights Reserved.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -67,6 +67,26 @@ public Meter.Type getType() {
 		}
 	},
 
+	/**
+	 * The number of HTTP/2 stream acquisition steal count.
+	 */
+	STEAL_STREAMS {
+		@Override
+		public String getName() {
+			return "reactor.netty.connection.provider.steal.streams";
+		}
+
+		@Override
+		public KeyName[] getKeyNames() {
+			return Http2ConnectionProviderMetersTags.values();
+		}
+
+		@Override
+		public Meter.Type getType() {
+			return Meter.Type.COUNTER;
+		}
+	},
+
 	/**
 	 * The number of the idle connections in the connection pool.
 	 */
diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java
index 7a149a9468..dc30127a1c 100644
--- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java
@@ -25,6 +25,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.function.Function;
 
@@ -120,6 +121,10 @@ class Http2Pool implements InstrumentedPool<Connection>, InstrumentedPool.PoolMe
 	static final AtomicReferenceFieldUpdater<Http2Pool, ConcurrentLinkedQueue> CONNECTIONS =
 			AtomicReferenceFieldUpdater.newUpdater(Http2Pool.class, ConcurrentLinkedQueue.class, "connections");
 
+	volatile int connectionsCount;
+	static final AtomicIntegerFieldUpdater<Http2Pool> CONNECTIONS_COUNT =
+			AtomicIntegerFieldUpdater.newUpdater(Http2Pool.class, "connectionsCount");
+
 	volatile int idleSize;
 	private static final AtomicIntegerFieldUpdater<Http2Pool> IDLE_SIZE =
 			AtomicIntegerFieldUpdater.newUpdater(Http2Pool.class, "idleSize");
@@ -157,6 +162,7 @@ class Http2Pool implements InstrumentedPool<Connection>, InstrumentedPool.PoolMe
 	final Long maxConcurrentStreams;
 	final int minConnections;
 	final PoolConfig<Connection> poolConfig;
+	final boolean workStealingEnabled;
 
 	long lastInteractionTimestamp;
 
@@ -166,12 +172,14 @@ class Http2Pool implements InstrumentedPool<Connection>, InstrumentedPool.PoolMe
 		this.clock = poolConfig.clock();
 		this.connections = new ConcurrentLinkedQueue<>();
 		this.lastInteractionTimestamp = clock.millis();
-		this.maxConcurrentStreams = allocationStrategy instanceof Http2AllocationStrategy ?
+		Http2AllocationStrategy http2Strategy = allocationStrategy instanceof Http2AllocationStrategy ? (Http2AllocationStrategy) allocationStrategy : null;
+
+		this.maxConcurrentStreams = http2Strategy != null ?
 				((Http2AllocationStrategy) allocationStrategy).maxConcurrentStreams() : -1;
 		this.minConnections = allocationStrategy == null ? 0 : allocationStrategy.permitMinimum();
 		this.pending = new ConcurrentLinkedDeque<>();
 		this.poolConfig = poolConfig;
-
+		this.workStealingEnabled = http2Strategy != null && http2Strategy.enableWorkStealing;
 		recordInteractionTimestamp();
 		scheduleEviction();
 	}
@@ -186,6 +194,41 @@ public Mono<PooledRef<Connection>> acquire(Duration timeout) {
 		return new BorrowerMono(this, timeout);
 	}
 
+	@Override
+	public boolean hasAvailableResources() {
+		long totalMaxConcurrentStreams = this.totalMaxConcurrentStreams;
+		long estimateStreamsCount = totalMaxConcurrentStreams - acquired;
+		int permits = poolConfig.allocationStrategy().estimatePermitCount();
+		if ((estimateStreamsCount + permits) - pendingSize <= 0) {
+			// no more idle streams
+			if (connectionsCount < poolConfig.allocationStrategy().permitMaximum()) {
+				// but we know we can allocate more connections, which will most likely be able to allocate many streams
+				return true;
+			}
+			// we can't acquire streams anymore (all streams are used and all connections are established)
+			return false;
+
+		}
+		return true;
+	}
+
+	@Override
+	public boolean transferBorrowersFrom(InstrumentedPool<Connection> pool) {
+		Http2Pool other = (Http2Pool) pool;
+
+		if (!other.isDisposed()) {
+			ConcurrentLinkedDeque<Borrower> q = other.pending;
+			Borrower b = other.pollPending(q, false);
+			if (b != null && !b.get()) {
+				b.setPool(this);
+				doAcquire(b);
+				return true;
+			}
+		}
+
+		return false;
+	}
+
 	@Override
 	public int acquiredSize() {
 		return allocatedSize() - idleSize();
@@ -348,10 +391,12 @@ void doAcquire(Borrower borrower) {
 		drain();
 	}
 
-	void drain() {
+	boolean drain() {
 		if (WIP.getAndIncrement(this) == 0) {
 			drainLoop();
+			return true;
 		}
+		return false;
 	}
 
 	void drainLoop() {
@@ -390,10 +435,19 @@ void drainLoop() {
 						log.debug(format(slot.connection.channel(), "Channel activated"));
 					}
 					ACQUIRED.incrementAndGet(this);
-					slot.connection.channel().eventLoop().execute(() -> {
+					if (!workStealingEnabled) {
+						slot.connection.channel().eventLoop().execute(() -> {
+							borrower.deliver(new Http2PooledRef(slot)); // will insert the connection slot into CONNECTIONS
+							drain();
+						});
+					}
+					else {
+						// WHen using the reactor work-stealing pool, we are already executing from one of the pools' executor,
+						// so, we can safely deliver the borrower concurrently, all the borrowers are distributed across
+						// all sub pools, so we won't be in a situation where the current thread will run the drainloop
+						// for ever under heavy requests load, so no need to reschedule.
 						borrower.deliver(new Http2PooledRef(slot));
-						drain();
-					});
+					}
 				}
 				else {
 					int resourcesCount = idleSize;
@@ -748,7 +802,7 @@ static final class Borrower extends AtomicBoolean implements Scannable, Subscrip
 
 		final Duration acquireTimeout;
 		final CoreSubscriber<? super Http2PooledRef> actual;
-		final Http2Pool pool;
+		final AtomicReference<Http2Pool> pool;
 
 		long pendingAcquireStart;
 
@@ -757,7 +811,7 @@ static final class Borrower extends AtomicBoolean implements Scannable, Subscrip
 		Borrower(CoreSubscriber<? super Http2PooledRef> actual, Http2Pool pool, Duration acquireTimeout) {
 			this.acquireTimeout = acquireTimeout;
 			this.actual = actual;
-			this.pool = pool;
+			this.pool = new AtomicReference<>(pool);
 			this.timeoutTask = TIMEOUT_DISPOSED;
 		}
 
@@ -765,7 +819,7 @@ static final class Borrower extends AtomicBoolean implements Scannable, Subscrip
 		public void cancel() {
 			stopPendingCountdown(true); // this is not failure, the subscription was canceled
 			if (compareAndSet(false, true)) {
-				pool.cancelAcquire(this);
+				pool().cancelAcquire(this);
 			}
 		}
 
@@ -776,7 +830,7 @@ Context currentContext() {
 		@Override
 		public void request(long n) {
 			if (Operators.validate(n)) {
-				pool.doAcquire(this);
+				pool().doAcquire(this);
 			}
 		}
 
@@ -785,7 +839,7 @@ public void run() {
 			if (compareAndSet(false, true)) {
 				// this is failure, a timeout was observed
 				stopPendingCountdown(false);
-				pool.cancelAcquire(Http2Pool.Borrower.this);
+				pool().cancelAcquire(Http2Pool.Borrower.this);
 				actual.onError(new PoolAcquireTimeoutException(acquireTimeout));
 			}
 		}
@@ -813,7 +867,10 @@ public String toString() {
 		}
 
 		void deliver(Http2PooledRef poolSlot) {
-			assert poolSlot.slot.connection.channel().eventLoop().inEventLoop();
+			if (!pool().workStealingEnabled) {
+				// TODO can we do this check even when workstealing is enabled ?
+				assert poolSlot.slot.connection.channel().eventLoop().inEventLoop();
+			}
 			poolSlot.slot.incrementConcurrencyAndGet();
 			poolSlot.slot.deactivate();
 			if (get()) {
@@ -835,6 +892,7 @@ void fail(Throwable error) {
 
 		void stopPendingCountdown(boolean success) {
 			if (pendingAcquireStart > 0) {
+				Http2Pool pool = pool();
 				if (success) {
 					pool.poolConfig.metricsRecorder().recordPendingSuccessAndLatency(pool.clock.millis() - pendingAcquireStart);
 				}
@@ -846,6 +904,14 @@ void stopPendingCountdown(boolean success) {
 			}
 			timeoutTask.dispose();
 		}
+
+		Http2Pool pool() {
+			return pool.get();
+		}
+
+		public void setPool(Http2Pool replace) {
+			pool.set(replace);
+		}
 	}
 
 	static final class BorrowerMono extends Mono<PooledRef<Connection>> {
@@ -896,7 +962,11 @@ public Mono<Void> invalidate() {
 			return Mono.defer(() -> {
 				if (compareAndSet(false, true)) {
 					ACQUIRED.decrementAndGet(slot.pool);
-					return slot.pool.destroyPoolable(this).doFinally(st -> slot.pool.drain());
+					return slot.pool.destroyPoolable(this).doFinally(st -> {
+						if (slot.pool.drain() && slot.pool.hasAvailableResources()) {
+							slot.pool.config().resourceManager().resourceAvailable();
+						}
+					});
 				}
 				else {
 					return Mono.empty();
@@ -967,6 +1037,8 @@ static class Slot extends AtomicBoolean implements PooledRefMetadata {
 			}
 			initMaxConcurrentStreams();
 			TOTAL_MAX_CONCURRENT_STREAMS.addAndGet(this.pool, this.maxConcurrentStreams);
+			CONNECTIONS_COUNT.incrementAndGet(this.pool);
+			pool.config().resourceManager().resourceAvailable();
 		}
 
 		void initMaxConcurrentStreams() {
@@ -1066,6 +1138,7 @@ void invalidate() {
 				}
 				pool.poolConfig.allocationStrategy().returnPermits(1);
 				TOTAL_MAX_CONCURRENT_STREAMS.addAndGet(this.pool, -maxConcurrentStreams);
+				CONNECTIONS_COUNT.decrementAndGet(this.pool);
 			}
 		}
 
diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java
index 75bf6cd939..b81a075074 100644
--- a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2021-2023 VMware, Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2021-2024 VMware, Inc. or its affiliates, All Rights Reserved.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -19,8 +19,10 @@
 import io.micrometer.core.instrument.Meter;
 import io.micrometer.core.instrument.Tags;
 import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool;
+import reactor.netty.internal.shaded.reactor.pool.PoolScheduler;
 
 import java.net.SocketAddress;
+import java.util.List;
 
 import static reactor.netty.Metrics.REGISTRY;
 import static reactor.netty.http.client.Http2ConnectionProviderMeters.ACTIVE_CONNECTIONS;
@@ -31,6 +33,7 @@
 import static reactor.netty.http.client.Http2ConnectionProviderMeters.IDLE_CONNECTIONS;
 import static reactor.netty.http.client.Http2ConnectionProviderMeters.PENDING_STREAMS;
 import static reactor.netty.Metrics.formatSocketAddress;
+import static reactor.netty.http.client.Http2ConnectionProviderMeters.STEAL_STREAMS;
 
 final class MicrometerHttp2ConnectionProviderMeterRegistrar {
 
@@ -48,9 +51,20 @@ void registerMetrics(String poolName, String id, SocketAddress remoteAddress, In
 		     .tags(tags)
 		     .register(REGISTRY);
 
-		Gauge.builder(ACTIVE_STREAMS.getName(), metrics, poolMetrics -> ((Http2Pool) poolMetrics).activeStreams())
-		     .tags(tags)
-		     .register(REGISTRY);
+		if (metrics instanceof Http2Pool) {
+			Gauge.builder(ACTIVE_STREAMS.getName(), metrics, poolMetrics -> ((Http2Pool) poolMetrics).activeStreams())
+					.tags(tags)
+					.register(REGISTRY);
+		}
+		else if (metrics instanceof PoolScheduler<?>) {
+			Gauge.builder(ACTIVE_STREAMS.getName(), metrics, poolMetrics -> getActiveStreams(((PoolScheduler<?>) metrics).getPools()))
+					.tags(tags)
+					.register(REGISTRY);
+
+			Gauge.builder(STEAL_STREAMS.getName(), metrics, poolMetrics -> ((PoolScheduler<?>) metrics).stealCount())
+					.tags(tags)
+					.register(REGISTRY);
+		}
 
 		Gauge.builder(IDLE_CONNECTIONS.getName(), metrics, InstrumentedPool.PoolMetrics::idleSize)
 		     .tags(tags)
@@ -70,4 +84,10 @@ void deRegisterMetrics(String poolName, String id, SocketAddress remoteAddress)
 		REGISTRY.remove(new Meter.Id(IDLE_CONNECTIONS.getName(), tags, null, null, Meter.Type.GAUGE));
 		REGISTRY.remove(new Meter.Id(PENDING_STREAMS.getName(), tags, null, null, Meter.Type.GAUGE));
 	}
-}
\ No newline at end of file
+
+	int getActiveStreams(List<? extends InstrumentedPool<?>> pools) {
+		return pools.stream()
+				.mapToInt(pool -> ((Http2Pool) pool).activeStreams())
+				.sum();
+	}
+}
diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java
index b08280fb28..1ccd2a2dea 100644
--- a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java
+++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java
@@ -95,6 +95,7 @@
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
@@ -3427,18 +3428,58 @@ void testIssue3285(String serverResponse, @Nullable Class<? extends Throwable> e
 	static void testIssue3285SendRequest(HttpClient client, @Nullable Class<? extends Throwable> exception) {
 		Mono<String> response =
 				client.get()
-				      .uri("/")
-				      .responseSingle((res, bytes) -> bytes.asString());
+						.uri("/")
+						.responseSingle((res, bytes) -> bytes.asString());
 		if (exception != null) {
 			response.as(StepVerifier::create)
-			        .expectError(exception)
-			        .verify(Duration.ofSeconds(5));
+					.expectError(exception)
+					.verify(Duration.ofSeconds(5));
 		}
 		else {
 			response.as(StepVerifier::create)
-			        .expectNext("test")
-			        .expectComplete()
-			        .verify(Duration.ofSeconds(5));
+					.expectNext("test")
+					.expectComplete()
+					.verify(Duration.ofSeconds(5));
+		}
+	}
+
+    @RepeatedTest(10)
+	void testHttp2ClientWithWorkStealing() {
+		disposableServer =
+				HttpServer.create()
+						.protocol(HttpProtocol.H2C)
+						.port(0)
+						.handle((req, res) ->
+								res.sendString(Mono.just("Welcome")))
+						.bindNow();
+
+		ConnectionProvider provider = ConnectionProvider
+				.builder("http")
+				.allocationStrategy(Http2AllocationStrategy.builder()
+						.maxConcurrentStreams(100)
+						.minConnections(1)
+						.maxConnections(Runtime.getRuntime().availableProcessors())
+						.enableWorkStealing(true)
+						.build())
+				.build();
+
+		try {
+			HttpClient client = HttpClient.create(provider)
+					.protocol(HttpProtocol.H2C)
+					.port(disposableServer.port())
+					.wiretap(true);
+
+			StepVerifier.create(client
+							.headers(hdr -> hdr.set("Content-Type", "text/plain"))
+							.get()
+							.uri("/payload-size")
+							.response((r, buf) -> buf.aggregate().asString().zipWith(Mono.just(r))))
+					.expectNextMatches(tuple -> "Welcome".equals(tuple.getT1()) && tuple.getT2().status().equals(HttpResponseStatus.OK))
+					.expectComplete()
+					.verify(Duration.ofSeconds(30));
+		}
+		finally {
+			provider.disposeLater().block();
 		}
 	}
 }