diff --git a/src/ProducerInstance.php b/src/ProducerInstance.php index fa97a17..77fd584 100644 --- a/src/ProducerInstance.php +++ b/src/ProducerInstance.php @@ -173,14 +173,6 @@ public function produceAndQueueJobs($data = null): Promise $job = $jobs->getCurrent(); $job->addEvent(new ProducedJobEvent(new \DateTime(), \get_class($this->producer))); $jobsCount += yield $this->queueManager->enqueue($job); - $this->logger->info( - 'Successfully produced a new Job', - [ - 'producer' => \get_class($this->producer), - 'job_uuid' => $job->getUuid(), - 'payload_data' => NonUtf8Cleaner::clean($job->getPayloadData()) - ] - ); } $jobsCount += yield $this->queueManager->flush(); diff --git a/src/Service/ElasticSearch.php b/src/Service/ElasticSearch.php index b614443..fcacdc8 100644 --- a/src/Service/ElasticSearch.php +++ b/src/Service/ElasticSearch.php @@ -61,7 +61,7 @@ public function indexJob(JobInterface $job, string $indexName): Amp\Promise public function bulkIndexJobs(array $jobs, string $indexName): Amp\Promise { return Amp\call(function () use ($jobs, $indexName) { - yield from $this->doBulkIndexJobs($jobs, $indexName); + return yield from $this->doBulkIndexJobs($jobs, $indexName); }); } @@ -136,7 +136,7 @@ private function doBulkIndexJobs(array $jobs, string $indexName): Generator $body[] = ['index' => ['_id' => $job->getUuid()]]; $body[] = (array)$this->normalizer->normalize($job, 'json'); } - yield $this->client->bulk($body, $indexName); + return yield $this->client->bulk($body, $indexName); } /** diff --git a/src/Service/QueueManager.php b/src/Service/QueueManager.php index f4206ec..5d8c01b 100644 --- a/src/Service/QueueManager.php +++ b/src/Service/QueueManager.php @@ -13,6 +13,7 @@ use Webgriffe\Esb\Model\FlowConfig; use Webgriffe\Esb\Model\Job; use Webgriffe\Esb\Model\JobInterface; +use Webgriffe\Esb\NonUtf8Cleaner; final class QueueManager implements ProducerQueueManagerInterface, WorkerQueueManagerInterface { @@ -96,7 +97,7 @@ public function enqueue(JobInterface $job): Promise ) ); } - $this->batch[] = $job; + $this->batch[$job->getUuid()] = $job; $count = count($this->batch); if ($count < $this->batchSize) { @@ -218,7 +219,28 @@ private function jobExists(string $jobUuid): Promise private function processBatch(): \Generator { $this->logger->debug('Processing batch'); - yield $this->elasticSearch->bulkIndexJobs($this->batch, $this->flowConfig->getTube()); + $result = yield $this->elasticSearch->bulkIndexJobs($this->batch, $this->flowConfig->getTube()); + + if ($result['errors'] === true) { + foreach ($result['items'] as $item) { + if (!array_key_exists('index', $item)) { + $this->logger->error( + 'Unexpected response item in bulk index response', + ['bulk_index_response_item' => $item] + ); + continue; + } + $itemStatusCode = $item['index']['status'] ?? null; + if (!$this->isSuccessfulStatusCode($itemStatusCode)) { + $uuid = $item['index']['_id']; + unset($this->batch[$uuid]); + $this->logger->error( + 'Job could not be indexed in ElasticSearch', + ['bulk_index_response_item' => $item] + ); + } + } + } foreach ($this->batch as $singleJob) { yield $this->beanstalkClient->put( @@ -227,6 +249,14 @@ private function processBatch(): \Generator $singleJob->getDelay(), $singleJob->getPriority() ); + $this->logger->info( + 'Successfully enqueued a new Job', + [ + 'flow_name' => $this->flowConfig->getName(), + 'job_uuid' => $singleJob->getUuid(), + 'payload_data' => NonUtf8Cleaner::clean($singleJob->getPayloadData()) + ] + ); } $this->batch = []; @@ -254,4 +284,9 @@ private function getJobBeanstalkId(JobInterface $job): int throw new \RuntimeException("Unknown Beanstalk id for job {$uuid}"); } + + public function isSuccessfulStatusCode(?int $statusCode): bool + { + return $statusCode !== null && $statusCode >= 200 && $statusCode < 300; + } } diff --git a/tests/DummyFilesystemRepeatProducer.php b/tests/DummyFilesystemRepeatProducer.php index a43f631..2672f3d 100644 --- a/tests/DummyFilesystemRepeatProducer.php +++ b/tests/DummyFilesystemRepeatProducer.php @@ -65,7 +65,12 @@ public function produce($data = null): Iterator continue; } yield $this->longRunningOperation(); - yield $emit(new Job(['file' => $file, 'data' => (yield File\read($file))])); + $fileContent = yield File\read($file); + $fileContentAsArray = json_decode($fileContent, true); + $payloadData = is_array($fileContentAsArray) ? + $fileContentAsArray : + ['file' => $file, 'data' => $fileContent]; + yield $emit(new Job($payloadData)); yield \Amp\File\deleteFile($file); } }); diff --git a/tests/Integration/ElasticSearchIndexingTest.php b/tests/Integration/ElasticSearchIndexingTest.php index ae73503..b81660e 100644 --- a/tests/Integration/ElasticSearchIndexingTest.php +++ b/tests/Integration/ElasticSearchIndexingTest.php @@ -172,6 +172,60 @@ function (Job $job) { ); } + /** + * @test + */ + public function itLogsAndSkipsJobsThatCouldNotBeIndexedOntoElasticSearch() + { + $producerDir = vfsStream::url('root/producer_dir'); + $workerFile = vfsStream::url('root/worker.data'); + self::createKernel([ + 'services' => [ + DummyFilesystemRepeatProducer::class => ['arguments' => [$producerDir]], + DummyFilesystemWorker::class => ['arguments' => [$workerFile]], + ], + 'flows' => [ + self::FLOW_CODE => [ + 'description' => 'ElasticSearch Indexing Test Repeat Flow', + 'producer' => ['service' => DummyFilesystemRepeatProducer::class], + 'worker' => ['service' => DummyFilesystemWorker::class], + ] + ] + ]); + mkdir($producerDir); + Loop::delay( + 200, + function () use ($producerDir) { + $veryLargeDocument = json_encode(array_fill_keys(range(1, 1001), 'value')); + file_put_contents($producerDir . DIRECTORY_SEPARATOR . 'job1', $veryLargeDocument); + touch($producerDir . DIRECTORY_SEPARATOR . 'job2'); + } + ); + $this->stopWhen(function () { + $successLog = array_filter( + $this->logHandler()->getRecords(), + function ($log) { + return strpos($log['message'], 'Successfully worked a Job') !== false; + } + ); + return count($successLog) >= 1; + }); + self::$kernel->boot(); + + Promise\wait($this->esClient->refresh()); + $search = Promise\wait($this->esClient->uriSearchOneIndex(self::FLOW_CODE, '')); + $this->assertCount(1, $search['hits']['hits']); + $this->assertTrue($this->logHandler()->hasErrorThatContains('Job could not be indexed in ElasticSearch')); + $logRecords = $this->logHandler()->getRecords(); + $successfullyIndexedLog = array_filter( + $logRecords, + function ($log) { + return strpos($log['message'], 'Successfully enqueued a new Job') !== false; + } + ); + $this->assertCount(1, $successfullyIndexedLog); + } + private function assertForEachJob(callable $callable, array $jobsData) { /** @var Serializer $serializer */