diff --git a/src/Knet.Kudu.Client/KuduClient.cs b/src/Knet.Kudu.Client/KuduClient.cs index 833a2303..1f7db769 100644 --- a/src/Knet.Kudu.Client/KuduClient.cs +++ b/src/Knet.Kudu.Client/KuduClient.cs @@ -1223,8 +1223,15 @@ private ValueTask GetMasterLeaderInfoAsync( return new ValueTask(masterLeaderInfo); } - var task = ConnectToClusterAsync(cancellationToken); + var task = ConnectAsync(cancellationToken); return new ValueTask(task); + + async Task ConnectAsync(CancellationToken cancellationToken) + { + var rpc = new ConnectToMasterRequest(); + await SendRpcAsync(rpc, cancellationToken).ConfigureAwait(false); + return _masterLeaderInfo; + } } /// @@ -1431,7 +1438,7 @@ internal async Task SendRpcAsync( { KuduMasterRpc masterRpc => SendRpcToMasterAsync(masterRpc, token), KuduTabletRpc tabletRpc => SendRpcToTabletAsync(tabletRpc, token), - KuduTxnRpc txnRpc => SendRpcToTxnAsync(txnRpc, token), + KuduTxnRpc txnRpc => SendRpcToTxnManagerAsync(txnRpc, token), _ => throw new NotSupportedException() }; @@ -1485,28 +1492,50 @@ private void CompleteTrackedRpc(KuduRpc rpc) } } - private async Task SendRpcToMasterAsync( + private Task SendRpcToMasterAsync( KuduMasterRpc rpc, CancellationToken cancellationToken) { - var masterLeaderInfo = await GetMasterLeaderInfoAsync(cancellationToken) - .ConfigureAwait(false); + var masterLeaderInfo = _masterLeaderInfo; + if (masterLeaderInfo is not null) + { + var serverInfo = masterLeaderInfo.ServerInfo; + return SendRpcToMasterAsync(rpc, serverInfo, cancellationToken); + } - var serverInfo = masterLeaderInfo.ServerInfo; + return SendAsync(rpc, cancellationToken); - return await SendRpcToMasterAsync(rpc, serverInfo, cancellationToken) - .ConfigureAwait(false); + async Task SendAsync(KuduMasterRpc rpc, CancellationToken cancellationToken) + { + var masterLeaderInfo = await ConnectToClusterAsync(cancellationToken) + .ConfigureAwait(false); + + var serverInfo = masterLeaderInfo.ServerInfo; + return await SendRpcToMasterAsync(rpc, serverInfo, cancellationToken) + .ConfigureAwait(false); + } } - private async Task SendRpcToTxnAsync( + private Task SendRpcToTxnManagerAsync( KuduTxnRpc rpc, CancellationToken cancellationToken) { - var masterLeaderInfo = await GetMasterLeaderInfoAsync(cancellationToken) - .ConfigureAwait(false); + var masterLeaderInfo = _masterLeaderInfo; + if (masterLeaderInfo is not null) + { + var serverInfo = masterLeaderInfo.ServerInfo; + return SendRpcToTxnManagerAsync(rpc, serverInfo, cancellationToken); + } - var serverInfo = masterLeaderInfo.ServerInfo; + return SendAsync(rpc, cancellationToken); - return await SendRpcToTxnAsync(rpc, serverInfo, cancellationToken) - .ConfigureAwait(false); + async Task SendAsync(KuduTxnRpc rpc, CancellationToken cancellationToken) + { + var masterLeaderInfo = await ConnectToClusterAsync(cancellationToken) + .ConfigureAwait(false); + + var serverInfo = masterLeaderInfo.ServerInfo; + return await SendRpcToTxnManagerAsync(rpc, serverInfo, cancellationToken) + .ConfigureAwait(false); + } } /// @@ -1617,7 +1646,7 @@ await SendRpcToServerGenericAsync(rpc, serverInfo, cancellationToken) return rpc.Output; } - private async Task SendRpcToTxnAsync( + private async Task SendRpcToTxnManagerAsync( KuduTxnRpc rpc, ServerInfo serverInfo, CancellationToken cancellationToken)