Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multipart upload pause #1536

Merged
merged 3 commits into from
Mar 30, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/**
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazonaws.mobileconnectors.s3.transferutility;

import android.content.Context;
import android.support.test.InstrumentationRegistry;

import com.amazonaws.services.s3.S3IntegrationTestBase;

import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.File;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public final class PauseTransferIntegrationTest extends S3IntegrationTestBase {

private static final String bucketName = "amazon-transfer-util-integ-test-" + new Date().getTime();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we name it like BUCKET_NAME to be consistent with convention and rest of the codebase?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea oops. A mistake that was copy + pasted from my previous mistake. will fix them

private static TransferUtility util;
private static Context context = InstrumentationRegistry.getContext();

private File file;
private CountDownLatch started;
private CountDownLatch paused;
private CountDownLatch resumed;
private CountDownLatch completed;

/**
* Creates and initializes all the test resources needed for these tests.
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
setUp();
TransferNetworkLossHandler.getInstance(context);
util = TransferUtility.builder()
.context(context)
.s3Client(s3)
.build();

try {
s3.createBucket(bucketName);
waitForBucketCreation(bucketName);
} catch (final Exception e) {
System.out.println("Error in creating the bucket. "
+ "Please manually create the bucket " + bucketName);
}
}

@AfterClass
public static void tearDown() {
try {
deleteBucketAndAllContents(bucketName);
} catch (final Exception e) {
System.out.println("Error in deleting the bucket. "
+ "Please manually delete the bucket " + bucketName);
e.printStackTrace();
}
}

@Before
public void setUpLatches() {
started = new CountDownLatch(1);
paused = new CountDownLatch(1);
resumed = new CountDownLatch(1);
completed = new CountDownLatch(1);
}

@Test
public void testSinglePartUploadPause() throws Exception {
// Small (1KB) file upload
file = getRandomTempFile("small", 1000L);
testUploadPause();
}

@Test
public void testMultiPartUploadPause() throws Exception {
// Large (10MB) file upload
file = getRandomSparseFile("large", 10L * 1024 * 1024);
testUploadPause();
}

private void testUploadPause() throws Exception {
// start transfer and wait for progress
TransferObserver observer = util.upload(bucketName, file.getName(), file);
observer.setTransferListener(new TestListener());
started.await(100, TimeUnit.MILLISECONDS);

// pause and wait
util.pause(observer.getId());
paused.await(100, TimeUnit.MILLISECONDS);
Thread.sleep(1000); // throws if progress is made

// resume if pause was properly executed
util.resume(observer.getId());
resumed.await(100, TimeUnit.MILLISECONDS);

// cancel early to avoid having to wait for completion
util.cancel(observer.getId());
completed.await(100, TimeUnit.MILLISECONDS);
}

private final class TestListener implements TransferListener {
@Override
public void onStateChanged(int id, TransferState state) {
switch (state) {
case CANCELED:
case COMPLETED:
completed.countDown();
break;
case PAUSED:
paused.countDown();
break;
case IN_PROGRESS:
if (paused.getCount() == 0) {
// Post-pause
resumed.countDown();
} else {
// Pre-pause
started.countDown();
}
break;
case FAILED:
throw new RuntimeException("Failed transfer.");
}
}

@Override
public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {
if (paused.getCount() == 0 && resumed.getCount() > 0) {
throw new RuntimeException("Progress made even while paused.");
}
}

@Override
public void onError(int id, Exception ex) {
throw new RuntimeException(ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,16 @@ public Boolean call() {
return true;
} catch (final Exception e) {
// No need to update the progress listener.
if (TransferState.CANCELED.equals(download.state)) {
LOGGER.info("Transfer is " + download.state);
if (TransferState.PENDING_CANCEL.equals(download.state)) {
updater.updateState(download.id, TransferState.CANCELED);
LOGGER.info("Transfer is " + TransferState.CANCELED);
return false;
}

// Reset the progress when the transfer is paused.
if (TransferState.PAUSED.equals(download.state)) {
LOGGER.info("Transfer is " + download.state);
ProgressEvent resetEvent = new ProgressEvent(0);
resetEvent.setEventCode(ProgressEvent.RESET_EVENT_CODE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we not resetting on pause on anymore? If so, do we need to update documentation?

Copy link
Contributor Author

@raphkim raphkim Mar 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you look closely, the lines i removed were noops. Im not sure if the original intent was to pass in a new ProgressEvent(0) with reset flag, but this resetEvent object never gets used and a new progress event without flag is reinstantiated instead. Perhaps a different bug, but I didn't want to introduce yet another behavior change in this PR.

if (TransferState.PENDING_PAUSE.equals(download.state)) {
updater.updateState(download.id, TransferState.PAUSED);
LOGGER.info("Transfer is " + TransferState.PAUSED);
progressListener.progressChanged(new ProgressEvent(0));
return false;
}
Expand All @@ -144,8 +144,6 @@ public Boolean call() {
*/
updater.updateState(download.id, TransferState.WAITING_FOR_NETWORK);
LOGGER.debug("Network Connection Interrupted: " + "Moving the TransferState to WAITING_FOR_NETWORK");
ProgressEvent resetEvent = new ProgressEvent(0);
resetEvent.setEventCode(ProgressEvent.RESET_EVENT_CODE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we be resetting the progress upon loss of network and then pass reserEvent to the progressListener?

progressListener.progressChanged(new ProgressEvent(0));
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,12 @@ public boolean start(final AmazonS3 s3,
* @return true if the transfer is running and is paused successfully, false
* otherwise
*/
public boolean pause(final AmazonS3 s3,
public boolean pause(final AmazonS3 s3,
final TransferStatusUpdater updater) {
if (!isFinalState(state) &&
!TransferState.PAUSED.equals(state)) {
updater.updateState(id, TransferState.PAUSED);
if (!isFinalState(state)
&& !TransferState.PAUSED.equals(state)
&& !TransferState.PENDING_PAUSE.equals(state)) {
updater.updateState(id, TransferState.PENDING_PAUSE);
if (isRunning()) {
submittedTask.cancel(true);
}
Expand Down Expand Up @@ -249,10 +250,11 @@ boolean pauseIfRequiredForNetworkInterruption(final AmazonS3 s3,
public boolean cancel(final AmazonS3 s3,
final TransferStatusUpdater updater) {
if (!isFinalState(state)) {
// Update the state to CANCELED in the TransferStatusUpdater and
// TransferDBUtil and involes the onStateChanged callback.
updater.updateState(id, TransferState.CANCELED);
// Update the state to PENDING_CANCEL in the TransferStatusUpdater
// and TransferDBUtil and involves the onStateChanged callback.
updater.updateState(id, TransferState.PENDING_CANCEL);
if (isRunning()) {
// State will update to CANCELED upon encountering S3 exception
submittedTask.cancel(true);
}
// additional cleanup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ public TransferProgressListener(TransferRecord transfer) {
public synchronized void progressChanged(ProgressEvent progressEvent) {
if (ProgressEvent.RESET_EVENT_CODE == progressEvent.getEventCode()) {
// Reset will discard what's been transferred
LOGGER.info("Reset Event triggerred. Resetting the bytesCurrent to 0.");
LOGGER.info("Reset Event triggered. Resetting the bytesCurrent to 0.");
// Reset the local counter to 0.
bytesTransferredSoFar = 0;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

package com.amazonaws.mobileconnectors.s3.transferutility;


import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.AmazonS3;
Expand Down Expand Up @@ -112,7 +111,7 @@ public UploadPartTaskProgressListener(UploadTask.UploadTaskProgressListener prog
public void progressChanged(ProgressEvent progressEvent) {
if (ProgressEvent.RESET_EVENT_CODE == progressEvent.getEventCode()) {
// Reset will discard what's been transferred
LOGGER.info("Reset Event triggerred. Resetting the bytesCurrent to 0.");
LOGGER.info("Reset Event triggered. Resetting the bytesCurrent to 0.");
// Reset the local counter to 0.
bytesTransferredSoFar = 0;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.amazonaws.services.s3.model.ObjectTagging;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
import com.amazonaws.services.s3.model.Tag;
import com.amazonaws.services.s3.model.UploadPartRequest;
Expand Down Expand Up @@ -193,15 +192,6 @@ private Boolean uploadMultipartAndWaitForCompletion() throws ExecutionException
} catch (final Exception e) {
LOGGER.error("Upload resulted in an exception. " + e);

// If the thread that is executing the transfer is interrupted
// because of a user initiated pause or cancel operation,
// do not throw exception or set the state to FAILED.
if (TransferState.CANCELED.equals(upload.state) ||
TransferState.PAUSED.equals(upload.state)) {
LOGGER.info("Transfer is " + upload.state);
return false;
}

/*
* Future.get() will catch InterruptedException, but it's not a
* failure, it may be caused by a pause operation from applications.
Expand All @@ -211,6 +201,21 @@ private Boolean uploadMultipartAndWaitForCompletion() throws ExecutionException
task.uploadPartTask.cancel(true);
}

// If the thread that is executing the transfer is interrupted
// because of a user initiated pause or cancel operation,
// do not throw exception or set the state to FAILED.
if (TransferState.PENDING_CANCEL.equals(upload.state)) {
updater.updateState(upload.id, TransferState.CANCELED);
LOGGER.info("Transfer is " + TransferState.CANCELED);
return false;
}

if (TransferState.PENDING_PAUSE.equals(upload.state)) {
updater.updateState(upload.id, TransferState.PAUSED);
LOGGER.info("Transfer is " + TransferState.PAUSED);
return false;
}

// interrupted due to network. Set the TransferState to
// WAITING_FOR_NETWORK if the individual parts were waiting for network
for (final UploadPartTaskMetadata task : uploadPartTasks.values()) {
Expand Down Expand Up @@ -274,22 +279,22 @@ private Boolean uploadSinglePartAndWaitForCompletion() {
putObjectRequest.setGeneralProgressListener(progressListener);

try {
PutObjectResult putObjectResult = s3.putObject(putObjectRequest);
s3.putObject(putObjectRequest);
updater.updateProgress(upload.id, length, length, true);
updater.updateState(upload.id, TransferState.COMPLETED);
return true;
} catch (final Exception e) {
// we dont need to update progress listener
if (TransferState.CANCELED.equals(upload.state)) {
LOGGER.info("Transfer is " + upload.state);
// we dont need to update progress listener
if (TransferState.PENDING_CANCEL.equals(upload.state)) {
updater.updateState(upload.id, TransferState.CANCELED);
LOGGER.info("Transfer is " + TransferState.CANCELED);
return false;
}

// pause
if (TransferState.PAUSED.equals(upload.state)) {
LOGGER.info("Transfer is " + upload.state);
ProgressEvent resetEvent = new ProgressEvent(0);
resetEvent.setEventCode(ProgressEvent.RESET_EVENT_CODE);
if (TransferState.PENDING_PAUSE.equals(upload.state)) {
updater.updateState(upload.id, TransferState.PAUSED);
LOGGER.info("Transfer is " + TransferState.PAUSED);
progressListener.progressChanged(new ProgressEvent(0));
return false;
}
Expand All @@ -310,8 +315,6 @@ private Boolean uploadSinglePartAndWaitForCompletion() {
*/
updater.updateState(upload.id, TransferState.WAITING_FOR_NETWORK);
LOGGER.debug("Network Connection Interrupted: " + "Moving the TransferState to WAITING_FOR_NETWORK");
ProgressEvent resetEvent = new ProgressEvent(0);
resetEvent.setEventCode(ProgressEvent.RESET_EVENT_CODE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. If eventCode is not being used anywhere should we consider removing it altogether from the ProgressEvent class?

progressListener.progressChanged(new ProgressEvent(0));
return false;
}
Expand Down