Skip to content

Commit

Permalink
[Transaction] Transaction coordinator client connect add response com…
Browse files Browse the repository at this point in the history
…mand. (#12301)

## Motivation
now #11357 has merged, #11357 use `SUCCESS` command handle tcClientConnectRequest, This is not conducive to later expansion. So add a individual response for `CommandTcClientConnectRequest`

## implement
add command 
```
message CommandTcClientConnectResponse {
    required uint64 request_id = 1;
    optional ServerError error  = 2;
    optional string message     = 3;
}
```

In order to ensure that the new client is compatible with the old broker, I update protocol version to 19.

```
    v19 = 19; // Add CommandTcClientConnectRequest and CommandTcClientConnectResponse
```
if broker protocol version > 18 we should send TcClientConnectCommand
if broker protocol version <= 18 we don't need to send TcClientConnectCommand
  • Loading branch information
congbobo184 authored Oct 9, 2021
1 parent 44dcc04 commit e29f720
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,8 @@ Future<Void> sendMessagesToConsumer(long consumerId, String topicName, Subscript
int partitionIdx, List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
RedeliveryTracker redeliveryTracker);

void sendTcClientConnectResponse(long requestId, ServerError error, String message);

void sendTcClientConnectResponse(long requestId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,19 @@ public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName,
return writePromise;
}

@Override
public void sendTcClientConnectResponse(long requestId, ServerError error, String message) {
BaseCommand command = Commands.newTcClientConnectResponse(requestId, error, message);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf);
}

@Override
public void sendTcClientConnectResponse(long requestId) {
sendTcClientConnectResponse(requestId, null, null);
}

private void safeIntercept(BaseCommand command, ServerCnx cnx) {
try {
this.interceptor.onPulsarCommand(command, cnx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.CommandTcClientConnect;
import org.apache.pulsar.common.api.proto.CommandTcClientConnectRequest;
import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
import org.apache.pulsar.common.api.proto.FeatureFlags;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
Expand Down Expand Up @@ -1934,7 +1934,7 @@ protected void handleGetOrCreateSchema(CommandGetOrCreateSchema commandGetOrCrea
}

@Override
protected void handleTcClientConnect(CommandTcClientConnect command) {
protected void handleTcClientConnectRequest(CommandTcClientConnectRequest command) {
final long requestId = command.getRequestId();
final TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTcId());
if (log.isDebugEnabled()) {
Expand All @@ -1954,11 +1954,12 @@ protected void handleTcClientConnect(CommandTcClientConnect command) {
log.debug("Handle tc client connect request {} to transaction meta store {} from {} success.",
requestId, tcId, remoteAddress);
}
commandSender.sendSuccessResponse(requestId);
commandSender.sendTcClientConnectResponse(requestId);
}).exceptionally(e -> {
log.error("Handle tc client connect request {} to transaction meta store {} from {} fail.",
requestId, tcId, remoteAddress, e.getCause());
commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(e), e.getMessage());
commandSender.sendTcClientConnectResponse(requestId,
BrokerServiceException.getClientErrorCode(e), e.getMessage());
return null;
});
}
Expand Down
6 changes: 4 additions & 2 deletions pulsar-client-cpp/lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -644,8 +644,10 @@ std::string Commands::messageType(BaseCommand_Type type) {
case BaseCommand::END_TXN_ON_SUBSCRIPTION_RESPONSE:
return "END_TXN_ON_SUBSCRIPTION_RESPONSE";
break;
case BaseCommand::TC_CLIENT_CONNECT:
return "TC_CLIENT_CONNECT";
case BaseCommand::TC_CLIENT_CONNECT_REQUEST:
return "TC_CLIENT_CONNECT_REQUEST";
case BaseCommand::TC_CLIENT_CONNECT_RESPONSE:
return "TC_CLIENT_CONNECT_RESPONSE";
break;
};
BOOST_THROW_EXCEPTION(std::logic_error("Invalid BaseCommand enumeration value"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.pulsar.client.impl.TransactionMetaStoreHandler.getExceptionByServerError;

import com.google.common.collect.Queues;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
Expand Down Expand Up @@ -55,8 +57,11 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.ConnectException;
import org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse;
import org.apache.pulsar.common.tls.TlsHostnameVerifier;
import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
import org.apache.pulsar.client.util.TimedCompletableFuture;
Expand Down Expand Up @@ -957,7 +962,6 @@ protected void handleAddSubscriptionToTxnResponse(CommandAddSubscriptionToTxnRes

@Override
protected void handleEndTxnOnPartitionResponse(CommandEndTxnOnPartitionResponse command) {
log.info("handleEndTxnOnPartitionResponse");
TransactionBufferHandler handler = checkAndGetTransactionBufferHandler();
if (handler != null) {
handler.handleEndTxnOnTopicResponse(command.getRequestId(), command);
Expand All @@ -980,6 +984,32 @@ protected void handleEndTxnResponse(CommandEndTxnResponse command) {
}
}

@Override
protected void handleTcClientConnectResponse(CommandTcClientConnectResponse response) {
checkArgument(state == State.Ready);

if (log.isDebugEnabled()) {
log.debug("{} Received tc client connect response "
+ "from server: {}", ctx.channel(), response.getRequestId());
}
long requestId = response.getRequestId();
CompletableFuture<?> requestFuture = pendingRequests.remove(requestId);

if (requestFuture != null && !requestFuture.isDone()) {
if (!response.hasError()) {
requestFuture.complete(null);
} else {
ServerError error = response.getError();
log.error("Got tc client connect response for request: {}, error: {}, errorMessage: {}",
response.getRequestId(), response.getError(), response.getMessage());
requestFuture.completeExceptionally(getExceptionByServerError(error, response.getMessage()));
}
} else {
log.warn("Tc client connect command has been completed and get response for request: {}",
response.getRequestId());
}
}

private TransactionMetaStoreHandler checkAndGetTransactionMetaStoreHandler(long tcId) {
TransactionMetaStoreHandler handler = transactionMetaStoreHandlers.get(tcId);
if (handler == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
import org.apache.pulsar.common.api.proto.CommandNewTxnResponse;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.api.proto.Subscription;
import org.apache.pulsar.common.api.proto.TxnAction;
Expand Down Expand Up @@ -119,31 +120,41 @@ public void connectionOpened(ClientCnx cnx) {

connectionHandler.setClientCnx(cnx);
cnx.registerTransactionMetaStoreHandler(transactionCoordinatorId, this);
long requestId = client.newRequestId();
ByteBuf request = Commands.newTcClientConnect(transactionCoordinatorId, requestId);

cnx.sendRequestWithId(request, requestId).thenRun(() -> {
LOG.info("Transaction coordinator client connect success! tcId : {}", transactionCoordinatorId);
if (!changeToReadyState()) {
setState(State.Closed);
cnx.channel().close();
}
// if broker protocol version < 19, don't send TcClientConnectRequest to broker.
if (cnx.getRemoteEndpointProtocolVersion() > ProtocolVersion.v18.getValue()) {
long requestId = client.newRequestId();
ByteBuf request = Commands.newTcClientConnectRequest(transactionCoordinatorId, requestId);

if (!this.connectFuture.isDone()) {
this.connectFuture.complete(null);
}
this.connectionHandler.resetBackoff();
}).exceptionally((e) -> {
LOG.error("Transaction coordinator client connect fail! tcId : {}", transactionCoordinatorId, e.getCause());
if (getState() == State.Closing || getState() == State.Closed
|| e.getCause() instanceof PulsarClientException.NotAllowedException) {
setState(State.Closed);
cnx.sendRequestWithId(request, requestId).thenRun(() -> {
LOG.info("Transaction coordinator client connect success! tcId : {}", transactionCoordinatorId);
if (!changeToReadyState()) {
setState(State.Closed);
cnx.channel().close();
}

if (!this.connectFuture.isDone()) {
this.connectFuture.complete(null);
}
this.connectionHandler.resetBackoff();
}).exceptionally((e) -> {
LOG.error("Transaction coordinator client connect fail! tcId : {}",
transactionCoordinatorId, e.getCause());
if (getState() == State.Closing || getState() == State.Closed
|| e.getCause() instanceof PulsarClientException.NotAllowedException) {
setState(State.Closed);
cnx.channel().close();
} else {
connectionHandler.reconnectLater(e.getCause());
}
return null;
});
} else {
if (!changeToReadyState()) {
cnx.channel().close();
} else {
connectionHandler.reconnectLater(e.getCause());
}
return null;
});
this.connectFuture.complete(null);
}
}

private void failPendingRequest() {
Expand Down Expand Up @@ -401,7 +412,7 @@ protected OpForVoidCallBack newObject(Handle<OpForVoidCallBack> handle) {
};
}

private TransactionCoordinatorClientException getExceptionByServerError(ServerError serverError, String msg) {
public static TransactionCoordinatorClientException getExceptionByServerError(ServerError serverError, String msg) {
switch (serverError) {
case TransactionCoordinatorNotFound:
return new TransactionCoordinatorClientException.CoordinatorNotFoundException(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
import org.apache.pulsar.common.api.proto.AuthMethod;
import org.apache.pulsar.common.api.proto.BaseCommand;
Expand Down Expand Up @@ -594,12 +595,28 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu
return serializeWithSize(cmd);
}

public static ByteBuf newTcClientConnect(long tcId, long requestId) {
BaseCommand cmd = localCmd(Type.TC_CLIENT_CONNECT);
cmd.setTcClientConnect().setTcId(tcId).setRequestId(requestId);
public static ByteBuf newTcClientConnectRequest(long tcId, long requestId) {
BaseCommand cmd = localCmd(Type.TC_CLIENT_CONNECT_REQUEST);
cmd.setTcClientConnectRequest().setTcId(tcId).setRequestId(requestId);
return serializeWithSize(cmd);
}

public static BaseCommand newTcClientConnectResponse(long requestId, ServerError error, String message) {
BaseCommand cmd = localCmd(Type.TC_CLIENT_CONNECT_RESPONSE);

CommandTcClientConnectResponse response = cmd.setTcClientConnectResponse()
.setRequestId(requestId);

if (error != null) {
response.setError(error);
}

if (message != null) {
response.setMessage(message);
}

return cmd;
}

private static KeySharedMode convertKeySharedMode(org.apache.pulsar.client.api.KeySharedMode mode) {
switch (mode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@
import org.apache.pulsar.common.api.proto.CommandSendReceipt;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSuccess;
import org.apache.pulsar.common.api.proto.CommandTcClientConnect;
import org.apache.pulsar.common.api.proto.CommandTcClientConnectRequest;
import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse;
import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.intercept.InterceptException;
Expand Down Expand Up @@ -361,9 +362,14 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
handleAuthResponse(cmd.getAuthResponse());
break;

case TC_CLIENT_CONNECT:
checkArgument(cmd.hasTcClientConnect());
handleTcClientConnect(cmd.getTcClientConnect());
case TC_CLIENT_CONNECT_REQUEST:
checkArgument(cmd.hasTcClientConnectRequest());
handleTcClientConnectRequest(cmd.getTcClientConnectRequest());
break;

case TC_CLIENT_CONNECT_RESPONSE:
checkArgument(cmd.hasTcClientConnectResponse());
handleTcClientConnectResponse(cmd.getTcClientConnectResponse());
break;

case NEW_TXN:
Expand Down Expand Up @@ -607,7 +613,11 @@ protected void handleAuthChallenge(CommandAuthChallenge commandAuthChallenge) {
throw new UnsupportedOperationException();
}

protected void handleTcClientConnect(CommandTcClientConnect tcClientConnect) {
protected void handleTcClientConnectRequest(CommandTcClientConnectRequest tcClientConnectRequest) {
throw new UnsupportedOperationException();
}

protected void handleTcClientConnectResponse(CommandTcClientConnectResponse tcClientConnectResponse) {
throw new UnsupportedOperationException();
}

Expand Down
15 changes: 12 additions & 3 deletions pulsar-common/src/main/proto/PulsarApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ enum ProtocolVersion {
v16 = 16; // Add support for broker entry metadata
v17 = 17; // Added support ack receipt
v18 = 18; // Add client support for broker entry metadata
v19 = 19; // Add CommandTcClientConnectRequest and CommandTcClientConnectResponse
}

message CommandConnect {
Expand Down Expand Up @@ -762,11 +763,17 @@ enum TxnAction {
ABORT = 1;
}

message CommandTcClientConnect {
message CommandTcClientConnectRequest {
required uint64 request_id = 1;
required uint64 tc_id = 2 [default = 0];
}

message CommandTcClientConnectResponse {
required uint64 request_id = 1;
optional ServerError error = 2;
optional string message = 3;
}

message CommandNewTxn {
required uint64 request_id = 1;
optional uint64 txn_ttl_seconds = 2 [default = 0];
Expand Down Expand Up @@ -946,7 +953,8 @@ message BaseCommand {

END_TXN_ON_SUBSCRIPTION = 60;
END_TXN_ON_SUBSCRIPTION_RESPONSE = 61;
TC_CLIENT_CONNECT = 62;
TC_CLIENT_CONNECT_REQUEST = 62;
TC_CLIENT_CONNECT_RESPONSE = 63;

}

Expand Down Expand Up @@ -1022,5 +1030,6 @@ message BaseCommand {
optional CommandEndTxnOnPartitionResponse endTxnOnPartitionResponse = 59;
optional CommandEndTxnOnSubscription endTxnOnSubscription = 60;
optional CommandEndTxnOnSubscriptionResponse endTxnOnSubscriptionResponse = 61;
optional CommandTcClientConnect tcClientConnect = 62;
optional CommandTcClientConnectRequest tcClientConnectRequest = 62;
optional CommandTcClientConnectResponse tcClientConnectResponse = 63;
}

0 comments on commit e29f720

Please sign in to comment.