-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor to use setOnce #7
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ | |
import org.apache.logging.log4j.core.pattern.ConverterKeys; | ||
import org.apache.logging.log4j.core.pattern.LogEventPatternConverter; | ||
import org.apache.logging.log4j.core.pattern.PatternConverter; | ||
import org.apache.lucene.util.SetOnce; | ||
|
||
import java.util.Locale; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
|
@@ -35,7 +36,7 @@ | |
@Plugin(category = PatternConverter.CATEGORY, name = "NodeAndClusterIdConverter") | ||
@ConverterKeys({"node_and_cluster_id"}) | ||
public final class NodeAndClusterIdConverter extends LogEventPatternConverter { | ||
private static final AtomicReference<String> nodeAndClusterId = new AtomicReference<>(); | ||
private static final SetOnce<String> nodeAndClusterId = new SetOnce<>(); | ||
|
||
/** | ||
* Called by log4j2 to initialize this converter. | ||
|
@@ -49,14 +50,14 @@ public NodeAndClusterIdConverter() { | |
} | ||
|
||
/** | ||
* Updates only once the clusterID and nodeId | ||
* Updates only once the clusterID and nodeId. | ||
* Note: Should only be called once. Subsequent executions will throw {@link org.apache.lucene.util.SetOnce.AlreadySetException}. | ||
* | ||
* @param nodeId a nodeId received from cluster state update | ||
* @param clusterUUID a clusterId received from cluster state update | ||
* @return true if the update was for the first time (successful) or false if for another calls (does not updates) | ||
*/ | ||
public static boolean setOnce(String nodeId, String clusterUUID) { | ||
return nodeAndClusterId.compareAndSet(null, formatIds(clusterUUID, nodeId)); | ||
public static void setOnce(String nodeId, String clusterUUID) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I still dont' like this being "setOnce", I think it should be parallel to the node name listener? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as this is almost the same as NodeNameConverter I think you are right this method should be name in similar manner. Will rename to |
||
nodeAndClusterId.set(formatIds(clusterUUID, nodeId)); | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,27 +21,53 @@ | |
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.elasticsearch.cluster.ClusterChangedEvent; | ||
import org.elasticsearch.cluster.ClusterStateListener; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.ClusterStateObserver; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
|
||
/** | ||
* The {@link NodeAndClusterIdStateListener} listens to cluster state changes and ONLY when receives the first update | ||
* it sets the clusterUUID and nodeID in log4j pattern converter {@link NodeAndClusterIdConverter} | ||
* The {@link NodeAndClusterIdStateListener} listens to cluster state changes and ONLY when receives the first update | ||
* it sets the clusterUUID and nodeID in log4j pattern converter {@link NodeAndClusterIdConverter} | ||
* Once the first update is received, it will automatically be de-registered from subsequent updates | ||
*/ | ||
public class NodeAndClusterIdStateListener implements ClusterStateListener { | ||
public class NodeAndClusterIdStateListener implements ClusterStateObserver.Listener { | ||
private final Logger logger = LogManager.getLogger(NodeAndClusterIdStateListener.class); | ||
|
||
@Override | ||
public void clusterChanged(ClusterChangedEvent event) { | ||
DiscoveryNode localNode = event.state().getNodes().getLocalNode(); | ||
String clusterUUID = event.state().getMetaData().clusterUUID(); | ||
String nodeId = localNode.getId(); | ||
private NodeAndClusterIdStateListener() {} | ||
|
||
/** | ||
* Subscribes for the first cluster state update where nodeId and clusterId is set. | ||
*/ | ||
public static void subscribeTo(ClusterStateObserver observer) { | ||
observer.waitForNextChange(new NodeAndClusterIdStateListener(), NodeAndClusterIdStateListener::nodeIdAndClusterIdSet); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this have a timeout? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's ok not to specify a timeout here. |
||
} | ||
|
||
private static boolean nodeIdAndClusterIdSet(ClusterState clusterState) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "set" here threw me off at first (sounds like a setter). Maybe have this be more a boolean name, like "isNodeAndClusterIdReady"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe |
||
return getNodeId(clusterState) != null && getClusterUUID(clusterState) != null; | ||
} | ||
|
||
private static String getClusterUUID(ClusterState state) { | ||
return state.getMetaData().clusterUUID(); | ||
} | ||
|
||
boolean wasSet = NodeAndClusterIdConverter.setOnce(nodeId, clusterUUID); | ||
private static String getNodeId(ClusterState state) { | ||
DiscoveryNode localNode = state.getNodes().getLocalNode(); | ||
return localNode.getId(); | ||
} | ||
|
||
@Override | ||
public void onNewClusterState(ClusterState state) { | ||
String nodeId = getNodeId(state); | ||
String clusterUUID = getClusterUUID(state); | ||
|
||
if (wasSet) { | ||
logger.debug("Received first cluster state update. Setting nodeId=[{}] and clusterUuid=[{}]", nodeId, clusterUUID); | ||
} | ||
NodeAndClusterIdConverter.setOnce(nodeId, clusterUUID); | ||
logger.debug("Received first cluster state update. Setting nodeId=[{}] and clusterUuid=[{}]", nodeId, clusterUUID); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd prefer to move this up before we set it because this statement is never executed if there is a second attempt and we're failing. If it is one line up it would provide at least some ability to see what's going on. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good pint. will remove the |
||
} | ||
|
||
@Override | ||
public void onClusterServiceClose() {} | ||
|
||
@Override | ||
public void onTimeout(TimeValue timeout) {} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -438,9 +438,6 @@ protected Node( | |
namedWriteableRegistry).stream()) | ||
.collect(Collectors.toList()); | ||
|
||
NodeAndClusterIdStateListener nodeAndClusterIdConverter = new NodeAndClusterIdStateListener(); | ||
clusterService.addListener(nodeAndClusterIdConverter); | ||
|
||
ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(), | ||
settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), | ||
threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService); | ||
|
@@ -717,6 +714,8 @@ public void onTimeout(TimeValue timeout) { | |
} catch (InterruptedException e) { | ||
throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state"); | ||
} | ||
|
||
NodeAndClusterIdStateListener.subscribeTo(observer); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The name subscribeTo looks odd here, since this is a one time use. Perhaps the name should reflect that, like "getAndSetClusterIds"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, note that this is inside a block which will not run if the join has already happened. I think this needs to be moved outside this block. |
||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Scratch "
Note: Should only be called once.
"?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, the previous line should make it clear that it should only be called once