Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not enqueue jobs that could not be indexed #36

Merged
merged 3 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Service/ElasticSearch.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public function indexJob(JobInterface $job, string $indexName): Amp\Promise
public function bulkIndexJobs(array $jobs, string $indexName): Amp\Promise
{
return Amp\call(function () use ($jobs, $indexName) {
yield from $this->doBulkIndexJobs($jobs, $indexName);
return yield from $this->doBulkIndexJobs($jobs, $indexName);
});
}

Expand Down Expand Up @@ -136,7 +136,7 @@ private function doBulkIndexJobs(array $jobs, string $indexName): Generator
$body[] = ['index' => ['_id' => $job->getUuid()]];
$body[] = (array)$this->normalizer->normalize($job, 'json');
}
yield $this->client->bulk($body, $indexName);
return yield $this->client->bulk($body, $indexName);
}

/**
Expand Down
25 changes: 23 additions & 2 deletions src/Service/QueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,30 @@ private function jobExists(string $jobUuid): Promise
private function processBatch(): \Generator
{
$this->logger->debug('Processing batch');
yield $this->elasticSearch->bulkIndexJobs($this->batch, $this->flowConfig->getTube());
$result = yield $this->elasticSearch->bulkIndexJobs($this->batch, $this->flowConfig->getTube());
$successfullyIndexedJobs = $this->batch;
$notIndexedJobs = [];

if ($result['errors'] === true) {
$successfullyIndexedJobs = array_filter($this->batch, function ($job) use ($result) {
return $result['items'][$job->getUuid()]['index']['status'] === 201;
fabianaromagnoli marked this conversation as resolved.
Show resolved Hide resolved
});

$notIndexedJobs = array_filter($this->batch, function ($job) use ($result) {
return $result['items'][$job->getUuid()]['index']['status'] !== 201;
fabianaromagnoli marked this conversation as resolved.
Show resolved Hide resolved
});
}

foreach ($notIndexedJobs as $singleJob) {
$this->logger->error(
sprintf(
'Job with UUID "%s" could not be indexed in ElasticSearch',
$singleJob->getUuid()
)
);
}

foreach ($this->batch as $singleJob) {
foreach ($successfullyIndexedJobs as $singleJob) {
yield $this->beanstalkClient->put(
fabianaromagnoli marked this conversation as resolved.
Show resolved Hide resolved
$singleJob->getUuid(),
$singleJob->getTimeout(),
Expand Down
56 changes: 56 additions & 0 deletions tests/Integration/ElasticSearchIndexingTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
use Webgriffe\Esb\KernelTestCase;
use Webgriffe\Esb\Model\ErroredJobEvent;
use Webgriffe\Esb\Model\Job;
use Webgriffe\Esb\Model\JobEventInterface;
use Webgriffe\Esb\Model\ProducedJobEvent;
use Webgriffe\Esb\Model\ReservedJobEvent;
use Webgriffe\Esb\Model\WorkedJobEvent;
use Webgriffe\Esb\TestUtils;
use Webgriffe\Esb\Unit\Model\DummyJobEvent;
use function Amp\Http\formatDateHeader;

class ElasticSearchIndexingTest extends KernelTestCase
{
Expand Down Expand Up @@ -172,6 +175,59 @@ function (Job $job) {
);
}

/**
* @test
*/
public function itLogsAndSkipsJobsThatCouldNotBeIndexedOntoElasticSearchWithAllEvents()
{
$producerDir = vfsStream::url('root/producer_dir');
$workerFile = vfsStream::url('root/worker.data');
self::createKernel([
'services' => [
DummyFilesystemRepeatProducer::class => ['arguments' => [$producerDir]],
DummyFilesystemWorker::class => ['arguments' => [$workerFile]],
],
'flows' => [
self::FLOW_CODE => [
'description' => 'ElasticSearch Indexing Test Repeat Flow',
'producer' => ['service' => DummyFilesystemRepeatProducer::class],
'worker' => ['service' => DummyFilesystemWorker::class],
]
]
]);
mkdir($producerDir);
Loop::delay(
200,
function () use ($producerDir) {
touch($producerDir . DIRECTORY_SEPARATOR . 'job1');
// TODO: It needs to become a document with more than 1000 fields
$veryLargeDocument = 'TODO';
file_put_contents($producerDir . DIRECTORY_SEPARATOR . 'job1', $veryLargeDocument);
Loop::delay(
200,
function () use ($producerDir) {
touch($producerDir . DIRECTORY_SEPARATOR . 'job2');
}
);
}
);
$this->stopWhen(function () {
$successLog = array_filter(
$this->logHandler()->getRecords(),
function ($log) {
return strpos($log['message'], 'Successfully worked a Job') !== false;
}
);
return count($successLog) >= 1;
});
self::$kernel->boot();

Promise\wait($this->esClient->refresh());
$search = Promise\wait($this->esClient->uriSearchOneIndex(self::FLOW_CODE, ''));
$this->assertCount(1, $search['hits']['hits']); // TODO: Make it green
// TODO: Add assertions on logs
}

private function assertForEachJob(callable $callable, array $jobsData)
{
/** @var Serializer $serializer */
Expand Down