Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cfapi: Make sure no data is transfered after cancellation #3420

Merged
merged 1 commit into from
Jun 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/libsync/propagatedownload.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,11 +351,13 @@ void GETFileJob::slotReadyRead()

void GETFileJob::cancel()
{
if (reply()->isRunning()) {
reply()->abort();
const auto networkReply = reply();
FlexW marked this conversation as resolved.
Show resolved Hide resolved
if (networkReply && networkReply->isRunning()) {
networkReply->abort();
}
if (_device && _device->isOpen()) {
_device->close();
}

emit canceled();
}

void GETFileJob::onTimedOut()
Expand Down
1 change: 0 additions & 1 deletion src/libsync/propagatedownload.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ class GETFileJob : public AbstractNetworkJob
void setExpectedContentLength(qint64 size) { _expectedContentLength = size; }

signals:
void canceled();
void finishedSignal();
void downloadProgress(qint64, qint64);
private slots:
Expand Down
48 changes: 38 additions & 10 deletions src/libsync/vfs/cfapi/cfapiwrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,22 @@ void CALLBACK cfApiFetchDataCallback(const CF_CALLBACK_INFO *callbackInfo, const
return;
}

QLocalSocket signalSocket;
const QString signalSocketName = requestId + ":cancellation";
signalSocket.connectToServer(signalSocketName);
const auto cancellationSocketConnectResult = signalSocket.waitForConnected();
if (!cancellationSocketConnectResult) {
qCWarning(lcCfApiWrapper) << "Couldn't connect the socket" << signalSocketName
<< signalSocket.error() << signalSocket.errorString();
sendTransferError();
return;
}

auto hydrationRequestCancelled = false;
QObject::connect(&signalSocket, &QLocalSocket::readyRead, &loop, [&] {
hydrationRequestCancelled = true;
});

// CFAPI expects sent blocks to be of a multiple of a block size.
// Only the last sent block is allowed to be of a different size than
// a multiple of a block size
Expand All @@ -169,6 +185,11 @@ void CALLBACK cfApiFetchDataCallback(const CF_CALLBACK_INFO *callbackInfo, const
};

QObject::connect(&socket, &QLocalSocket::readyRead, &loop, [&] {
if (hydrationRequestCancelled) {
qCDebug(lcCfApiWrapper) << "Don't transfer data because request" << requestId << "was cancelled";
return;
}

const auto receivedData = socket.readAll();
if (receivedData.isEmpty()) {
qCWarning(lcCfApiWrapper) << "Unexpected empty data received" << requestId;
Expand All @@ -180,24 +201,34 @@ void CALLBACK cfApiFetchDataCallback(const CF_CALLBACK_INFO *callbackInfo, const
alignAndSendData(receivedData);
});

QObject::connect(vfs, &OCC::VfsCfApi::hydrationRequestFinished, &loop, [&](const QString &id, int s) {
QObject::connect(vfs, &OCC::VfsCfApi::hydrationRequestFinished, &loop, [&](const QString &id) {
qDebug(lcCfApiWrapper) << "Hydration finished for request" << id;
if (requestId == id) {
const auto status = static_cast<OCC::HydrationJob::Status>(s);
qCInfo(lcCfApiWrapper) << "Hydration done for" << path << requestId << status;
if (status != OCC::HydrationJob::Success) {
sendTransferError();
}
socket.close();
FlexW marked this conversation as resolved.
Show resolved Hide resolved
signalSocket.close();
loop.quit();
}
});

loop.exec();

if (!protrudingData.isEmpty()) {
if (!hydrationRequestCancelled && !protrudingData.isEmpty()) {
qDebug(lcCfApiWrapper) << "Send remaining protruding data. Size:" << protrudingData.size();
sendTransferInfo(protrudingData, dataOffset);
}

int hydrationJobResult = OCC::HydrationJob::Status::Error;
const auto invokeFinalizeResult = QMetaObject::invokeMethod(
vfs, [=] { vfs->finalizeHydrationJob(requestId); }, Qt::BlockingQueuedConnection,
&hydrationJobResult);
if (!invokeFinalizeResult) {
qCritical(lcCfApiWrapper) << "Failed to finalize hydration job for" << path << requestId;
}

if (static_cast<OCC::HydrationJob::Status>(hydrationJobResult) == OCC::HydrationJob::Success) {
sendTransferError();
}
}
}

void CALLBACK cfApiCancelFetchData(const CF_CALLBACK_INFO *callbackInfo, const CF_CALLBACK_PARAMETERS * /*callbackParameters*/)
Expand All @@ -217,7 +248,6 @@ void CALLBACK cfApiCancelFetchData(const CF_CALLBACK_INFO *callbackInfo, const C
}
}


CF_CALLBACK_REGISTRATION cfApiCallbacks[] = {
{ CF_CALLBACK_TYPE_FETCH_DATA, cfApiFetchDataCallback },
{ CF_CALLBACK_TYPE_CANCEL_FETCH_DATA, cfApiCancelFetchData },
Expand Down Expand Up @@ -292,8 +322,6 @@ CF_SET_PIN_FLAGS pinRecurseModeToCfSetPinFlags(OCC::CfApiWrapper::SetPinRecurseM
}
}

}

OCC::CfApiWrapper::ConnectionKey::ConnectionKey()
: _data(new CF_CONNECTION_KEY, [](void *p) { delete reinterpret_cast<CF_CONNECTION_KEY *>(p); })
{
Expand Down
121 changes: 81 additions & 40 deletions src/libsync/vfs/cfapi/hydrationjob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "common/syncjournaldb.h"
#include "propagatedownload.h"
#include "vfs/cfapi/vfs_cfapi.h"

#include <QLocalServer>
#include <QLocalSocket>
Expand All @@ -25,7 +26,6 @@ Q_LOGGING_CATEGORY(lcHydration, "nextcloud.sync.vfs.hydrationjob", QtInfoMsg)
OCC::HydrationJob::HydrationJob(QObject *parent)
: QObject(parent)
{
connect(this, &HydrationJob::finished, this, &HydrationJob::deleteLater);
}

OCC::AccountPtr OCC::HydrationJob::account() const
Expand Down Expand Up @@ -104,90 +104,131 @@ void OCC::HydrationJob::start()
Q_ASSERT(_localPath.endsWith('/'));
Q_ASSERT(!_folderPath.startsWith('/'));

_server = new QLocalServer(this);
const auto listenResult = _server->listen(_requestId);
if (!listenResult) {
qCCritical(lcHydration) << "Couldn't get server to listen" << _requestId << _localPath << _folderPath;
emitFinished(Error);
const auto startServer = [this](const QString &serverName) -> QLocalServer * {
const auto server = new QLocalServer(this);
const auto listenResult = server->listen(serverName);
if (!listenResult) {
qCCritical(lcHydration) << "Couldn't get server to listen" << serverName
<< _localPath << _folderPath;
if (!_isCancelled) {
emitFinished(Error);
}
return nullptr;
}
qCInfo(lcHydration) << "Server ready, waiting for connections" << serverName
<< _localPath << _folderPath;
return server;
};

// Start cancellation server
_signalServer = startServer(_requestId + ":cancellation");
Q_ASSERT(_signalServer);
if (!_signalServer) {
return;
}
connect(_signalServer, &QLocalServer::newConnection, this, &HydrationJob::onCancellationServerNewConnection);

qCInfo(lcHydration) << "Server ready, waiting for connections" << _requestId << _localPath << _folderPath;
connect(_server, &QLocalServer::newConnection, this, &HydrationJob::onNewConnection);
// Start transfer data server
_transferDataServer = startServer(_requestId);
Q_ASSERT(_transferDataServer);
if (!_transferDataServer) {
return;
}
connect(_transferDataServer, &QLocalServer::newConnection, this, &HydrationJob::onNewConnection);
}

void OCC::HydrationJob::cancel()
{
if (!_job) {
return;
Q_ASSERT(_signalSocket);

_isCancelled = true;
if (_job) {
_job->cancel();
}

_job->cancel();
_signalSocket->write("cancelled");

emitFinished(Cancelled);
}

void OCC::HydrationJob::emitFinished(Status status)
{
_status = status;
_signalSocket->close();

if (status == Success) {
connect(_socket, &QLocalSocket::disconnected, this, [=]{
_socket->close();
connect(_transferDataSocket, &QLocalSocket::disconnected, this, [=] {
_transferDataSocket->close();
emit finished(this);
});
_socket->disconnectFromServer();
} else {
_socket->close();
emit finished(this);
_transferDataSocket->disconnectFromServer();
return;
}
_transferDataSocket->close();

emit finished(this);
}

void OCC::HydrationJob::emitCanceled()
void OCC::HydrationJob::onCancellationServerNewConnection()
{
connect(_socket, &QLocalSocket::disconnected, this, [=] {
_socket->close();
});
_socket->disconnectFromServer();
Q_ASSERT(!_signalSocket);

emit canceled(this);
qCInfo(lcHydration) << "Got new connection on cancellation server" << _requestId << _folderPath;
_signalSocket = _signalServer->nextPendingConnection();
}

void OCC::HydrationJob::onNewConnection()
{
Q_ASSERT(!_socket);
Q_ASSERT(!_transferDataSocket);
Q_ASSERT(!_job);

qCInfo(lcHydration) << "Got new connection starting GETFileJob" << _requestId << _folderPath;
_socket = _server->nextPendingConnection();
_job = new GETFileJob(_account, _remotePath + _folderPath, _socket, {}, {}, 0, this);
_transferDataSocket = _transferDataServer->nextPendingConnection();
_job = new GETFileJob(_account, _remotePath + _folderPath, _transferDataSocket, {}, {}, 0, this);
connect(_job, &GETFileJob::finishedSignal, this, &HydrationJob::onGetFinished);
connect(_job, &GETFileJob::canceled, this, &HydrationJob::onGetCanceled);
_job->start();
}

void OCC::HydrationJob::onGetCanceled()
void OCC::HydrationJob::finalize(OCC::VfsCfApi *vfs)
{
qCInfo(lcHydration) << "GETFileJob canceled" << _requestId << _folderPath << _job->reply()->error();
emitCanceled();
// Mark the file as hydrated in the sync journal
SyncJournalFileRecord record;
_journal->getFileRecord(_folderPath, &record);
Q_ASSERT(record.isValid());
if (!record.isValid()) {
qCWarning(lcHydration) << "Couldn't find record to update after hydration" << _requestId << _folderPath;
// emitFinished(Error);
return;
}

if (_isCancelled) {
// Remove placeholder file because there might be already pumped
// some data into it
QFile::remove(_localPath + _folderPath);
// Create a new placeholder file
const auto item = SyncFileItem::fromSyncJournalFileRecord(record);
vfs->createPlaceholder(*item);
return;
}

record._type = ItemTypeFile;
_journal->setFileRecord(record);
}

void OCC::HydrationJob::onGetFinished()
{
qCInfo(lcHydration) << "GETFileJob finished" << _requestId << _folderPath << _job->reply()->error();

if (_job->reply()->error()) {
emitFinished(Error);
const auto isGetJobResultError = _job->reply()->error();
// GETFileJob deletes itself after this signal was handled
_job = nullptr;
if (_isCancelled) {
return;
}

SyncJournalFileRecord record;
_journal->getFileRecord(_folderPath, &record);
Q_ASSERT(record.isValid());
if (!record.isValid()) {
qCWarning(lcHydration) << "Couldn't find record to update after hydration" << _requestId << _folderPath;
if (isGetJobResultError) {
emitFinished(Error);
return;
}

record._type = ItemTypeFile;
_journal->setFileRecord(record);
emitFinished(Success);
}
14 changes: 9 additions & 5 deletions src/libsync/vfs/cfapi/hydrationjob.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class QLocalSocket;
namespace OCC {
class GETFileJob;
class SyncJournalDb;
class VfsCfApi;

class OWNCLOUDSYNC_EXPORT HydrationJob : public QObject
{
Expand All @@ -31,6 +32,7 @@ class OWNCLOUDSYNC_EXPORT HydrationJob : public QObject
enum Status {
Success = 0,
Error,
Cancelled,
};
Q_ENUM(Status)

Expand Down Expand Up @@ -58,29 +60,31 @@ class OWNCLOUDSYNC_EXPORT HydrationJob : public QObject

void start();
void cancel();
void finalize(OCC::VfsCfApi *vfs);

signals:
void finished(HydrationJob *job);
void canceled(HydrationJob *job);

private:
void emitFinished(Status status);
void emitCanceled();

void onNewConnection();
void onCancellationServerNewConnection();
void onGetFinished();
void onGetCanceled();

AccountPtr _account;
QString _remotePath;
QString _localPath;
SyncJournalDb *_journal = nullptr;
bool _isCancelled = false;

QString _requestId;
QString _folderPath;

QLocalServer *_server = nullptr;
QLocalSocket *_socket = nullptr;
QLocalServer *_transferDataServer = nullptr;
QLocalServer *_signalServer = nullptr;
QLocalSocket *_transferDataSocket = nullptr;
QLocalSocket *_signalSocket = nullptr;
GETFileJob *_job = nullptr;
Status _status = Success;
};
Expand Down
Loading