Skip to content

Commit

Permalink
Merge pull request #6812 from nextcloud/enh/process-imip-in-background
Browse files Browse the repository at this point in the history
Add imip processing in the background
  • Loading branch information
ChristophWurst authored Sep 9, 2022
2 parents 1c2a5d0 + 2dcf8b9 commit 5670311
Show file tree
Hide file tree
Showing 18 changed files with 1,425 additions and 10 deletions.
1 change: 1 addition & 0 deletions appinfo/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
<background-jobs>
<job>OCA\Mail\BackgroundJob\CleanupJob</job>
<job>OCA\Mail\BackgroundJob\OutboxWorkerJob</job>
<job>OCA\Mail\BackgroundJob\IMipMessageJob</job>
</background-jobs>
<repair-steps>
<post-migration>
Expand Down
47 changes: 47 additions & 0 deletions lib/BackgroundJob/IMipMessageJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php

declare(strict_types=1);

/*
* @copyright 2022 Anna Larch <anna.larch@gmx.net>
*
* @author 2022 Anna Larch <anna.larch@gmx.net>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

namespace OCA\Mail\BackgroundJob;

use OCA\Mail\Service\IMipService;
use OCP\AppFramework\Utility\ITimeFactory;
use OCP\BackgroundJob\TimedJob;

class IMipMessageJob extends TimedJob {
private IMipService $iMipService;

public function __construct(ITimeFactory $time,
IMipService $iMipService) {
parent::__construct($time);

// Run once per hour
$this->setInterval(60 * 60);
$this->iMipService = $iMipService;
}

protected function run($argument): void {
$this->iMipService->process();
}
}
96 changes: 96 additions & 0 deletions lib/BackgroundJob/PreviewEnhancementProcessingJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
<?php

declare(strict_types=1);
/**
* @copyright Anna Larch <anna.larch@gmx.net>
*
* @author Anna Larch <anna.larch@gmx.net>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

namespace OCA\Mail\BackgroundJob;

use OCA\Mail\Service\AccountService;
use OCA\Mail\Service\PreprocessingService;
use OCP\AppFramework\Db\DoesNotExistException;
use OCP\AppFramework\Utility\ITimeFactory;
use OCP\BackgroundJob\IJobList;
use OCP\BackgroundJob\TimedJob;
use OCP\IUserManager;
use Psr\Log\LoggerInterface;
use function sprintf;

class PreviewEnhancementProcessingJob extends TimedJob {
private IUserManager $userManager;
private AccountService $accountService;
private LoggerInterface $logger;
private IJobList $jobList;
private PreprocessingService $preprocessingService;

public function __construct(ITimeFactory $time,
IUserManager $userManager,
AccountService $accountService,
PreprocessingService $preprocessingService,
LoggerInterface $logger,
IJobList $jobList) {
parent::__construct($time);

$this->userManager = $userManager;
$this->accountService = $accountService;
$this->logger = $logger;
$this->jobList = $jobList;
$this->preprocessingService = $preprocessingService;

$this->setInterval(3600);
$this->setTimeSensitivity(self::TIME_SENSITIVE);
}

/**
* @return void
*/
public function run($argument) {
$accountId = (int)$argument['accountId'];

try {
$account = $this->accountService->findById($accountId);
} catch (DoesNotExistException $e) {
$this->logger->debug('Could not find account <' . $accountId . '> removing from jobs');
$this->jobList->remove(self::class, $argument);
return;
}

$user = $this->userManager->get($account->getUserId());
if ($user === null || !$user->isEnabled()) {
$this->logger->debug(sprintf(
'Account %d of user %s could not be found or was disabled, skipping preprocessing of messages',
$account->getId(),
$account->getUserId()
));
return;
}

$dbAccount = $account->getMailAccount();
if (!is_null($dbAccount->getProvisioningId()) && $dbAccount->getInboundPassword() === null) {
$this->logger->info("Ignoring preprocessing job for provisioned account that has no password set yet");
return;
}

$limitTimestamp = $this->time->getTime() - (60 * 60 * 24 * 14); // Two weeks into the past
$this->preprocessingService->process($limitTimestamp, $account);
}
}
18 changes: 18 additions & 0 deletions lib/Db/MailboxMapper.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
use OCP\AppFramework\Db\MultipleObjectsReturnedException;
use OCP\AppFramework\Db\QBMapper;
use OCP\AppFramework\Utility\ITimeFactory;
use OCP\DB\Exception;
use OCP\DB\QueryBuilder\IQueryBuilder;
use OCP\DB\QueryBuilder\IQueryFunction;
use OCP\IDBConnection;
Expand Down Expand Up @@ -128,6 +129,23 @@ public function findById(int $id): Mailbox {
}
}

/**
* @return Mailbox[]
*
* @throws Exception
*/
public function findByIds(array $ids): array {
$qb = $this->db->getQueryBuilder();

$select = $qb->select('*')
->from($this->getTableName())
->where(
$qb->expr()->in('id', $qb->createNamedParameter($ids, IQueryBuilder::PARAM_INT_ARRAY), IQueryBuilder::PARAM_INT_ARRAY)
);
return $this->findEntities($select);
}


/**
* @param int $id
* @param string $uid
Expand Down
13 changes: 13 additions & 0 deletions lib/Db/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@
* @method null|string getPreviewText()
* @method void setUpdatedAt(int $time)
* @method int getUpdatedAt()
* @method bool isImipMessage()
* @method void setImipMessage(bool $imipMessage)
* @method bool isImipProcessed()
* @method void setImipProcessed(bool $imipProcessed)
* @method bool isImipError()
* @method void setImipError(bool $imipError)
*/
class Message extends Entity implements JsonSerializable {
private const MUTABLE_FLAGS = [
Expand Down Expand Up @@ -114,6 +120,9 @@ class Message extends Entity implements JsonSerializable {
protected $flagImportant = false;
protected $flagMdnsent;
protected $previewText;
protected $imipMessage = false;
protected $imipProcessed = false;
protected $imipError = false;

/** @var AddressList */
private $from;
Expand Down Expand Up @@ -152,6 +161,9 @@ public function __construct() {
$this->addType('flagImportant', 'boolean');
$this->addType('flagMdnsent', 'boolean');
$this->addType('updatedAt', 'integer');
$this->addType('imipMessage', 'boolean');
$this->addType('imipProcessed', 'boolean');
$this->addType('imipError', 'boolean');
}

/**
Expand Down Expand Up @@ -316,6 +328,7 @@ function (Tag $tag) {
'inReplyTo' => $this->getInReplyTo(),
'references' => empty($this->getReferences()) ? null: json_decode($this->getReferences(), true),
'threadRootId' => $this->getThreadRootId(),
'imipMessage' => $this->isImipMessage(),
'previewText' => $this->getPreviewText(),
];
}
Expand Down
88 changes: 87 additions & 1 deletion lib/Db/MessageMapper.php
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,6 @@ public function insertBulk(Account $account, Message ...$messages): void {
$qb1->setParameter('flag_notjunk', $message->getFlagNotjunk(), IQueryBuilder::PARAM_BOOL);
$qb1->setParameter('flag_important', $message->getFlagImportant(), IQueryBuilder::PARAM_BOOL);
$qb1->setParameter('flag_mdnsent', $message->getFlagMdnsent(), IQueryBuilder::PARAM_BOOL);

$qb1->execute();

$messageId = $qb1->getLastInsertId();
Expand Down Expand Up @@ -482,6 +481,7 @@ public function updatePreviewDataBulk(Message ...$messages): array {
->set('preview_text', $query->createParameter('preview_text'))
->set('structure_analyzed', $query->createNamedParameter(true, IQueryBuilder::PARAM_BOOL))
->set('updated_at', $query->createNamedParameter($this->timeFactory->getTime(), IQueryBuilder::PARAM_INT))
->set('imip_message', $query->createParameter('imip_message'))
->where($query->expr()->andX(
$query->expr()->eq('uid', $query->createParameter('uid')),
$query->expr()->eq('mailbox_id', $query->createParameter('mailbox_id'))
Expand All @@ -505,6 +505,7 @@ public function updatePreviewDataBulk(Message ...$messages): array {
$previewText,
$previewText === null ? IQueryBuilder::PARAM_NULL : IQueryBuilder::PARAM_STR
);
$query->setParameter('imip_message', $message->isImipMessage(), IQueryBuilder::PARAM_BOOL);

$query->execute();
}
Expand All @@ -520,6 +521,50 @@ public function updatePreviewDataBulk(Message ...$messages): array {
return $messages;
}

/**
* @param Message ...$messages
*
* @return Message[]
*/
public function updateImipData(Message ...$messages): array {
$this->db->beginTransaction();

try {
$query = $this->db->getQueryBuilder();
$query->update($this->getTableName())
->set('imip_message', $query->createParameter('imip_message'))
->set('imip_error', $query->createParameter('imip_error'))
->set('imip_processed', $query->createParameter('imip_processed'))
->where($query->expr()->andX(
$query->expr()->eq('uid', $query->createParameter('uid')),
$query->expr()->eq('mailbox_id', $query->createParameter('mailbox_id'))
));

foreach ($messages as $message) {
if (empty($message->getUpdatedFields())) {
// Micro optimization
continue;
}

$query->setParameter('uid', $message->getUid(), IQueryBuilder::PARAM_INT);
$query->setParameter('mailbox_id', $message->getMailboxId(), IQueryBuilder::PARAM_INT);
$query->setParameter('imip_message', $message->isImipMessage(), IQueryBuilder::PARAM_BOOL);
$query->setParameter('imip_error', $message->isImipError(), IQueryBuilder::PARAM_BOOL);
$query->setParameter('imip_processed', $message->isImipProcessed(), IQueryBuilder::PARAM_BOOL);
$query->execute();
}

$this->db->commit();
} catch (Throwable $e) {
// Make sure to always roll back, otherwise the outer code runs in a failed transaction
$this->db->rollBack();

throw $e;
}

return $messages;
}

public function resetPreviewDataFlag(): void {
$qb = $this->db->getQueryBuilder();
$update = $qb->update($this->getTableName())
Expand Down Expand Up @@ -1232,4 +1277,45 @@ public function resetInReplyTo(): int {
);
return $update->execute();
}

/**
* Get all iMIP messages from the last two weeks
* that haven't been processed yet
* @return Message[]
*/
public function findIMipMessagesAscending(): array {
$time = $this->timeFactory->getTime() - 60 * 60 * 24 * 14;
$qb = $this->db->getQueryBuilder();

$select = $qb->select('*')
->from($this->getTableName())
->where(
$qb->expr()->eq('imip_message', $qb->createNamedParameter(true, IQueryBuilder::PARAM_BOOL), IQueryBuilder::PARAM_BOOL),
$qb->expr()->eq('imip_processed', $qb->createNamedParameter(false, IQueryBuilder::PARAM_BOOL), IQueryBuilder::PARAM_BOOL),
$qb->expr()->eq('imip_error', $qb->createNamedParameter(false, IQueryBuilder::PARAM_BOOL), IQueryBuilder::PARAM_BOOL),
$qb->expr()->eq('flag_junk', $qb->createNamedParameter(false, IQueryBuilder::PARAM_BOOL), IQueryBuilder::PARAM_BOOL),
$qb->expr()->gt('sent_at', $qb->createNamedParameter($time, IQueryBuilder::PARAM_INT)),
)->orderBy('sent_at', 'ASC'); // make sure we don't process newer messages first

return $this->findEntities($select);
}

/**
* @return Message[]
*
* @throws \OCP\DB\Exception
*/
public function getUnanalyzed(int $lastRun, array $mailboxIds): array {
$qb = $this->db->getQueryBuilder();

$select = $qb->select('*')
->from($this->getTableName())
->where(
$qb->expr()->lte('sent_at', $qb->createNamedParameter($lastRun. IQueryBuilder::PARAM_INT), IQueryBuilder::PARAM_INT),
$qb->expr()->eq('structure_analyzed', $qb->createNamedParameter(false, IQueryBuilder::PARAM_BOOL), IQueryBuilder::PARAM_BOOL),
$qb->expr()->in('mailbox_id', $qb->createNamedParameter($mailboxIds, IQueryBuilder::PARAM_INT_ARRAY), IQueryBuilder::PARAM_INT_ARRAY),
)->orderBy('sent_at', 'ASC');

return $this->findEntities($select);
}
}
Loading

0 comments on commit 5670311

Please sign in to comment.