Skip to content

Commit

Permalink
Fixed presence channel joining and leaving
Browse files Browse the repository at this point in the history
  • Loading branch information
slavarazum committed Dec 3, 2023
1 parent 69774d3 commit e8407db
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 113 deletions.
7 changes: 0 additions & 7 deletions src/Listeners/RemoveStoredConnectionListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,6 @@ public function handle(SseConnectionClosedEvent $event)

$fullyExitedChannels = $this->store->removeConnection($event->user, $event->connectionId);

// ray(
// 'exited channels',
// Broadcast::socket(),
// $event->user->name,
// $fullyExitedChannels,
// )->color(request()->user()->id === 1 ? 'blue' : 'green')->label(Broadcast::socket());

foreach ($fullyExitedChannels as $exitInfo) {
broadcast(new PresenceChannelLeaveEvent($event->user->getAuthIdentifierForBroadcasting(), $exitInfo['user_info'], Str::after($exitInfo['channel'], 'presence-')))->toOthers();
}
Expand Down
25 changes: 20 additions & 5 deletions src/Sse/ServerSentEventStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Illuminate\Http\Request;
use Illuminate\Support\Facades\Broadcast;
use Qruto\LaravelWave\BroadcastingUserIdentifier;
use Qruto\LaravelWave\PresenceChannelEvent;
use Qruto\LaravelWave\ServerSentEventSubscriber;
use Qruto\LaravelWave\Storage\BroadcastEventHistory;
use Qruto\LaravelWave\Storage\BroadcastingEvent;
Expand All @@ -36,6 +37,7 @@ public function __construct(
protected ResponseFactory $responseFactory,
protected PresenceChannelUsersRedisRepository $store,
protected BroadcastEventHistory $eventsHistory,
protected PresenceChannelEvent $presenceChannelEvent,
protected ConfigRepository $config
) {
}
Expand All @@ -55,16 +57,21 @@ public function toResponse($request)
$missedEvents = $this->eventsHistory->getEventsFrom($request->header('Last-Event-ID'));

$missedEvents
->filter(fn (BroadcastingEvent $event) => $event->event !== 'connected')
// TODO: except system channel
->filter(fn (BroadcastingEvent $event) => $event->channel !== 'general')
->each($this->eventHandler($request, $lastSocket));
}

$event = EventFactory::create('general', 'connected', $newSocket, $newSocket);
}

// TODO: change general channel name
$this->eventsHistory->pushEvent($event);
tap(
EventFactory::create('general', 'connected', $newSocket, $newSocket),
function (BroadcastingEvent $event) {
$this->eventsHistory->pushEvent($event);

$event->send();
$event->send();
}
);

$this->eventSubscriber->start(function (string $message, string $channel) use ($request, $newSocket) {
$this->eventHandler($request, $newSocket)(EventFactory::fromRedisMessage($message, $channel));
Expand All @@ -87,6 +94,10 @@ protected function eventHandler(Request $request, ?string $socket): Closure
return;
}

if ($request->user() && $this->presenceChannelEvent->isLeaveEvent($event, $request->user())) {
$this->presenceChannelEvent->formatLeaveEventForSending($event);
}

$event->send();
};
}
Expand All @@ -109,6 +120,10 @@ protected function shouldNotSend(BroadcastingEvent $event, ?string $socket, ?Aut
return false;
}

if ($user !== null && $this->presenceChannelEvent->isSelfLeaveEvent($event, $user)) {
return true;
}

return $event->socket === $socket;
}

Expand Down
158 changes: 57 additions & 101 deletions src/Storage/PresenceChannelUsersRedisRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@

namespace Qruto\LaravelWave\Storage;

use Closure;
use Illuminate\Contracts\Auth\Authenticatable;
use Illuminate\Contracts\Redis\Connection;
use Illuminate\Redis\Connections\PhpRedisConnection;
use Illuminate\Redis\Connections\PredisConnection;
use Illuminate\Support\Facades\Redis;
use Illuminate\Support\Str;
use Qruto\LaravelWave\BroadcastingUserIdentifier;

class PresenceChannelUsersRedisRepository implements PresenceChannelUsersRepository
Expand Down Expand Up @@ -48,86 +46,52 @@ private function unserialize(string $value)

public function join(string $channel, Authenticatable $user, array $userInfo, string $connectionId): bool
{
return $this->lock($this->channelMemberKey($channel, 'users'), function () use (
$channel,
$user,
$userInfo,
$connectionId
) {
$userKey = $this->userKey($user);
$usersHashKey = $this->channelMemberKey($channel, 'users');

$firstJoin = false;

// TODO: race condition here ?
if (! $this->db->hexists($usersHashKey, $this->userKey($user))) {
$firstJoin = true;
}

$this->db->transaction(function ($transaction) use (
$user,
$channel,
$userKey,
$userInfo,
$usersHashKey,
$connectionId,
) {
$transaction->sadd(
$this->channelMemberKey($channel, $userKey, 'user_sockets'),
$connectionId
);

$transaction->hset(
$usersHashKey,
$this->userKey($user),
$this->serialize($userInfo)
);

$transaction->sadd(
$this->userChannelsKey($user),
$channel
);
});

return $firstJoin;
});
$userKey = $this->userKey($user);
$usersHashKey = $this->channelMemberKey($channel, 'users');
$socketsSetKey = $this->channelMemberKey($channel, $userKey, 'user_sockets');
$userChannelsKey = $this->userChannelsKey($user);

$luaScript = <<<'LUA'
local firstJoin = redis.call('hexists', KEYS[1], ARGV[1]) == 0
redis.call('sadd', KEYS[2], ARGV[3])
redis.call('hset', KEYS[1], ARGV[1], ARGV[2])
redis.call('sadd', KEYS[3], ARGV[4])
return firstJoin and 1 or 0
LUA;

return $this->db->eval(
$luaScript,
3,
$usersHashKey, $socketsSetKey, $userChannelsKey,
$userKey, $this->serialize($userInfo), $connectionId, $channel
);
}

public function leave(string $channel, Authenticatable $user, string $connectionId): bool
{
return $this->lock($this->channelMemberKey($channel, 'users'), function () use (
$channel,
$user,
$connectionId
) {
$userKey = $this->userKey($user);
$usersHashKey = $this->channelMemberKey($channel, 'users');
$socketsSetKey = $this->channelMemberKey($channel, $userKey, 'user_sockets');

$lastLeave = false;

if ($this->db->sIsMember($socketsSetKey, $connectionId) && $this->db->scard($socketsSetKey) === 1) {
$lastLeave = true;
}

$this->db->transaction(function ($transaction) use (
$user,
$channel,
$connectionId,
$usersHashKey,
$socketsSetKey,
$lastLeave,
) {
$transaction->srem($socketsSetKey, $connectionId);

if ($lastLeave) {
$transaction->srem($this->userChannelsKey($user), $channel);
$transaction->hdel($usersHashKey, $this->userKey($user));
}
});

return $lastLeave;
});
$userKey = $this->userKey($user);
$socketsSetKey = $this->channelMemberKey($channel, $userKey, 'user_sockets');
$usersHashKey = $this->channelMemberKey($channel, 'users');
$userChannelsKey = $this->userChannelsKey($user);

$luaScript = <<<'LUA'
if redis.call('sismember', KEYS[1], ARGV[1]) == 1 then
redis.call('srem', KEYS[1], ARGV[1])
if redis.call('scard', KEYS[1]) == 0 then
redis.call('srem', KEYS[3], ARGV[3])
redis.call('hdel', KEYS[2], ARGV[2])
return 1
end
end
return 0
LUA;

return $this->db->eval(
$luaScript,
3,
$socketsSetKey, $usersHashKey, $userChannelsKey,
$connectionId, $userKey, $channel
);
}

public function getUsers(string $channel): array
Expand All @@ -140,29 +104,21 @@ public function getUsers(string $channel): array

public function removeConnection(Authenticatable $user, string $connectionId): array
{
return $this->lock($this->userChannelsKey($user), function () use ($user, $connectionId) {
$fullyExitedChannels = [];
collect($this->db->smembers($this->userChannelsKey($user)))
->each(function ($channel) use ($user, $connectionId, &$fullyExitedChannels) {
$userInfo = $this->unserialize($this->db->hget(
$this->channelMemberKey($channel, 'users'),
$this->userKey($user)
));

if ($this->leave($channel, $user, $connectionId)) {
$fullyExitedChannels[] = [
'channel' => $channel,
'user_info' => $userInfo,
];
}
});

return $fullyExitedChannels;
});
}
return collect($this->db->smembers($this->userChannelsKey($user)))
->map(function ($channel) use ($user, $connectionId) {
$userInfo = $this->unserialize($this->db->hget(
$this->channelMemberKey($channel, 'users'),
$this->userKey($user)
));

if ($this->leave($channel, $user, $connectionId)) {
return [
'channel' => $channel,
'user_info' => $userInfo,
];
}

private function lock(string $key, Closure $callback)
{
return cache()->lock($key.':lock', 100)->block(115, $callback);
return null;
})->filter()->values()->toArray();
}
}

0 comments on commit e8407db

Please sign in to comment.