-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extract remote "sniffing" to connection strategy #47253
Merged
Tim-Brooks
merged 16 commits into
elastic:master
from
Tim-Brooks:remote_connection_strategy
Sep 30, 2019
Merged
Changes from all commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
b0942e9
WIP
Tim-Brooks 4fe35ed
Merge remote-tracking branch 'upstream/master' into remote_connection…
Tim-Brooks d6393ce
WIP
Tim-Brooks 63a7c0b
Changes
Tim-Brooks 5d41424
Merge remote-tracking branch 'upstream/master' into remote_connection…
Tim-Brooks e61b3ca
WIP
Tim-Brooks a82e098
WIP
Tim-Brooks 60ccefb
Changes
Tim-Brooks 0d5d06d
Merge remote-tracking branch 'upstream/master' into remote_connection…
Tim-Brooks bcae80f
More testing
Tim-Brooks 690dca6
Work on tests
Tim-Brooks 72d9ddd
Merge remote-tracking branch 'upstream/master' into remote_connection…
Tim-Brooks 7b79c8e
WIP
Tim-Brooks cb94cd0
Checkstyle
Tim-Brooks a6cc494
review comments
Tim-Brooks ca40fd5
Remove listener in strategy
Tim-Brooks File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
378 changes: 19 additions & 359 deletions
378
server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
164 changes: 164 additions & 0 deletions
164
server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.transport; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.apache.logging.log4j.message.ParameterizedMessage; | ||
import org.apache.lucene.store.AlreadyClosedException; | ||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.action.support.ContextPreservingActionListener; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.common.util.concurrent.AbstractRunnable; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
|
||
import java.io.Closeable; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.RejectedExecutionException; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
public abstract class RemoteConnectionStrategy implements TransportConnectionListener, Closeable { | ||
|
||
protected static final Logger logger = LogManager.getLogger(RemoteConnectionStrategy.class); | ||
|
||
private static final int MAX_LISTENERS = 100; | ||
private final AtomicBoolean closed = new AtomicBoolean(false); | ||
private final Object mutex = new Object(); | ||
private final ThreadPool threadPool; | ||
protected final RemoteConnectionManager connectionManager; | ||
private List<ActionListener<Void>> listeners = new ArrayList<>(); | ||
|
||
RemoteConnectionStrategy(ThreadPool threadPool, RemoteConnectionManager connectionManager) { | ||
this.threadPool = threadPool; | ||
this.connectionManager = connectionManager; | ||
connectionManager.getConnectionManager().addListener(this); | ||
} | ||
|
||
/** | ||
* Triggers a connect round unless there is one running already. If there is a connect round running, the listener will either | ||
* be queued or rejected and failed. | ||
*/ | ||
void connect(ActionListener<Void> connectListener) { | ||
boolean runConnect = false; | ||
final ActionListener<Void> listener = | ||
ContextPreservingActionListener.wrapPreservingContext(connectListener, threadPool.getThreadContext()); | ||
boolean closed; | ||
synchronized (mutex) { | ||
closed = this.closed.get(); | ||
if (closed) { | ||
assert listeners.isEmpty(); | ||
} else { | ||
if (listeners.size() >= MAX_LISTENERS) { | ||
assert listeners.size() == MAX_LISTENERS; | ||
listener.onFailure(new RejectedExecutionException("connect listener queue is full")); | ||
return; | ||
} else { | ||
listeners.add(listener); | ||
} | ||
runConnect = listeners.size() == 1; | ||
} | ||
} | ||
if (closed) { | ||
connectListener.onFailure(new AlreadyClosedException("connect handler is already closed")); | ||
return; | ||
} | ||
if (runConnect) { | ||
ExecutorService executor = threadPool.executor(ThreadPool.Names.MANAGEMENT); | ||
executor.submit(new AbstractRunnable() { | ||
@Override | ||
public void onFailure(Exception e) { | ||
ActionListener.onFailure(getAndClearListeners(), e); | ||
} | ||
|
||
@Override | ||
protected void doRun() { | ||
connectImpl(new ActionListener<>() { | ||
@Override | ||
public void onResponse(Void aVoid) { | ||
ActionListener.onResponse(getAndClearListeners(), aVoid); | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
ActionListener.onFailure(getAndClearListeners(), e); | ||
} | ||
}); | ||
} | ||
}); | ||
} | ||
} | ||
|
||
@Override | ||
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { | ||
if (shouldOpenMoreConnections()) { | ||
// try to reconnect and fill up the slot of the disconnected node | ||
connect(ActionListener.wrap( | ||
ignore -> logger.trace("successfully connected after disconnect of {}", node), | ||
e -> logger.trace(() -> new ParameterizedMessage("failed to connect after disconnect of {}", node), e))); | ||
} | ||
} | ||
|
||
@Override | ||
public void close() { | ||
final List<ActionListener<Void>> toNotify; | ||
synchronized (mutex) { | ||
if (closed.compareAndSet(false, true)) { | ||
connectionManager.getConnectionManager().removeListener(this); | ||
toNotify = listeners; | ||
listeners = Collections.emptyList(); | ||
} else { | ||
toNotify = Collections.emptyList(); | ||
} | ||
} | ||
ActionListener.onFailure(toNotify, new AlreadyClosedException("connect handler is already closed")); | ||
} | ||
|
||
public boolean isClosed() { | ||
return closed.get(); | ||
} | ||
|
||
// for testing only | ||
boolean assertNoRunningConnections() { | ||
synchronized (mutex) { | ||
assert listeners.isEmpty(); | ||
} | ||
return true; | ||
} | ||
|
||
protected abstract boolean shouldOpenMoreConnections(); | ||
|
||
protected abstract void connectImpl(ActionListener<Void> listener); | ||
|
||
private List<ActionListener<Void>> getAndClearListeners() { | ||
final List<ActionListener<Void>> result; | ||
synchronized (mutex) { | ||
if (listeners.isEmpty()) { | ||
result = Collections.emptyList(); | ||
} else { | ||
result = listeners; | ||
listeners = new ArrayList<>(); | ||
} | ||
} | ||
return result; | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we not doing this already in the constructor of RemoteClusterConnection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you're adding the listener, it would be best to remove the listener in this same class, and not in RemoteClusterConnection