Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Meaningful HTTP Producer responses #21

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
sudo: required

language: php
php:
- 7.2
- 7.3
- 7.4

jobs:
include:
# Combination PHP <7.4 and maglnet/composer-require-checker doesn't work with Composer 2
- php: 7.2
env: COMPOSER_VERSION=1.10.16
- php: 7.3
env: COMPOSER_VERSION=1.10.16
- php: 7.4
env: COMPOSER_VERSION=--stable

env:
- ESB_CONSOLE_PORT=8080 ESB_HTTP_SERVER_PORT=34981 ESB_BEANSTALKD_URL=tcp://127.0.0.1:11300 ES_BASE_URI=http://127.0.0.1:9200
global:
- ESB_CONSOLE_PORT=8080 ESB_HTTP_SERVER_PORT=34981 ESB_BEANSTALKD_URL=tcp://127.0.0.1:11300 ES_BASE_URI=http://127.0.0.1:9200

cache:
directories:
Expand All @@ -24,6 +31,8 @@ before_install:
- echo -e '-Ddiscovery.type=single-node\n-XX:+DisableExplicitGC\n-Djdk.io.permissionsUseCanonicalPath=true\n-Dlog4j.skipJansi=true\n-server\n' | sudo tee -a /etc/elasticsearch/jvm.options
- sudo chown -R elasticsearch:elasticsearch /etc/default/elasticsearch
- sudo systemctl start elasticsearch
- composer --verbose self-update $COMPOSER_VERSION
- composer --version

install:
- sudo apt-get update
Expand Down
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"require": {
"php": "~7.2.0|~7.3.0|~7.4.0",
"ext-pcntl": "*",
"ext-json": "*",
"symfony/dependency-injection": "^3.3",
"symfony/config": "^3.3",
"symfony/yaml": "^3.3",
Expand Down
53 changes: 53 additions & 0 deletions src/Exception/HttpResponseException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<?php

declare(strict_types=1);

namespace Webgriffe\Esb\Exception;

class HttpResponseException extends \Exception
{
/**
* @var int
*/
private $httpResponseCode;

/**
* @var string
*/
private $clientMessage;

/**
* @param int $httpResponseCode The HTTP response code to use
* @param string $clientMessage The message to send to the client
* @param string $internalMessage The message to use internally (e.g. store in error logs), when empty the client message is used
* @param int $code The Exception code.
* @param \Throwable|null $previous The previous throwable used for the exception chaining.
*/
public function __construct(
int $httpResponseCode,
string $clientMessage,
string $internalMessage = "",
int $code = 0,
?\Throwable $previous = null
) {
$this->httpResponseCode = $httpResponseCode;
$this->clientMessage = $clientMessage;
parent::__construct($internalMessage ?: $clientMessage, $code, $previous);
}

/**
* @return int
*/
public function getHttpResponseCode(): int
{
return $this->httpResponseCode;
}

/**
* @return string
*/
public function getClientMessage(): string
{
return $this->clientMessage;
}
}
22 changes: 18 additions & 4 deletions src/ProducerInstance.php
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ function ($watcherId) {
public function produceAndQueueJobs($data = null): Promise
{
return call(function () use ($data) {
$jobsCount = 0;
$flushedJobsCount = 0;
$queuedJobsCount = 0;
$caughtException = null;
$job = null;
$test = false;
try {
Expand All @@ -173,7 +175,8 @@ public function produceAndQueueJobs($data = null): Promise
/** @var Job $job */
$job = $jobs->getCurrent();
$job->addEvent(new ProducedJobEvent(new \DateTime(), \get_class($this->producer)));
$jobsCount += yield $this->queueManager->enqueue($job);
$flushedJobsCount += yield $this->queueManager->enqueue($job);
$queuedJobsCount++;
$this->logger->info(
'Successfully produced a new Job',
[
Expand All @@ -184,8 +187,9 @@ public function produceAndQueueJobs($data = null): Promise
);
}

$jobsCount += yield $this->queueManager->flush();
$flushedJobsCount += yield $this->queueManager->flush();
} catch (\Throwable $error) {
$caughtException = $error;
$this->logger->error(
'An error occurred producing/queueing jobs.',
[
Expand All @@ -195,8 +199,18 @@ public function produceAndQueueJobs($data = null): Promise
'test' => $test
]
);

// At least try to flush any previously successfully queued jobs. Don't let an error in parsing job 3
// details also fail jobs 1 and 2.
if ($queuedJobsCount > $flushedJobsCount) {
try {
$flushedJobsCount += yield $this->queueManager->flush();
} catch (\Throwable $nestedError) {
// Ignore any further (duplicated) errors
}
}
}
return $jobsCount;
return new ProducerResult($flushedJobsCount, $caughtException);
});
}

Expand Down
44 changes: 44 additions & 0 deletions src/ProducerResult.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?php

declare(strict_types=1);

namespace Webgriffe\Esb;

class ProducerResult
{
/**
* @var int
*/
private $jobsCount;

/**
* @var \Throwable|null
*/
private $exception;

/**
* @param int $jobsCount
* @param \Throwable|null $exception
*/
public function __construct(int $jobsCount, ?\Throwable $exception = null)
{
$this->jobsCount = $jobsCount;
$this->exception = $exception;
}

/**
* @return int
*/
public function getJobsCount(): int
{
return $this->jobsCount;
}

/**
* @return \Throwable|null
*/
public function getException(): ?\Throwable
{
return $this->exception;
}
}
39 changes: 35 additions & 4 deletions src/Service/HttpProducersServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@
use Amp\Socket;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Webgriffe\Esb\Exception\HttpResponseException;
use Webgriffe\Esb\HttpRequestProducerInterface;
use Webgriffe\Esb\ProducerInstance;
use Webgriffe\Esb\ProducerResult;

use function Amp\call;

/**
Expand Down Expand Up @@ -59,7 +62,7 @@ public function start(): Promise
Socket\listen("[::]:{$this->port}"),
];

$this->httpServer = new \Amp\Http\Server\Server(
$this->httpServer = new Server(
$sockets,
new CallableRequestHandler($this->callableFromInstanceMethod('requestHandler')),
new NullLogger()
Expand Down Expand Up @@ -105,9 +108,37 @@ private function requestHandler(Request $request)
'request' => sprintf('%s %s', strtoupper($request->getMethod()), $request->getUri())
]
);
$jobsCount = yield $producerInstance->produceAndQueueJobs($request);
$responseMessage = sprintf('Successfully scheduled %s job(s) to be queued.', $jobsCount);
return new Response(Status::OK, [], sprintf('"%s"', $responseMessage));
$producerResult = yield $producerInstance->produceAndQueueJobs($request);
return $this->buildResponse($producerResult);
}

/**
* @param ProducerResult $producerResult
* @return Response
*/
private function buildResponse(ProducerResult $producerResult): Response
{
$producerException = $producerResult->getException();
if ($producerException === null) {
$responseCode = Status::OK;
$responseMessage = sprintf('Successfully scheduled %d job(s) to be queued.', $producerResult->getJobsCount());
} else {
$responseCode = Status::INTERNAL_SERVER_ERROR;
$errorMessage = 'Internal server error';

if ($producerException instanceof HttpResponseException) {
$responseCode = $producerException->getHttpResponseCode();
$errorMessage = $producerException->getClientMessage();
}

if ($producerResult->getJobsCount() === 0) {
$responseMessage = sprintf('%s, could not schedule any jobs.', $errorMessage);
} else {
$responseMessage = sprintf('%s, only scheduled the first %d job(s) to be queued.', $errorMessage, $producerResult->getJobsCount());
}
}

return new Response($responseCode, [], sprintf('"%s"', $responseMessage));
}

/**
Expand Down
14 changes: 13 additions & 1 deletion tests/DummyHttpRequestProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
namespace Webgriffe\Esb;

use Amp\Http\Server\Request;
use Amp\Http\Status;
use Amp\Iterator;
use Amp\Producer;
use Amp\Promise;
use Amp\Success;
use Webgriffe\Esb\Exception\HttpResponseException;
use Webgriffe\Esb\Model\Job;

final class DummyHttpRequestProducer implements HttpRequestProducerInterface
Expand Down Expand Up @@ -45,9 +47,19 @@ public function produce($data = null): Iterator
);
}
$body = json_decode(yield $data->getBody()->read(), true);
if (!is_array($body)) {
throw new HttpResponseException(Status::BAD_REQUEST, 'Request body contains invalid JSON');
}
$jobsData = $body['jobs'];
foreach ($jobsData as $jobData) {
yield $emit(new Job([$jobData]));
switch ($jobData) {
case 'throw http response exception':
throw new HttpResponseException(Status::PRECONDITION_FAILED, 'Some other custom message');
case 'throw other exception':
throw new \Exception('This message shouldn\'t be send to the client');
default:
yield $emit(new Job([$jobData]));
}
}
});
}
Expand Down
Loading