From 4c24d3662b3529b5498d967efcdbdbefaa1e6fdc Mon Sep 17 00:00:00 2001 From: Tyler Gregg Date: Thu, 5 Dec 2024 13:04:52 -0800 Subject: [PATCH] Adds support for detection of interceptors on the classpath; renames StreamInterceptor to InputStreamInterceptor. --- .../ion/impl/_Private_IonReaderBuilder.java | 22 +- .../amazon/ion/system/IonReaderBuilder.java | 84 ++++- .../ion/{ => util}/GZIPStreamInterceptor.java | 6 +- .../InputStreamInterceptor.java} | 15 +- .../ion/system/IonReaderBuilderTest.java | 6 +- .../ion/system/ZstdStreamInterceptorTest.java | 153 --------- .../ion/util/ZstdStreamInterceptorTest.java | 290 ++++++++++++++++++ 7 files changed, 398 insertions(+), 178 deletions(-) rename src/main/java/com/amazon/ion/{ => util}/GZIPStreamInterceptor.java (88%) rename src/main/java/com/amazon/ion/{StreamInterceptor.java => util/InputStreamInterceptor.java} (76%) delete mode 100644 src/test/java/com/amazon/ion/system/ZstdStreamInterceptorTest.java create mode 100644 src/test/java/com/amazon/ion/util/ZstdStreamInterceptorTest.java diff --git a/src/main/java/com/amazon/ion/impl/_Private_IonReaderBuilder.java b/src/main/java/com/amazon/ion/impl/_Private_IonReaderBuilder.java index 96acca6e50..23ea7d6435 100644 --- a/src/main/java/com/amazon/ion/impl/_Private_IonReaderBuilder.java +++ b/src/main/java/com/amazon/ion/impl/_Private_IonReaderBuilder.java @@ -7,7 +7,7 @@ import com.amazon.ion.IonReader; import com.amazon.ion.IonTextReader; import com.amazon.ion.IonValue; -import com.amazon.ion.StreamInterceptor; +import com.amazon.ion.util.InputStreamInterceptor; import com.amazon.ion.system.IonReaderBuilder; import com.amazon.ion.util.IonStreamUtils; @@ -66,6 +66,20 @@ public void setLstFactory(_Private_LocalSymbolTableFactory factory) { } } + /** + * Configures the builder to use a custom {@link ClassLoader} for loading resources, such as + * {@link InputStreamInterceptor} instances. This method is internal only; users should configure + * stream interceptors using either {@link #addInputStreamInterceptor(InputStreamInterceptor)} or by + * making instances available for discovery by the default ClassLoader. + * @param customClassLoader the ClassLoader to use when loading resources. + * @return this builder instance, if mutable; otherwise, a mutable copy of this builder. + */ + public IonReaderBuilder withCustomClassLoader(ClassLoader customClassLoader) { + _Private_IonReaderBuilder b = (_Private_IonReaderBuilder) mutable(); + b.customClassLoader = customClassLoader; + return b; + } + public static class Mutable extends _Private_IonReaderBuilder { public Mutable() { @@ -199,7 +213,7 @@ static IonReader buildReader( IonReaderFromBytesFactoryBinary binary, IonReaderFromBytesFactoryText text ) { - for (StreamInterceptor streamInterceptor : builder.getStreamInterceptors()) { + for (InputStreamInterceptor streamInterceptor : builder.getInputStreamInterceptors()) { if (streamInterceptor.matchesHeader(ionData, offset, length)) { try { return buildReader( @@ -269,7 +283,7 @@ static IonReader buildReader( } int maxHeaderLength = Math.max( _Private_IonConstants.BINARY_VERSION_MARKER_SIZE, - builder.getStreamInterceptors().stream().mapToInt(StreamInterceptor::headerLength).max().orElse(0) + builder.getInputStreamInterceptors().stream().mapToInt(InputStreamInterceptor::headerMatchLength).max().orElse(0) ); // Note: this can create a lot of layers of InputStream wrappers. For example, if this method is called // from build(byte[]) and the bytes contain GZIP, the chain will be SequenceInputStream(ByteArrayInputStream, @@ -292,7 +306,7 @@ static IonReader buildReader( // stream will always be empty (in which case it doesn't matter whether a text or binary reader is used) // or it's a binary stream (in which case the correct reader was created) or it's a growing text stream // (which has always been unsupported). - for (StreamInterceptor streamInterceptor : builder.getStreamInterceptors()) { + for (InputStreamInterceptor streamInterceptor : builder.getInputStreamInterceptors()) { if (streamInterceptor.matchesHeader(possibleIVM, 0, bytesRead)) { try { ionData = streamInterceptor.newInputStream( diff --git a/src/main/java/com/amazon/ion/system/IonReaderBuilder.java b/src/main/java/com/amazon/ion/system/IonReaderBuilder.java index b143eb6380..08e75f7d46 100644 --- a/src/main/java/com/amazon/ion/system/IonReaderBuilder.java +++ b/src/main/java/com/amazon/ion/system/IonReaderBuilder.java @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 package com.amazon.ion.system; -import com.amazon.ion.GZIPStreamInterceptor; +import com.amazon.ion.util.GZIPStreamInterceptor; import com.amazon.ion.IonBufferConfiguration; import com.amazon.ion.IonCatalog; import com.amazon.ion.IonException; @@ -11,7 +11,7 @@ import com.amazon.ion.IonSystem; import com.amazon.ion.IonTextReader; import com.amazon.ion.IonValue; -import com.amazon.ion.StreamInterceptor; +import com.amazon.ion.util.InputStreamInterceptor; import com.amazon.ion.impl._Private_IonReaderBuilder; import java.io.IOException; @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.ServiceLoader; /** * Build a new {@link IonReader} from the given {@link IonCatalog} and data @@ -34,10 +35,20 @@ public abstract class IonReaderBuilder { + // Default stream interceptors, which always begin with the GZIP interceptor. + private static final List DEFAULT_STREAM_INTERCEPTORS = Collections.singletonList(GZIPStreamInterceptor.INSTANCE); + + // Detected stream interceptors. Each thread may have its own list because each thread may have a different + // context class loader. This list could be an instance variable, but since there may be use cases that require + // creating many IonReaderBuilder instances per thread, we prefer to inspect the classpath once per thread instead + // of once per instance. + private static final ThreadLocal> DETECTED_STREAM_INTERCEPTORS = new ThreadLocal<>(); + private IonCatalog catalog = null; private boolean isIncrementalReadingEnabled = false; private IonBufferConfiguration bufferConfiguration = IonBufferConfiguration.DEFAULT; - private List streamInterceptors = new ArrayList<>(Collections.singletonList(GZIPStreamInterceptor.INSTANCE)); + private List streamInterceptors = null; + protected ClassLoader customClassLoader = null; protected IonReaderBuilder() { @@ -48,7 +59,8 @@ protected IonReaderBuilder(IonReaderBuilder that) this.catalog = that.catalog; this.isIncrementalReadingEnabled = that.isIncrementalReadingEnabled; this.bufferConfiguration = that.bufferConfiguration; - this.streamInterceptors = new ArrayList<>(that.streamInterceptors); + this.streamInterceptors = that.streamInterceptors == null ? null : new ArrayList<>(that.streamInterceptors); + this.customClassLoader = that.customClassLoader == null ? null : that.customClassLoader; } /** @@ -258,26 +270,80 @@ public IonBufferConfiguration getBufferConfiguration() { } /** - * Adds a {@link StreamInterceptor} to the end of the list that the builder will apply - * in order to each stream before creating {@link IonReader} instances over that stream. + * Adds an {@link InputStreamInterceptor} to the end of the list that the builder will apply + * to each stream before creating {@link IonReader} instances over that stream. * {@link GZIPStreamInterceptor} is always consulted first, and need not be added. + *

+ * As an alternative to adding stream interceptors manually using this method, users + * may register implementations as service providers on the classpath. + * See {@link ServiceLoader} for details about how to do this. + *

+ * The list of stream interceptors available to the reader always begins with + * {@link GZIPStreamInterceptor} and is followed by either: + *

    + *
  1. any stream interceptor(s) added by calling this method, if this method was + * called at least once on this builder instance, OR
  2. + *
  3. any stream interceptors detected on the classpath using + * {@link ServiceLoader#load(Class)}, if this method was not called on this builder + * instance.
  4. + *
* * @param streamInterceptor the stream interceptor to add. * * @return this builder instance, if mutable; * otherwise a mutable copy of this builder. */ - public IonReaderBuilder addStreamInterceptor(StreamInterceptor streamInterceptor) { + public IonReaderBuilder addInputStreamInterceptor(InputStreamInterceptor streamInterceptor) { IonReaderBuilder b = mutable(); + if (b.streamInterceptors == null) { + b.streamInterceptors = new ArrayList<>(DEFAULT_STREAM_INTERCEPTORS); + } b.streamInterceptors.add(streamInterceptor); return b; } /** - * @see #addStreamInterceptor(StreamInterceptor) + * Detects implementations of {@link InputStreamInterceptor} using the given {@link ClassLoader}, appending any + * implementations found to the list of stream interceptors enabled by default. + * @param classLoader the ClassLoader to use to locate stream interceptor instances. + * @return the stream interceptors. + */ + private static List detectStreamInterceptorsOnClasspath(ClassLoader classLoader) { + List interceptorsOnClasspath = new ArrayList<>(4); // 4 is arbitrary, but more would be very rare. + interceptorsOnClasspath.addAll(DEFAULT_STREAM_INTERCEPTORS); + ServiceLoader.load(InputStreamInterceptor.class, classLoader).iterator().forEachRemaining(interceptorsOnClasspath::add); + return Collections.unmodifiableList(interceptorsOnClasspath); + } + + /** + * Detects implementations of {@link InputStreamInterceptor} using the default {@link ClassLoader}, appending any + * implementations found to the list of stream interceptors enabled by default. + * @return the stream interceptors. + */ + private static List detectStreamInterceptorsOnDefaultClasspath() { + List detectedStreamInterceptors = DETECTED_STREAM_INTERCEPTORS.get(); + if (detectedStreamInterceptors == null) { + detectedStreamInterceptors = detectStreamInterceptorsOnClasspath(Thread.currentThread().getContextClassLoader()); + DETECTED_STREAM_INTERCEPTORS.set(detectedStreamInterceptors); + } + return detectedStreamInterceptors; + } + + /** + * Gets the {@link InputStreamInterceptor} instances available to this builder. If any instances were added using + * {@link #addInputStreamInterceptor(InputStreamInterceptor)}, then the returned list will be the default stream + * interceptor, which detects GZIP, followed by the stream interceptor(s) manually added. If no instances were + * manually added, the returned list will be the default stream interceptor followed by any stream interceptor(s) + * detected on the classpath by {@link ServiceLoader#load(Class)}. + * @see #addInputStreamInterceptor(InputStreamInterceptor) * @return an unmodifiable view of the stream interceptors currently configured. */ - public List getStreamInterceptors() { + public List getInputStreamInterceptors() { + if (streamInterceptors == null) { + return customClassLoader == null + ? detectStreamInterceptorsOnDefaultClasspath() + : detectStreamInterceptorsOnClasspath(customClassLoader); + } return Collections.unmodifiableList(streamInterceptors); } diff --git a/src/main/java/com/amazon/ion/GZIPStreamInterceptor.java b/src/main/java/com/amazon/ion/util/GZIPStreamInterceptor.java similarity index 88% rename from src/main/java/com/amazon/ion/GZIPStreamInterceptor.java rename to src/main/java/com/amazon/ion/util/GZIPStreamInterceptor.java index a3706ca741..7484384c92 100644 --- a/src/main/java/com/amazon/ion/GZIPStreamInterceptor.java +++ b/src/main/java/com/amazon/ion/util/GZIPStreamInterceptor.java @@ -1,6 +1,6 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -package com.amazon.ion; +package com.amazon.ion.util; import java.io.IOException; import java.io.InputStream; @@ -9,7 +9,7 @@ /** * The interceptor for GZIP streams. */ -public enum GZIPStreamInterceptor implements StreamInterceptor { +public enum GZIPStreamInterceptor implements InputStreamInterceptor { INSTANCE; @@ -21,7 +21,7 @@ public String formatName() { } @Override - public int headerLength() { + public int headerMatchLength() { return GZIP_HEADER.length; } diff --git a/src/main/java/com/amazon/ion/StreamInterceptor.java b/src/main/java/com/amazon/ion/util/InputStreamInterceptor.java similarity index 76% rename from src/main/java/com/amazon/ion/StreamInterceptor.java rename to src/main/java/com/amazon/ion/util/InputStreamInterceptor.java index 9e771d17ab..e9a4cc540f 100644 --- a/src/main/java/com/amazon/ion/StreamInterceptor.java +++ b/src/main/java/com/amazon/ion/util/InputStreamInterceptor.java @@ -1,7 +1,8 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -package com.amazon.ion; +package com.amazon.ion.util; +import com.amazon.ion.IonReader; import com.amazon.ion.system.IonReaderBuilder; import java.io.IOException; @@ -12,10 +13,10 @@ * {@link IonReader} over a user-provided stream. This allows users to configure a sequence of interceptors * to allow transformation of the stream's raw bytes into valid text or binary Ion. * - * @see com.amazon.ion.system.IonReaderBuilder#addStreamInterceptor(StreamInterceptor) + * @see com.amazon.ion.system.IonReaderBuilder#addInputStreamInterceptor(InputStreamInterceptor) * @see com.amazon.ion.system.IonSystemBuilder#withReaderBuilder(IonReaderBuilder) */ -public interface StreamInterceptor { +public interface InputStreamInterceptor { /** * The name of the format the interceptor recognizes. @@ -24,16 +25,18 @@ public interface StreamInterceptor { String formatName(); /** - * The length of the byte header that identifies streams in this format. + * The number of bytes required to be read from the beginning of the stream in order to determine whether + * it matches this format. * @return the length in bytes. */ - int headerLength(); + int headerMatchLength(); /** * Determines whether the given candidate byte sequence matches this format. * @param candidate the candidate byte sequence. * @param offset the offset into the candidate bytes to begin matching. - * @param length the number of bytes (beginning at 'offset') in the candidate byte sequence. + * @param length the number of bytes (beginning at 'offset') in `candidate`. If this is less than + * {@link #headerMatchLength()}, then the candidate cannot be a match. * @return true if the candidate byte sequence matches; otherwise, false. */ boolean matchesHeader(byte[] candidate, int offset, int length); diff --git a/src/test/java/com/amazon/ion/system/IonReaderBuilderTest.java b/src/test/java/com/amazon/ion/system/IonReaderBuilderTest.java index 7c20416bdc..3b52703d1d 100644 --- a/src/test/java/com/amazon/ion/system/IonReaderBuilderTest.java +++ b/src/test/java/com/amazon/ion/system/IonReaderBuilderTest.java @@ -13,14 +13,14 @@ import static org.junit.Assert.fail; import com.amazon.ion.BitUtils; -import com.amazon.ion.GZIPStreamInterceptor; +import com.amazon.ion.util.GZIPStreamInterceptor; import com.amazon.ion.IonBufferConfiguration; import com.amazon.ion.IonCatalog; import com.amazon.ion.IonException; import com.amazon.ion.IonReader; import com.amazon.ion.IonType; import com.amazon.ion.IonWriter; -import com.amazon.ion.StreamInterceptor; +import com.amazon.ion.util.InputStreamInterceptor; import com.amazon.ion.impl.ResizingPipedInputStream; import com.amazon.ion.impl._Private_IonBinaryWriterBuilder; @@ -227,7 +227,7 @@ public void incompleteIvmFailsCleanly(boolean isIncremental) throws Exception { @Test public void gzipInterceptorEnabledByDefault() { IonReaderBuilder builder = IonReaderBuilder.standard(); - List interceptors = builder.getStreamInterceptors(); + List interceptors = builder.getInputStreamInterceptors(); assertEquals(1, interceptors.size()); assertEquals(GZIPStreamInterceptor.INSTANCE.formatName(), interceptors.get(0).formatName()); // The list returned from IonReaderBuilder.getStreamInterceptors() is unmodifiable. diff --git a/src/test/java/com/amazon/ion/system/ZstdStreamInterceptorTest.java b/src/test/java/com/amazon/ion/system/ZstdStreamInterceptorTest.java deleted file mode 100644 index d2a90955b4..0000000000 --- a/src/test/java/com/amazon/ion/system/ZstdStreamInterceptorTest.java +++ /dev/null @@ -1,153 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package com.amazon.ion.system; - -import com.amazon.ion.IonReader; -import com.amazon.ion.IonType; -import com.amazon.ion.IonWriter; -import com.amazon.ion.StreamInterceptor; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; - -import com.github.luben.zstd.ZstdInputStream; -import com.github.luben.zstd.ZstdOutputStream; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.EnumSource; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -/** - * Demonstrates how a StreamInterceptor that recognizes Zstd streams can be plugged into the IonReaderBuilder and - * IonSystem. - */ -public class ZstdStreamInterceptorTest { - - enum ZstdStreamInterceptor implements StreamInterceptor { - INSTANCE; - - private static final byte[] ZSTD_HEADER = {(byte) 0x28, (byte) 0xB5, (byte) 0x2F, (byte) 0xFD}; - - @Override - public String formatName() { - return "Zstd"; - } - - @Override - public int headerLength() { - return ZSTD_HEADER.length; - } - - @Override - public boolean matchesHeader(byte[] candidate, int offset, int length) { - if (candidate == null || length < ZSTD_HEADER.length) { - return false; - } - - for (int i = 0; i < ZSTD_HEADER.length; i++) { - if (ZSTD_HEADER[i] != candidate[offset + i]) { - return false; - } - } - return true; - } - - @Override - public InputStream newInputStream(InputStream interceptedStream) throws IOException { - return new ZstdInputStream(interceptedStream).setContinuous(true); - } - } - - public enum ZstdStream { - BINARY_STREAM_READER { - @Override - IonReader newReader(IonReaderBuilder builder) { - return builder.build(new ByteArrayInputStream(BINARY_BYTES)); - } - }, - TEXT_STREAM_READER { - @Override - IonReader newReader(IonReaderBuilder builder) { - return builder.build(new ByteArrayInputStream(TEXT_BYTES)); - } - }, - BINARY_BYTES_READER { - @Override - IonReader newReader(IonReaderBuilder builder) { - return builder.build(BINARY_BYTES); - } - }, - TEXT_BYTES_READER { - @Override - IonReader newReader(IonReaderBuilder builder) { - return builder.build(TEXT_BYTES); - } - }, - BINARY_STREAM_SYSTEM { - @Override - IonReader newReader(IonReaderBuilder builder) { - return IonSystemBuilder.standard() - .withReaderBuilder(builder) - .build() - .newReader(new ByteArrayInputStream(BINARY_BYTES)); - } - }, - TEXT_STREAM_SYSTEM { - @Override - IonReader newReader(IonReaderBuilder builder) { - return IonSystemBuilder.standard() - .withReaderBuilder(builder) - .build() - .newReader(new ByteArrayInputStream(TEXT_BYTES)); - } - }, - BINARY_BYTES_SYSTEM { - @Override - IonReader newReader(IonReaderBuilder builder) { - return IonSystemBuilder.standard() - .withReaderBuilder(builder) - .build() - .newReader(BINARY_BYTES); - } - }, - TEXT_BYTES_SYSTEM { - @Override - IonReader newReader(IonReaderBuilder builder) { - return IonSystemBuilder.standard() - .withReaderBuilder(builder) - .build() - .newReader(TEXT_BYTES); - } - }; - - abstract IonReader newReader(IonReaderBuilder builder); - - private static byte[] writeCompressedStream(boolean isText) { - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - try (IonWriter writer = isText - ? IonTextWriterBuilder.standard().build(new ZstdOutputStream(bytes)) - : IonBinaryWriterBuilder.standard().build(new ZstdOutputStream(bytes)) - ) { - writer.writeInt(123); - } catch (IOException e) { - throw new IllegalStateException(e); - } - return bytes.toByteArray(); - } - - private static final byte[] TEXT_BYTES = writeCompressedStream(true); - private static final byte[] BINARY_BYTES = writeCompressedStream(false); - } - - @ParameterizedTest - @EnumSource(ZstdStream.class) - public void interceptedViaIonReader(ZstdStream stream) throws IOException { - IonReaderBuilder builder = IonReaderBuilder.standard().addStreamInterceptor(ZstdStreamInterceptor.INSTANCE); - try (IonReader reader = stream.newReader(builder)) { - assertEquals(IonType.INT, reader.next()); - assertEquals(123, reader.intValue()); - } - } -} diff --git a/src/test/java/com/amazon/ion/util/ZstdStreamInterceptorTest.java b/src/test/java/com/amazon/ion/util/ZstdStreamInterceptorTest.java new file mode 100644 index 0000000000..4c964cca4b --- /dev/null +++ b/src/test/java/com/amazon/ion/util/ZstdStreamInterceptorTest.java @@ -0,0 +1,290 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package com.amazon.ion.util; + +import com.amazon.ion.system.IonBinaryWriterBuilder; +import com.amazon.ion.system.IonReaderBuilder; +import com.amazon.ion.system.IonSystemBuilder; +import com.amazon.ion.system.IonTextWriterBuilder; +import com.amazon.ion.IonReader; +import com.amazon.ion.IonType; +import com.amazon.ion.IonWriter; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.net.URLClassLoader; +import java.net.URLConnection; +import java.net.URLStreamHandler; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Enumeration; +import java.util.List; + +import com.amazon.ion.impl._Private_IonReaderBuilder; +import com.github.luben.zstd.ZstdInputStream; +import com.github.luben.zstd.ZstdOutputStream; +import org.junit.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import static org.junit.Assert.assertSame; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Demonstrates how a StreamInterceptor that recognizes Zstd streams can be plugged into the IonReaderBuilder and + * IonSystem. + */ +public class ZstdStreamInterceptorTest { + + private static final byte[] ZSTD_HEADER = {(byte) 0x28, (byte) 0xB5, (byte) 0x2F, (byte) 0xFD}; + + public static class ZstdStreamInterceptor implements InputStreamInterceptor { + + @Override + public String formatName() { + return "Zstd"; + } + + @Override + public int headerMatchLength() { + return ZSTD_HEADER.length; + } + + @Override + public boolean matchesHeader(byte[] candidate, int offset, int length) { + if (candidate == null || length < ZSTD_HEADER.length) { + return false; + } + + for (int i = 0; i < ZSTD_HEADER.length; i++) { + if (ZSTD_HEADER[i] != candidate[offset + i]) { + return false; + } + } + return true; + } + + @Override + public InputStream newInputStream(InputStream interceptedStream) throws IOException { + return new ZstdInputStream(interceptedStream).setContinuous(true); + } + } + + public enum ZstdStream { + BINARY_STREAM_READER { + @Override + IonReader newReader(IonReaderBuilder builder) { + return builder.build(new ByteArrayInputStream(BINARY_BYTES)); + } + }, + TEXT_STREAM_READER { + @Override + IonReader newReader(IonReaderBuilder builder) { + return builder.build(new ByteArrayInputStream(TEXT_BYTES)); + } + }, + BINARY_BYTES_READER { + @Override + IonReader newReader(IonReaderBuilder builder) { + return builder.build(BINARY_BYTES); + } + }, + TEXT_BYTES_READER { + @Override + IonReader newReader(IonReaderBuilder builder) { + return builder.build(TEXT_BYTES); + } + }, + BINARY_STREAM_SYSTEM { + @Override + IonReader newReader(IonReaderBuilder builder) { + return IonSystemBuilder.standard() + .withReaderBuilder(builder) + .build() + .newReader(new ByteArrayInputStream(BINARY_BYTES)); + } + }, + TEXT_STREAM_SYSTEM { + @Override + IonReader newReader(IonReaderBuilder builder) { + return IonSystemBuilder.standard() + .withReaderBuilder(builder) + .build() + .newReader(new ByteArrayInputStream(TEXT_BYTES)); + } + }, + BINARY_BYTES_SYSTEM { + @Override + IonReader newReader(IonReaderBuilder builder) { + return IonSystemBuilder.standard() + .withReaderBuilder(builder) + .build() + .newReader(BINARY_BYTES); + } + }, + TEXT_BYTES_SYSTEM { + @Override + IonReader newReader(IonReaderBuilder builder) { + return IonSystemBuilder.standard() + .withReaderBuilder(builder) + .build() + .newReader(TEXT_BYTES); + } + }; + + abstract IonReader newReader(IonReaderBuilder builder); + + private static byte[] writeCompressedStream(boolean isText) { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + try (IonWriter writer = isText + ? IonTextWriterBuilder.standard().build(new ZstdOutputStream(bytes)) + : IonBinaryWriterBuilder.standard().build(new ZstdOutputStream(bytes)) + ) { + writer.writeInt(123); + } catch (IOException e) { + throw new IllegalStateException(e); + } + return bytes.toByteArray(); + } + + private static final byte[] TEXT_BYTES = writeCompressedStream(true); + private static final byte[] BINARY_BYTES = writeCompressedStream(false); + } + + @ParameterizedTest + @EnumSource(ZstdStream.class) + public void interceptedWhenAddedManually(ZstdStream stream) throws IOException { + IonReaderBuilder builder = IonReaderBuilder.standard().addInputStreamInterceptor(new ZstdStreamInterceptor()); + try (IonReader reader = stream.newReader(builder)) { + assertEquals(IonType.INT, reader.next()); + assertEquals(123, reader.intValue()); + } + } + + public static class CustomInterceptorClassLoader extends URLClassLoader { + + public CustomInterceptorClassLoader() { + super(new URL[0], InputStreamInterceptor.class.getClassLoader()); + } + + @Override + public Enumeration getResources(String name) throws IOException { + if (name.equals("META-INF/services/" + InputStreamInterceptor.class.getName())) { + URL dummyUrl = new URL("unused", "unused", 42, "unused", new URLStreamHandler() { + @Override + protected URLConnection openConnection(URL url) { + return new URLConnection(url) { + @Override + public void connect() { + // Nothing to do. + } + + @Override + public InputStream getInputStream() { + return new ByteArrayInputStream(ZstdStreamInterceptor.class.getName().getBytes(StandardCharsets.UTF_8)); + } + }; + } + }); + return Collections.enumeration(Collections.singletonList(dummyUrl)); + } + return super.getResources(name); + } + } + + @ParameterizedTest + @EnumSource(ZstdStream.class) + public void interceptedWhenDetectedOnClasspath(ZstdStream stream) throws IOException { + IonReaderBuilder builder = ((_Private_IonReaderBuilder) IonReaderBuilder.standard()) + .withCustomClassLoader(new CustomInterceptorClassLoader()); + try (IonReader reader = stream.newReader(builder)) { + assertEquals(IonType.INT, reader.next()); + assertEquals(123, reader.intValue()); + } + } + + private static void assertGzipThen(IonReaderBuilder readerBuilder, Class interceptorType) { + List streamInterceptorsAddedManually = readerBuilder.getInputStreamInterceptors(); + assertEquals(2, streamInterceptorsAddedManually.size()); + assertSame(GZIPStreamInterceptor.INSTANCE, streamInterceptorsAddedManually.get(0)); + assertTrue(streamInterceptorsAddedManually.get(1).getClass().isAssignableFrom(interceptorType)); + } + + @Test + public void gzipAlwaysAvailableWhenZstdAddedManually() { + IonReaderBuilder builder = IonReaderBuilder.standard().addInputStreamInterceptor(new ZstdStreamInterceptor()); + assertGzipThen(builder, ZstdStreamInterceptor.class); + } + + @Test + public void gzipAlwaysAvailableWhenZstdDetectedOnClasspath() { + IonReaderBuilder builder = ((_Private_IonReaderBuilder) IonReaderBuilder.standard()) + .withCustomClassLoader(new CustomInterceptorClassLoader()); + assertGzipThen(builder, ZstdStreamInterceptor.class); + } + + private static class DummyInterceptor implements InputStreamInterceptor { + + @Override + public String formatName() { + return null; + } + + @Override + public int headerMatchLength() { + return 0; + } + + @Override + public boolean matchesHeader(byte[] candidate, int offset, int length) { + return false; + } + + @Override + public InputStream newInputStream(InputStream interceptedStream){ + return null; + } + } + + @Test + public void addingManuallyTakesPrecedenceOverClasspath() { + IonReaderBuilder builder = ((_Private_IonReaderBuilder) IonReaderBuilder.standard()) + .withCustomClassLoader(new CustomInterceptorClassLoader()) // This would add Zstd if classpath detection were used. + .addInputStreamInterceptor(new DummyInterceptor()); + assertGzipThen(builder, DummyInterceptor.class); + } + + @Test + public void differentClassLoadersInEachThread() throws Exception { + Thread withDefaultClassLoader = new Thread(() -> { + IonReaderBuilder readerBuilder = IonReaderBuilder.standard(); + List streamInterceptors = readerBuilder.getInputStreamInterceptors(); + assertEquals(1, streamInterceptors.size()); + assertSame(GZIPStreamInterceptor.INSTANCE, streamInterceptors.get(0)); + }); + Thread withCustomClassLoader = new Thread(() -> { + IonReaderBuilder readerBuilder = IonReaderBuilder.standard(); + List streamInterceptors = readerBuilder.getInputStreamInterceptors(); + assertGzipThen(readerBuilder, ZstdStreamInterceptor.class); + // Verify that a new IonReaderBuilder instance does not re-detect the interceptors applicable to this + // thread. + readerBuilder = IonReaderBuilder.standard(); + assertSame(streamInterceptors, readerBuilder.getInputStreamInterceptors()); + }); + withCustomClassLoader.setContextClassLoader(new CustomInterceptorClassLoader()); + + withDefaultClassLoader.start(); + withCustomClassLoader.start(); + + // While the spawned threads are working, verify they do not affect the parent thread. + IonReaderBuilder builder = IonReaderBuilder.standard().addInputStreamInterceptor(new DummyInterceptor()); + assertGzipThen(builder, DummyInterceptor.class); + + withDefaultClassLoader.join(); + withCustomClassLoader.join(); + } +}