Skip to content

Commit

Permalink
Replace QueueItem with ItemImport model (#123)
Browse files Browse the repository at this point in the history
  • Loading branch information
lruozzi9 committed Aug 29, 2022
1 parent 8fb93c7 commit 37d67da
Show file tree
Hide file tree
Showing 16 changed files with 62 additions and 540 deletions.
104 changes: 0 additions & 104 deletions src/Command/ConsumeCommand.php
Original file line number Diff line number Diff line change
@@ -1,104 +0,0 @@
<?php

declare(strict_types=1);

namespace Webgriffe\SyliusAkeneoPlugin\Command;

use Doctrine\ORM\EntityManagerInterface;
use Doctrine\Persistence\ManagerRegistry;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Command\LockableTrait;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Webgriffe\SyliusAkeneoPlugin\ImporterInterface;
use Webgriffe\SyliusAkeneoPlugin\ImporterRegistryInterface;
use Webgriffe\SyliusAkeneoPlugin\Repository\QueueItemRepositoryInterface;

final class ConsumeCommand extends Command
{
use LockableTrait;

protected static $defaultName = 'webgriffe:akeneo:consume';

public function __construct(
private QueueItemRepositoryInterface $queueItemRepository,
private ImporterRegistryInterface $importerRegistry,
private ManagerRegistry $managerRegistry,
) {
parent::__construct();
}

protected function configure(): void
{
$this->setDescription('Process the Queue by calling the proper importer for each item');
}

/**
* @throws \Throwable
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
if (!$this->lock()) {
$output->writeln('The command is already running in another process.');

return 0;
}

$queueItems = $this->queueItemRepository->findAllToImport();
foreach ($queueItems as $queueItem) {
$akeneoIdentifier = $queueItem->getAkeneoIdentifier();

try {
$importer = $this->resolveImporter($queueItem->getAkeneoEntity());
$importer->import($akeneoIdentifier);
$queueItem->setImportedAt(new \DateTime());
$queueItem->setErrorMessage(null);
} catch (\Throwable $t) {
/** @var EntityManagerInterface $objectManager */
$objectManager = $this->managerRegistry->getManager();
if (!$objectManager->isOpen()) {
$this->release();

throw $t;
}
$queueItem->setErrorMessage($t->getMessage() . \PHP_EOL . $t->getTraceAsString());
$output->writeln(
sprintf(
'There has been an error importing <info>%s</info> entity with identifier <info>%s</info>. ' .
'The error was: <error>%s</error>.',
$queueItem->getAkeneoEntity(),
$akeneoIdentifier,
$t->getMessage(),
),
);
if ($output->isVeryVerbose()) {
$output->writeln((string) $t);
}
}

$this->queueItemRepository->add($queueItem);
$output->writeln(
sprintf(
'<info>%s</info> entity with identifier <info>%s</info> has been imported.',
$queueItem->getAkeneoEntity(),
$akeneoIdentifier,
),
);
}

$this->release();

return 0;
}

private function resolveImporter(string $akeneoEntity): ImporterInterface
{
foreach ($this->importerRegistry->all() as $importer) {
if ($importer->getAkeneoEntity() === $akeneoEntity) {
return $importer;
}
}

throw new \RuntimeException(sprintf('Cannot find suitable importer for entity "%s".', $akeneoEntity));
}
}
61 changes: 24 additions & 37 deletions src/Command/EnqueueCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@

namespace Webgriffe\SyliusAkeneoPlugin\Command;

use Sylius\Component\Resource\Factory\FactoryInterface;
use DateTime;
use InvalidArgumentException;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Command\LockableTrait;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Throwable;
use Webgriffe\SyliusAkeneoPlugin\DateTimeBuilderInterface;
use Webgriffe\SyliusAkeneoPlugin\Entity\QueueItemInterface;
use Webgriffe\SyliusAkeneoPlugin\ImporterInterface;
use Webgriffe\SyliusAkeneoPlugin\ImporterRegistryInterface;
use Webgriffe\SyliusAkeneoPlugin\Repository\QueueItemRepositoryInterface;
use Webgriffe\SyliusAkeneoPlugin\Message\ItemImport;
use Webmozart\Assert\Assert;

final class EnqueueCommand extends Command
Expand All @@ -32,10 +34,9 @@ final class EnqueueCommand extends Command
protected static $defaultName = 'webgriffe:akeneo:enqueue';

public function __construct(
private QueueItemRepositoryInterface $queueItemRepository,
private FactoryInterface $queueItemFactory,
private DateTimeBuilderInterface $dateTimeBuilder,
private ImporterRegistryInterface $importerRegistry,
private MessageBusInterface $messageBus,
) {
parent::__construct();
}
Expand Down Expand Up @@ -76,18 +77,18 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$sinceFilePath = null;
if ('' !== $sinceOptionValue = (string) $input->getOption(self::SINCE_OPTION_NAME)) {
try {
$sinceDate = new \DateTime($sinceOptionValue);
} catch (\Throwable) {
throw new \InvalidArgumentException(
$sinceDate = new DateTime($sinceOptionValue);
} catch (Throwable) {
throw new InvalidArgumentException(
sprintf('The "%s" argument must be a valid date', self::SINCE_OPTION_NAME),
);
}
} elseif ('' !== $sinceFilePath = (string) $input->getOption(self::SINCE_FILE_OPTION_NAME)) {
$sinceDate = $this->getSinceDateByFile($sinceFilePath);
} elseif ($input->getOption(self::ALL_OPTION_NAME) === true) {
$sinceDate = (new \DateTime())->setTimestamp(0);
$sinceDate = (new DateTime())->setTimestamp(0);
} else {
throw new \InvalidArgumentException(
throw new InvalidArgumentException(
sprintf(
'One of "--%s", "--%s" or "--%s" option must be specified',
self::SINCE_OPTION_NAME,
Expand Down Expand Up @@ -118,15 +119,11 @@ protected function execute(InputInterface $input, OutputInterface $output): int
continue;
}
foreach ($identifiers as $identifier) {
if ($this->isEntityAlreadyQueuedToImport($importer->getAkeneoEntity(), $identifier)) {
continue;
}
$queueItem = $this->queueItemFactory->createNew();
Assert::isInstanceOf($queueItem, QueueItemInterface::class);
$queueItem->setAkeneoEntity($importer->getAkeneoEntity());
$queueItem->setAkeneoIdentifier($identifier);
$queueItem->setCreatedAt(new \DateTime());
$this->queueItemRepository->add($queueItem);
$itemImport = new ItemImport(
$importer->getAkeneoEntity(),
$identifier
);
$this->messageBus->dispatch($itemImport);
$output->writeln(
sprintf(
'<info>%s</info> entity with identifier <info>%s</info> enqueued.',
Expand All @@ -146,50 +143,40 @@ protected function execute(InputInterface $input, OutputInterface $output): int
return 0;
}

private function getSinceDateByFile(string $filepath): \DateTime
private function getSinceDateByFile(string $filepath): DateTime
{
if (!file_exists($filepath)) {
throw new \InvalidArgumentException(
throw new InvalidArgumentException(
sprintf('The file "%s" does not exists', $filepath),
);
}
if (!is_readable($filepath)) {
throw new \InvalidArgumentException(
throw new InvalidArgumentException(
sprintf('The file "%s" is not readable', $filepath),
);
}
if (!is_writable($filepath)) {
throw new \InvalidArgumentException(
throw new InvalidArgumentException(
sprintf('The file "%s" is not writable', $filepath),
);
}

try {
$content = file_get_contents($filepath);
Assert::string($content);
$sinceDate = new \DateTime(trim($content));
} catch (\Throwable $t) {
$sinceDate = new DateTime(trim($content));
} catch (Throwable $t) {
throw new \RuntimeException(sprintf('The file "%s" must contain a valid datetime', $filepath), 0, $t);
}

return $sinceDate;
}

private function writeSinceDateFile(string $filepath, \DateTime $runDate): void
private function writeSinceDateFile(string $filepath, DateTime $runDate): void
{
file_put_contents($filepath, $runDate->format('c'));
}

private function isEntityAlreadyQueuedToImport(string $akeneoEntity, string $akeneoIdentifier): bool
{
$queueItem = $this->queueItemRepository->findOneToImport($akeneoEntity, $akeneoIdentifier);
if ($queueItem !== null) {
return true;
}

return false;
}

/**
* @return ImporterInterface[]
*/
Expand Down Expand Up @@ -220,7 +207,7 @@ private function getImporters(InputInterface $input): array
$importers = [];
foreach ($importersToUse as $importerToUse) {
if (!array_key_exists($importerToUse, $allImporters)) {
throw new \InvalidArgumentException(sprintf('Importer "%s" does not exists.', $importerToUse));
throw new InvalidArgumentException(sprintf('Importer "%s" does not exists.', $importerToUse));
}
$importers[] = $allImporters[$importerToUse];
}
Expand Down
97 changes: 0 additions & 97 deletions src/Command/QueueCleanupCommand.php
Original file line number Diff line number Diff line change
@@ -1,97 +0,0 @@
<?php

declare(strict_types=1);

namespace Webgriffe\SyliusAkeneoPlugin\Command;

use DateInterval;
use DateTime;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Webgriffe\SyliusAkeneoPlugin\DateTimeBuilder;
use Webgriffe\SyliusAkeneoPlugin\Entity\QueueItem;
use Webgriffe\SyliusAkeneoPlugin\Repository\CleanableQueueItemRepositoryInterface;

final class QueueCleanupCommand extends Command
{
public const SUCCESS = 0;

public const FAILURE = 1;

private const DEFAULT_DAYS = 10;

private const DAYS_ARGUMENT_NAME = 'days';

// the name of the command (the part after "bin/console")
protected static $defaultName = 'webgriffe:akeneo:cleanup-queue';

/**
* QueueCleanupCommand constructor.
*/
public function __construct(private CleanableQueueItemRepositoryInterface $queueItemRepository)
{
parent::__construct();
}

protected function configure(): void
{
$this
->setDescription('Clean the Akeneo\'s queue of items older than N days.')
->setHelp('This command allows you to clean the Akeneo\'s queue of item older than a specificed numbers of days.')
->addArgument(
self::DAYS_ARGUMENT_NAME,
InputArgument::OPTIONAL,
'Number of days from which to purge the queue of previous items',
(string) (self::DEFAULT_DAYS),
)
;
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$numberOfDays = self::DEFAULT_DAYS;
// get the number of days from user
$numberOfDaysEntered = $input->getArgument(self::DAYS_ARGUMENT_NAME);
if ($numberOfDaysEntered !== null) {
if (!is_string($numberOfDaysEntered) || (int) $numberOfDaysEntered < 0) {
$output->writeln('Sorry, the number of days entered is not valid!');

return self::FAILURE;
}
$numberOfDays = (int) $numberOfDaysEntered;
}

// get the beginning date
$dateToDelete = $this->getPreviousDateNDays($numberOfDays);

$queueItems = $this->queueItemRepository->findToCleanup($dateToDelete);

if (count($queueItems) === 0) {
$output->writeln('There are no items to clean');

return self::SUCCESS;
}

/** @var QueueItem $queueItem */
foreach ($queueItems as $queueItem) {
$this->queueItemRepository->remove($queueItem);
}

$output->writeln(sprintf('<info>%s</info> items imported before <info>%s</info> has been deleted.', count($queueItems), $dateToDelete->format('Y-m-d H:i:s')));

return self::SUCCESS;
}

/**
* @throws \Exception
*/
private function getPreviousDateNDays(int $numberOfDays): DateTime
{
$dtBuilder = new DateTimeBuilder();
$today = $dtBuilder->build();

return $today->sub(new DateInterval(sprintf('P%dD', $numberOfDays)));
}
}
Loading

0 comments on commit 37d67da

Please sign in to comment.