Skip to content

Commit

Permalink
Cfapi: Make sure no data is transfered after cancellation
Browse files Browse the repository at this point in the history
Signed-off-by: Felix Weilbach <felix.weilbach@nextcloud.com>
  • Loading branch information
Felix Weilbach authored and Felix Weilbach (Rebase PR Action) committed Jun 17, 2021
1 parent e6e4bfb commit c799dbd
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 85 deletions.
10 changes: 6 additions & 4 deletions src/libsync/propagatedownload.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,11 +351,13 @@ void GETFileJob::slotReadyRead()

void GETFileJob::cancel()
{
if (reply()->isRunning()) {
reply()->abort();
const auto networkReply = reply();
if (networkReply && networkReply->isRunning()) {
networkReply->abort();
}
if (_device && _device->isOpen()) {
_device->close();
}

emit canceled();
}

void GETFileJob::onTimedOut()
Expand Down
1 change: 0 additions & 1 deletion src/libsync/propagatedownload.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ class GETFileJob : public AbstractNetworkJob
void setExpectedContentLength(qint64 size) { _expectedContentLength = size; }

signals:
void canceled();
void finishedSignal();
void downloadProgress(qint64, qint64);
private slots:
Expand Down
48 changes: 38 additions & 10 deletions src/libsync/vfs/cfapi/cfapiwrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,22 @@ void CALLBACK cfApiFetchDataCallback(const CF_CALLBACK_INFO *callbackInfo, const
return;
}

QLocalSocket signalSocket;
const QString signalSocketName = requestId + ":cancellation";
signalSocket.connectToServer(signalSocketName);
const auto cancellationSocketConnectResult = signalSocket.waitForConnected();
if (!cancellationSocketConnectResult) {
qCWarning(lcCfApiWrapper) << "Couldn't connect the socket" << signalSocketName
<< signalSocket.error() << signalSocket.errorString();
sendTransferError();
return;
}

auto hydrationRequestCancelled = false;
QObject::connect(&signalSocket, &QLocalSocket::readyRead, &loop, [&] {
hydrationRequestCancelled = true;
});

// CFAPI expects sent blocks to be of a multiple of a block size.
// Only the last sent block is allowed to be of a different size than
// a multiple of a block size
Expand All @@ -169,6 +185,11 @@ void CALLBACK cfApiFetchDataCallback(const CF_CALLBACK_INFO *callbackInfo, const
};

QObject::connect(&socket, &QLocalSocket::readyRead, &loop, [&] {
if (hydrationRequestCancelled) {
qCDebug(lcCfApiWrapper) << "Don't transfer data because request" << requestId << "was cancelled";
return;
}

const auto receivedData = socket.readAll();
if (receivedData.isEmpty()) {
qCWarning(lcCfApiWrapper) << "Unexpected empty data received" << requestId;
Expand All @@ -180,24 +201,34 @@ void CALLBACK cfApiFetchDataCallback(const CF_CALLBACK_INFO *callbackInfo, const
alignAndSendData(receivedData);
});

QObject::connect(vfs, &OCC::VfsCfApi::hydrationRequestFinished, &loop, [&](const QString &id, int s) {
QObject::connect(vfs, &OCC::VfsCfApi::hydrationRequestFinished, &loop, [&](const QString &id) {
qDebug(lcCfApiWrapper) << "Hydration finished for request" << id;
if (requestId == id) {
const auto status = static_cast<OCC::HydrationJob::Status>(s);
qCInfo(lcCfApiWrapper) << "Hydration done for" << path << requestId << status;
if (status != OCC::HydrationJob::Success) {
sendTransferError();
}
socket.close();
signalSocket.close();
loop.quit();
}
});

loop.exec();

if (!protrudingData.isEmpty()) {
if (!hydrationRequestCancelled && !protrudingData.isEmpty()) {
qDebug(lcCfApiWrapper) << "Send remaining protruding data. Size:" << protrudingData.size();
sendTransferInfo(protrudingData, dataOffset);
}

int hydrationJobResult = OCC::HydrationJob::Status::Error;
const auto invokeFinalizeResult = QMetaObject::invokeMethod(
vfs, [=] { vfs->finalizeHydrationJob(requestId); }, Qt::BlockingQueuedConnection,
&hydrationJobResult);
if (!invokeFinalizeResult) {
qCritical(lcCfApiWrapper) << "Failed to finalize hydration job for" << path << requestId;
}

if (static_cast<OCC::HydrationJob::Status>(hydrationJobResult) == OCC::HydrationJob::Success) {
sendTransferError();
}
}
}

void CALLBACK cfApiCancelFetchData(const CF_CALLBACK_INFO *callbackInfo, const CF_CALLBACK_PARAMETERS * /*callbackParameters*/)
Expand All @@ -217,7 +248,6 @@ void CALLBACK cfApiCancelFetchData(const CF_CALLBACK_INFO *callbackInfo, const C
}
}


CF_CALLBACK_REGISTRATION cfApiCallbacks[] = {
{ CF_CALLBACK_TYPE_FETCH_DATA, cfApiFetchDataCallback },
{ CF_CALLBACK_TYPE_CANCEL_FETCH_DATA, cfApiCancelFetchData },
Expand Down Expand Up @@ -292,8 +322,6 @@ CF_SET_PIN_FLAGS pinRecurseModeToCfSetPinFlags(OCC::CfApiWrapper::SetPinRecurseM
}
}

}

OCC::CfApiWrapper::ConnectionKey::ConnectionKey()
: _data(new CF_CONNECTION_KEY, [](void *p) { delete reinterpret_cast<CF_CONNECTION_KEY *>(p); })
{
Expand Down
121 changes: 81 additions & 40 deletions src/libsync/vfs/cfapi/hydrationjob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "common/syncjournaldb.h"
#include "propagatedownload.h"
#include "vfs/cfapi/vfs_cfapi.h"

#include <QLocalServer>
#include <QLocalSocket>
Expand All @@ -25,7 +26,6 @@ Q_LOGGING_CATEGORY(lcHydration, "nextcloud.sync.vfs.hydrationjob", QtInfoMsg)
OCC::HydrationJob::HydrationJob(QObject *parent)
: QObject(parent)
{
connect(this, &HydrationJob::finished, this, &HydrationJob::deleteLater);
}

OCC::AccountPtr OCC::HydrationJob::account() const
Expand Down Expand Up @@ -104,90 +104,131 @@ void OCC::HydrationJob::start()
Q_ASSERT(_localPath.endsWith('/'));
Q_ASSERT(!_folderPath.startsWith('/'));

_server = new QLocalServer(this);
const auto listenResult = _server->listen(_requestId);
if (!listenResult) {
qCCritical(lcHydration) << "Couldn't get server to listen" << _requestId << _localPath << _folderPath;
emitFinished(Error);
const auto startServer = [this](const QString &serverName) -> QLocalServer * {
const auto server = new QLocalServer(this);
const auto listenResult = server->listen(serverName);
if (!listenResult) {
qCCritical(lcHydration) << "Couldn't get server to listen" << serverName
<< _localPath << _folderPath;
if (!_isCancelled) {
emitFinished(Error);
}
return nullptr;
}
qCInfo(lcHydration) << "Server ready, waiting for connections" << serverName
<< _localPath << _folderPath;
return server;
};

// Start cancellation server
_signalServer = startServer(_requestId + ":cancellation");
Q_ASSERT(_signalServer);
if (!_signalServer) {
return;
}
connect(_signalServer, &QLocalServer::newConnection, this, &HydrationJob::onCancellationServerNewConnection);

qCInfo(lcHydration) << "Server ready, waiting for connections" << _requestId << _localPath << _folderPath;
connect(_server, &QLocalServer::newConnection, this, &HydrationJob::onNewConnection);
// Start transfer data server
_transferDataServer = startServer(_requestId);
Q_ASSERT(_transferDataServer);
if (!_transferDataServer) {
return;
}
connect(_transferDataServer, &QLocalServer::newConnection, this, &HydrationJob::onNewConnection);
}

void OCC::HydrationJob::cancel()
{
if (!_job) {
return;
Q_ASSERT(_signalSocket);

_isCancelled = true;
if (_job) {
_job->cancel();
}

_job->cancel();
_signalSocket->write("cancelled");

emitFinished(Cancelled);
}

void OCC::HydrationJob::emitFinished(Status status)
{
_status = status;
_signalSocket->close();

if (status == Success) {
connect(_socket, &QLocalSocket::disconnected, this, [=]{
_socket->close();
connect(_transferDataSocket, &QLocalSocket::disconnected, this, [=] {
_transferDataSocket->close();
emit finished(this);
});
_socket->disconnectFromServer();
} else {
_socket->close();
emit finished(this);
_transferDataSocket->disconnectFromServer();
return;
}
_transferDataSocket->close();

emit finished(this);
}

void OCC::HydrationJob::emitCanceled()
void OCC::HydrationJob::onCancellationServerNewConnection()
{
connect(_socket, &QLocalSocket::disconnected, this, [=] {
_socket->close();
});
_socket->disconnectFromServer();
Q_ASSERT(!_signalSocket);

emit canceled(this);
qCInfo(lcHydration) << "Got new connection on cancellation server" << _requestId << _folderPath;
_signalSocket = _signalServer->nextPendingConnection();
}

void OCC::HydrationJob::onNewConnection()
{
Q_ASSERT(!_socket);
Q_ASSERT(!_transferDataSocket);
Q_ASSERT(!_job);

qCInfo(lcHydration) << "Got new connection starting GETFileJob" << _requestId << _folderPath;
_socket = _server->nextPendingConnection();
_job = new GETFileJob(_account, _remotePath + _folderPath, _socket, {}, {}, 0, this);
_transferDataSocket = _transferDataServer->nextPendingConnection();
_job = new GETFileJob(_account, _remotePath + _folderPath, _transferDataSocket, {}, {}, 0, this);
connect(_job, &GETFileJob::finishedSignal, this, &HydrationJob::onGetFinished);
connect(_job, &GETFileJob::canceled, this, &HydrationJob::onGetCanceled);
_job->start();
}

void OCC::HydrationJob::onGetCanceled()
void OCC::HydrationJob::finalize(OCC::VfsCfApi *vfs)
{
qCInfo(lcHydration) << "GETFileJob canceled" << _requestId << _folderPath << _job->reply()->error();
emitCanceled();
// Mark the file as hydrated in the sync journal
SyncJournalFileRecord record;
_journal->getFileRecord(_folderPath, &record);
Q_ASSERT(record.isValid());
if (!record.isValid()) {
qCWarning(lcHydration) << "Couldn't find record to update after hydration" << _requestId << _folderPath;
// emitFinished(Error);
return;
}

if (_isCancelled) {
// Remove placeholder file because there might be already pumped
// some data into it
QFile::remove(_localPath + _folderPath);
// Create a new placeholder file
const auto item = SyncFileItem::fromSyncJournalFileRecord(record);
vfs->createPlaceholder(*item);
return;
}

record._type = ItemTypeFile;
_journal->setFileRecord(record);
}

void OCC::HydrationJob::onGetFinished()
{
qCInfo(lcHydration) << "GETFileJob finished" << _requestId << _folderPath << _job->reply()->error();

if (_job->reply()->error()) {
emitFinished(Error);
const auto isGetJobResultError = _job->reply()->error();
// GETFileJob deletes itself after this signal was handled
_job = nullptr;
if (_isCancelled) {
return;
}

SyncJournalFileRecord record;
_journal->getFileRecord(_folderPath, &record);
Q_ASSERT(record.isValid());
if (!record.isValid()) {
qCWarning(lcHydration) << "Couldn't find record to update after hydration" << _requestId << _folderPath;
if (isGetJobResultError) {
emitFinished(Error);
return;
}

record._type = ItemTypeFile;
_journal->setFileRecord(record);
emitFinished(Success);
}
14 changes: 9 additions & 5 deletions src/libsync/vfs/cfapi/hydrationjob.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class QLocalSocket;
namespace OCC {
class GETFileJob;
class SyncJournalDb;
class VfsCfApi;

class OWNCLOUDSYNC_EXPORT HydrationJob : public QObject
{
Expand All @@ -31,6 +32,7 @@ class OWNCLOUDSYNC_EXPORT HydrationJob : public QObject
enum Status {
Success = 0,
Error,
Cancelled,
};
Q_ENUM(Status)

Expand Down Expand Up @@ -58,29 +60,31 @@ class OWNCLOUDSYNC_EXPORT HydrationJob : public QObject

void start();
void cancel();
void finalize(OCC::VfsCfApi *vfs);

signals:
void finished(HydrationJob *job);
void canceled(HydrationJob *job);

private:
void emitFinished(Status status);
void emitCanceled();

void onNewConnection();
void onCancellationServerNewConnection();
void onGetFinished();
void onGetCanceled();

AccountPtr _account;
QString _remotePath;
QString _localPath;
SyncJournalDb *_journal = nullptr;
bool _isCancelled = false;

QString _requestId;
QString _folderPath;

QLocalServer *_server = nullptr;
QLocalSocket *_socket = nullptr;
QLocalServer *_transferDataServer = nullptr;
QLocalServer *_signalServer = nullptr;
QLocalSocket *_transferDataSocket = nullptr;
QLocalSocket *_signalSocket = nullptr;
GETFileJob *_job = nullptr;
Status _status = Success;
};
Expand Down
Loading

0 comments on commit c799dbd

Please sign in to comment.