From 56800274572e1c48df51e5d482aa62c80a029463 Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Thu, 31 Oct 2024 14:48:59 -0500 Subject: [PATCH 1/2] Start compression, but curator requires commands set it explicitly. --- .../org/apache/solr/cloud/ZkShardTerms.java | 13 +++-- .../solr/common/cloud/SolrZkClient.java | 53 ++---------------- .../cloud/SolrZkCompressionProvider.java | 54 +++++++++++++++++++ 3 files changed, 63 insertions(+), 57 deletions(-) create mode 100644 solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkCompressionProvider.java diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java index 83f8885325b..508d2bb0114 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java @@ -406,21 +406,20 @@ private void registerWatcher() throws KeeperException { Watcher watcher = event -> { // Don't do anything if we are closed - if (isClosed.get()) { + if (isClosed.get() || !event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) { return; } // session events are not change events, and do not remove the watcher if (Watcher.Event.EventType.None == event.getType()) { return; } + // Some events may be missed during registering a watcher, so it is safer to refresh terms + // after registering watcher retryRegisterWatcher(); - // if term node is deleted, refresh cannot possibly succeed - if (Watcher.Event.EventType.NodeDeleted == event.getType()) { - return; + // Only refresh the data if the node was created or its data changed. + if (Watcher.Event.EventType.NodeCreated == event.getType() || Watcher.Event.EventType.NodeDataChanged == event.getType()) { + refreshTerms(); } - // Some events may be missed during register a watcher, so it is safer to refresh terms - // after registering watcher - refreshTerms(); }; try { // exists operation is faster than getData operation diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java index 9ae720cb180..ffbbdb155ef 100644 --- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java +++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java @@ -58,7 +58,6 @@ import org.apache.solr.common.util.ReflectMapWriter; import org.apache.solr.common.util.SolrNamedThreadFactory; import org.apache.solr.common.util.StrUtils; -import org.apache.solr.common.util.ZLibCompressor; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; @@ -86,9 +85,6 @@ public class SolrZkClient implements Closeable { private final ZkMetrics metrics = new ZkMetrics(); - private Compressor compressor; - private int minStateByteLenForCompression; - public MapWriter getMetrics() { return metrics::writeMap; } @@ -199,6 +195,7 @@ private SolrZkClient( .authorization(zkCredentialsProvider.getCredentials()) .retryPolicy(retryPolicy) .runSafeService(curatorSafeServiceExecutor) + .compressionProvider(new SolrZkCompressionProvider(compressor, minStateByteLenForCompression)) .build(); if (onReconnect != null) { client @@ -228,18 +225,6 @@ private SolrZkClient( } assert ObjectReleaseTracker.track(this); - if (aclProvider == null) { - this.aclProvider = useDefaultCredsAndACLs ? createACLProvider() : new DefaultZkACLProvider(); - } else { - this.aclProvider = aclProvider; - } - - if (compressor == null) { - this.compressor = new ZLibCompressor(); - } else { - this.compressor = compressor; - } - this.minStateByteLenForCompression = minStateByteLenForCompression; } public CuratorFramework getCuratorFramework() { @@ -444,18 +429,6 @@ public byte[] getData( .storingStatIn(stat) .usingWatcher(wrapWatcher(watcher)) .forPath(path)); - if (compressor.isCompressedBytes(result)) { - log.debug("Zookeeper data at path {} is compressed", path); - try { - result = compressor.decompressBytes(result); - } catch (Exception e) { - throw new SolrException( - SolrException.ErrorCode.SERVER_ERROR, - String.format( - Locale.ROOT, "Unable to decompress data at path: %s from zookeeper", path), - e); - } - } metrics.reads.increment(); if (result != null) { metrics.bytesRead.add(result.length); @@ -470,16 +443,11 @@ public NodeData getNode(final String path, Watcher watcher, boolean retryOnConnL } /** Returns node's state */ - public Stat setData(final String path, byte data[], final int version, boolean retryOnConnLoss) + public Stat setData(final String path, final byte[] data, final int version, boolean retryOnConnLoss) throws KeeperException, InterruptedException { - if (SolrZkClient.shouldCompressData(data, path, minStateByteLenForCompression)) { - // state.json should be compressed before being put to ZK - data = compressor.compressBytes(data); - } - final byte[] finalData = data; Stat result = runWithCorrectThrows( - "setting data", () -> client.setData().withVersion(version).forPath(path, finalData)); + "setting data", () -> client.setData().withVersion(version).forPath(path, data)); metrics.writes.increment(); if (data != null) { metrics.bytesWritten.add(data.length); @@ -675,11 +643,6 @@ public void makePath( createBuilder.orSetData(); } - if (SolrZkClient.shouldCompressData(data, path, minStateByteLenForCompression)) { - // state.json should be compressed before being put to ZK - data = compressor.compressBytes(data); - } - metrics.writes.increment(); if (data != null) { metrics.bytesWritten.add(data.length); @@ -1353,14 +1316,4 @@ public SolrZkClient build() { return new SolrZkClient(this); } } - - static boolean shouldCompressData(byte[] data, String path, int minStateByteLenForCompression) { - if (path.endsWith("state.json") - && minStateByteLenForCompression > -1 - && data.length > minStateByteLenForCompression) { - // state.json should be compressed before being put to ZK - return true; - } - return false; - } } diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkCompressionProvider.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkCompressionProvider.java new file mode 100644 index 00000000000..67317224b98 --- /dev/null +++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkCompressionProvider.java @@ -0,0 +1,54 @@ +package org.apache.solr.common.cloud; + +import org.apache.curator.framework.api.CompressionProvider; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.util.Compressor; +import org.apache.solr.common.util.ZLibCompressor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.util.Locale; + +public class SolrZkCompressionProvider implements CompressionProvider { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final Compressor compressor; + private final int minStateByteLenForCompression; + + public SolrZkCompressionProvider(Compressor compressor, int minStateByteLenForCompression) { + this.compressor = compressor != null ? compressor : new ZLibCompressor(); + this.minStateByteLenForCompression = minStateByteLenForCompression; + } + + @Override + public byte[] compress(String path, byte[] data) throws Exception { + if (path.endsWith("state.json") + && minStateByteLenForCompression > -1 + && data.length >= minStateByteLenForCompression) { + // state.json should be compressed before being put to ZK + return compressor.compressBytes(data); + } else { + return data; + } + } + + @Override + public byte[] decompress(String path, byte[] compressedData) throws SolrException { + if (compressor.isCompressedBytes(compressedData)) { + log.info("Zookeeper data at path {} is compressed", path); + try { + return compressor.decompressBytes(compressedData); + } catch (Exception e) { + throw new SolrException( + SolrException.ErrorCode.SERVER_ERROR, + String.format( + Locale.ROOT, "Unable to decompress data at path: %s from zookeeper", path), + e); + } + } else { + return compressedData; + } + } +} From 8aa3d02856c7e141abb80f422b51b00a91463b2f Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Wed, 6 Nov 2024 10:25:51 -0600 Subject: [PATCH 2/2] Remove zkSHardTerms stuff --- .../java/org/apache/solr/cloud/ZkShardTerms.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java index 508d2bb0114..83f8885325b 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java @@ -406,20 +406,21 @@ private void registerWatcher() throws KeeperException { Watcher watcher = event -> { // Don't do anything if we are closed - if (isClosed.get() || !event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) { + if (isClosed.get()) { return; } // session events are not change events, and do not remove the watcher if (Watcher.Event.EventType.None == event.getType()) { return; } - // Some events may be missed during registering a watcher, so it is safer to refresh terms - // after registering watcher retryRegisterWatcher(); - // Only refresh the data if the node was created or its data changed. - if (Watcher.Event.EventType.NodeCreated == event.getType() || Watcher.Event.EventType.NodeDataChanged == event.getType()) { - refreshTerms(); + // if term node is deleted, refresh cannot possibly succeed + if (Watcher.Event.EventType.NodeDeleted == event.getType()) { + return; } + // Some events may be missed during register a watcher, so it is safer to refresh terms + // after registering watcher + refreshTerms(); }; try { // exists operation is faster than getData operation