Skip to content

Commit

Permalink
Merge pull request #7 from pgomulka/fix/observer-logging
Browse files Browse the repository at this point in the history
Refactor to use setOnce  and observable
  • Loading branch information
pgomulka authored Jan 23, 2019
2 parents 53ead59 + 72bd776 commit ee3322c
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import org.apache.logging.log4j.core.pattern.ConverterKeys;
import org.apache.logging.log4j.core.pattern.LogEventPatternConverter;
import org.apache.logging.log4j.core.pattern.PatternConverter;
import org.apache.lucene.util.SetOnce;

import java.util.Locale;
import java.util.concurrent.atomic.AtomicReference;

/**
* Pattern converter to format the node_and_cluster_id variable into JSON fields <code>node.id</code> and <code>cluster.uuid</code>.
Expand All @@ -35,7 +35,7 @@
@Plugin(category = PatternConverter.CATEGORY, name = "NodeAndClusterIdConverter")
@ConverterKeys({"node_and_cluster_id"})
public final class NodeAndClusterIdConverter extends LogEventPatternConverter {
private static final AtomicReference<String> nodeAndClusterId = new AtomicReference<>();
private static final SetOnce<String> nodeAndClusterId = new SetOnce<>();

/**
* Called by log4j2 to initialize this converter.
Expand All @@ -49,14 +49,14 @@ public NodeAndClusterIdConverter() {
}

/**
* Updates only once the clusterID and nodeId
* Updates only once the clusterID and nodeId.
* Subsequent executions will throw {@link org.apache.lucene.util.SetOnce.AlreadySetException}.
*
* @param nodeId a nodeId received from cluster state update
* @param clusterUUID a clusterId received from cluster state update
* @return true if the update was for the first time (successful) or false if for another calls (does not updates)
*/
public static boolean setOnce(String nodeId, String clusterUUID) {
return nodeAndClusterId.compareAndSet(null, formatIds(clusterUUID, nodeId));
public static void setNodeIdAndClusterId(String nodeId, String clusterUUID) {
nodeAndClusterId.set(formatIds(clusterUUID, nodeId));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,55 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.unit.TimeValue;

/**
* The {@link NodeAndClusterIdStateListener} listens to cluster state changes and ONLY when receives the first update
* it sets the clusterUUID and nodeID in log4j pattern converter {@link NodeAndClusterIdConverter}
* The {@link NodeAndClusterIdStateListener} listens to cluster state changes and ONLY when receives the first update
* it sets the clusterUUID and nodeID in log4j pattern converter {@link NodeAndClusterIdConverter}.
* Once the first update is received, it will automatically be de-registered from subsequent updates.
*/
public class NodeAndClusterIdStateListener implements ClusterStateListener {
public class NodeAndClusterIdStateListener implements ClusterStateObserver.Listener {
private final Logger logger = LogManager.getLogger(NodeAndClusterIdStateListener.class);

@Override
public void clusterChanged(ClusterChangedEvent event) {
DiscoveryNode localNode = event.state().getNodes().getLocalNode();
String clusterUUID = event.state().getMetaData().clusterUUID();
String nodeId = localNode.getId();
private NodeAndClusterIdStateListener() {}

/**
* Subscribes for the first cluster state update where nodeId and clusterId is present
* and sets these values in {@link NodeAndClusterIdConverter}.
* @param observer - the observer that the listener subscribes for an update.
*/
public static void getAndSetNodeIdAndClusterId(ClusterStateObserver observer) {
observer.waitForNextChange(new NodeAndClusterIdStateListener(), NodeAndClusterIdStateListener::isNodeAndClusterIdPresent);
}

private static boolean isNodeAndClusterIdPresent(ClusterState clusterState) {
return getNodeId(clusterState) != null && getClusterUUID(clusterState) != null;
}

private static String getClusterUUID(ClusterState state) {
return state.getMetaData().clusterUUID();
}

boolean wasSet = NodeAndClusterIdConverter.setOnce(nodeId, clusterUUID);
private static String getNodeId(ClusterState state) {
DiscoveryNode localNode = state.getNodes().getLocalNode();
return localNode.getId();
}

@Override
public void onNewClusterState(ClusterState state) {
String nodeId = getNodeId(state);
String clusterUUID = getClusterUUID(state);

if (wasSet) {
logger.debug("Received first cluster state update. Setting nodeId=[{}] and clusterUuid=[{}]", nodeId, clusterUUID);
}
logger.debug("Received cluster state update. Setting nodeId=[{}] and clusterUuid=[{}]", nodeId, clusterUUID);
NodeAndClusterIdConverter.setNodeIdAndClusterId(nodeId, clusterUUID);
}

@Override
public void onClusterServiceClose() {}

@Override
public void onTimeout(TimeValue timeout) {}
}
5 changes: 2 additions & 3 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,6 @@ protected Node(
namedWriteableRegistry).stream())
.collect(Collectors.toList());

NodeAndClusterIdStateListener nodeAndClusterIdConverter = new NodeAndClusterIdStateListener();
clusterService.addListener(nodeAndClusterIdConverter);

ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),
threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService);
Expand Down Expand Up @@ -717,7 +714,9 @@ public void onTimeout(TimeValue timeout) {
} catch (InterruptedException e) {
throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
}

}
NodeAndClusterIdStateListener.getAndSetNodeIdAndClusterId(observer);
}

injector.getInstance(HttpServerTransport.class).start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public abstract class JsonLogsIntegTestCase extends ESRestTestCase {

public void testElementsPresentOnAllLinesOfLog() throws IOException {
JsonLogLine firstLine = findFirstLine();
assertNotNull(firstLine);

try (Stream<JsonLogLine> stream = JsonLogsStream.from(openReader(getLogFile()))) {
stream.limit(LINES_TO_CHECK)
Expand Down Expand Up @@ -102,6 +103,7 @@ public void testNodeIdAndClusterIdConsistentOnceAvailable() throws IOException {
firstLine = jsonLogLine;
}
}
assertNotNull(firstLine);

//once the nodeId and clusterId are received, they should be the same on remaining lines

Expand Down

0 comments on commit ee3322c

Please sign in to comment.