Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-23529 Flaky test ThinClientPartitionAwarenessUnstableTopologyTest #11626

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,70 +62,108 @@
* Communication channel with failover and partition awareness.
*/
final class ReliableChannel implements AutoCloseable {
/** Channel factory. */
/**
* Channel factory.
*/
private final BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory;

/** Client channel holders for each configured address. */
/**
* Client channel holders for each configured address.
*/
private volatile List<ClientChannelHolder> channels;

/** Limit of attempts to execute each service. */
/**
* Limit of attempts to execute each service.
*/
private volatile int attemptsLimit;

/** Index of the current channel. */
/**
* Index of the current channel.
*/
private volatile int curChIdx = -1;

/** Partition awareness enabled. */
/**
* Partition awareness enabled.
*/
final boolean partitionAwarenessEnabled;

/** Cache partition awareness context. */
/**
* Cache partition awareness context.
*/
private final ClientCacheAffinityContext affinityCtx;

/** Nodes discovery context. */
/**
* Nodes discovery context.
*/
private final ClientDiscoveryContext discoveryCtx;

/** Client configuration. */
/**
* Client configuration.
*/
private final ClientConfiguration clientCfg;

/** Logger. */
/**
* Logger.
*/
private final IgniteLogger log;

/** Node channels. */
/**
* Node channels.
*/
private final Map<UUID, ClientChannelHolder> nodeChannels = new ConcurrentHashMap<>();

/** Channels reinit was scheduled. */
/**
* Channels reinit was scheduled.
*/
private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();

/** Timestamp of start of channels reinitialization. */
/**
* Timestamp of start of channels reinitialization.
*/
private volatile long startChannelsReInit;

/** Timestamp of finish of channels reinitialization. */
/**
* Timestamp of finish of channels reinitialization.
*/
private volatile long finishChannelsReInit;

/** Affinity map update is in progress. */
/**
* Affinity map update is in progress.
*/
private final AtomicBoolean affinityUpdateInProgress = new AtomicBoolean();

/** Channel is closed. */
/**
* Channel is closed.
*/
private volatile boolean closed;

/** Fail (disconnect) listeners. */
/**
* Fail (disconnect) listeners.
*/
private final ArrayList<Runnable> chFailLsnrs = new ArrayList<>();

/** Guard channels and curChIdx together. */
/**
* Guard channels and curChIdx together.
*/
private final ReadWriteLock curChannelsGuard = new ReentrantReadWriteLock();

/** Connection manager. */
/**
* Connection manager.
*/
private final ClientConnectionMultiplexer connMgr;

/** Open channels counter. */
/**
* Open channels counter.
*/
private final AtomicInteger channelsCnt = new AtomicInteger();

/**
* Constructor.
*/
ReliableChannel(
BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory,
ClientConfiguration clientCfg,
IgniteBinary binary
BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory,
ClientConfiguration clientCfg,
IgniteBinary binary
) {
if (chFactory == null)
throw new NullPointerException("chFactory");
Expand Down Expand Up @@ -154,7 +192,9 @@ final class ReliableChannel implements AutoCloseable {
log.debug("ReliableChannel created");
}

/** {@inheritDoc} */
/**
* {@inheritDoc}
*/
@Override public synchronized void close() {
if (log.isDebugEnabled())
log.debug("ReliableChannel stopping");
Expand All @@ -166,7 +206,7 @@ final class ReliableChannel implements AutoCloseable {
List<ClientChannelHolder> holders = channels;

if (holders != null) {
for (ClientChannelHolder hld: holders)
for (ClientChannelHolder hld : holders)
hld.close();
}

Expand All @@ -177,11 +217,11 @@ final class ReliableChannel implements AutoCloseable {
/**
* Send request and handle response.
*
* @throws ClientException Thrown by {@code payloadWriter} or {@code payloadReader}.
* @throws ClientException Thrown by {@code payloadWriter} or {@code payloadReader}.
* @throws ClientAuthenticationException When user name or password is invalid.
* @throws ClientAuthorizationException When user has no permission to perform operation.
* @throws ClientProtocolError When failed to handshake with server.
* @throws ClientServerError When failed to process request on server.
* @throws ClientAuthorizationException When user has no permission to perform operation.
* @throws ClientProtocolError When failed to handshake with server.
* @throws ClientServerError When failed to process request on server.
*/
public <T> T service(
ClientOperation op,
Expand All @@ -194,11 +234,11 @@ public <T> T service(
/**
* Send request to one of the passed nodes and handle response.
*
* @throws ClientException Thrown by {@code payloadWriter} or {@code payloadReader}.
* @throws ClientException Thrown by {@code payloadWriter} or {@code payloadReader}.
* @throws ClientAuthenticationException When user name or password is invalid.
* @throws ClientAuthorizationException When user has no permission to perform operation.
* @throws ClientProtocolError When failed to handshake with server.
* @throws ClientServerError When failed to process request on server.
* @throws ClientAuthorizationException When user has no permission to perform operation.
* @throws ClientProtocolError When failed to handshake with server.
* @throws ClientServerError When failed to process request on server.
*/
public <T> T service(
ClientOperation op,
Expand All @@ -220,9 +260,9 @@ public <T> T service(
* Send request and handle response asynchronously.
*/
public <T> IgniteClientFuture<T> serviceAsync(
ClientOperation op,
Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader
ClientOperation op,
Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader
) throws ClientException, ClientError {
CompletableFuture<T> fut = new CompletableFuture<>();

Expand Down Expand Up @@ -254,7 +294,9 @@ private <T> void handleServiceAsync(
}
}

/** */
/**
*
*/
private <T> Object applyOnClientChannelAsync(
final CompletableFuture<T> fut,
ClientChannel ch,
Expand Down Expand Up @@ -306,7 +348,7 @@ private <T> Object applyOnClientChannelAsync(
* Send request without payload and handle response.
*/
public <T> T service(ClientOperation op, Function<PayloadInputChannel, T> payloadReader)
throws ClientException, ClientError {
throws ClientException, ClientError {
return service(op, null, payloadReader);
}

Expand Down Expand Up @@ -451,9 +493,9 @@ private boolean affinityInfoIsUpToDate(int cacheId) {
return false;

Boolean result = applyOnNodeChannel(nodeId, channel ->
channel.service(ClientOperation.CACHE_PARTITIONS,
affinityCtx::writePartitionsUpdateRequest,
affinityCtx::readPartitionsUpdateResponse),
channel.service(ClientOperation.CACHE_PARTITIONS,
affinityCtx::writePartitionsUpdateRequest,
affinityCtx::readPartitionsUpdateResponse),
failures
);

Expand Down Expand Up @@ -804,7 +846,9 @@ private <T> T applyOnNodeChannel(
return null;
}

/** */
/**
*
*/
<T> T applyOnDefaultChannel(Function<ClientChannel, T> function, ClientOperation op) {
return applyOnDefaultChannel(function, op, null);
}
Expand Down Expand Up @@ -885,7 +929,9 @@ private <T> T applyOnDefaultChannel(
throw composeException(failures);
}

/** */
/**
*
*/
private ClientConnectionException composeException(List<ClientConnectionException> failures) {
if (F.isEmpty(failures))
return null;
Expand All @@ -901,7 +947,8 @@ private ClientConnectionException composeException(List<ClientConnectionExceptio
* Try apply specified {@code function} on a channel corresponding to {@code tryNodeId}.
* If failed then apply the function on any available channel.
*/
private <T> T applyOnNodeChannelWithFallback(UUID tryNodeId, Function<ClientChannel, T> function, ClientOperation op) {
private <T> T applyOnNodeChannelWithFallback(UUID tryNodeId, Function<ClientChannel, T> function,
ClientOperation op) {
ClientChannelHolder hld = nodeChannels.get(tryNodeId);

List<ClientConnectionException> failures = null;
Expand All @@ -928,7 +975,9 @@ private <T> T applyOnNodeChannelWithFallback(UUID tryNodeId, Function<ClientChan
return applyOnDefaultChannel(function, op, failures);
}

/** Get retry limit. */
/**
* Get retry limit.
*/
private int getRetryLimit() {
List<ClientChannelHolder> holders = channels;

Expand All @@ -940,7 +989,9 @@ private int getRetryLimit() {
return clientCfg.getRetryLimit() > 0 ? Math.min(clientCfg.getRetryLimit(), size) : size;
}

/** Determines whether specified operation should be retried. */
/**
* Determines whether specified operation should be retried.
*/
private boolean shouldRetry(ClientOperation op, int iteration, ClientConnectionException exception) {
ClientOperationType opType = op.toPublicOperationType();

Expand Down Expand Up @@ -979,7 +1030,9 @@ ClientCacheAffinityContext affinityContext() {
return affinityCtx;
}

/** */
/**
*
*/
private boolean isConnectionEstablished(UUID node) {
ClientChannelHolder chHolder = nodeChannels.get(node);

Expand All @@ -996,19 +1049,29 @@ private boolean isConnectionEstablished(UUID node) {
*/
@SuppressWarnings("PackageVisibleInnerClass") // Visible for tests.
class ClientChannelHolder {
/** Channel configuration. */
/**
* Channel configuration.
*/
private volatile ClientChannelConfiguration chCfg;

/** Channel. */
/**
* Channel.
*/
private volatile ClientChannel ch;

/** ID of the last server node that {@link #ch} is or was connected to. */
/**
* ID of the last server node that {@link #ch} is or was connected to.
*/
private volatile UUID serverNodeId;

/** Address that holder is bind to (chCfg.addr) is not in use now. So close the holder. */
/**
* Address that holder is bind to (chCfg.addr) is not in use now. So close the holder.
*/
private volatile boolean close;

/** Timestamps of reconnect retries. */
/**
* Timestamps of reconnect retries.
*/
private final long[] reconnectRetries;

/**
Expand Down Expand Up @@ -1058,13 +1121,13 @@ private ClientChannel getOrCreateChannel(boolean ignoreThrottling)
throw new ClientConnectionException("Channel is closed [addresses=" + getAddresses() + ']');

if (ch == null) {
if (ch != null)
return ch;

synchronized (this) {
if (close)
throw new ClientConnectionException("Channel is closed [addresses=" + getAddresses() + ']');

if (ch != null)
return ch;

if (!ignoreThrottling && applyReconnectionThrottling())
throw new ClientConnectionException("Reconnect is not allowed due to applied throttling" +
" [addresses=" + getAddresses() + ']');
Expand All @@ -1087,7 +1150,6 @@ private ClientChannel getOrCreateChannel(boolean ignoreThrottling)
nodeChannels.putIfAbsent(channel.serverNodeId(), this);
}
}

ch = channel;

channelsCnt.incrementAndGet();
Expand Down