Skip to content

Commit

Permalink
Adds support for detection of interceptors on the classpath; renames …
Browse files Browse the repository at this point in the history
…StreamInterceptor to InputStreamInterceptor.
  • Loading branch information
tgregg committed Dec 5, 2024
1 parent 4436c60 commit 4c24d36
Show file tree
Hide file tree
Showing 7 changed files with 398 additions and 178 deletions.
22 changes: 18 additions & 4 deletions src/main/java/com/amazon/ion/impl/_Private_IonReaderBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.amazon.ion.IonReader;
import com.amazon.ion.IonTextReader;
import com.amazon.ion.IonValue;
import com.amazon.ion.StreamInterceptor;
import com.amazon.ion.util.InputStreamInterceptor;
import com.amazon.ion.system.IonReaderBuilder;
import com.amazon.ion.util.IonStreamUtils;

Expand Down Expand Up @@ -66,6 +66,20 @@ public void setLstFactory(_Private_LocalSymbolTableFactory factory) {
}
}

/**
* Configures the builder to use a custom {@link ClassLoader} for loading resources, such as
* {@link InputStreamInterceptor} instances. This method is internal only; users should configure
* stream interceptors using either {@link #addInputStreamInterceptor(InputStreamInterceptor)} or by
* making instances available for discovery by the default ClassLoader.
* @param customClassLoader the ClassLoader to use when loading resources.
* @return this builder instance, if mutable; otherwise, a mutable copy of this builder.
*/
public IonReaderBuilder withCustomClassLoader(ClassLoader customClassLoader) {
_Private_IonReaderBuilder b = (_Private_IonReaderBuilder) mutable();
b.customClassLoader = customClassLoader;
return b;
}

public static class Mutable extends _Private_IonReaderBuilder {

public Mutable() {
Expand Down Expand Up @@ -199,7 +213,7 @@ static IonReader buildReader(
IonReaderFromBytesFactoryBinary binary,
IonReaderFromBytesFactoryText text
) {
for (StreamInterceptor streamInterceptor : builder.getStreamInterceptors()) {
for (InputStreamInterceptor streamInterceptor : builder.getInputStreamInterceptors()) {
if (streamInterceptor.matchesHeader(ionData, offset, length)) {
try {
return buildReader(
Expand Down Expand Up @@ -269,7 +283,7 @@ static IonReader buildReader(
}
int maxHeaderLength = Math.max(
_Private_IonConstants.BINARY_VERSION_MARKER_SIZE,
builder.getStreamInterceptors().stream().mapToInt(StreamInterceptor::headerLength).max().orElse(0)
builder.getInputStreamInterceptors().stream().mapToInt(InputStreamInterceptor::headerMatchLength).max().orElse(0)
);
// Note: this can create a lot of layers of InputStream wrappers. For example, if this method is called
// from build(byte[]) and the bytes contain GZIP, the chain will be SequenceInputStream(ByteArrayInputStream,
Expand All @@ -292,7 +306,7 @@ static IonReader buildReader(
// stream will always be empty (in which case it doesn't matter whether a text or binary reader is used)
// or it's a binary stream (in which case the correct reader was created) or it's a growing text stream
// (which has always been unsupported).
for (StreamInterceptor streamInterceptor : builder.getStreamInterceptors()) {
for (InputStreamInterceptor streamInterceptor : builder.getInputStreamInterceptors()) {
if (streamInterceptor.matchesHeader(possibleIVM, 0, bytesRead)) {
try {
ionData = streamInterceptor.newInputStream(
Expand Down
84 changes: 75 additions & 9 deletions src/main/java/com/amazon/ion/system/IonReaderBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
package com.amazon.ion.system;

import com.amazon.ion.GZIPStreamInterceptor;
import com.amazon.ion.util.GZIPStreamInterceptor;
import com.amazon.ion.IonBufferConfiguration;
import com.amazon.ion.IonCatalog;
import com.amazon.ion.IonException;
Expand All @@ -11,7 +11,7 @@
import com.amazon.ion.IonSystem;
import com.amazon.ion.IonTextReader;
import com.amazon.ion.IonValue;
import com.amazon.ion.StreamInterceptor;
import com.amazon.ion.util.InputStreamInterceptor;
import com.amazon.ion.impl._Private_IonReaderBuilder;

import java.io.IOException;
Expand All @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.ServiceLoader;

/**
* Build a new {@link IonReader} from the given {@link IonCatalog} and data
Expand All @@ -34,10 +35,20 @@
public abstract class IonReaderBuilder
{

// Default stream interceptors, which always begin with the GZIP interceptor.
private static final List<InputStreamInterceptor> DEFAULT_STREAM_INTERCEPTORS = Collections.singletonList(GZIPStreamInterceptor.INSTANCE);

// Detected stream interceptors. Each thread may have its own list because each thread may have a different
// context class loader. This list could be an instance variable, but since there may be use cases that require
// creating many IonReaderBuilder instances per thread, we prefer to inspect the classpath once per thread instead
// of once per instance.
private static final ThreadLocal<List<InputStreamInterceptor>> DETECTED_STREAM_INTERCEPTORS = new ThreadLocal<>();

private IonCatalog catalog = null;
private boolean isIncrementalReadingEnabled = false;
private IonBufferConfiguration bufferConfiguration = IonBufferConfiguration.DEFAULT;
private List<StreamInterceptor> streamInterceptors = new ArrayList<>(Collections.singletonList(GZIPStreamInterceptor.INSTANCE));
private List<InputStreamInterceptor> streamInterceptors = null;
protected ClassLoader customClassLoader = null;

protected IonReaderBuilder()
{
Expand All @@ -48,7 +59,8 @@ protected IonReaderBuilder(IonReaderBuilder that)
this.catalog = that.catalog;
this.isIncrementalReadingEnabled = that.isIncrementalReadingEnabled;
this.bufferConfiguration = that.bufferConfiguration;
this.streamInterceptors = new ArrayList<>(that.streamInterceptors);
this.streamInterceptors = that.streamInterceptors == null ? null : new ArrayList<>(that.streamInterceptors);
this.customClassLoader = that.customClassLoader == null ? null : that.customClassLoader;
}

/**
Expand Down Expand Up @@ -258,26 +270,80 @@ public IonBufferConfiguration getBufferConfiguration() {
}

/**
* Adds a {@link StreamInterceptor} to the end of the list that the builder will apply
* in order to each stream before creating {@link IonReader} instances over that stream.
* Adds an {@link InputStreamInterceptor} to the end of the list that the builder will apply
* to each stream before creating {@link IonReader} instances over that stream.
* {@link GZIPStreamInterceptor} is always consulted first, and need not be added.
* <p>
* As an alternative to adding stream interceptors manually using this method, users
* may register implementations as service providers on the classpath.
* See {@link ServiceLoader} for details about how to do this.
* <p>
* The list of stream interceptors available to the reader always begins with
* {@link GZIPStreamInterceptor} and is followed by either:
* <ol>
* <li>any stream interceptor(s) added by calling this method, if this method was
* called at least once on this builder instance, OR</li>
* <li>any stream interceptors detected on the classpath using
* {@link ServiceLoader#load(Class)}, if this method was not called on this builder
* instance.</li>
* </ol>
*
* @param streamInterceptor the stream interceptor to add.
*
* @return this builder instance, if mutable;
* otherwise a mutable copy of this builder.
*/
public IonReaderBuilder addStreamInterceptor(StreamInterceptor streamInterceptor) {
public IonReaderBuilder addInputStreamInterceptor(InputStreamInterceptor streamInterceptor) {
IonReaderBuilder b = mutable();
if (b.streamInterceptors == null) {
b.streamInterceptors = new ArrayList<>(DEFAULT_STREAM_INTERCEPTORS);
}
b.streamInterceptors.add(streamInterceptor);
return b;
}

/**
* @see #addStreamInterceptor(StreamInterceptor)
* Detects implementations of {@link InputStreamInterceptor} using the given {@link ClassLoader}, appending any
* implementations found to the list of stream interceptors enabled by default.
* @param classLoader the ClassLoader to use to locate stream interceptor instances.
* @return the stream interceptors.
*/
private static List<InputStreamInterceptor> detectStreamInterceptorsOnClasspath(ClassLoader classLoader) {
List<InputStreamInterceptor> interceptorsOnClasspath = new ArrayList<>(4); // 4 is arbitrary, but more would be very rare.
interceptorsOnClasspath.addAll(DEFAULT_STREAM_INTERCEPTORS);
ServiceLoader.load(InputStreamInterceptor.class, classLoader).iterator().forEachRemaining(interceptorsOnClasspath::add);
return Collections.unmodifiableList(interceptorsOnClasspath);
}

/**
* Detects implementations of {@link InputStreamInterceptor} using the default {@link ClassLoader}, appending any
* implementations found to the list of stream interceptors enabled by default.
* @return the stream interceptors.
*/
private static List<InputStreamInterceptor> detectStreamInterceptorsOnDefaultClasspath() {
List<InputStreamInterceptor> detectedStreamInterceptors = DETECTED_STREAM_INTERCEPTORS.get();
if (detectedStreamInterceptors == null) {
detectedStreamInterceptors = detectStreamInterceptorsOnClasspath(Thread.currentThread().getContextClassLoader());
DETECTED_STREAM_INTERCEPTORS.set(detectedStreamInterceptors);
}
return detectedStreamInterceptors;
}

/**
* Gets the {@link InputStreamInterceptor} instances available to this builder. If any instances were added using
* {@link #addInputStreamInterceptor(InputStreamInterceptor)}, then the returned list will be the default stream
* interceptor, which detects GZIP, followed by the stream interceptor(s) manually added. If no instances were
* manually added, the returned list will be the default stream interceptor followed by any stream interceptor(s)
* detected on the classpath by {@link ServiceLoader#load(Class)}.
* @see #addInputStreamInterceptor(InputStreamInterceptor)
* @return an unmodifiable view of the stream interceptors currently configured.
*/
public List<StreamInterceptor> getStreamInterceptors() {
public List<InputStreamInterceptor> getInputStreamInterceptors() {
if (streamInterceptors == null) {
return customClassLoader == null
? detectStreamInterceptorsOnDefaultClasspath()
: detectStreamInterceptorsOnClasspath(customClassLoader);
}
return Collections.unmodifiableList(streamInterceptors);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package com.amazon.ion;
package com.amazon.ion.util;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -9,7 +9,7 @@
/**
* The interceptor for GZIP streams.
*/
public enum GZIPStreamInterceptor implements StreamInterceptor {
public enum GZIPStreamInterceptor implements InputStreamInterceptor {

INSTANCE;

Expand All @@ -21,7 +21,7 @@ public String formatName() {
}

@Override
public int headerLength() {
public int headerMatchLength() {
return GZIP_HEADER.length;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package com.amazon.ion;
package com.amazon.ion.util;

import com.amazon.ion.IonReader;
import com.amazon.ion.system.IonReaderBuilder;

import java.io.IOException;
Expand All @@ -12,10 +13,10 @@
* {@link IonReader} over a user-provided stream. This allows users to configure a sequence of interceptors
* to allow transformation of the stream's raw bytes into valid text or binary Ion.
*
* @see com.amazon.ion.system.IonReaderBuilder#addStreamInterceptor(StreamInterceptor)
* @see com.amazon.ion.system.IonReaderBuilder#addInputStreamInterceptor(InputStreamInterceptor)
* @see com.amazon.ion.system.IonSystemBuilder#withReaderBuilder(IonReaderBuilder)
*/
public interface StreamInterceptor {
public interface InputStreamInterceptor {

/**
* The name of the format the interceptor recognizes.
Expand All @@ -24,16 +25,18 @@ public interface StreamInterceptor {
String formatName();

/**
* The length of the byte header that identifies streams in this format.
* The number of bytes required to be read from the beginning of the stream in order to determine whether
* it matches this format.
* @return the length in bytes.
*/
int headerLength();
int headerMatchLength();

/**
* Determines whether the given candidate byte sequence matches this format.
* @param candidate the candidate byte sequence.
* @param offset the offset into the candidate bytes to begin matching.
* @param length the number of bytes (beginning at 'offset') in the candidate byte sequence.
* @param length the number of bytes (beginning at 'offset') in `candidate`. If this is less than
* {@link #headerMatchLength()}, then the candidate cannot be a match.
* @return true if the candidate byte sequence matches; otherwise, false.
*/
boolean matchesHeader(byte[] candidate, int offset, int length);
Expand Down
6 changes: 3 additions & 3 deletions src/test/java/com/amazon/ion/system/IonReaderBuilderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
import static org.junit.Assert.fail;

import com.amazon.ion.BitUtils;
import com.amazon.ion.GZIPStreamInterceptor;
import com.amazon.ion.util.GZIPStreamInterceptor;
import com.amazon.ion.IonBufferConfiguration;
import com.amazon.ion.IonCatalog;
import com.amazon.ion.IonException;
import com.amazon.ion.IonReader;
import com.amazon.ion.IonType;
import com.amazon.ion.IonWriter;
import com.amazon.ion.StreamInterceptor;
import com.amazon.ion.util.InputStreamInterceptor;
import com.amazon.ion.impl.ResizingPipedInputStream;
import com.amazon.ion.impl._Private_IonBinaryWriterBuilder;

Expand Down Expand Up @@ -227,7 +227,7 @@ public void incompleteIvmFailsCleanly(boolean isIncremental) throws Exception {
@Test
public void gzipInterceptorEnabledByDefault() {
IonReaderBuilder builder = IonReaderBuilder.standard();
List<StreamInterceptor> interceptors = builder.getStreamInterceptors();
List<InputStreamInterceptor> interceptors = builder.getInputStreamInterceptors();
assertEquals(1, interceptors.size());
assertEquals(GZIPStreamInterceptor.INSTANCE.formatName(), interceptors.get(0).formatName());
// The list returned from IonReaderBuilder.getStreamInterceptors() is unmodifiable.
Expand Down
Loading

0 comments on commit 4c24d36

Please sign in to comment.