diff --git a/CHANGELOG.md b/CHANGELOG.md index be4c0d77a2633..9fb3d1c56e68e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -80,6 +80,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for wrapping CollectorManager with profiling during concurrent execution ([#9129](https://github.com/opensearch-project/OpenSearch/pull/9129)) - Rethrow OpenSearch exception for non-concurrent path while using concurrent search ([#9177](https://github.com/opensearch-project/OpenSearch/pull/9177)) - Improve performance of encoding composite keys in multi-term aggregations ([#9412](https://github.com/opensearch-project/OpenSearch/pull/9412)) +- Refactor Compressors from CompressorFactory to CompressorRegistry for extensibility ([#9262](https://github.com/opensearch-project/OpenSearch/pull/9262)) ### Deprecated diff --git a/gradle/missing-javadoc.gradle b/gradle/missing-javadoc.gradle index e006b4309deea..ab2eddf16eacf 100644 --- a/gradle/missing-javadoc.gradle +++ b/gradle/missing-javadoc.gradle @@ -180,6 +180,7 @@ configure([ configure([ project(":libs:opensearch-common"), project(":libs:opensearch-core"), + project(":libs:opensearch-compress"), project(":server") ]) { project.tasks.withType(MissingJavadocTask) { diff --git a/libs/compress/build.gradle b/libs/compress/build.gradle new file mode 100644 index 0000000000000..7a5bc2f573dea --- /dev/null +++ b/libs/compress/build.gradle @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +apply plugin: 'opensearch.build' +apply plugin: 'opensearch.publish' + +base { + archivesName = 'opensearch-compress' +} + +dependencies { + api project(':libs:opensearch-common') + api project(':libs:opensearch-core') + + //zstd + api "com.github.luben:zstd-jni:${versions.zstd}" + + testImplementation(project(":test:framework")) { + // tests use the locally compiled version of server + exclude group: 'org.opensearch', module: 'opensearch-compress' + } +} + +tasks.named('forbiddenApisMain').configure { + // :libs:opensearch-compress does not depend on server + // TODO: Need to decide how we want to handle for forbidden signatures with the changes to server + replaceSignatureFiles 'jdk-signatures' +} + +jarHell.enabled = false diff --git a/server/licenses/zstd-jni-1.5.5-5.jar.sha1 b/libs/compress/licenses/zstd-jni-1.5.5-5.jar.sha1 similarity index 100% rename from server/licenses/zstd-jni-1.5.5-5.jar.sha1 rename to libs/compress/licenses/zstd-jni-1.5.5-5.jar.sha1 diff --git a/server/licenses/zstd-jni-LICENSE.txt b/libs/compress/licenses/zstd-jni-LICENSE.txt similarity index 100% rename from server/licenses/zstd-jni-LICENSE.txt rename to libs/compress/licenses/zstd-jni-LICENSE.txt diff --git a/server/licenses/zstd-jni-NOTICE.txt b/libs/compress/licenses/zstd-jni-NOTICE.txt similarity index 100% rename from server/licenses/zstd-jni-NOTICE.txt rename to libs/compress/licenses/zstd-jni-NOTICE.txt diff --git a/server/src/main/java/org/opensearch/common/compress/ZstdCompressor.java b/libs/compress/src/main/java/org/opensearch/compress/ZstdCompressor.java similarity index 86% rename from server/src/main/java/org/opensearch/common/compress/ZstdCompressor.java rename to libs/compress/src/main/java/org/opensearch/compress/ZstdCompressor.java index 672c66eb2909f..01afc368fb120 100644 --- a/server/src/main/java/org/opensearch/common/compress/ZstdCompressor.java +++ b/libs/compress/src/main/java/org/opensearch/compress/ZstdCompressor.java @@ -6,14 +6,15 @@ * compatible open source license. */ -package org.opensearch.common.compress; +package org.opensearch.compress; import com.github.luben.zstd.RecyclingBufferPool; import com.github.luben.zstd.ZstdInputStreamNoFinalizer; import com.github.luben.zstd.ZstdOutputStreamNoFinalizer; +import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; @@ -25,7 +26,8 @@ /** * {@link Compressor} implementation based on the ZSTD compression algorithm. * - * @opensearch.internal + * @opensearch.api - registered name requires BWC support + * @opensearch.experimental - class methods might change */ public class ZstdCompressor implements Compressor { // An arbitrary header that we use to identify compressed streams @@ -34,6 +36,14 @@ public class ZstdCompressor implements Compressor { // a XContent private static final byte[] HEADER = new byte[] { 'Z', 'S', 'T', 'D', '\0' }; + /** + * The name to register the compressor by + * + * @opensearch.api - requires BWC support + */ + @PublicApi(since = "2.10.0") + public static final String NAME = "ZSTD"; + private static final int LEVEL = 3; private static final int BUFFER_SIZE = 4096; diff --git a/libs/core/src/main/java/org/opensearch/core/common/compress/package-info.java b/libs/compress/src/main/java/org/opensearch/compress/package-info.java similarity index 63% rename from libs/core/src/main/java/org/opensearch/core/common/compress/package-info.java rename to libs/compress/src/main/java/org/opensearch/compress/package-info.java index 99459f99c42d8..3ffa53079fa69 100644 --- a/libs/core/src/main/java/org/opensearch/core/common/compress/package-info.java +++ b/libs/compress/src/main/java/org/opensearch/compress/package-info.java @@ -6,5 +6,7 @@ * compatible open source license. */ -/** Classes for core compress module */ -package org.opensearch.core.common.compress; +/** + * Concrete {@link org.opensearch.core.compress.Compressor} implementations + */ +package org.opensearch.compress; diff --git a/libs/compress/src/main/java/org/opensearch/compress/spi/CompressionProvider.java b/libs/compress/src/main/java/org/opensearch/compress/spi/CompressionProvider.java new file mode 100644 index 0000000000000..58bf24a210bae --- /dev/null +++ b/libs/compress/src/main/java/org/opensearch/compress/spi/CompressionProvider.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.compress.spi; + +import org.opensearch.compress.ZstdCompressor; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.spi.CompressorProvider; + +import java.util.AbstractMap.SimpleEntry; +import java.util.List; +import java.util.Map.Entry; + +/** + * Additional "optional" compressor implementations provided by the opensearch compress library + * + * @opensearch.internal + */ +public class CompressionProvider implements CompressorProvider { + + /** Returns the concrete {@link Compressor}s provided by the compress library */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public List> getCompressors() { + return List.of(new SimpleEntry<>(ZstdCompressor.NAME, new ZstdCompressor())); + } +} diff --git a/libs/compress/src/main/java/org/opensearch/compress/spi/package-info.java b/libs/compress/src/main/java/org/opensearch/compress/spi/package-info.java new file mode 100644 index 0000000000000..47d982a7ca2f9 --- /dev/null +++ b/libs/compress/src/main/java/org/opensearch/compress/spi/package-info.java @@ -0,0 +1,15 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Service Provider Interface for registering concrete {@link org.opensearch.core.compress.Compressor} + * implementations. + * + * See {@link org.opensearch.compress.ZstdCompressor} + */ +package org.opensearch.compress.spi; diff --git a/libs/compress/src/main/java/org/opensearch/package-info.java b/libs/compress/src/main/java/org/opensearch/package-info.java new file mode 100644 index 0000000000000..264680e9cb271 --- /dev/null +++ b/libs/compress/src/main/java/org/opensearch/package-info.java @@ -0,0 +1,13 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * This is the compress library for registering optional + * {@link org.opensearch.core.compress.Compressor} implementations + */ +package org.opensearch; diff --git a/libs/compress/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider b/libs/compress/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider new file mode 100644 index 0000000000000..a9ea063e24436 --- /dev/null +++ b/libs/compress/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider @@ -0,0 +1,9 @@ +# +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +org.opensearch.compress.spi.CompressionProvider diff --git a/server/src/test/java/org/opensearch/common/compress/ZstdCompressTests.java b/libs/compress/src/test/java/org/opensearch/compress/ZstdCompressTests.java similarity index 58% rename from server/src/test/java/org/opensearch/common/compress/ZstdCompressTests.java rename to libs/compress/src/test/java/org/opensearch/compress/ZstdCompressTests.java index 9def702792ffc..54864054a0e02 100644 --- a/server/src/test/java/org/opensearch/common/compress/ZstdCompressTests.java +++ b/libs/compress/src/test/java/org/opensearch/compress/ZstdCompressTests.java @@ -6,19 +6,20 @@ * compatible open source license. */ -package org.opensearch.common.compress; +package org.opensearch.compress; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; +import org.opensearch.test.core.compress.AbstractCompressorTestCase; /** * Test streaming compression */ -public class ZstdCompressTests extends AbstractCompressorTests { +public class ZstdCompressTests extends AbstractCompressorTestCase { private final Compressor compressor = new ZstdCompressor(); @Override - Compressor compressor() { + protected Compressor compressor() { return compressor; } } diff --git a/libs/core/src/main/java/org/opensearch/core/common/compress/Compressor.java b/libs/core/src/main/java/org/opensearch/core/compress/Compressor.java similarity index 77% rename from libs/core/src/main/java/org/opensearch/core/common/compress/Compressor.java rename to libs/core/src/main/java/org/opensearch/core/compress/Compressor.java index 88b6c9f85f225..27d5b5dfdfa15 100644 --- a/libs/core/src/main/java/org/opensearch/core/common/compress/Compressor.java +++ b/libs/core/src/main/java/org/opensearch/core/compress/Compressor.java @@ -30,8 +30,10 @@ * GitHub history for details. */ -package org.opensearch.core.common.compress; +package org.opensearch.core.compress; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.bytes.BytesReference; import java.io.IOException; @@ -39,10 +41,18 @@ import java.io.OutputStream; /** - * Compressor interface + * Compressor interface used for compressing {@link org.opensearch.core.xcontent.MediaType} and + * {@code org.opensearch.repositories.blobstore.BlobStoreRepository} implementations. * - * @opensearch.internal + * This is not to be confused with {@link org.apache.lucene.codecs.compressing.Compressor} which is used + * for codec implementations such as {@code org.opensearch.index.codec.customcodecs.Lucene95CustomCodec} + * for compressing {@link org.apache.lucene.document.StoredField}s + * + * @opensearch.api - intended to be extended + * @opensearch.experimental - however, bwc is not guaranteed at this time */ +@ExperimentalApi +@PublicApi(since = "2.10.0") public interface Compressor { boolean isCompressed(BytesReference bytes); diff --git a/libs/core/src/main/java/org/opensearch/core/compress/CompressorRegistry.java b/libs/core/src/main/java/org/opensearch/core/compress/CompressorRegistry.java new file mode 100644 index 0000000000000..9290254c30d8d --- /dev/null +++ b/libs/core/src/main/java/org/opensearch/core/compress/CompressorRegistry.java @@ -0,0 +1,115 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.core.compress; + +import org.opensearch.common.Nullable; +import org.opensearch.common.annotation.InternalApi; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.compress.spi.CompressorProvider; +import org.opensearch.core.xcontent.MediaTypeRegistry; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.ServiceLoader; +import java.util.stream.Collectors; + +/** + * A registry that wraps a static Map singleton which holds a mapping of unique String names (typically the + * compressor header as a string) to registerd {@link Compressor} implementations. + * + * This enables plugins, modules, extensions to register their own compression implementations through SPI + * + * @opensearch.experimental + * @opensearch.internal + */ +@InternalApi +public final class CompressorRegistry { + + // the backing registry map + private static final Map registeredCompressors = ServiceLoader.load( + CompressorProvider.class, + CompressorProvider.class.getClassLoader() + ) + .stream() + .flatMap(p -> p.get().getCompressors().stream()) + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); + + // no instance: + private CompressorRegistry() {} + + /** + * Returns the default compressor + */ + public static Compressor defaultCompressor() { + return registeredCompressors.get("DEFLATE"); + } + + public static Compressor none() { + return registeredCompressors.get(NoneCompressor.NAME); + } + + public static boolean isCompressed(BytesReference bytes) { + return compressor(bytes) != null; + } + + @Nullable + public static Compressor compressor(final BytesReference bytes) { + for (Compressor compressor : registeredCompressors.values()) { + if (compressor.isCompressed(bytes) == true) { + // bytes should be either detected as compressed or as xcontent, + // if we have bytes that can be either detected as compressed or + // as a xcontent, we have a problem + assert MediaTypeRegistry.xContentType(bytes) == null; + return compressor; + } + } + + if (MediaTypeRegistry.xContentType(bytes) == null) { + throw new NotXContentException("Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes"); + } + + return null; + } + + /** Decompress the provided {@link BytesReference}. */ + public static BytesReference uncompress(BytesReference bytes) throws IOException { + Compressor compressor = compressor(bytes); + if (compressor == null) { + throw new NotCompressedException(); + } + return compressor.uncompress(bytes); + } + + /** + * Uncompress the provided data, data can be detected as compressed using {@link #isCompressed(BytesReference)}. + */ + public static BytesReference uncompressIfNeeded(BytesReference bytes) throws IOException { + Compressor compressor = compressor(Objects.requireNonNull(bytes, "the BytesReference must not be null")); + return compressor == null ? bytes : compressor.uncompress(bytes); + } + + /** Returns a registered compressor by its registered name */ + public static Compressor getCompressor(final String name) { + if (registeredCompressors.containsKey(name)) { + return registeredCompressors.get(name); + } + throw new IllegalArgumentException("No registered compressor found by name [" + name + "]"); + } + + /** + * Returns the registered compressors as an Immutable collection + * + * note: used for testing + */ + public static Map registeredCompressors() { + // no destructive danger as backing map is immutable + return registeredCompressors; + } +} diff --git a/server/src/main/java/org/opensearch/common/compress/NoneCompressor.java b/libs/core/src/main/java/org/opensearch/core/compress/NoneCompressor.java similarity index 74% rename from server/src/main/java/org/opensearch/common/compress/NoneCompressor.java rename to libs/core/src/main/java/org/opensearch/core/compress/NoneCompressor.java index f820a3351bc80..6e607ed701633 100644 --- a/server/src/main/java/org/opensearch/common/compress/NoneCompressor.java +++ b/libs/core/src/main/java/org/opensearch/core/compress/NoneCompressor.java @@ -6,10 +6,10 @@ * compatible open source license. */ -package org.opensearch.common.compress; +package org.opensearch.core.compress; +import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; import java.io.IOException; import java.io.InputStream; @@ -18,9 +18,18 @@ /** * {@link Compressor} no compressor implementation. * - * @opensearch.internal + * @opensearch.api - registered name requires BWC support + * @opensearch.experimental - class methods might change */ public class NoneCompressor implements Compressor { + /** + * The name to register the compressor by + * + * @opensearch.api - requires BWC support + */ + @PublicApi(since = "2.10.0") + public static final String NAME = "NONE"; + @Override public boolean isCompressed(BytesReference bytes) { return false; diff --git a/server/src/main/java/org/opensearch/common/compress/NotCompressedException.java b/libs/core/src/main/java/org/opensearch/core/compress/NotCompressedException.java similarity index 97% rename from server/src/main/java/org/opensearch/common/compress/NotCompressedException.java rename to libs/core/src/main/java/org/opensearch/core/compress/NotCompressedException.java index 7f070e0b499d8..91d6bc57f1cd6 100644 --- a/server/src/main/java/org/opensearch/common/compress/NotCompressedException.java +++ b/libs/core/src/main/java/org/opensearch/core/compress/NotCompressedException.java @@ -30,7 +30,7 @@ * GitHub history for details. */ -package org.opensearch.common.compress; +package org.opensearch.core.compress; /** * Exception indicating that we were expecting something compressed, which diff --git a/libs/core/src/main/java/org/opensearch/core/common/compress/NotXContentException.java b/libs/core/src/main/java/org/opensearch/core/compress/NotXContentException.java similarity index 96% rename from libs/core/src/main/java/org/opensearch/core/common/compress/NotXContentException.java rename to libs/core/src/main/java/org/opensearch/core/compress/NotXContentException.java index d1a3e7709a7d0..99337d5a26025 100644 --- a/libs/core/src/main/java/org/opensearch/core/common/compress/NotXContentException.java +++ b/libs/core/src/main/java/org/opensearch/core/compress/NotXContentException.java @@ -30,7 +30,7 @@ * GitHub history for details. */ -package org.opensearch.core.common.compress; +package org.opensearch.core.compress; import org.opensearch.core.xcontent.XContent; diff --git a/libs/core/src/main/java/org/opensearch/core/compress/package-info.java b/libs/core/src/main/java/org/opensearch/core/compress/package-info.java new file mode 100644 index 0000000000000..c0365e45702bc --- /dev/null +++ b/libs/core/src/main/java/org/opensearch/core/compress/package-info.java @@ -0,0 +1,14 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Concrete {@link org.opensearch.core.compress.Compressor} implementations provided by the core library + * + * See {@link org.opensearch.core.compress.NoneCompressor} + */ +package org.opensearch.core.compress; diff --git a/libs/core/src/main/java/org/opensearch/core/compress/spi/CompressorProvider.java b/libs/core/src/main/java/org/opensearch/core/compress/spi/CompressorProvider.java new file mode 100644 index 0000000000000..019e282444d64 --- /dev/null +++ b/libs/core/src/main/java/org/opensearch/core/compress/spi/CompressorProvider.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.core.compress.spi; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.compress.Compressor; + +import java.util.List; +import java.util.Map; + +/** + * Service Provider Interface for plugins, modules, extensions providing custom + * compression algorithms + * + * see {@link Compressor} for implementing methods + * and {@link org.opensearch.core.compress.CompressorRegistry} for the registration of custom + * Compressors + * + * @opensearch.experimental + * @opensearch.api + */ +@ExperimentalApi +@PublicApi(since = "2.10.0") +public interface CompressorProvider { + /** Extensions that implement their own concrete {@link Compressor}s provide them through this interface method*/ + List> getCompressors(); +} diff --git a/libs/core/src/main/java/org/opensearch/core/compress/spi/DefaultCompressorProvider.java b/libs/core/src/main/java/org/opensearch/core/compress/spi/DefaultCompressorProvider.java new file mode 100644 index 0000000000000..3ca10b564ef68 --- /dev/null +++ b/libs/core/src/main/java/org/opensearch/core/compress/spi/DefaultCompressorProvider.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.core.compress.spi; + +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; + +import java.util.AbstractMap.SimpleEntry; +import java.util.List; +import java.util.Map.Entry; + +/** + * Default {@link Compressor} implementations provided by the + * opensearch core library + * + * @opensearch.internal + */ +public class DefaultCompressorProvider implements CompressorProvider { + /** Returns the default {@link Compressor}s provided by the core library */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public List> getCompressors() { + return List.of(new SimpleEntry(NoneCompressor.NAME, new NoneCompressor())); + } +} diff --git a/libs/core/src/main/java/org/opensearch/core/compress/spi/package-info.java b/libs/core/src/main/java/org/opensearch/core/compress/spi/package-info.java new file mode 100644 index 0000000000000..6e33cc8fb63d3 --- /dev/null +++ b/libs/core/src/main/java/org/opensearch/core/compress/spi/package-info.java @@ -0,0 +1,16 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * The Service Provider Interface implementation for registering {@link org.opensearch.core.compress.Compressor} + * with the {@link org.opensearch.core.compress.CompressorRegistry} + * + * See {@link org.opensearch.core.compress.spi.DefaultCompressorProvider} as an example of registering the core + * {@link org.opensearch.core.compress.NoneCompressor} + */ +package org.opensearch.core.compress.spi; diff --git a/libs/core/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider b/libs/core/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider new file mode 100644 index 0000000000000..181b802952c60 --- /dev/null +++ b/libs/core/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider @@ -0,0 +1,9 @@ +# +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +org.opensearch.core.compress.spi.DefaultCompressorProvider diff --git a/server/build.gradle b/server/build.gradle index c608c5ff86f06..9c409d77363cb 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -101,6 +101,7 @@ dependencies { api project(':libs:opensearch-common') api project(':libs:opensearch-core') + api project(":libs:opensearch-compress") api project(':libs:opensearch-secure-sm') api project(':libs:opensearch-x-content') api project(":libs:opensearch-geo") @@ -157,9 +158,6 @@ dependencies { api "com.google.protobuf:protobuf-java:${versions.protobuf}" api "jakarta.annotation:jakarta.annotation-api:${versions.jakarta_annotation}" - //zstd - api "com.github.luben:zstd-jni:${versions.zstd}" - testImplementation(project(":test:framework")) { // tests use the locally compiled version of server exclude group: 'org.opensearch', module: 'server' diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CompressedStreamUtils.java b/server/src/main/java/org/opensearch/cluster/coordination/CompressedStreamUtils.java index 32d3757e7a0cc..dc7b203eb7c4b 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CompressedStreamUtils.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CompressedStreamUtils.java @@ -12,16 +12,16 @@ import org.apache.logging.log4j.Logger; import org.opensearch.Version; import org.opensearch.common.CheckedConsumer; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; import org.opensearch.core.common.io.stream.InputStreamStreamInput; import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.io.stream.OutputStreamStreamOutput; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.transport.BytesTransportRequest; import java.io.IOException; @@ -37,7 +37,7 @@ public final class CompressedStreamUtils { public static BytesReference createCompressedStream(Version version, CheckedConsumer outputConsumer) throws IOException { final BytesStreamOutput bStream = new BytesStreamOutput(); - try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.defaultCompressor().threadLocalOutputStream(bStream))) { + try (StreamOutput stream = new OutputStreamStreamOutput(CompressorRegistry.defaultCompressor().threadLocalOutputStream(bStream))) { stream.setVersion(version); outputConsumer.accept(stream); } @@ -48,7 +48,7 @@ public static BytesReference createCompressedStream(Version version, CheckedCons public static StreamInput decompressBytes(BytesTransportRequest request, NamedWriteableRegistry namedWriteableRegistry) throws IOException { - final Compressor compressor = CompressorFactory.compressor(request.bytes()); + final Compressor compressor = CompressorRegistry.compressor(request.bytes()); final StreamInput in; if (compressor != null) { in = new InputStreamStreamInput(compressor.threadLocalInputStream(request.bytes().streamInput())); diff --git a/server/src/main/java/org/opensearch/common/compress/CompressedXContent.java b/server/src/main/java/org/opensearch/common/compress/CompressedXContent.java index fefdf0e7dfdf3..550bc5b3fd524 100644 --- a/server/src/main/java/org/opensearch/common/compress/CompressedXContent.java +++ b/server/src/main/java/org/opensearch/common/compress/CompressedXContent.java @@ -37,9 +37,10 @@ import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; @@ -86,7 +87,7 @@ private CompressedXContent(byte[] compressed, int crc32) { */ public CompressedXContent(ToXContent xcontent, ToXContent.Params params) throws IOException { BytesStreamOutput bStream = new BytesStreamOutput(); - OutputStream compressedStream = CompressorFactory.defaultCompressor().threadLocalOutputStream(bStream); + OutputStream compressedStream = CompressorRegistry.defaultCompressor().threadLocalOutputStream(bStream); CRC32 crc32 = new CRC32(); OutputStream checkedStream = new CheckedOutputStream(compressedStream, crc32); try (XContentBuilder builder = XContentFactory.jsonBuilder(checkedStream)) { @@ -108,20 +109,20 @@ public CompressedXContent(ToXContent xcontent, ToXContent.Params params) throws * that may already be compressed. */ public CompressedXContent(BytesReference data) throws IOException { - Compressor compressor = CompressorFactory.compressor(data); + Compressor compressor = CompressorRegistry.compressor(data); if (compressor != null) { // already compressed... this.bytes = BytesReference.toBytes(data); this.crc32 = crc32(uncompressed()); } else { - this.bytes = BytesReference.toBytes(CompressorFactory.defaultCompressor().compress(data)); + this.bytes = BytesReference.toBytes(CompressorRegistry.defaultCompressor().compress(data)); this.crc32 = crc32(data); } assertConsistent(); } private void assertConsistent() { - assert CompressorFactory.compressor(new BytesArray(bytes)) != null; + assert CompressorRegistry.compressor(new BytesArray(bytes)) != null; assert this.crc32 == crc32(uncompressed()); } @@ -146,7 +147,7 @@ public BytesReference compressedReference() { /** Return the uncompressed bytes. */ public BytesReference uncompressed() { try { - return CompressorFactory.uncompress(new BytesArray(bytes)); + return CompressorRegistry.uncompress(new BytesArray(bytes)); } catch (IOException e) { throw new IllegalStateException("Cannot decompress compressed string", e); } diff --git a/server/src/main/java/org/opensearch/common/compress/CompressorFactory.java b/server/src/main/java/org/opensearch/common/compress/CompressorFactory.java deleted file mode 100644 index e40dd89abab54..0000000000000 --- a/server/src/main/java/org/opensearch/common/compress/CompressorFactory.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.common.compress; - -import org.opensearch.common.Nullable; -import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; -import org.opensearch.core.common.compress.NotXContentException; -import org.opensearch.core.xcontent.MediaTypeRegistry; - -import java.io.IOException; -import java.util.Objects; - -/** - * Factory to create a compressor instance. - * - * @opensearch.internal - */ -public class CompressorFactory { - - public static final Compressor DEFLATE_COMPRESSOR = new DeflateCompressor(); - - public static final Compressor ZSTD_COMPRESSOR = new ZstdCompressor(); - - public static final Compressor NONE_COMPRESSOR = new NoneCompressor(); - - public static boolean isCompressed(BytesReference bytes) { - return compressor(bytes) != null; - } - - public static Compressor defaultCompressor() { - return DEFLATE_COMPRESSOR; - } - - @Nullable - public static Compressor compressor(BytesReference bytes) { - if (DEFLATE_COMPRESSOR.isCompressed(bytes)) { - // bytes should be either detected as compressed or as xcontent, - // if we have bytes that can be either detected as compressed or - // as a xcontent, we have a problem - assert MediaTypeRegistry.xContentType(bytes) == null; - return DEFLATE_COMPRESSOR; - } else if (ZSTD_COMPRESSOR.isCompressed(bytes)) { - assert MediaTypeRegistry.xContentType(bytes) == null; - return ZSTD_COMPRESSOR; - } - - if (MediaTypeRegistry.xContentType(bytes) == null) { - throw new NotXContentException("Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes"); - } - - return null; - } - - /** - * Uncompress the provided data, data can be detected as compressed using {@link #isCompressed(BytesReference)}. - */ - public static BytesReference uncompressIfNeeded(BytesReference bytes) throws IOException { - Compressor compressor = compressor(Objects.requireNonNull(bytes, "the BytesReference must not be null")); - return compressor == null ? bytes : compressor.uncompress(bytes); - } - - /** Decompress the provided {@link BytesReference}. */ - public static BytesReference uncompress(BytesReference bytes) throws IOException { - Compressor compressor = compressor(bytes); - if (compressor == null) { - throw new NotCompressedException(); - } - return compressor.uncompress(bytes); - } -} diff --git a/server/src/main/java/org/opensearch/common/compress/CompressorType.java b/server/src/main/java/org/opensearch/common/compress/CompressorType.java deleted file mode 100644 index bc688bab57c37..0000000000000 --- a/server/src/main/java/org/opensearch/common/compress/CompressorType.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.common.compress; - -import org.opensearch.core.common.compress.Compressor; - -/** - * Supported compression types - * - * @opensearch.internal - */ -public enum CompressorType { - - DEFLATE { - @Override - public Compressor compressor() { - return CompressorFactory.DEFLATE_COMPRESSOR; - } - }, - - ZSTD { - @Override - public Compressor compressor() { - return CompressorFactory.ZSTD_COMPRESSOR; - } - }, - - NONE { - @Override - public Compressor compressor() { - return CompressorFactory.NONE_COMPRESSOR; - } - }; - - public abstract Compressor compressor(); -} diff --git a/server/src/main/java/org/opensearch/common/compress/DeflateCompressor.java b/server/src/main/java/org/opensearch/common/compress/DeflateCompressor.java index ed741b4899ae7..3ccac1a941741 100644 --- a/server/src/main/java/org/opensearch/common/compress/DeflateCompressor.java +++ b/server/src/main/java/org/opensearch/common/compress/DeflateCompressor.java @@ -32,11 +32,12 @@ package org.opensearch.common.compress; +import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lease.Releasable; import org.opensearch.core.Assertions; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; @@ -53,7 +54,8 @@ /** * {@link Compressor} implementation based on the DEFLATE compression algorithm. * - * @opensearch.internal + * @opensearch.api - registered name requires BWC support + * @opensearch.experimental - class methods might change */ public class DeflateCompressor implements Compressor { @@ -62,6 +64,15 @@ public class DeflateCompressor implements Compressor { // enough so that no stream starting with these bytes could be detected as // a XContent private static final byte[] HEADER = new byte[] { 'D', 'F', 'L', '\0' }; + + /** + * The name to register the compressor by + * + * @opensearch.api - requires BWC support + */ + @PublicApi(since = "2.10.0") + public static String NAME = "DEFLATE"; + // 3 is a good trade-off between speed and compression ratio private static final int LEVEL = 3; // We use buffering on the input and output of in/def-laters in order to diff --git a/server/src/main/java/org/opensearch/common/compress/spi/ServerCompressorProvider.java b/server/src/main/java/org/opensearch/common/compress/spi/ServerCompressorProvider.java new file mode 100644 index 0000000000000..42036f8d88610 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/compress/spi/ServerCompressorProvider.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.compress.spi; + +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.spi.CompressorProvider; + +import java.util.AbstractMap.SimpleEntry; +import java.util.List; +import java.util.Map.Entry; + +/** + * Default {@link Compressor} implementations provided by the + * opensearch core library + * + * @opensearch.internal + * + * @deprecated This class is deprecated and will be removed when the {@link DeflateCompressor} is moved to the compress + * library as a default compression option + */ +@Deprecated +public class ServerCompressorProvider implements CompressorProvider { + /** Returns the concrete {@link Compressor}s provided by the server module */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public List> getCompressors() { + return List.of(new SimpleEntry(DeflateCompressor.NAME, new DeflateCompressor())); + } +} diff --git a/server/src/main/java/org/opensearch/common/compress/spi/package-info.java b/server/src/main/java/org/opensearch/common/compress/spi/package-info.java new file mode 100644 index 0000000000000..a8019b23c7d90 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/compress/spi/package-info.java @@ -0,0 +1,16 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Service Provider Interface for registering the{@link org.opensearch.common.compress.DeflateCompressor} with the + * {@link org.opensearch.core.compress.CompressorRegistry}. + * + * Note: this will be refactored to the {@code :libs:opensearch-compress} library after other dependency classes are + * refactored. + */ +package org.opensearch.common.compress.spi; diff --git a/server/src/main/java/org/opensearch/common/xcontent/XContentHelper.java b/server/src/main/java/org/opensearch/common/xcontent/XContentHelper.java index 3100d597abf55..798a58551457f 100644 --- a/server/src/main/java/org/opensearch/common/xcontent/XContentHelper.java +++ b/server/src/main/java/org/opensearch/common/xcontent/XContentHelper.java @@ -34,10 +34,10 @@ import org.opensearch.OpenSearchParseException; import org.opensearch.common.collect.Tuple; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.core.xcontent.DeprecationHandler; import org.opensearch.core.xcontent.MediaType; import org.opensearch.core.xcontent.MediaTypeRegistry; @@ -77,7 +77,7 @@ public static XContentParser createParser( DeprecationHandler deprecationHandler, BytesReference bytes ) throws IOException { - Compressor compressor = CompressorFactory.compressor(bytes); + Compressor compressor = CompressorRegistry.compressor(bytes); if (compressor != null) { InputStream compressedInput = null; try { @@ -106,7 +106,7 @@ public static XContentParser createParser( MediaType mediaType ) throws IOException { Objects.requireNonNull(mediaType); - Compressor compressor = CompressorFactory.compressor(bytes); + Compressor compressor = CompressorRegistry.compressor(bytes); if (compressor != null) { InputStream compressedInput = null; try { @@ -163,7 +163,7 @@ public static Tuple> convertToMap(Bytes try { final MediaType contentType; InputStream input; - Compressor compressor = CompressorFactory.compressor(bytes); + Compressor compressor = CompressorRegistry.compressor(bytes); if (compressor != null) { InputStream compressedStreamInput = compressor.threadLocalInputStream(bytes.streamInput()); if (compressedStreamInput.markSupported() == false) { @@ -451,7 +451,7 @@ private static boolean allListValuesAreMapsOfOne(List list) { */ @Deprecated public static void writeRawField(String field, BytesReference source, XContentBuilder builder, Params params) throws IOException { - Compressor compressor = CompressorFactory.compressor(source); + Compressor compressor = CompressorRegistry.compressor(source); if (compressor != null) { try (InputStream compressedStreamInput = compressor.threadLocalInputStream(source.streamInput())) { builder.rawField(field, compressedStreamInput); @@ -470,7 +470,7 @@ public static void writeRawField(String field, BytesReference source, XContentBu public static void writeRawField(String field, BytesReference source, XContentType xContentType, XContentBuilder builder, Params params) throws IOException { Objects.requireNonNull(xContentType); - Compressor compressor = CompressorFactory.compressor(source); + Compressor compressor = CompressorRegistry.compressor(source); if (compressor != null) { try (InputStream compressedStreamInput = compressor.threadLocalInputStream(source.streamInput())) { builder.rawField(field, compressedStreamInput, xContentType); diff --git a/server/src/main/java/org/opensearch/index/get/GetResult.java b/server/src/main/java/org/opensearch/index/get/GetResult.java index 6445434764fb4..7f87f9c61c93e 100644 --- a/server/src/main/java/org/opensearch/index/get/GetResult.java +++ b/server/src/main/java/org/opensearch/index/get/GetResult.java @@ -35,7 +35,6 @@ import org.opensearch.LegacyESVersion; import org.opensearch.OpenSearchParseException; import org.opensearch.Version; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.document.DocumentField; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.common.Strings; @@ -43,6 +42,7 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; @@ -219,7 +219,7 @@ public BytesReference sourceRef() { } try { - this.source = CompressorFactory.uncompressIfNeeded(this.source); + this.source = CompressorRegistry.uncompressIfNeeded(this.source); return this.source; } catch (IOException e) { throw new OpenSearchParseException("failed to decompress source", e); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index f6900a7dd1801..da02fa81925db 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -74,8 +74,7 @@ import org.opensearch.common.blobstore.DeleteResult; import org.opensearch.common.blobstore.fs.FsBlobContainer; import org.opensearch.common.collect.Tuple; -import org.opensearch.common.compress.CompressorFactory; -import org.opensearch.common.compress.CompressorType; +import org.opensearch.common.compress.DeflateCompressor; import org.opensearch.common.io.Streams; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; @@ -93,10 +92,11 @@ import org.opensearch.core.common.Strings; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; -import org.opensearch.core.common.compress.NotXContentException; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.CompressorRegistry; +import org.opensearch.core.compress.NotXContentException; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.index.snapshots.IndexShardSnapshotFailedException; import org.opensearch.core.util.BytesRefUtils; @@ -265,10 +265,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp public static final Setting COMPRESS_SETTING = Setting.boolSetting("compress", false, Setting.Property.NodeScope); - public static final Setting COMPRESSION_TYPE_SETTING = new Setting<>( + public static final Setting COMPRESSION_TYPE_SETTING = new Setting<>( "compression_type", - CompressorType.DEFLATE.name().toLowerCase(Locale.ROOT), - s -> CompressorType.valueOf(s.toUpperCase(Locale.ROOT)), + DeflateCompressor.NAME.toLowerCase(Locale.ROOT), + s -> CompressorRegistry.getCompressor(s.toUpperCase(Locale.ROOT)), Setting.Property.NodeScope ); @@ -405,7 +405,7 @@ protected BlobStoreRepository( cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings()); bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes()); maxShardBlobDeleteBatch = MAX_SNAPSHOT_SHARD_BLOB_DELETE_BATCH_SIZE.get(metadata.settings()); - this.compressor = compress ? COMPRESSION_TYPE_SETTING.get(metadata.settings()).compressor() : CompressorFactory.NONE_COMPRESSOR; + this.compressor = compress ? COMPRESSION_TYPE_SETTING.get(metadata.settings()) : CompressorRegistry.none(); } @Override @@ -774,7 +774,7 @@ public BlobStore blobStore() { * @return true if compression is needed */ protected final boolean isCompress() { - return compressor != CompressorFactory.NONE_COMPRESSOR; + return compressor != CompressorRegistry.none(); } /** @@ -2002,7 +2002,7 @@ private void cacheRepositoryData(BytesReference updated, long generation) { if (cacheRepositoryData && bestEffortConsistency == false) { final BytesReference serialized; try { - serialized = CompressorFactory.defaultCompressor().compress(updated); + serialized = CompressorRegistry.defaultCompressor().compress(updated); final int len = serialized.length(); if (len > ByteSizeUnit.KB.toBytes(500)) { logger.debug( @@ -2038,7 +2038,7 @@ private void cacheRepositoryData(BytesReference updated, long generation) { } private RepositoryData repositoryDataFromCachedEntry(Tuple cacheEntry) throws IOException { - try (InputStream input = CompressorFactory.defaultCompressor().threadLocalInputStream(cacheEntry.v2().streamInput())) { + try (InputStream input = CompressorRegistry.defaultCompressor().threadLocalInputStream(cacheEntry.v2().streamInput())) { return RepositoryData.snapshotsFromXContent( MediaTypeRegistry.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, input), cacheEntry.v1(), diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 01f76f9d889b9..9048757405108 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -51,7 +51,7 @@ import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; diff --git a/server/src/main/java/org/opensearch/search/SearchHit.java b/server/src/main/java/org/opensearch/search/SearchHit.java index b6061c18eb629..fab9c1b773d0b 100644 --- a/server/src/main/java/org/opensearch/search/SearchHit.java +++ b/server/src/main/java/org/opensearch/search/SearchHit.java @@ -38,7 +38,6 @@ import org.opensearch.Version; import org.opensearch.action.OriginalIndices; import org.opensearch.common.Nullable; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.document.DocumentField; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.ParseField; @@ -49,6 +48,7 @@ import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.common.text.Text; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.ConstructingObjectParser; import org.opensearch.core.xcontent.MediaTypeRegistry; @@ -400,7 +400,7 @@ public BytesReference getSourceRef() { } try { - this.source = CompressorFactory.uncompressIfNeeded(this.source); + this.source = CompressorRegistry.uncompressIfNeeded(this.source); return this.source; } catch (IOException e) { throw new OpenSearchParseException("failed to decompress source", e); diff --git a/server/src/main/java/org/opensearch/transport/CompressibleBytesOutputStream.java b/server/src/main/java/org/opensearch/transport/CompressibleBytesOutputStream.java index e3fe3c4e37006..5cb169439a14d 100644 --- a/server/src/main/java/org/opensearch/transport/CompressibleBytesOutputStream.java +++ b/server/src/main/java/org/opensearch/transport/CompressibleBytesOutputStream.java @@ -32,12 +32,12 @@ package org.opensearch.transport; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.io.Streams; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.BytesStream; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.compress.CompressorRegistry; import java.io.IOException; import java.io.OutputStream; @@ -68,7 +68,7 @@ final class CompressibleBytesOutputStream extends StreamOutput { this.bytesStreamOutput = bytesStreamOutput; this.shouldCompress = shouldCompress; if (shouldCompress) { - this.stream = CompressorFactory.defaultCompressor().threadLocalOutputStream(Streams.flushOnCloseStream(bytesStreamOutput)); + this.stream = CompressorRegistry.defaultCompressor().threadLocalOutputStream(Streams.flushOnCloseStream(bytesStreamOutput)); } else { this.stream = bytesStreamOutput; } diff --git a/server/src/main/java/org/opensearch/transport/TransportDecompressor.java b/server/src/main/java/org/opensearch/transport/TransportDecompressor.java index 721085c611ad7..8fbc3b7ce6803 100644 --- a/server/src/main/java/org/opensearch/transport/TransportDecompressor.java +++ b/server/src/main/java/org/opensearch/transport/TransportDecompressor.java @@ -35,12 +35,12 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefIterator; import org.opensearch.common.bytes.ReleasableBytesReference; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.recycler.Recycler; import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.CompressorRegistry; import java.io.Closeable; import java.io.IOException; @@ -70,7 +70,7 @@ public TransportDecompressor(PageCacheRecycler recycler) { public int decompress(BytesReference bytesReference) throws IOException { int bytesConsumed = 0; if (hasReadHeader == false) { - final Compressor compressor = CompressorFactory.defaultCompressor(); + final Compressor compressor = CompressorRegistry.defaultCompressor(); if (compressor.isCompressed(bytesReference) == false) { int maxToRead = Math.min(bytesReference.length(), 10); StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [").append(maxToRead) @@ -137,7 +137,7 @@ public int decompress(BytesReference bytesReference) throws IOException { } public boolean canDecompress(int bytesAvailable) { - return hasReadHeader || bytesAvailable >= CompressorFactory.defaultCompressor().headerLength(); + return hasReadHeader || bytesAvailable >= CompressorRegistry.defaultCompressor().headerLength(); } public boolean isEOS() { diff --git a/server/src/main/java/org/opensearch/transport/TransportLogger.java b/server/src/main/java/org/opensearch/transport/TransportLogger.java index 24a8f886be4ef..1876164c52e58 100644 --- a/server/src/main/java/org/opensearch/transport/TransportLogger.java +++ b/server/src/main/java/org/opensearch/transport/TransportLogger.java @@ -34,12 +34,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.Version; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.InputStreamStreamInput; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.compress.CompressorRegistry; import java.io.IOException; @@ -185,7 +185,7 @@ private static String format(TcpChannel channel, InboundMessage message, String private static StreamInput decompressingStream(byte status, StreamInput streamInput) throws IOException { if (TransportStatus.isCompress(status) && streamInput.available() > 0) { try { - return new InputStreamStreamInput(CompressorFactory.defaultCompressor().threadLocalInputStream(streamInput)); + return new InputStreamStreamInput(CompressorRegistry.defaultCompressor().threadLocalInputStream(streamInput)); } catch (IllegalArgumentException e) { throw new IllegalStateException("stream marked as compressed, but is missing deflate header"); } diff --git a/server/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider b/server/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider new file mode 100644 index 0000000000000..8d93d45035f3f --- /dev/null +++ b/server/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider @@ -0,0 +1,9 @@ +# +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +org.opensearch.common.compress.spi.ServerCompressorProvider diff --git a/server/src/test/java/org/opensearch/common/compress/DeflateCompressTests.java b/server/src/test/java/org/opensearch/common/compress/DeflateCompressTests.java index 8c7ab05addc4c..262a7ec40a8f0 100644 --- a/server/src/test/java/org/opensearch/common/compress/DeflateCompressTests.java +++ b/server/src/test/java/org/opensearch/common/compress/DeflateCompressTests.java @@ -32,17 +32,18 @@ package org.opensearch.common.compress; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; +import org.opensearch.test.core.compress.AbstractCompressorTestCase; /** * Test streaming compression (e.g. used for recovery) */ -public class DeflateCompressTests extends AbstractCompressorTests { +public class DeflateCompressTests extends AbstractCompressorTestCase { private final Compressor compressor = new DeflateCompressor(); @Override - Compressor compressor() { + protected Compressor compressor() { return compressor; } } diff --git a/server/src/test/java/org/opensearch/common/compress/DeflateCompressedXContentTests.java b/server/src/test/java/org/opensearch/common/compress/DeflateCompressedXContentTests.java index 7583e7bd371c3..5c9353d15e24a 100644 --- a/server/src/test/java/org/opensearch/common/compress/DeflateCompressedXContentTests.java +++ b/server/src/test/java/org/opensearch/common/compress/DeflateCompressedXContentTests.java @@ -35,7 +35,7 @@ import org.apache.lucene.tests.util.TestUtil; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; import org.opensearch.test.OpenSearchTestCase; import org.junit.Assert; diff --git a/server/src/test/java/org/opensearch/index/mapper/BinaryFieldMapperTests.java b/server/src/test/java/org/opensearch/index/mapper/BinaryFieldMapperTests.java index 8c01bdb2e7056..87b5ad3434944 100644 --- a/server/src/test/java/org/opensearch/index/mapper/BinaryFieldMapperTests.java +++ b/server/src/test/java/org/opensearch/index/mapper/BinaryFieldMapperTests.java @@ -33,10 +33,10 @@ package org.opensearch.index.mapper; import org.apache.lucene.util.BytesRef; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.core.xcontent.XContentBuilder; import java.io.IOException; @@ -119,11 +119,11 @@ public void testStoredValue() throws IOException { // case 2: a value that looks compressed: this used to fail in 1.x BytesStreamOutput out = new BytesStreamOutput(); - try (OutputStream compressed = CompressorFactory.defaultCompressor().threadLocalOutputStream(out)) { + try (OutputStream compressed = CompressorRegistry.defaultCompressor().threadLocalOutputStream(out)) { new BytesArray(binaryValue1).writeTo(compressed); } final byte[] binaryValue2 = BytesReference.toBytes(out.bytes()); - assertTrue(CompressorFactory.isCompressed(new BytesArray(binaryValue2))); + assertTrue(CompressorRegistry.isCompressed(new BytesArray(binaryValue2))); for (byte[] value : Arrays.asList(binaryValue1, binaryValue2)) { ParsedDocument doc = mapperService.documentMapper().parse(source(b -> b.field("field", value))); diff --git a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java index fd99983a0c791..93be194b2d112 100644 --- a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java @@ -39,13 +39,12 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.fs.FsBlobStore; -import org.opensearch.common.compress.CompressorFactory; -import org.opensearch.common.compress.CompressorType; +import org.opensearch.common.compress.DeflateCompressor; import org.opensearch.common.io.Streams; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.bytes.BytesArray; -import org.opensearch.core.common.compress.Compressor; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; @@ -57,7 +56,6 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; -import java.util.Arrays; import java.util.Map; import static org.hamcrest.Matchers.containsString; @@ -122,12 +120,12 @@ public void testBlobStoreOperations() throws IOException { ChecksumBlobStoreFormat checksumSMILE = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent); // Write blobs in different formats - checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile", CompressorType.NONE.compressor()); + checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile", CompressorRegistry.none()); checksumSMILE.write( new BlobObj("checksum smile compressed"), blobContainer, "check-smile-comp", - CompressorFactory.DEFLATE_COMPRESSOR + CompressorRegistry.getCompressor(DeflateCompressor.NAME) ); // Assert that all checksum blobs can be read @@ -144,8 +142,8 @@ public void testCompressionIsApplied() throws IOException { } ChecksumBlobStoreFormat checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent); BlobObj blobObj = new BlobObj(veryRedundantText.toString()); - checksumFormat.write(blobObj, blobContainer, "blob-comp", CompressorType.DEFLATE.compressor()); - checksumFormat.write(blobObj, blobContainer, "blob-not-comp", CompressorType.NONE.compressor()); + checksumFormat.write(blobObj, blobContainer, "blob-comp", CompressorRegistry.getCompressor(DeflateCompressor.NAME)); + checksumFormat.write(blobObj, blobContainer, "blob-not-comp", CompressorRegistry.none()); Map blobs = blobContainer.listBlobsByPrefix("blob-"); assertEquals(blobs.size(), 2); assertThat(blobs.get("blob-not-comp").length(), greaterThan(blobs.get("blob-comp").length())); @@ -157,12 +155,7 @@ public void testBlobCorruption() throws IOException { String testString = randomAlphaOfLength(randomInt(10000)); BlobObj blobObj = new BlobObj(testString); ChecksumBlobStoreFormat checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent); - checksumFormat.write( - blobObj, - blobContainer, - "test-path", - randomFrom(Arrays.stream(CompressorType.values()).map(CompressorType::compressor).toArray(Compressor[]::new)) - ); + checksumFormat.write(blobObj, blobContainer, "test-path", randomFrom(CompressorRegistry.registeredCompressors().values())); assertEquals(checksumFormat.read(blobContainer, "test-path", xContentRegistry()).getText(), testString); randomCorruption(blobContainer, "test-path"); try { diff --git a/server/src/test/java/org/opensearch/transport/CompressibleBytesOutputStreamTests.java b/server/src/test/java/org/opensearch/transport/CompressibleBytesOutputStreamTests.java index c244cba513982..89018b7353e7c 100644 --- a/server/src/test/java/org/opensearch/transport/CompressibleBytesOutputStreamTests.java +++ b/server/src/test/java/org/opensearch/transport/CompressibleBytesOutputStreamTests.java @@ -32,12 +32,12 @@ package org.opensearch.transport; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.BytesStream; import org.opensearch.core.common.io.stream.InputStreamStreamInput; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.test.OpenSearchTestCase; import java.io.EOFException; @@ -56,7 +56,7 @@ public void testStreamWithoutCompression() throws IOException { // Closing compression stream does not close underlying stream stream.close(); - assertFalse(CompressorFactory.defaultCompressor().isCompressed(bytesRef)); + assertFalse(CompressorRegistry.defaultCompressor().isCompressed(bytesRef)); StreamInput streamInput = bytesRef.streamInput(); byte[] actualBytes = new byte[expectedBytes.length]; @@ -83,10 +83,10 @@ public void testStreamWithCompression() throws IOException { BytesReference bytesRef = stream.materializeBytes(); stream.close(); - assertTrue(CompressorFactory.defaultCompressor().isCompressed(bytesRef)); + assertTrue(CompressorRegistry.defaultCompressor().isCompressed(bytesRef)); StreamInput streamInput = new InputStreamStreamInput( - CompressorFactory.defaultCompressor().threadLocalInputStream(bytesRef.streamInput()) + CompressorRegistry.defaultCompressor().threadLocalInputStream(bytesRef.streamInput()) ); byte[] actualBytes = new byte[expectedBytes.length]; streamInput.readBytes(actualBytes, 0, expectedBytes.length); @@ -110,7 +110,7 @@ public void testCompressionWithCallingMaterializeFails() throws IOException { stream.write(expectedBytes); StreamInput streamInput = new InputStreamStreamInput( - CompressorFactory.defaultCompressor().threadLocalInputStream(bStream.bytes().streamInput()) + CompressorRegistry.defaultCompressor().threadLocalInputStream(bStream.bytes().streamInput()) ); byte[] actualBytes = new byte[expectedBytes.length]; EOFException e = expectThrows(EOFException.class, () -> streamInput.readBytes(actualBytes, 0, expectedBytes.length)); diff --git a/server/src/test/java/org/opensearch/transport/TransportDecompressorTests.java b/server/src/test/java/org/opensearch/transport/TransportDecompressorTests.java index 49d22b2221828..9811c0f690800 100644 --- a/server/src/test/java/org/opensearch/transport/TransportDecompressorTests.java +++ b/server/src/test/java/org/opensearch/transport/TransportDecompressorTests.java @@ -33,7 +33,6 @@ package org.opensearch.transport; import org.opensearch.common.bytes.ReleasableBytesReference; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.io.Streams; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lease.Releasables; @@ -43,6 +42,7 @@ import org.opensearch.core.common.io.stream.OutputStreamStreamOutput; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; @@ -54,7 +54,7 @@ public void testSimpleCompression() throws IOException { try (BytesStreamOutput output = new BytesStreamOutput()) { byte randomByte = randomByte(); try ( - OutputStream deflateStream = CompressorFactory.defaultCompressor() + OutputStream deflateStream = CompressorRegistry.defaultCompressor() .threadLocalOutputStream(Streams.flushOnCloseStream(output)) ) { deflateStream.write(randomByte); @@ -77,7 +77,7 @@ public void testMultiPageCompression() throws IOException { try (BytesStreamOutput output = new BytesStreamOutput()) { try ( StreamOutput deflateStream = new OutputStreamStreamOutput( - CompressorFactory.defaultCompressor().threadLocalOutputStream(Streams.flushOnCloseStream(output)) + CompressorRegistry.defaultCompressor().threadLocalOutputStream(Streams.flushOnCloseStream(output)) ) ) { for (int i = 0; i < 10000; ++i) { @@ -109,7 +109,7 @@ public void testIncrementalMultiPageCompression() throws IOException { try (BytesStreamOutput output = new BytesStreamOutput()) { try ( StreamOutput deflateStream = new OutputStreamStreamOutput( - CompressorFactory.defaultCompressor().threadLocalOutputStream(Streams.flushOnCloseStream(output)) + CompressorRegistry.defaultCompressor().threadLocalOutputStream(Streams.flushOnCloseStream(output)) ) ) { for (int i = 0; i < 10000; ++i) { diff --git a/test/framework/src/main/java/org/opensearch/repositories/blobstore/OpenSearchBlobStoreRepositoryIntegTestCase.java b/test/framework/src/main/java/org/opensearch/repositories/blobstore/OpenSearchBlobStoreRepositoryIntegTestCase.java index 5e1278302500a..4d23e2aecc118 100644 --- a/test/framework/src/main/java/org/opensearch/repositories/blobstore/OpenSearchBlobStoreRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/repositories/blobstore/OpenSearchBlobStoreRepositoryIntegTestCase.java @@ -46,11 +46,11 @@ import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; -import org.opensearch.common.compress.CompressorType; import org.opensearch.common.io.Streams; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -97,7 +97,7 @@ protected Settings repositorySettings() { final Settings.Builder builder = Settings.builder(); builder.put("compress", compress); if (compress) { - builder.put("compression_type", randomFrom(CompressorType.values())); + builder.put("compression_type", randomFrom(CompressorRegistry.registeredCompressors().keySet())); } return builder.build(); } diff --git a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java index 55b393ab4c577..b04e71d0fca52 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -52,7 +52,6 @@ import org.opensearch.common.action.ActionFuture; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.compress.CompressorType; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentFactory; @@ -60,6 +59,7 @@ import org.opensearch.core.common.Strings; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.core.xcontent.DeprecationHandler; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentBuilder; @@ -417,7 +417,7 @@ protected Settings.Builder randomRepositorySettings() { final boolean compress = randomBoolean(); settings.put("location", randomRepoPath()).put("compress", compress); if (compress) { - settings.put("compression_type", randomFrom(CompressorType.values())); + settings.put("compression_type", randomFrom(CompressorRegistry.registeredCompressors().keySet())); } if (rarely()) { settings.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES); diff --git a/server/src/test/java/org/opensearch/common/compress/AbstractCompressorTests.java b/test/framework/src/main/java/org/opensearch/test/core/compress/AbstractCompressorTestCase.java similarity index 98% rename from server/src/test/java/org/opensearch/common/compress/AbstractCompressorTests.java rename to test/framework/src/main/java/org/opensearch/test/core/compress/AbstractCompressorTestCase.java index a2a54f444ad9d..be53e46122157 100644 --- a/server/src/test/java/org/opensearch/common/compress/AbstractCompressorTests.java +++ b/test/framework/src/main/java/org/opensearch/test/core/compress/AbstractCompressorTestCase.java @@ -6,11 +6,11 @@ * compatible open source license. */ -package org.opensearch.common.compress; +package org.opensearch.test.core.compress; import org.apache.lucene.tests.util.LineFileDocs; import org.apache.lucene.tests.util.TestUtil; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; import org.opensearch.test.OpenSearchTestCase; import java.io.ByteArrayInputStream; @@ -22,7 +22,7 @@ import java.util.Random; import java.util.concurrent.CountDownLatch; -abstract class AbstractCompressorTests extends OpenSearchTestCase { +public abstract class AbstractCompressorTestCase extends OpenSearchTestCase { public void testRandom() throws IOException { Random r = random(); @@ -404,6 +404,6 @@ private void doTest(byte bytes[]) throws IOException { assertArrayEquals(bytes, uncompressedOut.toByteArray()); } - abstract Compressor compressor(); + protected abstract Compressor compressor(); }