From c799dbdd610618c42976ba5ea359282b4fddf9a7 Mon Sep 17 00:00:00 2001 From: Felix Weilbach Date: Wed, 9 Jun 2021 13:40:15 +0200 Subject: [PATCH] Cfapi: Make sure no data is transfered after cancellation Signed-off-by: Felix Weilbach --- src/libsync/propagatedownload.cpp | 10 +- src/libsync/propagatedownload.h | 1 - src/libsync/vfs/cfapi/cfapiwrapper.cpp | 48 ++++++++-- src/libsync/vfs/cfapi/hydrationjob.cpp | 121 +++++++++++++++++-------- src/libsync/vfs/cfapi/hydrationjob.h | 14 ++- src/libsync/vfs/cfapi/vfs_cfapi.cpp | 54 ++++++----- src/libsync/vfs/cfapi/vfs_cfapi.h | 6 +- 7 files changed, 169 insertions(+), 85 deletions(-) diff --git a/src/libsync/propagatedownload.cpp b/src/libsync/propagatedownload.cpp index d37fbeaf35974..54f01b5f51fed 100644 --- a/src/libsync/propagatedownload.cpp +++ b/src/libsync/propagatedownload.cpp @@ -351,11 +351,13 @@ void GETFileJob::slotReadyRead() void GETFileJob::cancel() { - if (reply()->isRunning()) { - reply()->abort(); + const auto networkReply = reply(); + if (networkReply && networkReply->isRunning()) { + networkReply->abort(); + } + if (_device && _device->isOpen()) { + _device->close(); } - - emit canceled(); } void GETFileJob::onTimedOut() diff --git a/src/libsync/propagatedownload.h b/src/libsync/propagatedownload.h index 769d28eb151bd..3b568a329eb8d 100644 --- a/src/libsync/propagatedownload.h +++ b/src/libsync/propagatedownload.h @@ -110,7 +110,6 @@ class GETFileJob : public AbstractNetworkJob void setExpectedContentLength(qint64 size) { _expectedContentLength = size; } signals: - void canceled(); void finishedSignal(); void downloadProgress(qint64, qint64); private slots: diff --git a/src/libsync/vfs/cfapi/cfapiwrapper.cpp b/src/libsync/vfs/cfapi/cfapiwrapper.cpp index a1c881369a7cb..e6e0f8da82131 100644 --- a/src/libsync/vfs/cfapi/cfapiwrapper.cpp +++ b/src/libsync/vfs/cfapi/cfapiwrapper.cpp @@ -146,6 +146,22 @@ void CALLBACK cfApiFetchDataCallback(const CF_CALLBACK_INFO *callbackInfo, const return; } + QLocalSocket signalSocket; + const QString signalSocketName = requestId + ":cancellation"; + signalSocket.connectToServer(signalSocketName); + const auto cancellationSocketConnectResult = signalSocket.waitForConnected(); + if (!cancellationSocketConnectResult) { + qCWarning(lcCfApiWrapper) << "Couldn't connect the socket" << signalSocketName + << signalSocket.error() << signalSocket.errorString(); + sendTransferError(); + return; + } + + auto hydrationRequestCancelled = false; + QObject::connect(&signalSocket, &QLocalSocket::readyRead, &loop, [&] { + hydrationRequestCancelled = true; + }); + // CFAPI expects sent blocks to be of a multiple of a block size. // Only the last sent block is allowed to be of a different size than // a multiple of a block size @@ -169,6 +185,11 @@ void CALLBACK cfApiFetchDataCallback(const CF_CALLBACK_INFO *callbackInfo, const }; QObject::connect(&socket, &QLocalSocket::readyRead, &loop, [&] { + if (hydrationRequestCancelled) { + qCDebug(lcCfApiWrapper) << "Don't transfer data because request" << requestId << "was cancelled"; + return; + } + const auto receivedData = socket.readAll(); if (receivedData.isEmpty()) { qCWarning(lcCfApiWrapper) << "Unexpected empty data received" << requestId; @@ -180,24 +201,34 @@ void CALLBACK cfApiFetchDataCallback(const CF_CALLBACK_INFO *callbackInfo, const alignAndSendData(receivedData); }); - QObject::connect(vfs, &OCC::VfsCfApi::hydrationRequestFinished, &loop, [&](const QString &id, int s) { + QObject::connect(vfs, &OCC::VfsCfApi::hydrationRequestFinished, &loop, [&](const QString &id) { qDebug(lcCfApiWrapper) << "Hydration finished for request" << id; if (requestId == id) { - const auto status = static_cast(s); - qCInfo(lcCfApiWrapper) << "Hydration done for" << path << requestId << status; - if (status != OCC::HydrationJob::Success) { - sendTransferError(); - } + socket.close(); + signalSocket.close(); loop.quit(); } }); loop.exec(); - if (!protrudingData.isEmpty()) { + if (!hydrationRequestCancelled && !protrudingData.isEmpty()) { qDebug(lcCfApiWrapper) << "Send remaining protruding data. Size:" << protrudingData.size(); sendTransferInfo(protrudingData, dataOffset); } + + int hydrationJobResult = OCC::HydrationJob::Status::Error; + const auto invokeFinalizeResult = QMetaObject::invokeMethod( + vfs, [=] { vfs->finalizeHydrationJob(requestId); }, Qt::BlockingQueuedConnection, + &hydrationJobResult); + if (!invokeFinalizeResult) { + qCritical(lcCfApiWrapper) << "Failed to finalize hydration job for" << path << requestId; + } + + if (static_cast(hydrationJobResult) == OCC::HydrationJob::Success) { + sendTransferError(); + } +} } void CALLBACK cfApiCancelFetchData(const CF_CALLBACK_INFO *callbackInfo, const CF_CALLBACK_PARAMETERS * /*callbackParameters*/) @@ -217,7 +248,6 @@ void CALLBACK cfApiCancelFetchData(const CF_CALLBACK_INFO *callbackInfo, const C } } - CF_CALLBACK_REGISTRATION cfApiCallbacks[] = { { CF_CALLBACK_TYPE_FETCH_DATA, cfApiFetchDataCallback }, { CF_CALLBACK_TYPE_CANCEL_FETCH_DATA, cfApiCancelFetchData }, @@ -292,8 +322,6 @@ CF_SET_PIN_FLAGS pinRecurseModeToCfSetPinFlags(OCC::CfApiWrapper::SetPinRecurseM } } -} - OCC::CfApiWrapper::ConnectionKey::ConnectionKey() : _data(new CF_CONNECTION_KEY, [](void *p) { delete reinterpret_cast(p); }) { diff --git a/src/libsync/vfs/cfapi/hydrationjob.cpp b/src/libsync/vfs/cfapi/hydrationjob.cpp index d71751fda1d3e..56cb880fda208 100644 --- a/src/libsync/vfs/cfapi/hydrationjob.cpp +++ b/src/libsync/vfs/cfapi/hydrationjob.cpp @@ -16,6 +16,7 @@ #include "common/syncjournaldb.h" #include "propagatedownload.h" +#include "vfs/cfapi/vfs_cfapi.h" #include #include @@ -25,7 +26,6 @@ Q_LOGGING_CATEGORY(lcHydration, "nextcloud.sync.vfs.hydrationjob", QtInfoMsg) OCC::HydrationJob::HydrationJob(QObject *parent) : QObject(parent) { - connect(this, &HydrationJob::finished, this, &HydrationJob::deleteLater); } OCC::AccountPtr OCC::HydrationJob::account() const @@ -104,90 +104,131 @@ void OCC::HydrationJob::start() Q_ASSERT(_localPath.endsWith('/')); Q_ASSERT(!_folderPath.startsWith('/')); - _server = new QLocalServer(this); - const auto listenResult = _server->listen(_requestId); - if (!listenResult) { - qCCritical(lcHydration) << "Couldn't get server to listen" << _requestId << _localPath << _folderPath; - emitFinished(Error); + const auto startServer = [this](const QString &serverName) -> QLocalServer * { + const auto server = new QLocalServer(this); + const auto listenResult = server->listen(serverName); + if (!listenResult) { + qCCritical(lcHydration) << "Couldn't get server to listen" << serverName + << _localPath << _folderPath; + if (!_isCancelled) { + emitFinished(Error); + } + return nullptr; + } + qCInfo(lcHydration) << "Server ready, waiting for connections" << serverName + << _localPath << _folderPath; + return server; + }; + + // Start cancellation server + _signalServer = startServer(_requestId + ":cancellation"); + Q_ASSERT(_signalServer); + if (!_signalServer) { return; } + connect(_signalServer, &QLocalServer::newConnection, this, &HydrationJob::onCancellationServerNewConnection); - qCInfo(lcHydration) << "Server ready, waiting for connections" << _requestId << _localPath << _folderPath; - connect(_server, &QLocalServer::newConnection, this, &HydrationJob::onNewConnection); + // Start transfer data server + _transferDataServer = startServer(_requestId); + Q_ASSERT(_transferDataServer); + if (!_transferDataServer) { + return; + } + connect(_transferDataServer, &QLocalServer::newConnection, this, &HydrationJob::onNewConnection); } void OCC::HydrationJob::cancel() { - if (!_job) { - return; + Q_ASSERT(_signalSocket); + + _isCancelled = true; + if (_job) { + _job->cancel(); } - _job->cancel(); + _signalSocket->write("cancelled"); + + emitFinished(Cancelled); } void OCC::HydrationJob::emitFinished(Status status) { _status = status; + _signalSocket->close(); + if (status == Success) { - connect(_socket, &QLocalSocket::disconnected, this, [=]{ - _socket->close(); + connect(_transferDataSocket, &QLocalSocket::disconnected, this, [=] { + _transferDataSocket->close(); emit finished(this); }); - _socket->disconnectFromServer(); - } else { - _socket->close(); - emit finished(this); + _transferDataSocket->disconnectFromServer(); + return; } + _transferDataSocket->close(); + + emit finished(this); } -void OCC::HydrationJob::emitCanceled() +void OCC::HydrationJob::onCancellationServerNewConnection() { - connect(_socket, &QLocalSocket::disconnected, this, [=] { - _socket->close(); - }); - _socket->disconnectFromServer(); + Q_ASSERT(!_signalSocket); - emit canceled(this); + qCInfo(lcHydration) << "Got new connection on cancellation server" << _requestId << _folderPath; + _signalSocket = _signalServer->nextPendingConnection(); } void OCC::HydrationJob::onNewConnection() { - Q_ASSERT(!_socket); + Q_ASSERT(!_transferDataSocket); Q_ASSERT(!_job); qCInfo(lcHydration) << "Got new connection starting GETFileJob" << _requestId << _folderPath; - _socket = _server->nextPendingConnection(); - _job = new GETFileJob(_account, _remotePath + _folderPath, _socket, {}, {}, 0, this); + _transferDataSocket = _transferDataServer->nextPendingConnection(); + _job = new GETFileJob(_account, _remotePath + _folderPath, _transferDataSocket, {}, {}, 0, this); connect(_job, &GETFileJob::finishedSignal, this, &HydrationJob::onGetFinished); - connect(_job, &GETFileJob::canceled, this, &HydrationJob::onGetCanceled); _job->start(); } -void OCC::HydrationJob::onGetCanceled() +void OCC::HydrationJob::finalize(OCC::VfsCfApi *vfs) { - qCInfo(lcHydration) << "GETFileJob canceled" << _requestId << _folderPath << _job->reply()->error(); - emitCanceled(); + // Mark the file as hydrated in the sync journal + SyncJournalFileRecord record; + _journal->getFileRecord(_folderPath, &record); + Q_ASSERT(record.isValid()); + if (!record.isValid()) { + qCWarning(lcHydration) << "Couldn't find record to update after hydration" << _requestId << _folderPath; + // emitFinished(Error); + return; + } + + if (_isCancelled) { + // Remove placeholder file because there might be already pumped + // some data into it + QFile::remove(_localPath + _folderPath); + // Create a new placeholder file + const auto item = SyncFileItem::fromSyncJournalFileRecord(record); + vfs->createPlaceholder(*item); + return; + } + + record._type = ItemTypeFile; + _journal->setFileRecord(record); } void OCC::HydrationJob::onGetFinished() { qCInfo(lcHydration) << "GETFileJob finished" << _requestId << _folderPath << _job->reply()->error(); - if (_job->reply()->error()) { - emitFinished(Error); + const auto isGetJobResultError = _job->reply()->error(); + // GETFileJob deletes itself after this signal was handled + _job = nullptr; + if (_isCancelled) { return; } - SyncJournalFileRecord record; - _journal->getFileRecord(_folderPath, &record); - Q_ASSERT(record.isValid()); - if (!record.isValid()) { - qCWarning(lcHydration) << "Couldn't find record to update after hydration" << _requestId << _folderPath; + if (isGetJobResultError) { emitFinished(Error); return; } - - record._type = ItemTypeFile; - _journal->setFileRecord(record); emitFinished(Success); } diff --git a/src/libsync/vfs/cfapi/hydrationjob.h b/src/libsync/vfs/cfapi/hydrationjob.h index 47a12475152a1..0bfc516ac1ccd 100644 --- a/src/libsync/vfs/cfapi/hydrationjob.h +++ b/src/libsync/vfs/cfapi/hydrationjob.h @@ -23,6 +23,7 @@ class QLocalSocket; namespace OCC { class GETFileJob; class SyncJournalDb; +class VfsCfApi; class OWNCLOUDSYNC_EXPORT HydrationJob : public QObject { @@ -31,6 +32,7 @@ class OWNCLOUDSYNC_EXPORT HydrationJob : public QObject enum Status { Success = 0, Error, + Cancelled, }; Q_ENUM(Status) @@ -58,29 +60,31 @@ class OWNCLOUDSYNC_EXPORT HydrationJob : public QObject void start(); void cancel(); + void finalize(OCC::VfsCfApi *vfs); signals: void finished(HydrationJob *job); - void canceled(HydrationJob *job); private: void emitFinished(Status status); - void emitCanceled(); void onNewConnection(); + void onCancellationServerNewConnection(); void onGetFinished(); - void onGetCanceled(); AccountPtr _account; QString _remotePath; QString _localPath; SyncJournalDb *_journal = nullptr; + bool _isCancelled = false; QString _requestId; QString _folderPath; - QLocalServer *_server = nullptr; - QLocalSocket *_socket = nullptr; + QLocalServer *_transferDataServer = nullptr; + QLocalServer *_signalServer = nullptr; + QLocalSocket *_transferDataSocket = nullptr; + QLocalSocket *_signalSocket = nullptr; GETFileJob *_job = nullptr; Status _status = Success; }; diff --git a/src/libsync/vfs/cfapi/vfs_cfapi.cpp b/src/libsync/vfs/cfapi/vfs_cfapi.cpp index 5db9c5d03ce4a..d7c6fdd866b8f 100644 --- a/src/libsync/vfs/cfapi/vfs_cfapi.cpp +++ b/src/libsync/vfs/cfapi/vfs_cfapi.cpp @@ -269,16 +269,28 @@ Vfs::AvailabilityResult VfsCfApi::availability(const QString &folderPath) return AvailabilityError::NoSuchItem; } -void VfsCfApi::cancelHydration(const QString &requestId, const QString & /*path*/) +HydrationJob *VfsCfApi::findHydrationJob(const QString &requestId) const { // Find matching hydration job for request id const auto hydrationJobsIter = std::find_if(d->hydrationJobs.cbegin(), d->hydrationJobs.cend(), [&](const HydrationJob *job) { return job->requestId() == requestId; }); - // If found, cancel it if (hydrationJobsIter != d->hydrationJobs.cend()) { - (*hydrationJobsIter)->cancel(); + return *hydrationJobsIter; + } + + return nullptr; +} + +void VfsCfApi::cancelHydration(const QString &requestId, const QString & /*path*/) +{ + // Find matching hydration job for request id + const auto hydrationJob = findHydrationJob(requestId); + // If found, cancel it + if (hydrationJob) { + qCInfo(lcCfApi) << "Cancel hydration"; + hydrationJob->cancel(); } } @@ -360,7 +372,6 @@ void VfsCfApi::scheduleHydrationJob(const QString &requestId, const QString &fol job->setRequestId(requestId); job->setFolderPath(folderPath); connect(job, &HydrationJob::finished, this, &VfsCfApi::onHydrationJobFinished); - connect(job, &HydrationJob::canceled, this, &VfsCfApi::onHydrationJobCanceled); d->hydrationJobs << job; job->start(); emit hydrationRequestReady(requestId); @@ -370,30 +381,27 @@ void VfsCfApi::onHydrationJobFinished(HydrationJob *job) { Q_ASSERT(d->hydrationJobs.contains(job)); qCInfo(lcCfApi) << "Hydration job finished" << job->requestId() << job->folderPath() << job->status(); - emit hydrationRequestFinished(job->requestId(), job->status()); - d->hydrationJobs.removeAll(job); - if (d->hydrationJobs.isEmpty()) { - emit doneHydrating(); - } + emit hydrationRequestFinished(job->requestId()); } -void VfsCfApi::onHydrationJobCanceled(HydrationJob *job) +int VfsCfApi::finalizeHydrationJob(const QString &requestId) { - const auto folderRelativePath = job->folderPath(); - SyncJournalFileRecord record; - if (!params().journal->getFileRecord(folderRelativePath, &record)) { - qCWarning(lcCfApi) << "Could not read file record from journal for canceled hydration request."; - return; + qCDebug(lcCfApi) << "Finalize hydration job" << requestId; + // Find matching hydration job for request id + const auto hydrationJob = findHydrationJob(requestId); + + // If found, finalize it + if (hydrationJob) { + hydrationJob->finalize(this); + d->hydrationJobs.removeAll(hydrationJob); + hydrationJob->deleteLater(); + if (d->hydrationJobs.isEmpty()) { + emit doneHydrating(); + } + return hydrationJob->status(); } - // Remove placeholder file because there might be already pumped - // some data into it - const auto folderPath = job->localPath(); - QFile::remove(folderPath + folderRelativePath); - - // Create a new placeholder file - const auto item = SyncFileItem::fromSyncJournalFileRecord(record); - createPlaceholder(*item); + return HydrationJob::Status::Error; } VfsCfApi::HydratationAndPinStates VfsCfApi::computeRecursiveHydrationAndPinStates(const QString &folderPath, const Optional &basePinState) diff --git a/src/libsync/vfs/cfapi/vfs_cfapi.h b/src/libsync/vfs/cfapi/vfs_cfapi.h index 90511a48a8511..1ec6826d163d7 100644 --- a/src/libsync/vfs/cfapi/vfs_cfapi.h +++ b/src/libsync/vfs/cfapi/vfs_cfapi.h @@ -55,6 +55,8 @@ class VfsCfApi : public Vfs void cancelHydration(const QString &requestId, const QString &path); + int finalizeHydrationJob(const QString &requestId); + public slots: void requestHydration(const QString &requestId, const QString &path); void fileStatusChanged(const QString &systemFileName, SyncFileStatus fileStatus) override; @@ -62,7 +64,7 @@ public slots: signals: void hydrationRequestReady(const QString &requestId); void hydrationRequestFailed(const QString &requestId); - void hydrationRequestFinished(const QString &requestId, int status); + void hydrationRequestFinished(const QString &requestId); protected: void startImpl(const VfsSetupParams ¶ms) override; @@ -70,7 +72,7 @@ public slots: private: void scheduleHydrationJob(const QString &requestId, const QString &folderPath); void onHydrationJobFinished(HydrationJob *job); - void onHydrationJobCanceled(HydrationJob *job); + HydrationJob *findHydrationJob(const QString &requestId) const; struct HasHydratedDehydrated { bool hasHydrated = false;