diff --git a/composer.json b/composer.json index 70448ed..af7ed51 100644 --- a/composer.json +++ b/composer.json @@ -33,7 +33,7 @@ }, "require-dev": { "laravel/pint": "^1.13", - "m6web/redis-mock": "^5.4", + "m6web/redis-mock": "v5.6", "nunomaduro/collision": "^7.10", "nunomaduro/larastan": "^2.6", "orchestra/testbench": "^7.22.1|^8.0", diff --git a/phpunit.xml.dist b/phpunit.xml.dist index cbad2e4..88fb4f7 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -1,14 +1,10 @@ - - - tests - - - - - ./src - - - - - - - - - - + + + tests + + + + + + + + + + + + + + + ./src + + diff --git a/src/Events/PresenceChannelLeaveEvent.php b/src/Events/PresenceChannelLeaveEvent.php index bfccbbb..0fb2f38 100644 --- a/src/Events/PresenceChannelLeaveEvent.php +++ b/src/Events/PresenceChannelLeaveEvent.php @@ -37,14 +37,4 @@ public function broadcastAs(): string { return 'leave'; } - // - // /** - // * Get the data to broadcast. - // * - // * @return array - // */ - // public function broadcastWith(): array - // { - // return $this->userInfo; - // } } diff --git a/src/Storage/BroadcastEventHistoryRedisStream.php b/src/Storage/BroadcastEventHistoryRedisStream.php index 41b6e13..86e37a8 100644 --- a/src/Storage/BroadcastEventHistoryRedisStream.php +++ b/src/Storage/BroadcastEventHistoryRedisStream.php @@ -37,13 +37,15 @@ public function getEventsFrom(string $id): Collection public function lastEventTimestamp(): int { - $keys = array_keys($this->db->xRevRange('broadcasted_events', '+', '-', 1)); + $keys = array_keys($this->db->xRevRange('broadcasted_events', '-', '+', 1)); - return explode('-', reset($keys))[0] ?? 0; + return empty($keys) ? 0 : explode('-', reset($keys))[0]; } public function pushEvent(BroadcastingEvent $event) { + $this->removeOldEvents(); + $eventData = \get_object_vars($event); $eventData['data'] = json_encode($eventData['data']); $id = $this->db->xAdd('broadcasted_events', '*', $eventData); @@ -52,4 +54,15 @@ public function pushEvent(BroadcastingEvent $event) return $id; } + + public function removeOldEvents() + { + // Calculate the threshold timestamp. Events older than this should be removed. + $thresholdTimestamp = now()->subSeconds($this->lifetime)->getPreciseTimestamp(3); + + // Fetch all events up to the threshold + $oldEvents = $this->db->xRange('broadcasted_events', '-', $thresholdTimestamp.'-0'); + + $this->db->xDel('broadcasted_events', \array_keys($oldEvents)); + } } diff --git a/src/Storage/BroadcastingEvent.php b/src/Storage/BroadcastingEvent.php index 2334378..3d328ae 100644 --- a/src/Storage/BroadcastingEvent.php +++ b/src/Storage/BroadcastingEvent.php @@ -14,7 +14,6 @@ public function __construct( public ?string $id, public ?string $socket, ) { - // $this->timestamp = now()->getTimestamp(); } public function send(): void @@ -32,7 +31,7 @@ public static function fake(array $attributes = []): self return new self( channel: $attributes['channel'] ?? fake()->word, name: $attributes['event'] ?? fake()->word, - id: $attributes['id'] ?? fake()->uuid, + id: null, data: $attributes['data'] ?? ['message' => fake()->sentence], socket: $attributes['socket'] ?? fake()->randomNumber(6, true).'.'.fake()->randomNumber(6, true), ); diff --git a/tests/EventStreamResumeTest.php b/tests/Feature/EventStreamResumeTest.php similarity index 93% rename from tests/EventStreamResumeTest.php rename to tests/Feature/EventStreamResumeTest.php index 0ffd1dc..3120777 100644 --- a/tests/EventStreamResumeTest.php +++ b/tests/Feature/EventStreamResumeTest.php @@ -1,8 +1,8 @@ logout(); $connection = waveConnection(); diff --git a/tests/PresenceChannelTest.php b/tests/Feature/PresenceChannelTest.php similarity index 84% rename from tests/PresenceChannelTest.php rename to tests/Feature/PresenceChannelTest.php index b4d8e28..6777112 100644 --- a/tests/PresenceChannelTest.php +++ b/tests/Feature/PresenceChannelTest.php @@ -6,8 +6,8 @@ use Illuminate\Support\Facades\Redis; use Qruto\LaravelWave\Events\PresenceChannelJoinEvent; use Qruto\LaravelWave\Events\SseConnectionClosedEvent; -use Qruto\LaravelWave\Tests\Events\SomePresenceEvent; -use Qruto\LaravelWave\Tests\Events\SomePrivateEvent; +use Qruto\LaravelWave\Tests\Support\Events\SomePresenceEvent; +use Qruto\LaravelWave\Tests\Support\Events\SomePrivateEvent; use Qruto\LaravelWave\Tests\Support\User; use function Pest\Laravel\actingAs; @@ -22,17 +22,20 @@ Event::assertDispatched(PresenceChannelJoinEvent::class); }); -it('stores user in redis presence channel pool', function () { +it('stores user in redis presence channel hash', function () { $connection = waveConnection(); - $key = 'presence_channel:presence-presence-channel:user:'.auth()->user()->getAuthIdentifierForBroadcasting(); + $key = channelMemberKey('presence-presence-channel', 'users'); joinRequest('presence-channel', $this->user, $connection->id()); - expect((bool) Redis::exists($key)) + expect((bool) Redis::hexists($key, $this->user->getAuthIdentifier())) ->toBeTrue() - ->and(json_decode(Redis::hget($key, 'connections'), true)[0]) - ->toBe($connection->id()); + ->and(json_decode(Redis::hget($key, $this->user->getAuthIdentifier()), true)) + ->toBe([ + 'id' => auth()->user()->id, + 'name' => auth()->user()->name, + ]); }); test('join request respond with actual count of channel users', function () { @@ -124,6 +127,9 @@ $connectionRick->assertEventReceived('presence-presence-channel.leave'); $connectionRick->assertEventReceived('presence-presence-channel-2.leave'); + + $connectionMorty->assertEventNotReceived('presence-presence-channel.leave'); + $connectionMorty->assertEventNotReceived('presence-presence-channel-2.leave'); }); it('successfully stores several connections', function () { @@ -133,9 +139,9 @@ joinRequest('presence-channel', $this->user, $connectionOne->id()); joinRequest('presence-channel', $this->user, $connectionTwo->id()); - $key = 'presence_channel:presence-presence-channel:user:'.auth()->user()->getAuthIdentifier(); + $key = channelMemberKey('presence-presence-channel', auth()->user()->getAuthIdentifier(), 'user_sockets'); - $storedUserConnections = json_decode(Redis::hget($key, 'connections'), true); + $storedUserConnections = Redis::smembers($key); expect($storedUserConnections)->toBe([ $connectionOne->id(), @@ -152,12 +158,12 @@ leaveRequest('presence-channel', $this->user, $connectionOne->id()); - $key = 'presence_channel:presence-presence-channel:user:'.auth()->user()->getAuthIdentifier(); + $key = channelMemberKey('presence-presence-channel', auth()->user()->getAuthIdentifier(), 'user_sockets'); - $storedUserConnections = json_decode(Redis::hget($key, 'connections'), true); + $storedUserConnections = Redis::smembers($key); expect($storedUserConnections)->toBe([ - $connectionTwo->id(), + 1 => $connectionTwo->id(), ]); }); diff --git a/tests/WhisperTest.php b/tests/Feature/WhisperTest.php similarity index 100% rename from tests/WhisperTest.php rename to tests/Feature/WhisperTest.php diff --git a/tests/Pest.php b/tests/Pest.php index 1385bf1..23a774a 100644 --- a/tests/Pest.php +++ b/tests/Pest.php @@ -9,6 +9,7 @@ use Qruto\LaravelWave\Tests\Support\User; use Qruto\LaravelWave\Tests\TestCase; +use function Pest\Laravel\actingAs; use function PHPUnit\Framework\assertCount; use function PHPUnit\Framework\assertSame; use function PHPUnit\Framework\assertTrue; @@ -16,15 +17,33 @@ uses(TestCase::class)->in(__DIR__); uses()->beforeEach(function () { + $redisMock = new RedisConnectionMock(); + $this->instance('redis', $redisMock); + $redisMock->flushdb(); + $redisMock->flushEventsQueue(); + $this->user = User::factory()->create(); Broadcast::channel('private-channel', fn () => true); - Broadcast::channel('presence-channel', fn () => ['id' => request()->user()->id, 'name' => request()->user()->name]); + Broadcast::channel('presence-channel', fn () => [ + 'id' => request()->user()->id, + 'name' => request()->user()->name, + ]); $this->actingAs($this->user); })->in(__DIR__); -function waveConnection(Authenticatable $user = null, string $lastEventId = null) +function channelMemberKey(string $channel, string ...$suffixes): string +{ + return implode(':', array_merge(["broadcasting_channels:$channel"], $suffixes)); +} + +function userChannelsKey(Authenticatable $user): string +{ + return implode(':', ['broadcasting_channels', $user->getAuthIdentifier(), 'user_channels']); +} + +function waveConnection(?Authenticatable $user = null, ?string $lastEventId = null): object { return new class($user, $lastEventId) { @@ -141,6 +160,12 @@ public function hasReceived($event) public function getSentEvents() { + $user = auth()->user(); + + if ($this->user) { + actingAs($this->user); + } + if (! $this->sentEvents) { $rawEvents = array_filter(explode("\n\n", $this->response->streamedContent())); $this->sentEvents = Collection::make($rawEvents)->map(function ($event) { @@ -159,6 +184,10 @@ public function getSentEvents() })->mapToGroups(fn ($item) => [$item['event'] => ['data' => $item['data'], 'id' => $item['id']]]); } + if ($user) { + actingAs($user); + } + return $this->sentEvents; } }; diff --git a/tests/RedisConnectionMock.php b/tests/RedisConnectionMock.php index 4250baa..db2bd57 100644 --- a/tests/RedisConnectionMock.php +++ b/tests/RedisConnectionMock.php @@ -17,6 +17,8 @@ class RedisConnectionMock extends RedisMock implements Connection, Factory { public static $events = []; + protected $streams = []; + public function eval($script, $numberOfKeys, ...$arguments) { if (Str::contains($script, 'publish')) { @@ -42,6 +44,31 @@ public function eval($script, $numberOfKeys, ...$arguments) return; } + + if (strpos($script, 'firstJoin') !== false) { + // Handle 'join' script logic + $firstJoin = ! $this->hExists($arguments[0], $arguments[3]); + $this->sAdd($arguments[1], $arguments[5]); + $this->hSet($arguments[0], $arguments[3], $arguments[4]); + $this->sAdd($arguments[2], $arguments[6]); + + return $firstJoin ? 1 : 0; + } + + if (strpos($script, 'sismember') !== false) { + // Handle 'leave' script logic + if ($this->sIsMember($arguments[0], $arguments[3])) { + $this->sRem($arguments[0], $arguments[3]); + if ($this->sCard($arguments[0]) == 0) { + $this->sRem($arguments[2], $arguments[5]); + $this->hDel($arguments[1], $arguments[4]); + + return 1; + } + } + + return 0; + } } public function psubscribe($channels, Closure $callback) @@ -97,4 +124,100 @@ public function transaction($callback) return true; } + + public function xAdd($stream, $id, array $fields) + { + // Create the stream if it doesn't exist + if (! isset($this->streams[$stream])) { + $this->streams[$stream] = []; + } + + // Generate the entry ID + if ($id === '*') { + $timestamp = now()->getPreciseTimestamp(3); // Convert to milliseconds + $sequence = isset($this->lastId[$stream]) ? $this->lastId[$stream][1] + 1 : 0; + $entryId = $timestamp.'-'.$sequence; + + // Update the last ID for the stream + $this->lastId[$stream] = [$timestamp, $sequence]; + } else { + $entryId = $id; + } + + // Add the entry + $this->streams[$stream][$entryId] = $fields; + + return $entryId; + } + + public function xRange($stream, $start, $end, $count = null) + { + if (! isset($this->streams[$stream])) { + return []; + } + + // Filter entries based on start and end IDs + $entries = array_filter( + $this->streams[$stream], + function ($entryId) use ($start, $end) { + $startCheck = $start === '-' || strcmp($entryId, $start) >= 0; + $endCheck = $end === '+' || strcmp($entryId, $end) <= 0; + + return $startCheck && $endCheck; + }, + ARRAY_FILTER_USE_KEY + ); + + // Limit the number of entries if count is specified + if ($count !== null) { + $entries = array_slice($entries, 0, $count, true); + } + + return $entries; + } + + public function xRevRange($stream, $end, $start, $count = null) + { + if (! isset($this->streams[$stream])) { + return []; + } + + // Filter entries based on start and end IDs + $entries = array_filter( + $this->streams[$stream], + function ($entryId) use ($start, $end) { + $startCheck = $start === '+' || strcmp($entryId, $start) <= 0; + $endCheck = $end === '-' || strcmp($entryId, $end) >= 0; + + return $startCheck && $endCheck; + }, + ARRAY_FILTER_USE_KEY + ); + + // Reverse the entries and apply count limit + $entries = array_reverse($entries, true); + if ($count !== null) { + $entries = array_slice($entries, 0, $count, true); + } + + return $entries; + } + + public function xDel($stream, array $ids) + { + if (! isset($this->streams[$stream])) { + return 0; + } + + $deletedCount = 0; + + foreach ($ids as $id) { + if (isset($this->streams[$stream][$id])) { + unset($this->streams[$stream][$id]); + $deletedCount++; + } + } + + return $deletedCount; + } } diff --git a/tests/Events/PublicEvent.php b/tests/Support/Events/PublicEvent.php similarity index 94% rename from tests/Events/PublicEvent.php rename to tests/Support/Events/PublicEvent.php index fea00ca..a79726f 100644 --- a/tests/Events/PublicEvent.php +++ b/tests/Support/Events/PublicEvent.php @@ -1,6 +1,6 @@ history->pushEvent($event)) - ->toBe($event->timestamp) - ->and(cache()->get('broadcasted_events')) - ->toEqual(collect([$event])); + $this->freezeTime(function (Carbon $time) use ($event) { + expect($this->history->pushEvent($event)) + ->toBe($time->getPreciseTimestamp(3).'-0') + ->and(\Illuminate\Support\Facades\Redis::xRange('broadcasted_events', '0', '+')) + ->toEqual([ + $time->getPreciseTimestamp(3).'-0' => [ + 'id' => '', + 'name' => $event->name, + 'channel' => $event->channel, + 'data' => json_encode($event->data), + 'socket' => $event->socket, + ], + ]); + }); + // ->and(cache()->get('broadcasted_events')) + // ->toEqual(collect([$event])); }); it('removes outdated events from Redis history', function () { @@ -29,7 +42,22 @@ $this->history->pushEvent($event2); $this->history->pushEvent($event3); - expect(cache()->get('broadcasted_events'))->toEqual(collect([$event2, $event3])); + expect(\Illuminate\Support\Facades\Redis::xRange('broadcasted_events', '0', '+'))->toEqual([ + $event2->id => [ + 'id' => '', + 'name' => $event2->name, + 'channel' => $event2->channel, + 'data' => json_encode($event2->data), + 'socket' => $event2->socket, + ], + $event3->id => [ + 'id' => '', + 'name' => $event3->name, + 'channel' => $event3->channel, + 'data' => json_encode($event3->data), + 'socket' => $event3->socket, + ], + ]); }); it('gets events from the given id', function () { @@ -58,7 +86,7 @@ $this->history->pushEvent($event2); $this->history->pushEvent($event3); - expect($this->history->lastEventTimestamp())->toBe($event3->timestamp); + expect($this->history->lastEventTimestamp())->toEqual(explode('-', $event3->id)[0]); }); it('returns 0 when there are no events', function () { diff --git a/tests/PresenceChannelUsersRedisRepositoryTest.php b/tests/Unit/PresenceChannelUsersRedisRepositoryTest.php similarity index 91% rename from tests/PresenceChannelUsersRedisRepositoryTest.php rename to tests/Unit/PresenceChannelUsersRedisRepositoryTest.php index 3a525ab..7cfb3ea 100644 --- a/tests/PresenceChannelUsersRedisRepositoryTest.php +++ b/tests/Unit/PresenceChannelUsersRedisRepositoryTest.php @@ -1,6 +1,5 @@ getAuthIdentifier(), 'user_channels']); -} - it('can add a new user to a presence channel', function () use ($connectionId, $channel) { $userKey = $this->user->getAuthIdentifierForBroadcasting(); $usersHashKey = channelMemberKey($channel, 'users'); @@ -73,7 +62,7 @@ function userChannelsKey(Authenticatable $user): string }); it('can remove a user connection from a presence channel', function () use ($connectionId, $channel) { - $this->repository->join($channel, $this->user, ['email' => $this->user->email],'first-connection-id'); + $this->repository->join($channel, $this->user, ['email' => $this->user->email], 'first-connection-id'); $this->repository->join($channel, $this->user, ['email' => $this->user->email], $connectionId); expect($this->repository->leave($channel, $this->user, $connectionId))->toBeFalse(); @@ -116,7 +105,7 @@ function userChannelsKey(Authenticatable $user): string $removedConnections = $this->repository->removeConnection($this->user, $connectionId); - expect($removedConnections)->toEqual([ + expect($removedConnections)->toEqualCanonicalizing([ [ 'channel' => 'channel1', 'user_info' => ['email' => $this->user->email],