Skip to content

Commit

Permalink
Locks for presence channel users storing
Browse files Browse the repository at this point in the history
  • Loading branch information
slavarazum committed Dec 3, 2023
1 parent 1f0ea10 commit 69774d3
Showing 1 changed file with 85 additions and 78 deletions.
163 changes: 85 additions & 78 deletions src/Storage/PresenceChannelUsersRedisRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,72 +48,86 @@ private function unserialize(string $value)

public function join(string $channel, Authenticatable $user, array $userInfo, string $connectionId): bool
{
$userKey = $this->userKey($user);
$usersHashKey = $this->channelMemberKey($channel, 'users');

$firstJoin = false;

if (! $this->db->hexists($usersHashKey, $this->userKey($user))) {
$firstJoin = true;
}

$this->db->transaction(function ($transaction) use (
$user,
return $this->lock($this->channelMemberKey($channel, 'users'), function () use (
$channel,
$userKey,
$user,
$userInfo,
$usersHashKey,
$connectionId,
$connectionId
) {
$transaction->sadd(
$this->channelMemberKey($channel, $userKey, 'user_sockets'),
$connectionId
);
$userKey = $this->userKey($user);
$usersHashKey = $this->channelMemberKey($channel, 'users');

$firstJoin = false;

// TODO: race condition here ?
if (! $this->db->hexists($usersHashKey, $this->userKey($user))) {
$firstJoin = true;
}

$transaction->hset(
$this->db->transaction(function ($transaction) use (
$user,
$channel,
$userKey,
$userInfo,
$usersHashKey,
$this->userKey($user),
$this->serialize($userInfo)
);

$transaction->sadd(
$this->userChannelsKey($user),
$channel
);
});
$connectionId,
) {
$transaction->sadd(
$this->channelMemberKey($channel, $userKey, 'user_sockets'),
$connectionId
);

$transaction->hset(
$usersHashKey,
$this->userKey($user),
$this->serialize($userInfo)
);

$transaction->sadd(
$this->userChannelsKey($user),
$channel
);
});

return $firstJoin;
return $firstJoin;
});
}

public function leave(string $channel, Authenticatable $user, string $connectionId): bool
{
$userKey = $this->userKey($user);
$usersHashKey = $this->channelMemberKey($channel, 'users');
$socketsSetKey = $this->channelMemberKey($channel, $userKey, 'user_sockets');

$lastLeave = false;

if ($this->db->scard($socketsSetKey) === 1) {
$lastLeave = true;
}

$this->db->transaction(function ($transaction) use (
$user,
return $this->lock($this->channelMemberKey($channel, 'users'), function () use (
$channel,
$connectionId,
$usersHashKey,
$socketsSetKey,
&$lastLeave,
$user,
$connectionId
) {
$transaction->srem($socketsSetKey, $connectionId);
$userKey = $this->userKey($user);
$usersHashKey = $this->channelMemberKey($channel, 'users');
$socketsSetKey = $this->channelMemberKey($channel, $userKey, 'user_sockets');

$lastLeave = false;

if ($lastLeave) {
$transaction->srem($this->userChannelsKey($user), $channel);
$transaction->hdel($usersHashKey, $this->userKey($user));
if ($this->db->sIsMember($socketsSetKey, $connectionId) && $this->db->scard($socketsSetKey) === 1) {
$lastLeave = true;
}
});

return $lastLeave;
$this->db->transaction(function ($transaction) use (
$user,
$channel,
$connectionId,
$usersHashKey,
$socketsSetKey,
$lastLeave,
) {
$transaction->srem($socketsSetKey, $connectionId);

if ($lastLeave) {
$transaction->srem($this->userChannelsKey($user), $channel);
$transaction->hdel($usersHashKey, $this->userKey($user));
}
});

return $lastLeave;
});
}

public function getUsers(string $channel): array
Expand All @@ -126,36 +140,29 @@ public function getUsers(string $channel): array

public function removeConnection(Authenticatable $user, string $connectionId): array
{
$fullyExitedChannels = [];

collect($this->db->smembers($this->userChannelsKey($user)))
->each(function ($channel) use ($user, $connectionId, &$fullyExitedChannels) {
$userInfo = $this->unserialize($this->db->hget(
$this->channelMemberKey($channel, 'users'),
$this->userKey($user)
));

if ($this->leave($channel, $user, $connectionId)) {
$fullyExitedChannels[] = [
'channel' => $channel,
'user_info' => $userInfo,
];
}
});

return $fullyExitedChannels;
}

private function extractChannelNameFromKey(string $key): string
{
$keyParts = explode(':', $key);
// It is assumed that the channel name is always the second part of the key.
// Adjust this according to your key structure.
return $keyParts[1];
return $this->lock($this->userChannelsKey($user), function () use ($user, $connectionId) {
$fullyExitedChannels = [];
collect($this->db->smembers($this->userChannelsKey($user)))
->each(function ($channel) use ($user, $connectionId, &$fullyExitedChannels) {
$userInfo = $this->unserialize($this->db->hget(
$this->channelMemberKey($channel, 'users'),
$this->userKey($user)
));

if ($this->leave($channel, $user, $connectionId)) {
$fullyExitedChannels[] = [
'channel' => $channel,
'user_info' => $userInfo,
];
}
});

return $fullyExitedChannels;
});
}

private function lock(string $key, Closure $callback)
{
return cache()->lock($key.':lock', 10)->block(5, $callback);
return cache()->lock($key.':lock', 100)->block(115, $callback);
}
}

0 comments on commit 69774d3

Please sign in to comment.