Skip to content

Commit

Permalink
Add integration test and make it pass
Browse files Browse the repository at this point in the history
  • Loading branch information
fabianaromagnoli committed Oct 28, 2024
1 parent 4e0dd40 commit bfe5ed8
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 32 deletions.
36 changes: 17 additions & 19 deletions src/Service/QueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -219,29 +219,22 @@ private function processBatch(): \Generator
{
$this->logger->debug('Processing batch');
$result = yield $this->elasticSearch->bulkIndexJobs($this->batch, $this->flowConfig->getTube());
$successfullyIndexedJobs = $this->batch;
$notIndexedJobs = [];

if ($result['errors'] === true) {
$successfullyIndexedJobs = array_filter($this->batch, function ($job) use ($result) {
return $result['items'][$job->getUuid()]['index']['status'] === 201;
});

$notIndexedJobs = array_filter($this->batch, function ($job) use ($result) {
return $result['items'][$job->getUuid()]['index']['status'] !== 201;
});
}

foreach ($notIndexedJobs as $singleJob) {
$this->logger->error(
sprintf(
'Job with UUID "%s" could not be indexed in ElasticSearch',
$singleJob->getUuid()
)
);
foreach ($result['items'] as $item) {
// TODO: Handle missing keys in result array
if (!$this->isSuccessfulStatusCode($item['index']['status'])) {
$uuid = $item['index']['_id'];
unset($this->batch[$uuid]);
$this->logger->error(
'Job could not be indexed in ElasticSearch',
['bulk_index_response_item' => $item]
);
}
}
}

foreach ($successfullyIndexedJobs as $singleJob) {
foreach ($this->batch as $singleJob) {
yield $this->beanstalkClient->put(
$singleJob->getUuid(),
$singleJob->getTimeout(),
Expand Down Expand Up @@ -275,4 +268,9 @@ private function getJobBeanstalkId(JobInterface $job): int

throw new \RuntimeException("Unknown Beanstalk id for job {$uuid}");
}

public function isSuccessfulStatusCode(int $statusCode): bool
{
return $statusCode >= 200 && $statusCode < 300;
}
}
7 changes: 6 additions & 1 deletion tests/DummyFilesystemRepeatProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,12 @@ public function produce($data = null): Iterator
continue;
}
yield $this->longRunningOperation();
yield $emit(new Job(['file' => $file, 'data' => (yield File\read($file))]));
$fileContent = yield File\read($file);
$fileContentAsArray = json_decode($fileContent, true);
$payloadData = is_array($fileContentAsArray) ?
$fileContentAsArray :
['file' => $file, 'data' => $fileContent];
yield $emit(new Job($payloadData));
yield \Amp\File\deleteFile($file);
}
});
Expand Down
14 changes: 2 additions & 12 deletions tests/Integration/ElasticSearchIndexingTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,10 @@
use Webgriffe\Esb\KernelTestCase;
use Webgriffe\Esb\Model\ErroredJobEvent;
use Webgriffe\Esb\Model\Job;
use Webgriffe\Esb\Model\JobEventInterface;
use Webgriffe\Esb\Model\ProducedJobEvent;
use Webgriffe\Esb\Model\ReservedJobEvent;
use Webgriffe\Esb\Model\WorkedJobEvent;
use Webgriffe\Esb\TestUtils;
use Webgriffe\Esb\Unit\Model\DummyJobEvent;
use function Amp\Http\formatDateHeader;

class ElasticSearchIndexingTest extends KernelTestCase
{
Expand Down Expand Up @@ -199,16 +196,9 @@ public function itLogsAndSkipsJobsThatCouldNotBeIndexedOntoElasticSearchWithAllE
Loop::delay(
200,
function () use ($producerDir) {
touch($producerDir . DIRECTORY_SEPARATOR . 'job1');
// TODO: It needs to become a document with more than 1000 fields
$veryLargeDocument = 'TODO';
$veryLargeDocument = json_encode(array_fill_keys(range(1, 1001), 'value'));
file_put_contents($producerDir . DIRECTORY_SEPARATOR . 'job1', $veryLargeDocument);
Loop::delay(
200,
function () use ($producerDir) {
touch($producerDir . DIRECTORY_SEPARATOR . 'job2');
}
);
touch($producerDir . DIRECTORY_SEPARATOR . 'job2');
}
);
$this->stopWhen(function () {
Expand Down

0 comments on commit bfe5ed8

Please sign in to comment.