Skip to content

Commit

Permalink
Merge pull request #79 from nextcloud/enh/db-schema-change
Browse files Browse the repository at this point in the history
enh: db schema change
  • Loading branch information
marcelklehr authored Dec 13, 2024
2 parents b252c8a + e5b7b89 commit 9ae31b5
Show file tree
Hide file tree
Showing 50 changed files with 1,262 additions and 609 deletions.
1 change: 1 addition & 0 deletions appinfo/routes.php
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<?php

/**
* Nextcloud - ContextChat
*
Expand Down
381 changes: 235 additions & 146 deletions composer.lock

Large diffs are not rendered by default.

11 changes: 8 additions & 3 deletions lib/AppInfo/Application.php
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<?php

/**
* Nextcloud - ContextChat
*
Expand All @@ -11,6 +12,8 @@

use OCA\ContextChat\Listener\AppDisableListener;
use OCA\ContextChat\Listener\FileListener;
use OCA\ContextChat\Listener\ShareListener;
use OCA\ContextChat\Listener\UserDeletedListener;
use OCA\ContextChat\Service\ProviderConfigService;
use OCA\ContextChat\TaskProcessing\ContextChatProvider;
use OCA\ContextChat\TaskProcessing\ContextChatTaskType;
Expand All @@ -27,11 +30,12 @@
use OCP\IConfig;
use OCP\Share\Events\ShareCreatedEvent;
use OCP\Share\Events\ShareDeletedEvent;
use OCP\User\Events\UserDeletedEvent;

class Application extends App implements IBootstrap {

public const APP_ID = 'context_chat';
public const MIN_APP_API_VERSION = '2.0.3';
public const MIN_APP_API_VERSION = '3.0.0';

public const CC_DEFAULT_REQUEST_TIMEOUT = 60 * 50; // 50 mins
// max size per file + max size of the batch of files to be embedded in a single request
Expand Down Expand Up @@ -71,12 +75,13 @@ public function __construct(array $urlParams = []) {
public function register(IRegistrationContext $context): void {
$context->registerEventListener(BeforeNodeDeletedEvent::class, FileListener::class);
$context->registerEventListener(NodeCreatedEvent::class, FileListener::class);
$context->registerEventListener(ShareCreatedEvent::class, FileListener::class);
$context->registerEventListener(ShareDeletedEvent::class, FileListener::class);
$context->registerEventListener(CacheEntryInsertedEvent::class, FileListener::class);
$context->registerEventListener(NodeRemovedFromCache::class, FileListener::class);
$context->registerEventListener(NodeWrittenEvent::class, FileListener::class);
$context->registerEventListener(AppDisableEvent::class, AppDisableListener::class);
$context->registerEventListener(UserDeletedEvent::class, UserDeletedListener::class);
$context->registerEventListener(ShareCreatedEvent::class, ShareListener::class);
$context->registerEventListener(ShareDeletedEvent::class, ShareListener::class);
$context->registerTaskProcessingTaskType(ContextChatTaskType::class);
$context->registerTaskProcessingProvider(ContextChatProvider::class);

Expand Down
122 changes: 122 additions & 0 deletions lib/BackgroundJobs/ActionJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
<?php

/**
* Nextcloud - ContextChat
*
* This file is licensed under the Affero General Public License version 3 or
* later. See the COPYING file.
*
* @author Anupam Kumar <kyteinsky@gmail.com>
* @copyright Anupam Kumar 2024
*/

declare(strict_types=1);
namespace OCA\ContextChat\BackgroundJobs;

use OCA\ContextChat\Db\QueueActionMapper;
use OCA\ContextChat\Service\DiagnosticService;
use OCA\ContextChat\Service\LangRopeService;
use OCA\ContextChat\Type\ActionType;
use OCP\AppFramework\Utility\ITimeFactory;
use OCP\BackgroundJob\IJobList;
use OCP\BackgroundJob\QueuedJob;
use Psr\Log\LoggerInterface;

class ActionJob extends QueuedJob {
private const BATCH_SIZE = 100;

public function __construct(
ITimeFactory $timeFactory,
private LangRopeService $networkService,
private QueueActionMapper $actionMapper,
private IJobList $jobList,
private LoggerInterface $logger,
private DiagnosticService $diagnosticService,
) {
parent::__construct($timeFactory);
}

protected function run($argument): void {
$this->diagnosticService->sendHeartbeat(static::class, $this->getId());
$entities = $this->actionMapper->getFromQueue(static::BATCH_SIZE);

if (empty($entities)) {
return;
}

try {
foreach ($entities as $entity) {
$this->diagnosticService->sendHeartbeat(static::class, $this->getId());

switch ($entity->getType()) {
case ActionType::DELETE_SOURCE_IDS:
$decoded = json_decode($entity->getPayload(), true);
if (!is_array($decoded) || !isset($decoded['sourceIds'])) {
$this->logger->warning('Invalid payload for DELETE_SOURCE_IDS action', ['payload' => $entity->getPayload()]);
break;
}
$this->networkService->deleteSources($decoded['sourceIds']);
break;

case ActionType::DELETE_PROVIDER_ID:
$decoded = json_decode($entity->getPayload(), true);
if (!is_array($decoded) || !isset($decoded['providerId'])) {
$this->logger->warning('Invalid payload for DELETE_PROVIDER_ID action', ['payload' => $entity->getPayload()]);
break;
}
$this->networkService->deleteProvider($decoded['providerId']);
break;

case ActionType::DELETE_USER_ID:
$decoded = json_decode($entity->getPayload(), true);
if (!is_array($decoded) || !isset($decoded['userId'])) {
$this->logger->warning('Invalid payload for DELETE_USER_ID action', ['payload' => $entity->getPayload()]);
break;
}
$this->networkService->deleteUser($decoded['userId']);
break;

case ActionType::UPDATE_ACCESS_SOURCE_ID:
$decoded = json_decode($entity->getPayload(), true);
if (!is_array($decoded) || !isset($decoded['op']) || !isset($decoded['userIds']) || !isset($decoded['sourceId'])) {
$this->logger->warning('Invalid payload for UPDATE_ACCESS_SOURCE_ID action', ['payload' => $entity->getPayload()]);
break;
}
$this->networkService->updateAccess($decoded['op'], $decoded['userIds'], $decoded['sourceId']);
break;

case ActionType::UPDATE_ACCESS_PROVIDER_ID:
$decoded = json_decode($entity->getPayload(), true);
if (!is_array($decoded) || !isset($decoded['op']) || !isset($decoded['userIds']) || !isset($decoded['providerId'])) {
$this->logger->warning('Invalid payload for UPDATE_ACCESS_PROVIDER_ID action', ['payload' => $entity->getPayload()]);
break;
}
$this->networkService->updateAccessProvider($decoded['op'], $decoded['userIds'], $decoded['providerId']);
break;

case ActionType::UPDATE_ACCESS_DECL_SOURCE_ID:
$decoded = json_decode($entity->getPayload(), true);
if (!is_array($decoded) || !isset($decoded['userIds']) || !isset($decoded['sourceId'])) {
$this->logger->warning('Invalid payload for UPDATE_ACCESS_DECL_SOURCE_ID action', ['payload' => $entity->getPayload()]);
break;
}
$this->networkService->updateAccessDeclarative($decoded['userIds'], $decoded['sourceId']);
break;

default:
$this->logger->warning('Unknown action type', ['type' => $entity->getType()]);
}
}

foreach ($entities as $entity) {
$this->diagnosticService->sendHeartbeat(static::class, $this->getId());
$this->actionMapper->removeFromQueue($entity);
}
} catch (\Throwable $e) {
$this->jobList->add(static::class);
throw $e;
}

$this->jobList->add(static::class);
}
}
74 changes: 0 additions & 74 deletions lib/BackgroundJobs/DeleteJob.php

This file was deleted.

105 changes: 70 additions & 35 deletions lib/BackgroundJobs/IndexerJob.php
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<?php

/*
* Copyright (c) 2022 The Recognize contributors.
* This file is licensed under the Affero General Public License version 3 or later. See the COPYING file.
Expand All @@ -7,6 +8,7 @@

namespace OCA\ContextChat\BackgroundJobs;

use OCA\ContextChat\AppInfo\Application;
use OCA\ContextChat\Db\QueueFile;
use OCA\ContextChat\Service\DiagnosticService;
use OCA\ContextChat\Service\LangRopeService;
Expand Down Expand Up @@ -46,15 +48,15 @@ class IndexerJob extends TimedJob {
public const DEFAULT_MAX_JOBS_COUNT = 3;

public function __construct(
ITimeFactory $time,
ITimeFactory $time,
private LoggerInterface $logger,
private QueueService $queue,
private QueueService $queue,
private IUserMountCache $userMountCache,
private IJobList $jobList,
private IJobList $jobList,
private LangRopeService $langRopeService,
private StorageService $storageService,
private IRootFolder $rootFolder,
private IAppConfig $appConfig,
private StorageService $storageService,
private IRootFolder $rootFolder,
private IAppConfig $appConfig,
private DiagnosticService $diagnosticService,
private IDBConnection $db,
private ITimeFactory $timeFactory,
Expand Down Expand Up @@ -178,49 +180,82 @@ protected function hasEnoughRunningJobs(): bool {
protected function index(array $files): void {
$maxTime = $this->getMaxIndexingTime();
$startTime = time();
$sources = [];
$allSourceIds = [];
$loadedSources = [];
$retryQFiles = [];
$size = 0;

foreach ($files as $queueFile) {
$this->diagnosticService->sendHeartbeat(static::class, $this->getId());
if ($startTime + $maxTime < time()) {
break;
}

$file = current($this->rootFolder->getById($queueFile->getFileId()));
if (!$file instanceof File) {
continue;
}

$file_size = $file->getSize();
if ($size + $file_size > Application::CC_MAX_SIZE || count($sources) >= Application::CC_MAX_FILES) {
$loadedSources = array_merge($loadedSources, $this->langRopeService->indexSources($sources));
$sources = [];
$size = 0;
}

$userIds = $this->storageService->getUsersForFileId($queueFile->getFileId());
foreach ($userIds as $userId) {
$this->diagnosticService->sendHeartbeat(static::class, $this->getId());
$this->diagnosticService->sendHeartbeat(static::class, $this->getId());

try {
try {
try {
$fileHandle = $file->fopen('r');
} catch (LockedException|NotPermittedException $e) {
$this->logger->error('Could not open file ' . $file->getPath() . ' for reading', ['exception' => $e]);
continue;
}
if (!is_resource($fileHandle)) {
$this->logger->warning('File handle for' . $file->getPath() . ' is not readable');
continue;
}
$source = new Source(
$userId,
ProviderConfigService::getSourceId($file->getId()),
$file->getPath(),
$fileHandle,
$file->getMtime(),
$file->getMimeType(),
ProviderConfigService::getDefaultProviderKey(),
);
} catch (InvalidPathException|NotFoundException $e) {
$this->logger->error('Could not find file ' . $file->getPath(), ['exception' => $e]);
continue 2;
$fileHandle = $file->fopen('r');
} catch (NotPermittedException $e) {
$this->logger->error('Could not open file ' . $file->getPath() . ' for reading', ['exception' => $e]);
continue;
} catch (LockedException $e) {
$retryQFiles[] = $queueFile;
$this->logger->info('File ' . $file->getPath() . ' is locked, could not read for indexing. Adding it to the next batch.');
continue;
}
$this->langRopeService->indexSources([$source]);
if (!is_resource($fileHandle)) {
$this->logger->warning('File handle for' . $file->getPath() . ' is not readable');
continue;
}

$sources[] = new Source(
$userIds,
ProviderConfigService::getSourceId($file->getId()),
substr($file->getInternalPath(), 6), // remove 'files/' prefix
$fileHandle,
$file->getMtime(),
$file->getMimeType(),
ProviderConfigService::getDefaultProviderKey(),
);
$allSourceIds[] = ProviderConfigService::getSourceId($file->getId());
} catch (InvalidPathException|NotFoundException $e) {
$this->logger->error('Could not find file ' . $file->getPath(), ['exception' => $e]);
continue;
}
try {
$this->queue->removeFromQueue($queueFile);
} catch (Exception $e) {
$this->logger->error('Could not remove file from queue', ['exception' => $e]);
}

if (count($sources) > 0) {
$loadedSources = array_merge($loadedSources, $this->langRopeService->indexSources($sources));
}

$emptyInvalidSources = array_diff($allSourceIds, $loadedSources);
if (count($emptyInvalidSources) > 0) {
$this->logger->info('Invalid or empty sources that were not indexed', ['sourceIds' => $emptyInvalidSources]);
}

try {
$this->queue->removeFromQueue($files);
// add files that were locked to the end of the queue
foreach ($retryQFiles as $queueFile) {
$this->queue->insertIntoQueue($queueFile);
}
} catch (Exception $e) {
$this->logger->error('Could not remove indexed files from queue', ['exception' => $e]);
}
}
}
Loading

0 comments on commit 9ae31b5

Please sign in to comment.