From d5631c961c1a026d7deef2c5405b1456f0dbfd77 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 30 Jun 2023 16:54:17 +0800 Subject: [PATCH] [fix][client] Make the whole grabCnx() progress atomic (#20595) (cherry picked from commit 2bede012c73c301b73c079aaeb122cbb472728c1) --- .../client/impl/ConnectionHandlerTest.java | 154 ++++++++++++++++++ .../pulsar/client/impl/ConnectionHandler.java | 49 ++++-- .../pulsar/client/impl/ConsumerImpl.java | 26 +-- .../pulsar/client/impl/ProducerImpl.java | 29 ++-- .../impl/TransactionMetaStoreHandler.java | 13 +- 5 files changed, 231 insertions(+), 40 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java new file mode 100644 index 0000000000000..d93d87c91374e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionTimeoutException; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-impl") +public class ConnectionHandlerTest extends ProducerConsumerBase { + + private static final Backoff BACKOFF = new BackoffBuilder().setInitialTime(1, TimeUnit.MILLISECONDS) + .setMandatoryStop(1, TimeUnit.SECONDS) + .setMax(3, TimeUnit.SECONDS).create(); + private final ExecutorService executor = Executors.newFixedThreadPool(4); + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + executor.shutdown(); + } + + @Test(timeOut = 30000) + public void testSynchronousGrabCnx() { + for (int i = 0; i < 10; i++) { + final CompletableFuture future = new CompletableFuture<>(); + final int index = i; + final ConnectionHandler handler = new ConnectionHandler( + new MockedHandlerState((PulsarClientImpl) pulsarClient, "my-topic"), BACKOFF, + cnx -> { + future.complete(index); + return CompletableFuture.completedFuture(null); + }); + handler.grabCnx(); + Assert.assertEquals(future.join().intValue(), i); + } + } + + @Test + public void testConcurrentGrabCnx() { + final AtomicInteger cnt = new AtomicInteger(0); + final ConnectionHandler handler = new ConnectionHandler( + new MockedHandlerState((PulsarClientImpl) pulsarClient, "my-topic"), BACKOFF, + cnx -> { + cnt.incrementAndGet(); + return CompletableFuture.completedFuture(null); + }); + final int numGrab = 10; + for (int i = 0; i < numGrab; i++) { + handler.grabCnx(); + } + Awaitility.await().atMost(Duration.ofSeconds(3)).until(() -> cnt.get() > 0); + Assert.assertThrows(ConditionTimeoutException.class, + () -> Awaitility.await().atMost(Duration.ofMillis(500)).until(() -> cnt.get() == numGrab)); + Assert.assertEquals(cnt.get(), 1); + } + + @Test + public void testDuringConnectInvokeCount() throws IllegalAccessException { + // 1. connectionOpened completes with null + final AtomicBoolean duringConnect = spy(new AtomicBoolean()); + final ConnectionHandler handler1 = new ConnectionHandler( + new MockedHandlerState((PulsarClientImpl) pulsarClient, "my-topic"), BACKOFF, + cnx -> CompletableFuture.completedFuture(null)); + FieldUtils.writeField(handler1, "duringConnect", duringConnect, true); + handler1.grabCnx(); + Awaitility.await().atMost(Duration.ofSeconds(3)).until(() -> !duringConnect.get()); + verify(duringConnect, times(1)).compareAndSet(false, true); + verify(duringConnect, times(1)).set(false); + + // 2. connectionFailed is called + final ConnectionHandler handler2 = new ConnectionHandler( + new MockedHandlerState((PulsarClientImpl) pulsarClient, null), new MockedBackoff(), + cnx -> CompletableFuture.completedFuture(null)); + FieldUtils.writeField(handler2, "duringConnect", duringConnect, true); + handler2.grabCnx(); + Awaitility.await().atMost(Duration.ofSeconds(3)).until(() -> !duringConnect.get()); + verify(duringConnect, times(2)).compareAndSet(false, true); + verify(duringConnect, times(2)).set(false); + + // 3. connectionOpened completes exceptionally + final ConnectionHandler handler3 = new ConnectionHandler( + new MockedHandlerState((PulsarClientImpl) pulsarClient, "my-topic"), new MockedBackoff(), + cnx -> FutureUtil.failedFuture(new RuntimeException("fail"))); + FieldUtils.writeField(handler3, "duringConnect", duringConnect, true); + handler3.grabCnx(); + Awaitility.await().atMost(Duration.ofSeconds(3)).until(() -> !duringConnect.get()); + verify(duringConnect, times(3)).compareAndSet(false, true); + verify(duringConnect, times(3)).set(false); + } + + private static class MockedHandlerState extends HandlerState { + + public MockedHandlerState(PulsarClientImpl client, String topic) { + super(client, topic); + } + + @Override + String getHandlerName() { + return "mocked"; + } + } + + private static class MockedBackoff extends Backoff { + + // Set a large backoff so that reconnection won't happen in tests + public MockedBackoff() { + super(1, TimeUnit.HOURS, 2, TimeUnit.HOURS, 1, TimeUnit.HOURS); + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java index a951d7b2cb836..1d9c28d90ea23 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.client.impl; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.pulsar.client.api.PulsarClientException; @@ -39,10 +41,16 @@ public class ConnectionHandler { // Start with -1L because it gets incremented before sending on the first connection private volatile long epoch = -1L; protected volatile long lastConnectionClosedTimestamp = 0L; + private final AtomicBoolean duringConnect = new AtomicBoolean(false); interface Connection { - void connectionFailed(PulsarClientException exception); - void connectionOpened(ClientCnx cnx); + + /** + * @apiNote If the returned future is completed exceptionally, reconnectLater will be called. + */ + CompletableFuture connectionOpened(ClientCnx cnx); + default void connectionFailed(PulsarClientException e) { + } } protected Connection connection; @@ -67,10 +75,16 @@ protected void grabCnx() { state.topic, state.getHandlerName(), state.getState()); return; } + if (!duringConnect.compareAndSet(false, true)) { + log.info("[{}] [{}] Skip grabbing the connection since there is a pending connection", + state.topic, state.getHandlerName()); + return; + } try { state.client.getConnection(state.topic) // .thenAccept(cnx -> connection.connectionOpened(cnx)) // + .thenAccept(__ -> duringConnect.set(false)) .exceptionally(this::handleConnectionError); } catch (Throwable t) { log.warn("[{}] [{}] Exception thrown while getting connection: ", state.topic, state.getHandlerName(), t); @@ -79,25 +93,27 @@ protected void grabCnx() { } private Void handleConnectionError(Throwable exception) { - log.warn("[{}] [{}] Error connecting to broker: {}", - state.topic, state.getHandlerName(), exception.getMessage()); - if (exception instanceof PulsarClientException) { - connection.connectionFailed((PulsarClientException) exception); - } else if (exception.getCause() instanceof PulsarClientException) { - connection.connectionFailed((PulsarClientException) exception.getCause()); - } else { - connection.connectionFailed(new PulsarClientException(exception)); - } - - State state = this.state.getState(); - if (state == State.Uninitialized || state == State.Connecting || state == State.Ready) { - reconnectLater(exception); + try { + log.warn("[{}] [{}] Error connecting to broker: {}", + state.topic, state.getHandlerName(), exception.getMessage()); + if (exception instanceof PulsarClientException) { + connection.connectionFailed((PulsarClientException) exception); + } else if (exception.getCause() instanceof PulsarClientException) { + connection.connectionFailed((PulsarClientException) exception.getCause()); + } else { + connection.connectionFailed(new PulsarClientException(exception)); + } + } catch (Throwable throwable) { + log.error("[{}] [{}] Unexpected exception after the connection", + state.topic, state.getHandlerName(), throwable); } + reconnectLater(exception); return null; } - protected void reconnectLater(Throwable exception) { + void reconnectLater(Throwable exception) { + duringConnect.set(false); CLIENT_CNX_UPDATER.set(this, null); if (!isValidStateForReconnection()) { log.info("[{}] [{}] Ignoring reconnection request (state: {})", @@ -121,6 +137,7 @@ protected void reconnectLater(Throwable exception) { public void connectionClosed(ClientCnx cnx) { lastConnectionClosedTimestamp = System.currentTimeMillis(); + duringConnect.set(false); state.client.getCnxPool().releaseConnection(cnx); if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) { if (!isValidStateForReconnection()) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 453fd52843218..4a7fef11af49b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -765,16 +765,17 @@ public void negativeAcknowledge(Message message) { } @Override - public void connectionOpened(final ClientCnx cnx) { + public CompletableFuture connectionOpened(final ClientCnx cnx) { previousExceptions.clear(); - if (getState() == State.Closing || getState() == State.Closed) { + final State state = getState(); + if (state == State.Closing || state == State.Closed) { setState(State.Closed); closeConsumerTasks(); deregisterFromClientCnx(); client.cleanupConsumer(this); clearReceiverQueue(); - return; + return CompletableFuture.completedFuture(null); } log.info("[{}][{}] Subscribing to topic on cnx {}, consumerId {}", @@ -823,6 +824,7 @@ public void connectionOpened(final ClientCnx cnx) { && startMessageId.equals(initialStartMessageId)) ? startMessageRollbackDurationInSec : 0; // synchronized this, because redeliverUnAckMessage eliminate the epoch inconsistency between them + final CompletableFuture future = new CompletableFuture<>(); synchronized (this) { setClientCnx(cnx); ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), @@ -844,6 +846,7 @@ public void connectionOpened(final ClientCnx cnx) { deregisterFromClientCnx(); client.cleanupConsumer(this); cnx.channel().close(); + future.complete(null); return; } } @@ -856,12 +859,14 @@ public void connectionOpened(final ClientCnx cnx) { if (!(firstTimeConnect && hasParentConsumer) && conf.getReceiverQueueSize() != 0) { increaseAvailablePermits(cnx, conf.getReceiverQueueSize()); } + future.complete(null); }).exceptionally((e) -> { deregisterFromClientCnx(); if (getState() == State.Closing || getState() == State.Closed) { // Consumer was closed while reconnecting, close the connection to make sure the broker // drops the consumer on its side cnx.channel().close(); + future.complete(null); return null; } log.warn("[{}][{}] Failed to subscribe to topic on {}", topic, @@ -879,7 +884,7 @@ public void connectionOpened(final ClientCnx cnx) { if (e.getCause() instanceof PulsarClientException && PulsarClientException.isRetriableError(e.getCause()) && System.currentTimeMillis() < SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) { - reconnectLater(e.getCause()); + future.completeExceptionally(e.getCause()); } else if (!subscribeFuture.isDone()) { // unable to create new consumer, fail operation setState(State.Failed); @@ -903,11 +908,16 @@ public void connectionOpened(final ClientCnx cnx) { topic, subscription, cnx.channel().remoteAddress()); } else { // consumer was subscribed and connected but we got some error, keep trying - reconnectLater(e.getCause()); + future.completeExceptionally(e.getCause()); + } + + if (!future.isDone()) { + future.complete(null); } return null; }); } + return future; } protected void consumerIsReconnectedToBroker(ClientCnx cnx, int currentQueueSize) { @@ -992,7 +1002,7 @@ public void connectionFailed(PulsarClientException exception) { setState(State.Failed); if (nonRetriableError) { log.info("[{}] Consumer creation failed for consumer {} with unretriableError {}", - topic, consumerId, exception); + topic, consumerId, exception.getMessage()); } else { log.info("[{}] Consumer creation failed for consumer {} after timeout", topic, consumerId); } @@ -2580,10 +2590,6 @@ void deregisterFromClientCnx() { setClientCnx(null); } - void reconnectLater(Throwable exception) { - this.connectionHandler.reconnectLater(exception); - } - void grabCnx() { this.connectionHandler.grabCnx(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index f2334650ad8e1..a8ef3236f5677 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -1536,8 +1536,9 @@ public Iterator iterator() { } } + @Override - public void connectionOpened(final ClientCnx cnx) { + public CompletableFuture connectionOpened(final ClientCnx cnx) { previousExceptions.clear(); final long epoch; @@ -1545,7 +1546,7 @@ public void connectionOpened(final ClientCnx cnx) { // Because the state could have been updated while retrieving the connection, we set it back to connecting, // as long as the change from current state to connecting is a valid state change. if (!changeToConnecting()) { - return; + return CompletableFuture.completedFuture(null); } // We set the cnx reference before registering the producer on the cnx, so if the cnx breaks before creating // the producer, it will try to grab a new cnx. We also increment and get the epoch value for the producer. @@ -1585,6 +1586,7 @@ public void connectionOpened(final ClientCnx cnx) { } } + final CompletableFuture future = new CompletableFuture<>(); cnx.sendRequestWithId( Commands.newProducer(topic, producerId, requestId, producerName, conf.isEncryptionEnabled(), metadata, schemaInfo, epoch, userProvidedProducerName, @@ -1599,11 +1601,13 @@ public void connectionOpened(final ClientCnx cnx) { // We are now reconnected to broker and clear to send messages. Re-send all pending messages and // set the cnx pointer so that new messages will be sent immediately synchronized (ProducerImpl.this) { - if (getState() == State.Closing || getState() == State.Closed) { + State state = getState(); + if (state == State.Closing || state == State.Closed) { // Producer was closed while reconnecting, close the connection to make sure the broker // drops the producer on its side cnx.removeProducer(producerId); cnx.channel().close(); + future.complete(null); return; } resetBackoff(); @@ -1653,13 +1657,16 @@ public void connectionOpened(final ClientCnx cnx) { } resendMessages(cnx, epoch); } + future.complete(null); }).exceptionally((e) -> { Throwable cause = e.getCause(); cnx.removeProducer(producerId); - if (getState() == State.Closing || getState() == State.Closed) { + State state = getState(); + if (state == State.Closing || state == State.Closed) { // Producer was closed while reconnecting, close the connection to make sure the broker // drops the producer on its side cnx.channel().close(); + future.complete(null); return null; } @@ -1688,6 +1695,7 @@ public void connectionOpened(final ClientCnx cnx) { } producerCreatedFuture.completeExceptionally(cause); }); + future.complete(null); return null; } if (cause instanceof PulsarClientException.ProducerBlockedQuotaExceededException) { @@ -1731,7 +1739,7 @@ public void connectionOpened(final ClientCnx cnx) { && System.currentTimeMillis() < PRODUCER_DEADLINE_UPDATER.get(ProducerImpl.this))) { // Either we had already created the producer once (producerCreatedFuture.isDone()) or we are // still within the initial timeout budget and we are dealing with a retriable error - reconnectLater(cause); + future.completeExceptionally(cause); } else { setState(State.Failed); producerCreatedFuture.completeExceptionally(cause); @@ -1743,9 +1751,12 @@ public void connectionOpened(final ClientCnx cnx) { sendTimeout = null; } } - + if (!future.isDone()) { + future.complete(null); + } return null; }); + return future; } @Override @@ -1757,7 +1768,7 @@ public void connectionFailed(PulsarClientException exception) { if (producerCreatedFuture.completeExceptionally(exception)) { if (nonRetriableError) { log.info("[{}] Producer creation failed for producer {} with unretriableError = {}", - topic, producerId, exception); + topic, producerId, exception.getMessage()); } else { log.info("[{}] Producer creation failed for producer {} after producerTimeout", topic, producerId); } @@ -2183,10 +2194,6 @@ void setClientCnx(ClientCnx clientCnx) { this.connectionHandler.setClientCnx(clientCnx); } - void reconnectLater(Throwable exception) { - this.connectionHandler.reconnectLater(exception); - } - void grabCnx() { this.connectionHandler.grabCnx(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java index baff5deb3f19a..56443709bf354 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java @@ -123,14 +123,17 @@ public void connectionFailed(PulsarClientException exception) { } @Override - public void connectionOpened(ClientCnx cnx) { + public CompletableFuture connectionOpened(ClientCnx cnx) { + final CompletableFuture future = new CompletableFuture<>(); internalPinnedExecutor.execute(() -> { LOG.info("Transaction meta handler with transaction coordinator id {} connection opened.", transactionCoordinatorId); - if (getState() == State.Closing || getState() == State.Closed) { + State state = getState(); + if (state == State.Closing || state == State.Closed) { setState(State.Closed); failPendingRequest(); + future.complete(null); return; } @@ -146,6 +149,7 @@ public void connectionOpened(ClientCnx cnx) { this.connectionHandler.resetBackoff(); pendingRequests.forEach((requestID, opBase) -> checkStateAndSendRequest(opBase)); } + future.complete(null); }); }).exceptionally((e) -> { internalPinnedExecutor.execute(() -> { @@ -155,16 +159,19 @@ public void connectionOpened(ClientCnx cnx) { || e.getCause() instanceof PulsarClientException.NotAllowedException) { setState(State.Closed); cnx.channel().close(); + future.complete(null); } else { - connectionHandler.reconnectLater(e.getCause()); + future.completeExceptionally(e.getCause()); } }); return null; }); } else { registerToConnection(cnx); + future.complete(null); } }); + return future; } private boolean registerToConnection(ClientCnx cnx) {