Skip to content

Commit

Permalink
Fixing a bug when reading files during put job with streamer strategy.
Browse files Browse the repository at this point in the history
Fixing some fractured english in error messages.
A few cleanup details like transforming anonymous class implementations to lambdas.
Updating kotlin version.
  • Loading branch information
GraciesPadre committed Oct 16, 2017
1 parent 98afeb2 commit 0ec884d
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 33 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/

buildscript {
ext.kotlin_version = '1.1.4-3'
ext.kotlin_version = '1.1.51'

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
import org.slf4j.LoggerFactory;

import java.io.BufferedInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
Expand All @@ -66,9 +68,12 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static com.spectralogic.ds3client.commands.spectrads3.PutBulkJobSpectraS3Request.MIN_UPLOAD_SIZE_IN_BYTES;
import static com.spectralogic.ds3client.integration.Util.RESOURCE_BASE_NAME;
import static com.spectralogic.ds3client.integration.Util.deleteAllContents;
import static org.hamcrest.Matchers.*;
Expand Down Expand Up @@ -2059,4 +2064,71 @@ public void testThatNonExistentFileDoesNotStopPutJob() throws IOException {
assertTrue(caughtNoSuchFileException.get());
assertTrue(getJobRan.get());
}

@Test
public void testStreamedPutJobWithBlobbedFile() throws Exception {
final int chunkSize = MIN_UPLOAD_SIZE_IN_BYTES;
final long biggerThanAChunkSize = chunkSize * 2L + 1024;

final int numIntsInBiggerThanAChunkSize = (int)biggerThanAChunkSize / 4;

final String originalFileName = "Gracie.bin";
final String movedFileName = "Gracie.bak";

try {
final DataOutputStream originalFileStream = new DataOutputStream(new FileOutputStream(originalFileName));

byte[] bytes = new byte[4];

for (int i = 0; i < numIntsInBiggerThanAChunkSize; ++i) {
bytes[0] = (byte)i;
bytes[1] = (byte)(i >> 8);
bytes[2] = (byte)(i >> 16);
bytes[3] = (byte)(i >> 24);
originalFileStream.write(bytes);
}

originalFileStream.close();

final Ds3Object ds3Object = new Ds3Object();
ds3Object.setName(originalFileName);
ds3Object.setSize(biggerThanAChunkSize);

final AtomicLong numBytesTransferred = new AtomicLong(0);

final WriteJobOptions writeJobOptions = WriteJobOptions.create();
writeJobOptions.withMaxUploadSize(chunkSize);

final Ds3ClientHelpers.Job writeJob = HELPERS.startWriteJobUsingStreamedBehavior(BUCKET_NAME, Collections.singletonList(ds3Object), writeJobOptions);
writeJob.attachDataTransferredListener(numBytesTransferred::addAndGet);

final CountDownLatch writeCountDownLatch = new CountDownLatch(1);

writeJob.attachObjectCompletedListener(name -> writeCountDownLatch.countDown());

writeJob.transfer(new FileObjectPutter(Paths.get(".")));

writeCountDownLatch.await();

assertEquals(biggerThanAChunkSize, numBytesTransferred.get());

Files.move(Paths.get(originalFileName), Paths.get(movedFileName));

final CountDownLatch readCountdownLatch = new CountDownLatch(1);

final Ds3ClientHelpers.Job readJob = HELPERS.startReadJob(BUCKET_NAME, Collections.singletonList(ds3Object));
readJob.withMaxParallelRequests(1);
readJob.attachObjectCompletedListener(name -> readCountdownLatch.countDown());
readJob.transfer(new FileObjectGetter(Paths.get(".")));

readCountdownLatch.await();

assertTrue(FileUtils.contentEquals(new File(movedFileName), new File(originalFileName)));
} finally {
deleteAllContents(client, BUCKET_NAME);

Files.deleteIfExists(Paths.get(originalFileName));
Files.deleteIfExists(Paths.get(movedFileName));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class ContentLengthNotMatchException extends IOException {
private final long contentLength;
private final long totalBytes;
public ContentLengthNotMatchException(final String fileName, final long contentLength, final long totalBytes) {
super(String.format("The Content length for %s (%d) not match the number of byte read (%d)", fileName, contentLength, totalBytes));
super(String.format("The Content length for %s (%d) does not match the number of bytes read (%d)", fileName, contentLength, totalBytes));

this.fileName = fileName;
this.contentLength = contentLength;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,16 @@ public PutSequentialBlobStrategy(final Ds3Client client,
public Iterable<JobPart> getWork() throws IOException, InterruptedException {
final Objects nextChunk = allocateChunk(chunksThatContainBlobs.next());

LOG.debug("Allocating chunk: {}", nextChunk.getChunkId().toString());
LOG.debug("==> Allocating chunk: {}", nextChunk.getChunkId().toString());
return FluentIterable.from(nextChunk.getObjects())
.filter(new Predicate<BulkObject>() {
@Override
public boolean apply(@Nullable final BulkObject input) {
return !input.getInCache();
}
})
.filter(input -> !input.getInCache())
.transform(new Function<BulkObject, JobPart>() {
@Nullable
@Override
public JobPart apply(@Nullable final BulkObject blob) {
return new JobPart(client(), blob);
final JobPart jobPart = new JobPart(client(), blob);
LOG.debug("==> JobPart: {}", jobPart);
return jobPart;

// TODO: When we get to the point where BP enables clustering, we'll want to be able to get the
// client connection info correct for the server on which a chunk resides. StrategyUtils.getClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,56 +21,118 @@
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An instance of {@link SeekableByteChannel} used to decorate another SeekableByteChannel in the
* situation where we re-use the same channel for more than 1 blob. This subclass prevents closing
* a channel when there are other blobs still referencing the shared channel.
*/
class SeekableByteChannelDecorator implements SeekableByteChannel {
private final SeekableByteChannel seekableByteChannel;
private static final Logger LOG = LoggerFactory.getLogger(SeekableByteChannelDecorator.class);

SeekableByteChannelDecorator(final SeekableByteChannel seekableByteChannel) {
Preconditions.checkNotNull(seekableByteChannel, "seekableByteChannel may not be null");
private final Object lock = new Object();

private final SeekableByteChannel seekableByteChannel;
private final long initialOffset;
private final long length;
private long position;

SeekableByteChannelDecorator(final SeekableByteChannel seekableByteChannel, final long initialOffset, final long length) {
Preconditions.checkNotNull(seekableByteChannel, "seekableByteChannel may not be null.");
Preconditions.checkArgument(initialOffset >= 0, "initialOffset must be >= 0.");
Preconditions.checkArgument(length >= 0, "length must be >= 0.");
this.seekableByteChannel = seekableByteChannel;
this.initialOffset = initialOffset;
this.position = initialOffset;
this.length = length;

LOG.debug("==> initialOffset: {}, position: {}, length: {}", initialOffset, position, length);
}

protected SeekableByteChannel wrappedSeekableByteChannel() {
SeekableByteChannel wrappedSeekableByteChannel() {
return seekableByteChannel;
}

@Override
public int read(final ByteBuffer dst) throws IOException {
return seekableByteChannel.read(dst);
synchronized (lock) {
final long remainingInChannel = length - position;
LOG.debug("==> remainingInChannel: {},", remainingInChannel);
final long numBytesWeCanRead = Math.min(dst.remaining(), remainingInChannel);
LOG.debug("==> numBytesWeCanRead: {},", numBytesWeCanRead);

if (numBytesWeCanRead <= 0) {
return 0;
}

final int numBytesRead;

if (numBytesWeCanRead != dst.remaining()) {
final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[(int) numBytesWeCanRead]);
numBytesRead = seekableByteChannel.read(byteBuffer);
byteBuffer.flip();
dst.put(byteBuffer);
} else {
numBytesRead = seekableByteChannel.read(dst);
}

position += numBytesRead;

LOG.debug("==> numBytesRead: {}, position: {}", numBytesRead, position);

return numBytesRead;
}
}

@Override
public int write(final ByteBuffer src) throws IOException {
return seekableByteChannel.write(src);
synchronized (lock) {
return seekableByteChannel.write(src);
}
}

@Override
public long position() throws IOException {
return seekableByteChannel.position();
synchronized (lock) {
LOG.debug("get position: {}, seekableByteChannel.position: {}", position, seekableByteChannel.position());
return seekableByteChannel.position();
}
}

@Override
public SeekableByteChannel position(final long newPosition) throws IOException {
return seekableByteChannel.position(newPosition);
synchronized (lock) {
final long lastPossiblePosition = length - 1;
position = Math.min(newPosition, lastPossiblePosition);
seekableByteChannel.position(initialOffset + position);

LOG.debug("==> set position: {}, seekableByteChannel.position: {}", position, seekableByteChannel.position());

return this;
}
}

@Override
public long size() throws IOException {
return seekableByteChannel.size();
synchronized (lock) {
return seekableByteChannel.size();
}
}

@Override
public SeekableByteChannel truncate(final long size) throws IOException {
return seekableByteChannel.truncate(size);
synchronized (lock) {
return seekableByteChannel.truncate(size);
}
}

@Override
public boolean isOpen() {
return seekableByteChannel.isOpen();
synchronized (lock) {
return seekableByteChannel.isOpen();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private SeekableByteChannel makeNewChannel(final BulkObject blob) throws IOExcep
channelPreparer.prepareChannel(blob.getName(), objectChannelBuilder);

final SeekableByteChannel seekableByteChannel = channelStrategyDelegate.acquireChannelForBlob(blob);
final SeekableByteChannelDecorator seekableByteChannelDecorator = new SeekableByteChannelDecorator(seekableByteChannel);
final SeekableByteChannelDecorator seekableByteChannelDecorator = new SeekableByteChannelDecorator(seekableByteChannel, blob.getOffset(), blob.getLength());

blobNameChannelMap.put(blob.getName(), seekableByteChannelDecorator);
return seekableByteChannelDecorator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,15 @@
import java.io.IOException;
import java.nio.channels.SeekableByteChannel;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The HTTP GET implementation class that retrieves a blob from a Black Pearl.
*/
public class GetJobTransferMethod implements TransferMethod {
private static final Logger LOG = LoggerFactory.getLogger(GetJobTransferMethod.class);

private final ChannelStrategy channelStrategy;
private final String bucketName;
private final String jobId;
Expand Down Expand Up @@ -67,6 +72,8 @@ public void transferJobPart(final JobPart jobPart) throws IOException {

final BulkObject blob = jobPart.getBlob();

LOG.debug("==> transferJobPart: {}", blob);

channelStrategy.releaseChannelForBlob(seekableByteChannel, blob);

eventDispatcher.emitChecksumEvent(blob, getObjectResponse.getChecksumType(), getObjectResponse.getChecksum());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public PutJobTransferMethod(final ChannelStrategy channelStrategy,
public void transferJobPart(final JobPart jobPart) throws IOException {
final BulkObject blob = jobPart.getBlob();

LOG.debug("==> Transferring: {}", blob);

final SeekableByteChannel seekableByteChannel = channelStrategy.acquireChannelForBlob(blob);

jobPart.getClient().putObject(makePutObjectRequest(seekableByteChannel, jobPart));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.SignatureException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Map;

import static com.spectralogic.ds3client.utils.Signature.canonicalizeAmzHeaders;
Expand Down Expand Up @@ -93,7 +91,7 @@ public NetworkClientImpl(final ConnectionDetails connectionDetails) {
this(connectionDetails, createDefaultClient(connectionDetails));
}

public NetworkClientImpl(final ConnectionDetails connectionDetails, final CloseableHttpClient client) {
private NetworkClientImpl(final ConnectionDetails connectionDetails, final CloseableHttpClient client) {
if (connectionDetails == null) throw new AssertionError("ConnectionDetails cannot be null");
if (client == null) throw new AssertionError("CloseableHttpClient cannot be null");
try {
Expand Down Expand Up @@ -125,12 +123,7 @@ private static CloseableHttpClient createDefaultClient(final ConnectionDetails c
private static CloseableHttpClient createInsecureSslHttpClient() throws NoSuchAlgorithmException, KeyManagementException, KeyStoreException {
final SSLContext sslContext = new SSLContextBuilder()
.useProtocol(INSECURE_SSL_PROTOCOL)
.loadTrustMaterial(null, new TrustStrategy() {
@Override
public boolean isTrusted(final X509Certificate[] chain, final String authType) throws CertificateException {
return true;
}
}).build();
.loadTrustMaterial(null, (TrustStrategy) (chain, authType) -> true).build();
final SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext, new NoopHostnameVerifier());

final Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create()
Expand Down Expand Up @@ -215,7 +208,7 @@ private class RequestExecutor implements Closeable {
private final ChecksumType.Type checksumType;
private final CloseableHttpClient client;

public RequestExecutor(final CloseableHttpClient client, final HttpHost host, final Ds3Request ds3Request) throws IOException {
RequestExecutor(final CloseableHttpClient client, final HttpHost host, final Ds3Request ds3Request) throws IOException {
this.client = client;
this.ds3Request = ds3Request;
this.host = host;
Expand All @@ -230,7 +223,7 @@ public RequestExecutor(final CloseableHttpClient client, final HttpHost host, fi
this.hash = this.buildHash();
}

public CloseableHttpResponse execute() throws IOException {
CloseableHttpResponse execute() throws IOException {
if (this.content != null) {
this.content.reset();
}
Expand Down

0 comments on commit 0ec884d

Please sign in to comment.