Skip to content

Commit

Permalink
Improve handling grpc response headers, propagate response headers wi…
Browse files Browse the repository at this point in the history
…th grpc error metadata
  • Loading branch information
rauanmayemir committed Sep 9, 2024
1 parent ddb3e21 commit e041c42
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 52 deletions.
5 changes: 5 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@
"config": {
"sort-packages": true
},
"extra": {
"branch-alias": {
"3.x": "3.4.x-dev"
}
},
"minimum-stability": "dev",
"prefer-stable": true
}
40 changes: 40 additions & 0 deletions src/Internal/CallContext.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php

declare(strict_types=1);

namespace Spiral\RoadRunner\GRPC\Internal;

use Spiral\RoadRunner\GRPC\ServiceInterface;

/**
* @internal
* @psalm-internal Spiral\RoadRunner\GRPC
*/
final class CallContext
{
/**
* @param class-string<ServiceInterface> $service
* @param non-empty-string $method
* @param array<string, array<string>> $context
*/
public function __construct(
public string $service,
public string $method,
public array $context,
) {
}

/**
* @throws \JsonException
*/
public static function decode(string $payload): self
{
$data = Json::decode($payload);

return new self(
service: $data['service'],
method: $data['method'],
context: $data['context'],
);
}
}
88 changes: 36 additions & 52 deletions src/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Spiral\RoadRunner\GRPC\Exception\GRPCExceptionInterface;
use Spiral\RoadRunner\GRPC\Exception\NotFoundException;
use Spiral\RoadRunner\GRPC\Exception\ServiceException;
use Spiral\RoadRunner\GRPC\Internal\CallContext;
use Spiral\RoadRunner\GRPC\Internal\Json;
use Spiral\RoadRunner\Payload;
use Spiral\RoadRunner\Worker;
Expand All @@ -21,12 +22,6 @@
* @psalm-type ServerOptions = array{
* debug?: bool
* }
*
* @psalm-type ContextResponse = array{
* service: class-string<ServiceInterface>,
* method: non-empty-string,
* context: array<string, array<string>>
* }
*/
final class Server
{
Expand Down Expand Up @@ -63,39 +58,6 @@ public function registerService(string $interface, ServiceInterface $service): v
$this->services[$service->getName()] = $service;
}

/**
* @param ContextResponse $data
* @return array{0: string, 1: string}
* @throws \JsonException
* @throws \Throwable
*/
private function tick(string $body, array $data): array
{
$context = (new Context($data['context']))
->withValue(ResponseHeaders::class, new ResponseHeaders());

$response = $this->invoke($data['service'], $data['method'], $context, $body);

/** @var ResponseHeaders|null $responseHeaders */
$responseHeaders = $context->getValue(ResponseHeaders::class);
$responseHeadersString = $responseHeaders ? $responseHeaders->packHeaders() : '{}';

return [$response, $responseHeadersString];
}

/**
* @psalm-suppress InaccessibleMethod
*/
private function workerSend(WorkerInterface $worker, string $body, string $headers): void
{
$worker->respond(new Payload($body, $headers));
}

private function workerError(WorkerInterface $worker, string $message): void
{
$worker->error($message);
}

/**
* Serve GRPC over given RoadRunner worker.
*/
Expand All @@ -111,15 +73,30 @@ public function serve(WorkerInterface $worker = null, callable $finalize = null)
return;
}

$responseHeaders = new ResponseHeaders();

try {
/** @var ContextResponse $context */
$context = Json::decode($request->header);
$call = CallContext::decode($request->header);

[$answerBody, $answerHeaders] = $this->tick($request->body, $context);
$context = (new Context($call->context))
->withValue(ResponseHeaders::class, $responseHeaders);

$this->workerSend($worker, $answerBody, $answerHeaders);
$response = $this->invoke($call->service, $call->method, $context, $request->body);

$this->workerSend(
worker: $worker,
body: $response,
headers: $responseHeaders->packHeaders()
);
} catch (GRPCExceptionInterface $e) {
$this->workerGrpcError($worker, $e);
$this->workerSend(
worker: $worker,
body: '',
headers: Json::encode([
'error' => $this->createGrpcError($e),
'headers' => $responseHeaders->packHeaders(),
]),
);
} catch (\Throwable $e) {
$this->workerError($worker, $this->isDebugMode() ? (string)$e : $e->getMessage());
} finally {
Expand All @@ -146,7 +123,20 @@ protected function invoke(string $service, string $method, ContextInterface $con
return $this->services[$service]->invoke($method, $context, $body);
}

private function workerGrpcError(WorkerInterface $worker, GRPCExceptionInterface $e): void
private function workerError(WorkerInterface $worker, string $message): void
{
$worker->error($message);
}

/**
* @psalm-suppress InaccessibleMethod
*/
private function workerSend(WorkerInterface $worker, string $body, string $headers): void
{
$worker->respond(new Payload($body, $headers));
}

private function createGrpcError(GRPCExceptionInterface $e): string
{
$status = new Status([
'code' => $e->getCode(),
Expand All @@ -162,13 +152,7 @@ static function ($detail) {
),
]);

$this->workerSend(
$worker,
'',
Json::encode([
'error' => \base64_encode($status->serializeToString()),
]),
);
return \base64_encode($status->serializeToString());
}

/**
Expand Down

0 comments on commit e041c42

Please sign in to comment.