Skip to content

Commit

Permalink
Refactored presence channels redis storage structure
Browse files Browse the repository at this point in the history
  • Loading branch information
slavarazum committed Nov 20, 2023
1 parent d558dc8 commit 453855d
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 160 deletions.
2 changes: 1 addition & 1 deletion src/Listeners/RemoveStoredConnectionListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public function handle(SseConnectionClosedEvent $event)
return;
}

$fullyExitedChannels = $this->store->removeConnection($event->connectionId);
$fullyExitedChannels = $this->store->removeConnection($event->user, $event->connectionId);

foreach ($fullyExitedChannels as $exitInfo) {
broadcast(new PresenceChannelLeaveEvent($exitInfo['user_info'], Str::after($exitInfo['channel'], 'presence-')))->toOthers();
Expand Down
152 changes: 72 additions & 80 deletions src/Storage/PresenceChannelUsersRedisRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,14 @@ public function __construct()
$this->prefix = config('database.redis.options.prefix');
}

protected function connectionsKey(string $channel, Authenticatable $user): string
protected function channelMemberKey(string $channel, string ...$suffixes): string
{
return "presence_channel:$channel:user:".$this->userKey($user);
return implode(':', \array_merge([$this->prefix.'channels', $channel], $suffixes));
}

protected function userChannelsKey(Authenticatable $user): string
{
return implode(':', [$this->prefix.'channels', $this->userKey($user), 'user_channels']);
}

protected function serialize(array $value): string
Expand All @@ -43,115 +48,102 @@ private function unserialize(string $value)

public function join(string $channel, Authenticatable $user, array $userInfo, string $connectionId): bool
{
$key = $this->connectionsKey($channel, $user);
$userKey = $this->userKey($user);
$usersHashKey = $this->channelMemberKey($channel, 'users');

$firstJoin = false;

while (true) {
$this->db->watch($key);

if ((bool) $this->db->exists($key)) {
$value = $this->db->hget($key, 'connections');
$connections = $this->unserialize($value);

if (! in_array($connectionId, $connections)) {
$connections[] = $connectionId;
}

$fields = ['connections' => $this->serialize($connections)];
} else {
$fields = [
'connections' => $this->serialize([$connectionId]),
'user_info' => $this->serialize($userInfo),
];

$firstJoin = true;
}

if ($this->db->transaction(fn ($transaction) => $transaction->hmset($key, $fields))) {
break;
}
if (! $this->db->hexists($usersHashKey, $this->userKey($user))) {
$firstJoin = true;
}

$this->db->transaction(function ($transaction) use (
$user,
$channel,
$userKey,
$userInfo,
$usersHashKey,
$connectionId,
) {
$transaction->sadd(
$this->channelMemberKey($channel, $userKey, 'user_sockets'),
$connectionId
);

$transaction->hset(
$usersHashKey,
$this->userKey($user),
$this->serialize($userInfo)
);

$transaction->sadd(
$this->userChannelsKey($user),
$channel
);
});

return $firstJoin;
}

public function leave(string $channel, Authenticatable $user, string $connectionId): bool
{
$key = $this->connectionsKey($channel, $user);
$userKey = $this->userKey($user);
$usersHashKey = $this->channelMemberKey($channel, 'users');
$socketsSetKey = $this->channelMemberKey($channel, $userKey, 'user_sockets');

$lastLeave = false;

while (true) {
$this->db->watch($key);

if ((bool) $this->db->exists($key)) {
$connections = $this->unserialize($this->db->hget($key, 'connections'));

$connections = array_values(array_filter($connections, fn ($connection) => $connection !== $connectionId));

if ($connections === []) {
if ($this->db->transaction(fn ($transaction) => $transaction->del($key))) {
$lastLeave = true;
if ($this->db->scard($socketsSetKey) === 1) {
$lastLeave = true;
}

break;
}
} elseif ($this->db->transaction(fn ($transaction) => $transaction->hset($key, 'connections', $this->serialize($connections)))) {
break;
}
} else {
break;
$this->db->transaction(function ($transaction) use (
$user,
$channel,
$connectionId,
$usersHashKey,
$socketsSetKey,
&$lastLeave,
) {
$transaction->srem($socketsSetKey, $connectionId);

if ($lastLeave) {
$transaction->srem($this->userChannelsKey($user), $channel);
$transaction->hdel($usersHashKey, $this->userKey($user));
}
}
});

return $lastLeave;
}

public function getUsers(string $channel): array
{
// TODO: test for Redis cluster
$keys = $this->db->keys("presence_channel:$channel:user:*");

$users = [];

foreach ($keys as $key) {
$userInfo = $this->db->hget(Str::after($key, $this->prefix), 'user_info');

$users[] = $this->unserialize($userInfo);
}

return $users;
return collect($this->db->hgetall($this->channelMemberKey($channel, 'users')))
->map(fn ($userInfo) => $this->unserialize($userInfo))
->values()
->toArray();
}

public function removeConnection(string $connectionId): array
public function removeConnection(Authenticatable $user, string $connectionId): array
{
// TODO: test for Redis cluster
return $this->lock('remove_connection', function () use ($connectionId) {
$keys = $this->db->keys('presence_channel:*');
$fullyExitedChannels = [];

foreach ($keys as $key) {
$key = Str::after($key, $this->prefix);

$connections = $this->unserialize($this->db->hget($key, 'connections'));
$fullyExitedChannels = [];

$connections = array_values(array_filter($connections, fn ($connection) => $connection !== $connectionId));

if ($connections === []) {
$userInfo = $this->unserialize($this->db->hget($key, 'user_info'));
$this->db->del($key);
collect($this->db->smembers($this->userChannelsKey($user)))
->each(function ($channel) use ($user, $connectionId, &$fullyExitedChannels) {
$userInfo = $this->unserialize($this->db->hget(
$this->channelMemberKey($channel, 'users'),
$this->userKey($user)
));

if ($this->leave($channel, $user, $connectionId)) {
$fullyExitedChannels[] = [
'channel' => $this->extractChannelNameFromKey($key),
'channel' => $channel,
'user_info' => $userInfo,
];
} else {
$this->db->hset($key, 'connections', $this->serialize($connections));
}
}
});

return $fullyExitedChannels;
});
return $fullyExitedChannels;
}

private function extractChannelNameFromKey(string $key): string

Check failure on line 149 in src/Storage/PresenceChannelUsersRedisRepository.php

View workflow job for this annotation

GitHub Actions / phpstan

Method Qruto\LaravelWave\Storage\PresenceChannelUsersRedisRepository::extractChannelNameFromKey() is unused.
Expand Down
2 changes: 1 addition & 1 deletion src/Storage/PresenceChannelUsersRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ public function leave(string $channel, Authenticatable $user, string $connection

public function getUsers(string $channel);

public function removeConnection(string $connectionId): array;
public function removeConnection(Authenticatable $user, string $connectionId): array;
}
Loading

0 comments on commit 453855d

Please sign in to comment.