Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add websocket feature #3

Merged
merged 6 commits into from
Dec 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
318 changes: 258 additions & 60 deletions README.md

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
"require": {
"php": "^8.1",
"farzai/support": "^1.2",
"farzai/transport": "^1.2"
"farzai/transport": "^1.2",
"phrity/websocket": "^2.0"
},
"require-dev": {
"pestphp/pest": "^2.15",
Expand Down
22 changes: 22 additions & 0 deletions example/live-order-book.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

require_once __DIR__.'/../vendor/autoload.php';

use Farzai\Bitkub\ClientBuilder;
use Farzai\Bitkub\WebSocket\Endpoints\LiveOrderBookEndpoint;
use Farzai\Bitkub\WebSocket\Message;
use Farzai\Bitkub\WebSocketClient;

$websocket = new LiveOrderBookEndpoint(
new WebSocketClient(
ClientBuilder::create()
->setCredentials('YOUR_API_KEY', 'YOUR_SECRET')
->build()
)
);

$websocket->listen('THB_ADA', function (Message $message) {
echo $message->json('event').PHP_EOL;
});

$websocket->run();
17 changes: 17 additions & 0 deletions example/stream-trade.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

require_once __DIR__.'/../vendor/autoload.php';

$websocket = new \Farzai\Bitkub\WebSocket\Endpoints\MarketEndpoint(
new \Farzai\Bitkub\WebSocketClient(
\Farzai\Bitkub\ClientBuilder::create()
->setCredentials('YOUR_API_KEY', 'YOUR_SECRET')
->build()
)
);

$websocket->listen('trade.thb_ada', function (Farzai\Bitkub\WebSocket\Message $message) {
echo $message->json('sym').PHP_EOL;
});

$websocket->run();
8 changes: 8 additions & 0 deletions src/Contracts/WebSocketEngineInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?php

namespace Farzai\Bitkub\Contracts;

interface WebSocketEngineInterface
{
public function handle(array $listeners): void;
}
6 changes: 3 additions & 3 deletions src/Endpoints/AbstractEndpoint.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

namespace Farzai\Bitkub\Endpoints;

use Farzai\Bitkub\Client;
use Farzai\Bitkub\Contracts\ClientInterface;
use Farzai\Bitkub\Requests\PendingRequest;

abstract class AbstractEndpoint
{
protected Client $client;
protected ClientInterface $client;

public function __construct(Client $client)
public function __construct(ClientInterface $client)
{
$this->client = $client;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Endpoints/UserEndpoint.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public function tradingCredits(): ResponseInterface
* }
* }
*/
public function userLimits(): ResponseInterface
public function limits(): ResponseInterface
{
$config = $this->client->getConfig();

Expand Down
11 changes: 0 additions & 11 deletions src/Exceptions/BadRequestException.php

This file was deleted.

14 changes: 14 additions & 0 deletions src/UriFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

namespace Farzai\Bitkub;

use Phrity\Net\Uri;
use Psr\Http\Message\UriInterface;

class UriFactory
{
public static function createFromUri(string $uri): UriInterface
{
return new Uri($uri);
}
}
28 changes: 28 additions & 0 deletions src/WebSocket/Endpoints/AbstractEndpoint.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

namespace Farzai\Bitkub\WebSocket\Endpoints;

use Farzai\Bitkub\WebSocketClient;

abstract class AbstractEndpoint
{
protected WebSocketClient $websocket;

public function __construct(WebSocketClient $websocket)
{
$this->websocket = $websocket;
}

/**
* Run the websocket.
*/
public function run()
{
$this->websocket->run();
}

protected function getLogger()
{
return $this->websocket->getLogger();
}
}
49 changes: 49 additions & 0 deletions src/WebSocket/Endpoints/LiveOrderBookEndpoint.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

namespace Farzai\Bitkub\WebSocket\Endpoints;

use Farzai\Bitkub\Endpoints as RestApiEndpoints;

class LiveOrderBookEndpoint extends AbstractEndpoint
{
/**
* Add event listener.
*
* @example $websocket->listen('thb_btc', function (Message $message) {
* echo $message->json('sym').PHP_EOL;
* });
*
* @param string|int $symbol Symbol name or id.
* @param callable|array<callable> $listeners
*/
public function listen($symbol, $listeners)
{
// Check if symbol is numeric.
if (! is_numeric($symbol)) {

$this->getLogger()->debug('Find symbol id by name: '.$symbol);

// Find symbol id by name.
$market = new RestApiEndpoints\MarketEndpoint($this->websocket->getClient());

foreach ($market->symbols()->throw()->json('result') as $item) {
if ($item['symbol'] === strtoupper(trim($symbol))) {
$symbol = $item['id'];

$this->getLogger()->debug('Found symbol id: '.$symbol);
break;
}
}

if (! is_numeric($symbol)) {
$this->getLogger()->debug('Invalid symbol name. Given: '.$symbol);

throw new \InvalidArgumentException('Invalid symbol name. Given: '.$symbol);
}
}

$this->websocket->addListener('orderbook/'.$symbol, $listeners);

return $this;
}
}
48 changes: 48 additions & 0 deletions src/WebSocket/Endpoints/MarketEndpoint.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?php

namespace Farzai\Bitkub\WebSocket\Endpoints;

class MarketEndpoint extends AbstractEndpoint
{
/**
* Add event listener.
*
* @example $websocket->listen('market.trade.thb_btc', function (Message $message) {
* echo $message->json('sym').PHP_EOL;
* });
*
* @param string[]|string $streamName
* @param callable|array<callable> $listeners
*/
public function listen($streamName, $listeners)
{
if (is_string($streamName)) {
$streamNames = array_map('trim', explode(',', $streamName));
} else {
$streamNames = $streamName;
}

$this->getLogger()->debug('Add event listener for stream: '.implode(', ', $streamNames));

foreach ($streamNames as $name) {
$this->websocket->addListener($this->getStreamName($name), $listeners);
}

return $this;
}

private function getStreamName(string $streamName): string
{
$segments = explode('.', $streamName);

if (count($segments) === 3) {
return $streamName;
}

if (count($segments) === 2) {
return 'market.'.$streamName;
}

throw new \InvalidArgumentException('Invalid stream name format. Given: '.$streamName);

Check warning on line 46 in src/WebSocket/Endpoints/MarketEndpoint.php

View check run for this annotation

Codecov / codecov/patch

src/WebSocket/Endpoints/MarketEndpoint.php#L46

Added line #L46 was not covered by tests
}
}
74 changes: 74 additions & 0 deletions src/WebSocket/Engine.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
<?php

namespace Farzai\Bitkub\WebSocket;

use Farzai\Bitkub\Contracts\WebSocketEngineInterface;
use Farzai\Support\Carbon;
use Psr\Log\LoggerInterface;
use WebSocket\Client as WebSocketClient;
use WebSocket\Connection as WebSocketConnection;
use WebSocket\Message\Message as WebSocketMessage;
use WebSocket\Middleware as WebSocketMiddleware;

class Engine implements WebSocketEngineInterface
{
public function __construct(
private LoggerInterface $logger,
) {
}

public function handle(array $listeners): void

Check warning on line 20 in src/WebSocket/Engine.php

View check run for this annotation

Codecov / codecov/patch

src/WebSocket/Engine.php#L20

Added line #L20 was not covered by tests
{
$events = $this->getEventNames($listeners);

Check warning on line 22 in src/WebSocket/Engine.php

View check run for this annotation

Codecov / codecov/patch

src/WebSocket/Engine.php#L22

Added line #L22 was not covered by tests

$this->logger->info('[WebSocket] - Connecting to WebSocket server...');

Check warning on line 24 in src/WebSocket/Engine.php

View check run for this annotation

Codecov / codecov/patch

src/WebSocket/Engine.php#L24

Added line #L24 was not covered by tests

$this->logger->debug('[WebSocket] - Events: '.implode(', ', $events));

Check warning on line 26 in src/WebSocket/Engine.php

View check run for this annotation

Codecov / codecov/patch

src/WebSocket/Engine.php#L26

Added line #L26 was not covered by tests

$client = new WebSocketClient('wss://api.bitkub.com/websocket-api/'.implode(',', $events));

Check warning on line 28 in src/WebSocket/Engine.php

View check run for this annotation

Codecov / codecov/patch

src/WebSocket/Engine.php#L28

Added line #L28 was not covered by tests

$client
->addMiddleware(new WebSocketMiddleware\CloseHandler())
->addMiddleware(new WebSocketMiddleware\PingResponder());

Check warning on line 32 in src/WebSocket/Engine.php

View check run for this annotation

Codecov / codecov/patch

src/WebSocket/Engine.php#L30-L32

Added lines #L30 - L32 were not covered by tests

$client->onText(function (WebSocketClient $client, WebSocketConnection $connection, WebSocketMessage $message) use ($listeners) {
$receivedAt = Carbon::now();

Check warning on line 35 in src/WebSocket/Engine.php

View check run for this annotation

Codecov / codecov/patch

src/WebSocket/Engine.php#L34-L35

Added lines #L34 - L35 were not covered by tests

$data = @json_decode($message->getContent(), true) ?? [];
if (! isset($data['stream'])) {
$this->logger->warning('[WebSocket] - Unknown data: '.$message->getContent());

Check warning on line 39 in src/WebSocket/Engine.php

View check run for this annotation

Codecov / codecov/patch

src/WebSocket/Engine.php#L37-L39

Added lines #L37 - L39 were not covered by tests

return;

Check warning on line 41 in src/WebSocket/Engine.php

View check run for this annotation

Codecov / codecov/patch

src/WebSocket/Engine.php#L41

Added line #L41 was not covered by tests
}

$event = $data['stream'];
if (! isset($listeners[$event])) {
$this->logger->warning('[WebSocket] - Unknown event: '.$event);

Check warning on line 46 in src/WebSocket/Engine.php

View check run for this annotation

Codecov / codecov/patch

src/WebSocket/Engine.php#L44-L46

Added lines #L44 - L46 were not covered by tests

return;

Check warning on line 48 in src/WebSocket/Engine.php

View check run for this annotation

Codecov / codecov/patch

src/WebSocket/Engine.php#L48

Added line #L48 was not covered by tests
}

$message = new Message(
$message->getContent(),
$receivedAt->toDateTimeImmutable(),
);

Check warning on line 54 in src/WebSocket/Engine.php

View check run for this annotation

Codecov / codecov/patch

src/WebSocket/Engine.php#L51-L54

Added lines #L51 - L54 were not covered by tests

foreach ($listeners[$event] as $listener) {
$this->logger->info('[WebSocket] - Event: '.$event);

Check warning on line 57 in src/WebSocket/Engine.php

View check run for this annotation

Codecov / codecov/patch

src/WebSocket/Engine.php#L56-L57

Added lines #L56 - L57 were not covered by tests

$listener($message);

Check warning on line 59 in src/WebSocket/Engine.php

View check run for this annotation

Codecov / codecov/patch

src/WebSocket/Engine.php#L59

Added line #L59 was not covered by tests
}
});

Check warning on line 61 in src/WebSocket/Engine.php

View check run for this annotation

Codecov / codecov/patch

src/WebSocket/Engine.php#L61

Added line #L61 was not covered by tests

$client->onClose(function () {
$this->logger->info('[WebSocket] - Connection closed.');
});

Check warning on line 65 in src/WebSocket/Engine.php

View check run for this annotation

Codecov / codecov/patch

src/WebSocket/Engine.php#L63-L65

Added lines #L63 - L65 were not covered by tests

$client->start();

Check warning on line 67 in src/WebSocket/Engine.php

View check run for this annotation

Codecov / codecov/patch

src/WebSocket/Engine.php#L67

Added line #L67 was not covered by tests
}

private function getEventNames(array $listeners): array

Check warning on line 70 in src/WebSocket/Engine.php

View check run for this annotation

Codecov / codecov/patch

src/WebSocket/Engine.php#L70

Added line #L70 was not covered by tests
{
return array_unique(array_keys($listeners));

Check warning on line 72 in src/WebSocket/Engine.php

View check run for this annotation

Codecov / codecov/patch

src/WebSocket/Engine.php#L72

Added line #L72 was not covered by tests
}
}
Loading