Skip to content

Commit

Permalink
async/split process
Browse files Browse the repository at this point in the history
Signed-off-by: Maxence Lange <maxence@artificial-owl.com>
  • Loading branch information
ArtificialOwl committed May 30, 2022
1 parent 6f46d3f commit aa7a8a4
Show file tree
Hide file tree
Showing 34 changed files with 1,324 additions and 244 deletions.
3 changes: 2 additions & 1 deletion appinfo/routes.php
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@
],

'routes' => [
['name' => 'EventWrapper#asyncBroadcast', 'url' => '/async/{token}/', 'verb' => 'POST'],
['name' => 'EventWrapper#asyncBroadcast', 'url' => '/async/broadcast/{token}', 'verb' => 'POST'],
['name' => 'EventWrapper#asyncInternal', 'url' => '/async/internal/{token}', 'verb' => 'POST'],

['name' => 'Remote#appService', 'url' => '/', 'verb' => 'GET'],
['name' => 'Remote#test', 'url' => '/test', 'verb' => 'GET'],
Expand Down
10 changes: 8 additions & 2 deletions lib/CircleSharesManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
use Exception;
use OCA\Circles\Exceptions\CircleSharesManagerException;
use OCA\Circles\Model\Probes\CircleProbe;
use OCA\Circles\Model\SyncedItemLock;
use OCA\Circles\Service\CircleService;
use OCA\Circles\Service\ConfigService;
use OCA\Circles\Service\DebugService;
Expand Down Expand Up @@ -214,7 +215,10 @@ public function deleteShare(string $itemId, string $circleId): void {
*/
public function updateItem(
string $itemId,
array $extraData = []
string $updateType = '',
string $updateTypeId = '',
array $extraData = [],
bool $sumCheck = true
): void {
$this->mustHaveOrigin();

Expand All @@ -226,12 +230,13 @@ public function updateItem(
'appId' => $this->originAppId,
'itemType' => $this->originItemType,
'itemId' => $itemId,
'updateType' => $updateType,
'updateTypeId' => $updateTypeId,
'extraData' => $extraData
]
);

try {
// $this->mustHaveOrigin();

// // TODO: verify rules that apply when sharing to a circle
// $probe = new CircleProbe();
Expand Down Expand Up @@ -260,6 +265,7 @@ public function updateItem(
$this->federatedSyncItemService->requestSyncedItemUpdate(
$this->federatedUserService->getCurrentEntity(),
$syncedItem,
new SyncedItemLock($updateType, $updateTypeId, $sumCheck),
$extraData
);
} catch (Exception $e) {
Expand Down
46 changes: 37 additions & 9 deletions lib/Controller/EventWrapperController.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
namespace OCA\Circles\Controller;

use OCA\Circles\AppInfo\Application;
use OCA\Circles\Exceptions\EventWrapperNotFoundException;
use OCA\Circles\Service\AsyncService;
use OCA\Circles\Service\ConfigService;
use OCA\Circles\Service\EventWrapperService;
use OCA\Circles\Service\FederatedEventService;
Expand Down Expand Up @@ -66,6 +68,8 @@ class EventWrapperController extends Controller {
/** @var RemoteDownstreamService */
private $remoteDownstreamService;

private AsyncService $asyncService;

/** @var ConfigService */
private $configService;

Expand All @@ -79,6 +83,7 @@ class EventWrapperController extends Controller {
* @param FederatedEventService $federatedEventService
* @param RemoteUpstreamService $remoteUpstreamService
* @param RemoteDownstreamService $remoteDownstreamService
* @param AsyncService $asyncService
* @param ConfigService $configService
*/
public function __construct(
Expand All @@ -88,13 +93,15 @@ public function __construct(
FederatedEventService $federatedEventService,
RemoteUpstreamService $remoteUpstreamService,
RemoteDownstreamService $remoteDownstreamService,
AsyncService $asyncService,
ConfigService $configService
) {
parent::__construct($appName, $request);
$this->eventWrapperService = $eventWrapperService;
$this->federatedEventService = $federatedEventService;
$this->remoteUpstreamService = $remoteUpstreamService;
$this->remoteDownstreamService = $remoteDownstreamService;
$this->asyncService = $asyncService;
$this->configService = $configService;

$this->setup('app', Application::APP_ID);
Expand All @@ -105,7 +112,7 @@ public function __construct(
/**
* Called locally.
*
* Async process and broadcast the event to every instances of GS
* Async process and broadcast the event to every instance of GS
* This should be initiated by the instance that owns the Circles.
*
* @PublicPage
Expand All @@ -116,19 +123,15 @@ public function __construct(
* @return DataResponse
*/
public function asyncBroadcast(string $token): DataResponse {
$wrappers = $this->remoteUpstreamService->getEventsByToken($token);
$wrappers = $this->eventWrapperService->getBroadcastByToken($token);
if (empty($wrappers) && $token !== 'test-dummy-token') {
return new DataResponse([], Http::STATUS_OK);
}

// closing socket, keep current process running.
$this->async();

foreach ($wrappers as $wrapper) {
$this->eventWrapperService->manageWrapper($wrapper);
}

$this->eventWrapperService->confirmStatus($token);
$this->asyncService->setSplittable(true);
$this->asyncService->split();
$this->eventWrapperService->performBroadcast($token, $wrappers);

// so circles:check can check async is fine
if ($token === 'test-dummy-token') {
Expand All @@ -140,6 +143,31 @@ public function asyncBroadcast(string $token): DataResponse {
}


/**
* Called locally.
*
* Async process and continue using IInternalAsync
*
* @PublicPage
* @NoCSRFRequired
*
* @param string $token
*
* @return DataResponse
* @throws EventWrapperNotFoundException
*/
public function asyncInternal(string $token): DataResponse {
$this->asyncService->setSplittable(true);
$this->asyncService->split();

$this->eventWrapperService->performInternal($token);

// exit() or useless log will be generated
exit();
}



// /**
// * Status Event. This is an event to check status of items between instances.
// *
Expand Down
8 changes: 5 additions & 3 deletions lib/Controller/SyncController.php
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,15 @@ public function updateSyncedItem(): DataResponse {
]
);

$updated = $this->federatedSyncItemService->requestSyncedItemUpdate(
$this->federatedSyncItemService->requestSyncedItemUpdate(
$wrapper->getFederatedUser(),
$local,
$wrapper->getExtraData()
$wrapper->getLock(),
$wrapper->getExtraData(),
true
);

return new DataResponse($updated);
return new DataResponse([]);
} catch (Exception $e) {
$this->e($e);

Expand Down
54 changes: 27 additions & 27 deletions lib/Db/CoreQueryBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,33 +56,33 @@ class CoreQueryBuilder extends ExtendedQueryBuilder {
use TArrayTools;


public const SINGLE = 'cs';
public const CIRCLE = 'cc';
public const MEMBER = 'mm';
public const OWNER = 'wn';
public const FEDERATED_EVENT = 'ev';
public const REMOTE = 'rm';
public const BASED_ON = 'on';
public const INITIATOR = 'in';
public const DIRECT_INITIATOR = 'di';
public const MEMBERSHIPS = 'ms';
public const CONFIG = 'cf';
public const UPSTREAM_MEMBERSHIPS = 'up';
public const INHERITANCE_FROM = 'ih';
public const INHERITED_BY = 'by';
public const INVITED_BY = 'nv';
public const MOUNT = 'mo';
public const MOUNTPOINT = 'mp';
public const SHARE = 'sh';
public const FILE_CACHE = 'fc';
public const STORAGES = 'st';
public const TOKEN = 'tk';
public const OPTIONS = 'pt';
public const HELPER = 'hp';
public const SYNC_ITEM = 'si';
public const SYNC_SHARE = 'ss';
public const SYNC_LOCK = 'sl';
public const DEBUG = 'bg';
public const SINGLE = 'ca';
public const CIRCLE = 'cb';
public const MEMBER = 'cc';
public const OWNER = 'cd';
public const FEDERATED_EVENT = 'ce';
public const REMOTE = 'cf';
public const BASED_ON = 'cg';
public const INITIATOR = 'ch';
public const DIRECT_INITIATOR = 'ci';
public const MEMBERSHIPS = 'cj';
public const CONFIG = 'ck';
public const UPSTREAM_MEMBERSHIPS = 'cl';
public const INHERITANCE_FROM = 'cm';
public const INHERITED_BY = 'cn';
public const INVITED_BY = 'co';
public const MOUNT = 'cp';
public const MOUNTPOINT = 'cq';
public const SHARE = 'cr';
public const FILE_CACHE = 'cs';
public const STORAGES = 'ct';
public const TOKEN = 'cu';
public const OPTIONS = 'cv';
public const HELPER = 'cw';
public const SYNC_ITEM = 'cx';
public const SYNC_SHARE = 'cy';
public const SYNC_LOCK = 'cz';
public const DEBUG = 'c0';


public static $SQL_PATH = [
Expand Down
2 changes: 2 additions & 0 deletions lib/Db/CoreRequestBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ class CoreRequestBuilder {
self::TABLE_EVENT => [
'token',
'event',
'store',
'event_type',
'result',
'instance',
'interface',
Expand Down
36 changes: 30 additions & 6 deletions lib/Db/EventWrapperRequest.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

namespace OCA\Circles\Db;

use OCA\Circles\Exceptions\EventWrapperNotFoundException;
use OCA\Circles\Model\Federated\EventWrapper;

/**
Expand All @@ -43,15 +44,16 @@ class EventWrapperRequest extends EventWrapperRequestBuilder {

/**
* @param EventWrapper $wrapper
*
* @throws \OCP\DB\Exception
*/
public function save(EventWrapper $wrapper): void {
$qb = $this->getEventWrapperInsertSql();
$qb->setValue('token', $qb->createNamedParameter($wrapper->getToken()))
->setValue('event_type', $qb->createNamedParameter($wrapper->getEventType()))
->setValue(
'event', $qb->createNamedParameter(json_encode($wrapper->getEvent(), JSON_UNESCAPED_SLASHES))
)
->setValue(
'result', $qb->createNamedParameter(json_encode($wrapper->getResult(), JSON_UNESCAPED_SLASHES))
'result',
$qb->createNamedParameter(json_encode($wrapper->getResult(), JSON_UNESCAPED_SLASHES))
)
->setValue('instance', $qb->createNamedParameter($wrapper->getInstance()))
->setValue('interface', $qb->createNamedParameter($wrapper->getInterface()))
Expand All @@ -60,7 +62,13 @@ public function save(EventWrapper $wrapper): void {
->setValue('status', $qb->createNamedParameter($wrapper->getStatus()))
->setValue('creation', $qb->createNamedParameter($wrapper->getCreation()));

$qb->execute();
$event = ($wrapper->hasEvent()) ? json_encode($wrapper->getEvent(), JSON_UNESCAPED_SLASHES) : '';
$qb->setValue('event', $qb->createNamedParameter($event));

$store = ($wrapper->hasStore()) ? json_encode($wrapper->getStore(), JSON_UNESCAPED_SLASHES) : '';
$qb->setValue('store', $qb->createNamedParameter($store));

$qb->executeStatement();
}

/**
Expand Down Expand Up @@ -113,10 +121,26 @@ public function getFailedEvents(array $retryRange): array {
*
* @return EventWrapper[]
*/
public function getByToken(string $token): array {
public function getBroadcastByToken(string $token): array {
$qb = $this->getEventWrapperSelectSql();
$qb->limitToToken($token);
$qb->limit('event_type', EventWrapper::TYPE_BROADCAST);

return $this->getItemsFromRequest($qb);
}


/**
* @param string $token
*
* @return EventWrapper
* @throws EventWrapperNotFoundException
*/
public function getInternalByToken(string $token): EventWrapper {
$qb = $this->getEventWrapperSelectSql();
$qb->limitToToken($token);
$qb->limit('event_type', EventWrapper::TYPE_INTERNAL);

return $this->getItemFromRequest($qb);
}
}
48 changes: 45 additions & 3 deletions lib/Db/SyncedItemLockRequest.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
namespace OCA\Circles\Db;

use OCA\Circles\Exceptions\InvalidIdException;
use OCA\Circles\Exceptions\SyncedItemNotFoundException;
use OCA\Circles\Model\SyncedItemLock;
use OCA\Circles\Tools\Exceptions\InvalidItemException;

/**
* Class SyncedItemLockRequest
Expand All @@ -51,11 +53,51 @@ public function save(SyncedItemLock $lock): void {
$this->confirmValidIds([$lock->getSingleId()]);

$qb = $this->getSyncedItemLockInsertSql();
$qb->setValue('single_id', $qb->createNamedParameter($lock->getSingleId()))
->setValue('update_type', $qb->createNamedParameter($lock->getUpdateType()))
$qb->setValue('update_type', $qb->createNamedParameter($lock->getUpdateType()))
->setValue('update_type_id', $qb->createNamedParameter($lock->getUpdateTypeId()))
->setValue('time', $qb->createNamedParameter($lock->getTime()));
->setValue('time', $qb->createNamedParameter(time()));

$qb->execute();
}


/**
* @param SyncedItemLock $syncedLock
*/
public function remove(SyncedItemLock $syncedLock): void {
$qb = $this->getSyncedItemLockDeleteSql();

$qb->limit('update_type', $syncedLock->getUpdateType());
$qb->limit('update_type_id', $syncedLock->getUpdateTypeId());

$qb->executeStatement();
}

/**
* @param SyncedItemLock $syncedLock
*
* @return SyncedItemLock
* @throws SyncedItemNotFoundException
* @throws InvalidItemException
*/
public function getSyncedItemLock(SyncedItemLock $syncedLock): SyncedItemLock {
$qb = $this->getSyncedItemLockSelectSql();

$qb->limit('update_type', $syncedLock->getUpdateType());
$qb->limit('update_type_id', $syncedLock->getUpdateTypeId());

return $this->getItemFromRequest($qb);
}


/**
* @param int $time
*/
public function clean(int $time = 10): void {
$qb = $this->getSyncedItemLockSelectSql();
$qb->lt('time', (time() - $time));

$qb->executeStatement();
}

}
Loading

0 comments on commit aa7a8a4

Please sign in to comment.