Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Compressors from CompressorFactory to CompressorRegistry for extensibility #9262

Merged
merged 8 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gradle/missing-javadoc.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ configure([
configure([
project(":libs:opensearch-common"),
project(":libs:opensearch-core"),
project(":libs:opensearch-compress"),
project(":plugins:events-correlation-engine"),
project(":server")
]) {
Expand Down
38 changes: 38 additions & 0 deletions libs/compress/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

apply plugin: 'opensearch.build'
apply plugin: 'opensearch.publish'

base {
nknize marked this conversation as resolved.
Show resolved Hide resolved
archivesName = 'opensearch-compress'
}

dependencies {
api project(':libs:opensearch-common')
api project(':libs:opensearch-core')

//zstd
api "com.github.luben:zstd-jni:${versions.zstd}"

testImplementation(project(":test:framework")) {
// tests use the locally compiled version of server
exclude group: 'org.opensearch', module: 'opensearch-compress'
}
}

tasks.named('forbiddenApisMain').configure {
// :libs:opensearch-compress does not depend on server
// TODO: Need to decide how we want to handle for forbidden signatures with the changes to server
replaceSignatureFiles 'jdk-signatures'
}

jarHell.enabled = false
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
* compatible open source license.
*/

package org.opensearch.common.compress;
package org.opensearch.compress;

import com.github.luben.zstd.RecyclingBufferPool;
import com.github.luben.zstd.ZstdInputStreamNoFinalizer;
import com.github.luben.zstd.ZstdOutputStreamNoFinalizer;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.compress.Compressor;
import org.opensearch.core.compress.Compressor;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
Expand All @@ -24,7 +25,8 @@
/**
* {@link Compressor} implementation based on the ZSTD compression algorithm.
*
* @opensearch.internal
* @opensearch.api - registered name requires BWC support
nknize marked this conversation as resolved.
Show resolved Hide resolved
* @opensearch.experimental - class methods might change
*/
public class ZstdCompressor implements Compressor {
// An arbitrary header that we use to identify compressed streams
Expand All @@ -33,6 +35,14 @@ public class ZstdCompressor implements Compressor {
// a XContent
private static final byte[] HEADER = new byte[] { 'Z', 'S', 'T', 'D', '\0' };

/**
* The name to register the compressor by
*
* @opensearch.api - requires BWC support
*/
@PublicApi(since = "2.10.0")
nknize marked this conversation as resolved.
Show resolved Hide resolved
public static final String NAME = "ZSTD";

private static final int LEVEL = 3;

private static final int BUFFER_SIZE = 4096;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@
* compatible open source license.
*/

/** Classes for core compress module */
package org.opensearch.core.common.compress;
/**
* Concrete {@link org.opensearch.core.compress.Compressor} implementations
*/
package org.opensearch.compress;
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.compress.spi;

import org.opensearch.compress.ZstdCompressor;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.compress.spi.CompressorProvider;

import java.util.AbstractMap.SimpleEntry;
import java.util.Map.Entry;
import java.util.List;

/**
* Additional "optional" compressor implementations provided by the opensearch compress library
*
* @opensearch.internal
*/
public class CompressionProvider implements CompressorProvider {
andrross marked this conversation as resolved.
Show resolved Hide resolved

/** Returns the concrete {@link Compressor}s provided by the compress library */
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public List<Entry<String, Compressor>> getCompressors() {
return List.of(new SimpleEntry<>(ZstdCompressor.NAME, new ZstdCompressor()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Service Provider Interface for registering concrete {@link org.opensearch.core.compress.Compressor}
* implementations.
*
* See {@link org.opensearch.compress.ZstdCompressor}
*/
package org.opensearch.compress.spi;
13 changes: 13 additions & 0 deletions libs/compress/src/main/java/org/opensearch/package-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* This is the compress library for registering optional
* {@link org.opensearch.core.compress.Compressor} implementations
*/
package org.opensearch;
nknize marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#

org.opensearch.compress.spi.CompressionProvider
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@
* compatible open source license.
*/

package org.opensearch.common.compress;
package org.opensearch.compress;

import org.opensearch.core.common.compress.Compressor;
import org.opensearch.core.compress.Compressor;
import org.opensearch.test.core.compress.AbstractCompressorTestCase;

/**
* Test streaming compression
*/
public class ZstdCompressTests extends AbstractCompressorTests {
public class ZstdCompressTests extends AbstractCompressorTestCase {

private final Compressor compressor = new ZstdCompressor();

@Override
Compressor compressor() {
protected Compressor compressor() {
return compressor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.opensearch.common.CheckedSupplier;
import org.opensearch.common.Nullable;
import org.opensearch.core.action.ShardOperationFailedException;
import org.opensearch.core.common.compress.NotXContentException;
import org.opensearch.core.compress.NotXContentException;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.index.Index;
import org.opensearch.core.rest.RestStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,29 @@
* GitHub history for details.
*/

package org.opensearch.core.common.compress;
package org.opensearch.core.compress;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.bytes.BytesReference;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
* Compressor interface
* Compressor interface used for compressing {@link org.opensearch.core.xcontent.MediaType} and
* {@code org.opensearch.repositories.blobstore.BlobStoreRepository} implementations.
*
* @opensearch.internal
* This is not to be confused with {@link org.apache.lucene.codecs.compressing.Compressor} which is used
* for codec implementations such as {@code org.opensearch.index.codec.customcodecs.Lucene95CustomCodec}
* for compressing {@link org.apache.lucene.document.StoredField}s
*
* @opensearch.api - intended to be extended
* @opensearch.experimental - however, bwc is not guaranteed at this time
*/
@ExperimentalApi
@PublicApi(since = "2.10.0")
public interface Compressor {

boolean isCompressed(BytesReference bytes);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.core.compress;

import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.compress.spi.CompressorProvider;
import org.opensearch.core.xcontent.MediaTypeRegistry;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.stream.Collectors;

/**
* A registry that wraps a static Map singleton which holds a mapping of unique String names (typically the
* compressor header as a string) to registerd {@link Compressor} implementations.
*
* This enables plugins, modules, extensions to register their own compression implementations through SPI
*
* @opensearch.experimental
* @opensearch.internal
*/
@InternalApi
public final class CompressorRegistry {

// the backing registry map
private static final Map<String, Compressor> registeredCompressors = ServiceLoader.load(
CompressorProvider.class,
CompressorProvider.class.getClassLoader()
)
.stream()
.flatMap(p -> p.get().getCompressors().stream())
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));

// no instance:
private CompressorRegistry() {}

/**
* Returns the default compressor
*/
public static Compressor defaultCompressor() {
return registeredCompressors.get("DEFLATE");
nknize marked this conversation as resolved.
Show resolved Hide resolved
}

public static Compressor none() {
return registeredCompressors.get(NoneCompressor.NAME);
}

public static boolean isCompressed(BytesReference bytes) {
return compressor(bytes) != null;
}

@Nullable
public static Compressor compressor(final BytesReference bytes) {
for (Compressor compressor : registeredCompressors.values()) {
if (compressor.isCompressed(bytes) == true) {
// bytes should be either detected as compressed or as xcontent,
// if we have bytes that can be either detected as compressed or
// as a xcontent, we have a problem
assert MediaTypeRegistry.xContentType(bytes) == null;
return compressor;
}
}

if (MediaTypeRegistry.xContentType(bytes) == null) {
throw new NotXContentException("Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes");
}

return null;
}

/** Decompress the provided {@link BytesReference}. */
public static BytesReference uncompress(BytesReference bytes) throws IOException {
Compressor compressor = compressor(bytes);
if (compressor == null) {
throw new NotCompressedException();

Check warning on line 85 in libs/core/src/main/java/org/opensearch/core/compress/CompressorRegistry.java

View check run for this annotation

Codecov / codecov/patch

libs/core/src/main/java/org/opensearch/core/compress/CompressorRegistry.java#L85

Added line #L85 was not covered by tests
}
return compressor.uncompress(bytes);
}

/**
* Uncompress the provided data, data can be detected as compressed using {@link #isCompressed(BytesReference)}.
*/
public static BytesReference uncompressIfNeeded(BytesReference bytes) throws IOException {
Compressor compressor = compressor(Objects.requireNonNull(bytes, "the BytesReference must not be null"));
return compressor == null ? bytes : compressor.uncompress(bytes);
}

/** Returns a registered compressor by its registered name */
public static Compressor getCompressor(final String name) {
if (registeredCompressors.containsKey(name)) {
return registeredCompressors.get(name);
}
throw new IllegalArgumentException("No registered compressor found by name [" + name + "]");

Check warning on line 103 in libs/core/src/main/java/org/opensearch/core/compress/CompressorRegistry.java

View check run for this annotation

Codecov / codecov/patch

libs/core/src/main/java/org/opensearch/core/compress/CompressorRegistry.java#L103

Added line #L103 was not covered by tests
}

/**
* Returns the registered compressors as an Immutable collection
*
* note: used for testing
*/
public static Map<String, Compressor> registeredCompressors() {
// no destructive danger as backing map is immutable
return registeredCompressors;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
* compatible open source license.
*/

package org.opensearch.common.compress;
package org.opensearch.core.compress;

import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.compress.Compressor;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -18,9 +18,18 @@
/**
* {@link Compressor} no compressor implementation.
*
* @opensearch.internal
* @opensearch.api - registered name requires BWC support
* @opensearch.experimental - class methods might change
*/
public class NoneCompressor implements Compressor {
/**
* The name to register the compressor by
*
* @opensearch.api - requires BWC support
*/
@PublicApi(since = "2.10.0")
public static final String NAME = "NONE";

@Override
public boolean isCompressed(BytesReference bytes) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* GitHub history for details.
*/

package org.opensearch.common.compress;
package org.opensearch.core.compress;

/**
* Exception indicating that we were expecting something compressed, which
Expand Down
Loading
Loading