Skip to content

Commit

Permalink
Revert of Revert of Drive: Let DriveUploader use batch request API. (…
Browse files Browse the repository at this point in the history
…patchset #1 id:1 of https://codereview.chromium.org/1135353004/)

Reason for revert:
The failure is not caused by the patch. MigrationSingleClientTest.AllTypesIndividually looks flaky.

Original issue's description:
> Revert of Drive: Let DriveUploader use batch request API. (patchset #7 id:120001 of https://codereview.chromium.org/1134633003/)
>
> Reason for revert:
> MigrationSingleClientTest.AllTypesIndividually is broken by this change on Linux GN:
>
> http://build.chromium.org/p/chromium.linux/builders/Linux%20GN/builds/28327
>
> Original issue's description:
> > Drive: Let DriveUploader use batch request API.
> >
> > The CL lets DriveUploader use batch request API. DriveUploader does asynchronous
> > preparation before calling API on service interface. To hold batch request
> > during the asynchronous preparation, the CL introduce refcounted hepler class,
> > which manages life time of batch request configurator.
> >
> > BUG=451917
> > TEST=DriveUploaderTest
> >
> > Committed: https://crrev.com/9cf453b74c8790f6d7a91f958ec998a9fa5a1fa0
> > Cr-Commit-Position: refs/heads/master@{#329599}
>
> TBR=kinaba@chromium.org,tzik@chromium.org,hirono@chromium.org
> NOPRESUBMIT=true
> NOTREECHECKS=true
> NOTRY=true
> BUG=451917
>
> Committed: https://crrev.com/e244812ae9a09d001f24f9aaaf780c80f976d790
> Cr-Commit-Position: refs/heads/master@{#329610}

TBR=kinaba@chromium.org,tzik@chromium.org,sergeyv@chromium.org
NOPRESUBMIT=true
NOTREECHECKS=true
NOTRY=true
BUG=451917

Review URL: https://codereview.chromium.org/1138883004

Cr-Commit-Position: refs/heads/master@{#329613}
  • Loading branch information
hirono-chromium authored and Commit bot committed May 13, 2015
1 parent e52bf69 commit 09b621e
Show file tree
Hide file tree
Showing 11 changed files with 353 additions and 74 deletions.
26 changes: 11 additions & 15 deletions chrome/browser/chromeos/drive/job_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,18 @@ void JobQueue::PopForRun(int accepted_priority, std::vector<JobID>* jobs) {
return;

// Looks up the queue in the order of priority upto |accepted_priority|.
bool processing_batch_request = false;
int64 total_size = 0;
uint64 total_size = 0;
bool batchable = true;
for (int priority = 0; priority <= accepted_priority; ++priority) {
auto it = queue_[priority].begin();
while (it != queue_[priority].end()) {
if (!processing_batch_request ||
(it->batchable && total_size + it->size <= max_batch_size_)) {
total_size += it->size;
processing_batch_request = it->batchable;
jobs->push_back(it->id);
running_.insert(it->id);
it = queue_[priority].erase(it);
if (processing_batch_request)
continue;
}
return;
while (!queue_[priority].empty()) {
const auto& item = queue_[priority].front();
total_size += item.size;
batchable = batchable && item.batchable && total_size <= max_batch_size_;
if (!(jobs->empty() || batchable))
return;
jobs->push_back(item.id);
running_.insert(item.id);
queue_[priority].pop_front();
}
}
}
Expand Down
39 changes: 23 additions & 16 deletions chrome/browser/chromeos/drive/job_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#include "chrome/browser/chromeos/drive/job_scheduler.h"

#include <algorithm>

#include "base/files/file_util.h"
#include "base/message_loop/message_loop.h"
#include "base/metrics/histogram.h"
Expand Down Expand Up @@ -742,7 +744,9 @@ void JobScheduler::QueueJob(JobID job_id) {
const JobInfo& job_info = job_entry->job_info;

const QueueType queue_type = GetJobQueueType(job_info.job_type);
queue_[queue_type]->Push(job_id, job_entry->context.type, false,
const bool batchable = job_info.job_type == TYPE_UPLOAD_EXISTING_FILE ||
job_info.job_type == TYPE_UPLOAD_NEW_FILE;
queue_[queue_type]->Push(job_id, job_entry->context.type, batchable,
job_info.num_total_bytes);

// Temporary histogram for crbug.com/229650.
Expand Down Expand Up @@ -802,25 +806,28 @@ void JobScheduler::DoJobLoop(QueueType queue_type) {
if (job_ids.empty())
return;

// TODO(hirono): Currently all requests are not batchable. So the queue always
// return just 1 job.
DCHECK_EQ(1u, job_ids.size());
JobEntry* entry = job_map_.Lookup(job_ids.front());
DCHECK(entry);
if (job_ids.size() > 1)
uploader_->StartBatchProcessing();

JobInfo* job_info = &entry->job_info;
job_info->state = STATE_RUNNING;
job_info->start_time = now;
NotifyJobUpdated(*job_info);
for (JobID job_id : job_ids) {
JobEntry* entry = job_map_.Lookup(job_id);
DCHECK(entry);

entry->cancel_callback = entry->task.Run();
JobInfo* job_info = &entry->job_info;
job_info->state = STATE_RUNNING;
job_info->start_time = now;
NotifyJobUpdated(*job_info);

UpdateWait();
entry->cancel_callback = entry->task.Run();
logger_->Log(logging::LOG_INFO, "Job started: %s - %s",
job_info->ToString().c_str(),
GetQueueInfo(queue_type).c_str());
}

logger_->Log(logging::LOG_INFO,
"Job started: %s - %s",
job_info->ToString().c_str(),
GetQueueInfo(queue_type).c_str());
if (job_ids.size() > 1)
uploader_->StopBatchProcessing();

UpdateWait();
}

int JobScheduler::GetCurrentAcceptedPriority(QueueType queue_type) {
Expand Down
56 changes: 50 additions & 6 deletions chrome/browser/drive/drive_uploader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,30 @@ const int64 kUploadChunkSize = (1LL << 30); // 1GB
const int64 kMaxMultipartUploadSize = (1LL << 20); // 1MB
} // namespace

// Refcounted helper class to manage batch request. DriveUploader uses the class
// for keeping the BatchRequestConfigurator instance while it prepares upload
// file information asynchronously. DriveUploader discard the reference after
// getting file information and the instance will be destroyed after all
// preparations complete. At that time, the helper instance commits owned batch
// request at the destrutor.
class DriveUploader::RefCountedBatchRequest
: public base::RefCounted<RefCountedBatchRequest> {
public:
RefCountedBatchRequest(
scoped_ptr<BatchRequestConfiguratorInterface> configurator)
: configurator_(configurator.Pass()) {}

// Gets pointer of BatchRequestConfiguratorInterface owned by the instance.
BatchRequestConfiguratorInterface* configurator() const {
return configurator_.get();
}

private:
friend class base::RefCounted<RefCountedBatchRequest>;
~RefCountedBatchRequest() { configurator_->Commit(); }
scoped_ptr<BatchRequestConfiguratorInterface> configurator_;
};

// Structure containing current upload information of file, passed between
// DriveServiceInterface methods and callbacks.
struct DriveUploader::UploadFileInfo {
Expand Down Expand Up @@ -155,7 +179,17 @@ CancelCallback DriveUploader::UploadNewFile(
local_file_path, content_type, callback, progress_callback)),
base::Bind(&DriveUploader::CallUploadServiceAPINewFile,
weak_ptr_factory_.GetWeakPtr(), parent_resource_id, title,
options));
options, current_batch_request_));
}

void DriveUploader::StartBatchProcessing() {
DCHECK(current_batch_request_ == nullptr);
current_batch_request_ =
new RefCountedBatchRequest(drive_service_->StartBatchRequest().Pass());
}

void DriveUploader::StopBatchProcessing() {
current_batch_request_ = nullptr;
}

CancelCallback DriveUploader::UploadExistingFile(
Expand All @@ -175,7 +209,8 @@ CancelCallback DriveUploader::UploadExistingFile(
scoped_ptr<UploadFileInfo>(new UploadFileInfo(
local_file_path, content_type, callback, progress_callback)),
base::Bind(&DriveUploader::CallUploadServiceAPIExistingFile,
weak_ptr_factory_.GetWeakPtr(), resource_id, options));
weak_ptr_factory_.GetWeakPtr(), resource_id, options,
current_batch_request_));
}

CancelCallback DriveUploader::ResumeUploadFile(
Expand All @@ -190,8 +225,7 @@ CancelCallback DriveUploader::ResumeUploadFile(
DCHECK(!callback.is_null());

scoped_ptr<UploadFileInfo> upload_file_info(new UploadFileInfo(
local_file_path, content_type,
callback, progress_callback));
local_file_path, content_type, callback, progress_callback));
upload_file_info->upload_location = upload_location;

return StartUploadFile(
Expand Down Expand Up @@ -243,12 +277,17 @@ void DriveUploader::CallUploadServiceAPINewFile(
const std::string& parent_resource_id,
const std::string& title,
const UploadNewFileOptions& options,
const scoped_refptr<RefCountedBatchRequest>& batch_request,
scoped_ptr<UploadFileInfo> upload_file_info) {
DCHECK(thread_checker_.CalledOnValidThread());

UploadFileInfo* const info_ptr = upload_file_info.get();
if (info_ptr->content_length <= kMaxMultipartUploadSize) {
info_ptr->cancel_callback = drive_service_->MultipartUploadNewFile(
DriveServiceBatchOperationsInterface* service = drive_service_;
// If this is a batched request, calls the API on the request instead.
if (batch_request.get())
service = batch_request->configurator();
info_ptr->cancel_callback = service->MultipartUploadNewFile(
info_ptr->content_type, info_ptr->content_length, parent_resource_id,
title, info_ptr->file_path, options,
base::Bind(&DriveUploader::OnMultipartUploadComplete,
Expand All @@ -267,12 +306,17 @@ void DriveUploader::CallUploadServiceAPINewFile(
void DriveUploader::CallUploadServiceAPIExistingFile(
const std::string& resource_id,
const UploadExistingFileOptions& options,
const scoped_refptr<RefCountedBatchRequest>& batch_request,
scoped_ptr<UploadFileInfo> upload_file_info) {
DCHECK(thread_checker_.CalledOnValidThread());

UploadFileInfo* const info_ptr = upload_file_info.get();
if (info_ptr->content_length <= kMaxMultipartUploadSize) {
info_ptr->cancel_callback = drive_service_->MultipartUploadExistingFile(
DriveServiceBatchOperationsInterface* service = drive_service_;
// If this is a batched request, calls the API on the request instead.
if (batch_request.get())
service = batch_request->configurator();
info_ptr->cancel_callback = service->MultipartUploadExistingFile(
info_ptr->content_type, info_ptr->content_length, resource_id,
info_ptr->file_path, options,
base::Bind(&DriveUploader::OnMultipartUploadComplete,
Expand Down
29 changes: 25 additions & 4 deletions chrome/browser/drive/drive_uploader.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ class DriveUploaderInterface {
public:
virtual ~DriveUploaderInterface() {}

// Starts batch processing for upload requests. All requests which upload
// small files (less than kMaxMultipartUploadSize) between
// |StartBatchProcessing| and |StopBatchProcessing| are sent as a single batch
// request.
virtual void StartBatchProcessing() = 0;

// Stops batch processing. Must be called after calling |StartBatchProcessing|
// to commit requests.
virtual void StopBatchProcessing() = 0;

// Uploads a new file to a directory specified by |upload_location|.
// Returns a callback for cancelling the uploading job.
//
Expand Down Expand Up @@ -114,6 +124,8 @@ class DriveUploader : public DriveUploaderInterface {
~DriveUploader() override;

// DriveUploaderInterface overrides.
void StartBatchProcessing() override;
void StopBatchProcessing() override;
google_apis::CancelCallback UploadNewFile(
const std::string& parent_resource_id,
const base::FilePath& local_file_path,
Expand All @@ -137,6 +149,7 @@ class DriveUploader : public DriveUploaderInterface {
const google_apis::ProgressCallback& progress_callback) override;

private:
class RefCountedBatchRequest;
struct UploadFileInfo;
typedef base::Callback<void(scoped_ptr<UploadFileInfo> upload_file_info)>
StartInitiateUploadCallback;
Expand All @@ -153,18 +166,25 @@ class DriveUploader : public DriveUploaderInterface {
// Checks file size and call InitiateUploadNewFile or MultipartUploadNewFile
// API. Upon completion, OnUploadLocationReceived (for InitiateUploadNewFile)
// or OnMultipartUploadComplete (for MultipartUploadNewFile) should be called.
void CallUploadServiceAPINewFile(const std::string& parent_resource_id,
const std::string& title,
const UploadNewFileOptions& options,
scoped_ptr<UploadFileInfo> upload_file_info);
// If |batch_request| is non-null, it calls the API function on the batch
// request.
void CallUploadServiceAPINewFile(
const std::string& parent_resource_id,
const std::string& title,
const UploadNewFileOptions& options,
const scoped_refptr<RefCountedBatchRequest>& batch_request,
scoped_ptr<UploadFileInfo> upload_file_info);

// Checks file size and call InitiateUploadExistingFile or
// MultipartUploadExistingFile API. Upon completion, OnUploadLocationReceived
// (for InitiateUploadExistingFile) or OnMultipartUploadComplete (for
// MultipartUploadExistingFile) should be called.
// If |batch_request| is non-null, it calls the API function on the batch
// request.
void CallUploadServiceAPIExistingFile(
const std::string& resource_id,
const UploadExistingFileOptions& options,
const scoped_refptr<RefCountedBatchRequest>& batch_request,
scoped_ptr<UploadFileInfo> upload_file_info);

// DriveService callback for InitiateUpload.
Expand Down Expand Up @@ -207,6 +227,7 @@ class DriveUploader : public DriveUploaderInterface {
DriveServiceInterface* drive_service_; // Not owned by this class.

scoped_refptr<base::TaskRunner> blocking_task_runner_;
scoped_refptr<RefCountedBatchRequest> current_batch_request_;

// Note: This should remain the last member so it'll be destroyed and
// invalidate its weak pointers before any other members are destroyed.
Expand Down
Loading

0 comments on commit 09b621e

Please sign in to comment.