Skip to content

Commit

Permalink
Implement very first version of seekable stream (awslabs#15)
Browse files Browse the repository at this point in the history
Implement very first version of seekable stream

In this commit we are adding the first version of seekable stream that calls
the object client under the hood. For now, seeking is extremely trivial and
reading data amounts to proxying down read calls to the GetObject stream.

The testing part is more important. On top of unit tests we set up the
ability to test against a mocked version of S3. In the test scope we also
add an in-memory stream backed by a byte array. Reading out of this stream
and seeking in this stream is very easy to implement correctly and this can
provide a good base for ensuring correctness in the future.

We also add Checkstyle to enforce Javadoc comments.
  • Loading branch information
CsengerG authored Apr 23, 2024
1 parent b5b0348 commit 0a277c4
Show file tree
Hide file tree
Showing 16 changed files with 573 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ plugins {

// Formatting
id("com.diffplug.spotless")
checkstyle
}

jacoco {
Expand Down
20 changes: 20 additions & 0 deletions config/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<!DOCTYPE module PUBLIC
"-//Puppy Crawl//DTD Check Configuration 1.3//EN"
"http://www.puppycrawl.com/dtds/configuration_1_3.dtd">

<module name="Checker">
<module name="SuppressionFilter">
<property name="file" value="${config_loc}/suppression.xml"/>
</module>

<module name="TreeWalker">
<module name="MissingJavadocMethod">
<property name="scope" value="public"/>
<property name="allowedAnnotations" value="Override,BeforeAll,Before,Test"/>
</module>
<module name="MissingJavadocPackage"/>
<module name="MissingJavadocType">
<property name="scope" value="public"/>
</module>
</module>
</module>
9 changes: 9 additions & 0 deletions config/checkstyle/suppression.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?xml version="1.0"?>

<!DOCTYPE suppressions PUBLIC
"-//Checkstyle//DTD SuppressionFilter Configuration 1.0//EN"
"https://checkstyle.org/dtds/suppressions_1_0.dtd">

<suppressions>
<suppress checks="MissingJavadocType" files=".*/src/test/java/.*"/>
</suppressions>
22 changes: 18 additions & 4 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,28 @@
junit = "5.10.1"
mockito = "4.11.0"
spotless = "6.13.0"
guava = "32.1.2-jre"
s3 = "2.25.31"
s3mock = "3.6.0"
testcontainers = "1.19.7"
crt = "0.29.10"
lombok = "1.18.32"

[libraries]
# S3-related dependencies
s3 = { module = "software.amazon.awssdk:s3", version.ref = "s3" }
sdk-url-connection-client = { module = "software.amazon.awssdk:url-connection-client", version.ref = "s3" }
sdk-bom = { group = "software.amazon.awssdk", name = "bom", version.ref = "s3" }
crt = { module = "software.amazon.awssdk.crt:aws-crt", version.ref = "crt" }
netty-nio-client = { module = "software.amazon.awssdk:netty-nio-client", version.ref = "s3" }

guava = { module = "com.google.guava:guava", version.ref = "guava" }
junit-jupiter = { module = "org.junit.jupiter:junit-jupiter", version.ref = "junit" }
junit-jupiter-launcher = { module = "org.junit.platform:junit-platform-launcher" }
mockito-core = { module = "org.mockito:mockito-core", version.ref = "mockito" }
mockito-inline = { module = "org.mockito:mockito-inline", version.ref = "mockito" }
mockito-junit-jupiter = { module = "org.mockito:mockito-junit-jupiter", version.ref = "mockito" }
spotless = { module = "com.diffplug.spotless:spotless-plugin-gradle", version.ref = "spotless" }




s3mock-testcontainers = { module = "com.adobe.testing:s3mock-testcontainers", version.ref = "s3mock" }
testcontainers-junit-jupiter = { module = "org.testcontainers:junit-jupiter", version.ref = "testcontainers" }
lombok = { module = "org.projectlombok:lombok", version.ref = "lombok" }
28 changes: 28 additions & 0 deletions input-stream/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,42 @@

plugins {
id("buildlogic.java-library-conventions")
id("io.freefair.lombok") version "8.6"
}

dependencies {
api(project(":object-client"))

implementation(libs.guava)
implementation(libs.s3)
testImplementation(libs.junit.jupiter)
testImplementation(libs.mockito.core)
testImplementation(libs.mockito.inline)
testImplementation(libs.mockito.junit.jupiter)
testImplementation(libs.s3mock.testcontainers)
testImplementation(libs.sdk.url.connection.client)
testImplementation(libs.testcontainers.junit.jupiter)
testImplementation(libs.netty.nio.client)
testRuntimeOnly(libs.junit.jupiter.launcher)
}

tasks.withType<JavaCompile>().configureEach {
}

tasks.compileJava {
javaCompiler = javaToolchains.compilerFor {
languageVersion = JavaLanguageVersion.of(8)
}
}

tasks.compileTestJava {
javaCompiler = javaToolchains.compilerFor {
languageVersion = JavaLanguageVersion.of(17)
}
}

tasks.test {
javaLauncher = javaToolchains.launcherFor {
languageVersion = JavaLanguageVersion.of(17)
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.amazon.connector.s3;

import com.amazon.connector.s3.util.S3URI;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;

/**
* High throughput seekable stream used to read data from Amazon S3.
*
* <p>Don't share between threads. The current implementation is not thread safe in that calling
* {@link #seek(long) seek} will modify the position of the stream and the behaviour of calling
* {@link #seek(long) seek} and {@link #read() read} concurrently from two different threads is
* undefined.
*/
public class S3SeekableInputStream extends SeekableInputStream {
private final ObjectClient objectClient;
private final S3URI uri;

private long position;
private InputStream stream;

/**
* Creates a new instance of {@link S3SeekableInputStream}.
*
* @param objectClient an instance of {@link ObjectClient}.
* @param uri location of the S3 object this stream is fetching data from
*/
public S3SeekableInputStream(ObjectClient objectClient, S3URI uri) throws IOException {
Preconditions.checkNotNull(objectClient, "objectClient must not be null");
Preconditions.checkNotNull(uri, "S3 URI must not be null");

this.objectClient = objectClient;
this.uri = uri;

this.position = 0;
requestBytes(position);
}

@Override
public int read() throws IOException {
int byteRead = stream.read();

if (byteRead < 0) {
return -1;
}

this.position++;
return byteRead;
}

@Override
public void seek(long pos) throws IOException {
try {
requestBytes(pos);
this.position = pos;
} catch (Exception e) {
throw new IOException(String.format("Unable to seek to position %s", pos));
}
}

@Override
public long getPos() {
return this.position;
}

@Override
public void close() throws IOException {
super.close();
this.stream.close();
}

private void requestBytes(long pos) throws IOException {
if (Objects.nonNull(this.stream)) {
this.stream.close();
}

this.stream =
this.objectClient.getObject(
GetObjectRequest.builder()
.bucket(uri.getBucket())
.key(uri.getKey())
.range(String.format("bytes=%s-", pos))
.build());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.amazon.connector.s3;

import java.io.IOException;
import java.io.InputStream;

/**
* A SeekableInputStream is like a conventional InputStream but equipped with two additional
* operations: {@link #seek(long) seek} and {@link #getPos() getPos}. Typically, seekable streams
* are used for random data access (i.e, data access that is not strictly sequential or requires
* backwards seeks).
*
* <p>Implementations should implement {@link #close() close} to release resources.
*/
public abstract class SeekableInputStream extends InputStream {

/**
* Seeks (jumps) to a position inside the stream.
*
* @param pos The position to jump to in the stream given in bytes (zero-indexed).
* @throws IOException
*/
public abstract void seek(long pos) throws IOException;

/**
* Returns the current position in the stream.
*
* @return the position in the stream
*/
public abstract long getPos();
}
25 changes: 25 additions & 0 deletions input-stream/src/main/java/com/amazon/connector/s3/util/S3URI.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.amazon.connector.s3.util;

import com.google.common.base.Preconditions;
import lombok.Data;

/** Container for representing an 's3://' or 's3a://'-style S3 location. */
@Data
public class S3URI {

private final String bucket;
private final String key;

private S3URI(String bucket, String key) {
this.bucket = bucket;
this.key = key;
}

/** Given a bucket and a key, creates an S3URI object. */
public static S3URI of(String bucket, String key) {
Preconditions.checkNotNull(bucket, "bucket must be non-null");
Preconditions.checkNotNull(key, "key must be non-null");

return new S3URI(bucket, key);
}
}

This file was deleted.

Loading

0 comments on commit 0a277c4

Please sign in to comment.