From 4954aac34a368ccff15d05b6363c676bcd82e95d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Dach?= Date: Wed, 5 Jun 2024 10:37:24 +0200 Subject: [PATCH 1/3] Rename some members in preparation for multiplayer room watch capabilities --- .../Multiplayer/MatchTypeTests.cs | 2 +- .../Multiplayer/MultiplayerInviteTest.cs | 2 +- .../Multiplayer/MultiplayerQueueTests.cs | 2 +- .../Multiplayer/MultiplayerTest.cs | 4 +-- .../Multiplayer/RoomParticipationTests.cs | 8 ++--- .../Database/DatabaseAccess.cs | 2 +- .../Database/IDatabaseAccess.cs | 2 +- .../Hubs/Multiplayer/MultiplayerHub.cs | 2 +- .../Spectator/IScoreProcessedSubscriber.cs | 8 +---- .../Spectator/ScoreProcessedSubscriber.cs | 36 +++++++++---------- .../Hubs/Spectator/SpectatorHub.cs | 2 +- 11 files changed, 32 insertions(+), 38 deletions(-) diff --git a/osu.Server.Spectator.Tests/Multiplayer/MatchTypeTests.cs b/osu.Server.Spectator.Tests/Multiplayer/MatchTypeTests.cs index 9a66d379..175e4c09 100644 --- a/osu.Server.Spectator.Tests/Multiplayer/MatchTypeTests.cs +++ b/osu.Server.Spectator.Tests/Multiplayer/MatchTypeTests.cs @@ -138,7 +138,7 @@ public async Task ChangeMatchType() [Fact] public async Task JoinRoomWithTypeCreatesCorrectInstance() { - Database.Setup(db => db.GetRoomAsync(ROOM_ID)) + Database.Setup(db => db.GetRealtimeRoomAsync(ROOM_ID)) .Callback(InitialiseRoom) .ReturnsAsync(new multiplayer_room { diff --git a/osu.Server.Spectator.Tests/Multiplayer/MultiplayerInviteTest.cs b/osu.Server.Spectator.Tests/Multiplayer/MultiplayerInviteTest.cs index c5efc76d..9e49c510 100644 --- a/osu.Server.Spectator.Tests/Multiplayer/MultiplayerInviteTest.cs +++ b/osu.Server.Spectator.Tests/Multiplayer/MultiplayerInviteTest.cs @@ -127,7 +127,7 @@ public async Task UserCanInviteIntoRoomWithPassword() { const string password = "password"; - Database.Setup(db => db.GetRoomAsync(It.IsAny())) + Database.Setup(db => db.GetRealtimeRoomAsync(It.IsAny())) .Callback(InitialiseRoom) .ReturnsAsync(new multiplayer_room { diff --git a/osu.Server.Spectator.Tests/Multiplayer/MultiplayerQueueTests.cs b/osu.Server.Spectator.Tests/Multiplayer/MultiplayerQueueTests.cs index e5d4234b..85596cb0 100644 --- a/osu.Server.Spectator.Tests/Multiplayer/MultiplayerQueueTests.cs +++ b/osu.Server.Spectator.Tests/Multiplayer/MultiplayerQueueTests.cs @@ -74,7 +74,7 @@ public async Task RoomStartsWithCurrentPlaylistItem() public async Task RoomStartsWithCorrectQueueingMode() { Database.Setup(d => d.GetBeatmapAsync(3333)).ReturnsAsync(new database_beatmap { checksum = "3333" }); - Database.Setup(db => db.GetRoomAsync(ROOM_ID)) + Database.Setup(db => db.GetRealtimeRoomAsync(ROOM_ID)) .Callback(InitialiseRoom) .ReturnsAsync(() => new multiplayer_room { diff --git a/osu.Server.Spectator.Tests/Multiplayer/MultiplayerTest.cs b/osu.Server.Spectator.Tests/Multiplayer/MultiplayerTest.cs index 384e5bb1..ea91cb44 100644 --- a/osu.Server.Spectator.Tests/Multiplayer/MultiplayerTest.cs +++ b/osu.Server.Spectator.Tests/Multiplayer/MultiplayerTest.cs @@ -214,7 +214,7 @@ private void setUpMockDatabase() { DatabaseFactory.Setup(factory => factory.GetInstance()).Returns(Database.Object); - Database.Setup(db => db.GetRoomAsync(ROOM_ID)) + Database.Setup(db => db.GetRealtimeRoomAsync(ROOM_ID)) .Callback(InitialiseRoom) .ReturnsAsync(() => new multiplayer_room { @@ -223,7 +223,7 @@ private void setUpMockDatabase() user_id = int.Parse(Hub.Context.UserIdentifier!), }); - Database.Setup(db => db.GetRoomAsync(ROOM_ID_2)) + Database.Setup(db => db.GetRealtimeRoomAsync(ROOM_ID_2)) .Callback(InitialiseRoom) .ReturnsAsync(() => new multiplayer_room { diff --git a/osu.Server.Spectator.Tests/Multiplayer/RoomParticipationTests.cs b/osu.Server.Spectator.Tests/Multiplayer/RoomParticipationTests.cs index 293f15cd..2c2afc73 100644 --- a/osu.Server.Spectator.Tests/Multiplayer/RoomParticipationTests.cs +++ b/osu.Server.Spectator.Tests/Multiplayer/RoomParticipationTests.cs @@ -22,7 +22,7 @@ public async Task UserCanJoinWithPasswordEvenWhenNotRequired() [Fact] public async Task UserCanJoinWithCorrectPassword() { - Database.Setup(db => db.GetRoomAsync(It.IsAny())) + Database.Setup(db => db.GetRealtimeRoomAsync(It.IsAny())) .Callback(InitialiseRoom) .ReturnsAsync(new multiplayer_room { @@ -36,7 +36,7 @@ public async Task UserCanJoinWithCorrectPassword() [Fact] public async Task UserCantJoinWithIncorrectPassword() { - Database.Setup(db => db.GetRoomAsync(It.IsAny())) + Database.Setup(db => db.GetRealtimeRoomAsync(It.IsAny())) .Callback(InitialiseRoom) .ReturnsAsync(new multiplayer_room { @@ -61,7 +61,7 @@ public async Task UserCantJoinWhenRestricted() [Fact] public async Task UserCantJoinAlreadyEnded() { - Database.Setup(db => db.GetRoomAsync(It.IsAny())) + Database.Setup(db => db.GetRealtimeRoomAsync(It.IsAny())) .ReturnsAsync(new multiplayer_room { ends_at = DateTimeOffset.Now.AddMinutes(-5), @@ -159,7 +159,7 @@ public async Task UserJoinLeaveNotifiesOtherUsers() [Fact] public async Task UserJoinPreRetrievalFailureCleansUpRoom() { - Database.Setup(db => db.GetRoomAsync(ROOM_ID)) + Database.Setup(db => db.GetRealtimeRoomAsync(ROOM_ID)) .Callback(InitialiseRoom) .ReturnsAsync(() => new multiplayer_room { diff --git a/osu.Server.Spectator/Database/DatabaseAccess.cs b/osu.Server.Spectator/Database/DatabaseAccess.cs index 51f7feca..ba2a7eea 100644 --- a/osu.Server.Spectator/Database/DatabaseAccess.cs +++ b/osu.Server.Spectator/Database/DatabaseAccess.cs @@ -48,7 +48,7 @@ public async Task IsUserRestrictedAsync(int userId) }) != 0; } - public async Task GetRoomAsync(long roomId) + public async Task GetRealtimeRoomAsync(long roomId) { var connection = await getConnectionAsync(); diff --git a/osu.Server.Spectator/Database/IDatabaseAccess.cs b/osu.Server.Spectator/Database/IDatabaseAccess.cs index 0ed2ad63..bd1eea94 100644 --- a/osu.Server.Spectator/Database/IDatabaseAccess.cs +++ b/osu.Server.Spectator/Database/IDatabaseAccess.cs @@ -33,7 +33,7 @@ public interface IDatabaseAccess : IDisposable /// /// Returns the with the given . /// - Task GetRoomAsync(long roomId); + Task GetRealtimeRoomAsync(long roomId); /// /// Retrieves a beatmap corresponding to the given . diff --git a/osu.Server.Spectator/Hubs/Multiplayer/MultiplayerHub.cs b/osu.Server.Spectator/Hubs/Multiplayer/MultiplayerHub.cs index d9dfdf95..e953d213 100644 --- a/osu.Server.Spectator/Hubs/Multiplayer/MultiplayerHub.cs +++ b/osu.Server.Spectator/Hubs/Multiplayer/MultiplayerHub.cs @@ -181,7 +181,7 @@ private async Task retrieveRoom(long roomId) // This will allow for other instances to know not to reinitialise the room if the host arrives there. // Alternatively, we can move lobby retrieval away from osu-web and not require this in the first place. // Needs further discussion and consideration either way. - var databaseRoom = await db.GetRoomAsync(roomId); + var databaseRoom = await db.GetRealtimeRoomAsync(roomId); if (databaseRoom == null) throw new InvalidOperationException("Specified match does not exist."); diff --git a/osu.Server.Spectator/Hubs/Spectator/IScoreProcessedSubscriber.cs b/osu.Server.Spectator/Hubs/Spectator/IScoreProcessedSubscriber.cs index 15d37865..3cb81399 100644 --- a/osu.Server.Spectator/Hubs/Spectator/IScoreProcessedSubscriber.cs +++ b/osu.Server.Spectator/Hubs/Spectator/IScoreProcessedSubscriber.cs @@ -16,12 +16,6 @@ public interface IScoreProcessedSubscriber /// The ID of the connection that should receive the notifications. /// The ID of the user who set the score. /// The ID of the score which is being processed. - Task RegisterForNotificationAsync(string receiverConnectionId, int userId, long scoreId); + Task RegisterForSingleScoreAsync(string receiverConnectionId, int userId, long scoreId); } - - /// - /// Callback delegate that will be invoked when a score has been successfully processed. - /// - /// The ID of the score that was processed. - public delegate Task ScoreProcessedAsyncCallback(string receiverConnectionId, int userId, long scoreId); } diff --git a/osu.Server.Spectator/Hubs/Spectator/ScoreProcessedSubscriber.cs b/osu.Server.Spectator/Hubs/Spectator/ScoreProcessedSubscriber.cs index c89bfe49..94e110e6 100644 --- a/osu.Server.Spectator/Hubs/Spectator/ScoreProcessedSubscriber.cs +++ b/osu.Server.Spectator/Hubs/Spectator/ScoreProcessedSubscriber.cs @@ -30,7 +30,7 @@ public sealed class ScoreProcessedSubscriber : IScoreProcessedSubscriber, IDispo private readonly IDatabaseFactory databaseFactory; private readonly ISubscriber? subscriber; - private readonly ConcurrentDictionary subscriptions = new ConcurrentDictionary(); + private readonly ConcurrentDictionary singleScoreSubscriptions = new ConcurrentDictionary(); private readonly Timer timer; private readonly ILogger logger; private readonly IHubContext spectatorHubContext; @@ -67,22 +67,22 @@ private void onMessageReceived(string? message) if (scoreProcessed == null) return; - if (subscriptions.TryRemove(scoreProcessed.ScoreId, out var subscription)) + if (singleScoreSubscriptions.TryRemove(scoreProcessed.ScoreId, out var subscription)) { using (subscription) subscription.InvokeAsync().Wait(); } - DogStatsd.Increment($"{statsd_prefix}.messages.delivered"); + DogStatsd.Increment($"{statsd_prefix}.messages.single-score.delivered"); } catch (Exception ex) { logger.LogError(ex, "Failed to process message"); - DogStatsd.Increment($"{statsd_prefix}.messages.dropped"); + DogStatsd.Increment($"{statsd_prefix}.messages.single-score.dropped"); } } - public async Task RegisterForNotificationAsync(string receiverConnectionId, int userId, long scoreToken) + public async Task RegisterForSingleScoreAsync(string receiverConnectionId, int userId, long scoreToken) { try { @@ -92,11 +92,11 @@ public async Task RegisterForNotificationAsync(string receiverConnectionId, int if (score == null) { - DogStatsd.Increment($"{statsd_prefix}.subscriptions.dropped"); + DogStatsd.Increment($"{statsd_prefix}.subscriptions.single-score.dropped"); return; } - var subscription = new ScoreProcessedSubscription(receiverConnectionId, userId, (long)score.id, spectatorHubContext); + var subscription = new SingleScoreSubscription(receiverConnectionId, userId, (long)score.id, spectatorHubContext); // because the score submission flow happens concurrently with the spectator play finished flow, // it is theoretically possible for the score processing to complete before the spectator hub had a chance to register for notifications. @@ -105,12 +105,12 @@ public async Task RegisterForNotificationAsync(string receiverConnectionId, int { using (subscription) await subscription.InvokeAsync(); - DogStatsd.Increment($"{statsd_prefix}.messages.delivered-immediately"); + DogStatsd.Increment($"{statsd_prefix}.messages.single-score.delivered-immediately"); return; } - subscriptions.TryAdd((long)score.id, subscription); - DogStatsd.Gauge($"{statsd_prefix}.subscriptions.total", subscriptions.Count); + singleScoreSubscriptions.TryAdd((long)score.id, subscription); + DogStatsd.Gauge($"{statsd_prefix}.subscriptions.single-score.total", singleScoreSubscriptions.Count); } catch (Exception ex) { @@ -118,30 +118,30 @@ public async Task RegisterForNotificationAsync(string receiverConnectionId, int receiverConnectionId, userId, scoreToken); - DogStatsd.Increment($"{statsd_prefix}.subscriptions.failed"); + DogStatsd.Increment($"{statsd_prefix}.subscriptions.single-score.failed"); } } private void purgeTimedOutSubscriptions() { - var scoreIds = subscriptions.Keys.ToArray(); + var scoreIds = singleScoreSubscriptions.Keys.ToArray(); int purgedCount = 0; foreach (var scoreId in scoreIds) { - if (subscriptions.TryGetValue(scoreId, out var subscription) && subscription.TimedOut) + if (singleScoreSubscriptions.TryGetValue(scoreId, out var subscription) && subscription.TimedOut) { subscription.Dispose(); - if (subscriptions.TryRemove(scoreId, out _)) + if (singleScoreSubscriptions.TryRemove(scoreId, out _)) purgedCount += 1; } } if (purgedCount > 0) { - DogStatsd.Gauge($"{statsd_prefix}.subscriptions.total", subscriptions.Count); - DogStatsd.Increment($"{statsd_prefix}.subscriptions.timed-out", purgedCount); + DogStatsd.Gauge($"{statsd_prefix}.subscriptions.single-score.total", singleScoreSubscriptions.Count); + DogStatsd.Increment($"{statsd_prefix}.subscriptions.single-score.timed-out", purgedCount); } if (!disposed) @@ -162,7 +162,7 @@ public void Dispose() private record ScoreProcessed(long ScoreId); - private class ScoreProcessedSubscription : IDisposable + private class SingleScoreSubscription : IDisposable { private readonly string receiverConnectionId; private readonly int userId; @@ -172,7 +172,7 @@ private class ScoreProcessedSubscription : IDisposable private readonly CancellationTokenSource cancellationTokenSource; public bool TimedOut => cancellationTokenSource.IsCancellationRequested; - public ScoreProcessedSubscription(string receiverConnectionId, int userId, long scoreId, IHubContext spectatorHubContext) + public SingleScoreSubscription(string receiverConnectionId, int userId, long scoreId, IHubContext spectatorHubContext) { this.receiverConnectionId = receiverConnectionId; this.userId = userId; diff --git a/osu.Server.Spectator/Hubs/Spectator/SpectatorHub.cs b/osu.Server.Spectator/Hubs/Spectator/SpectatorHub.cs index f58442a8..370ba649 100644 --- a/osu.Server.Spectator/Hubs/Spectator/SpectatorHub.cs +++ b/osu.Server.Spectator/Hubs/Spectator/SpectatorHub.cs @@ -176,7 +176,7 @@ private async Task processScore(Score score, long scoreToken) score.ScoreInfo.Rank = StandardisedScoreMigrationTools.ComputeRank(score.ScoreInfo); await scoreUploader.EnqueueAsync(scoreToken, score); - await scoreProcessedSubscriber.RegisterForNotificationAsync(Context.ConnectionId, Context.GetUserId(), scoreToken); + await scoreProcessedSubscriber.RegisterForSingleScoreAsync(Context.ConnectionId, Context.GetUserId(), scoreToken); } public async Task StartWatchingUser(int userId) From 13984231085e888471db2acf8d3ea9263a97e9be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Dach?= Date: Wed, 5 Jun 2024 12:44:34 +0200 Subject: [PATCH 2/3] Allow clients to receive realtime updates for a given playlist --- osu.Server.Spectator.Tests/MetadataHubTest.cs | 9 +- .../ScoreUploaderTests.cs | 8 +- .../SpectatorHubTest.cs | 12 +- .../Database/DatabaseAccess.cs | 94 ++++++++++- .../Database/IDatabaseAccess.cs | 34 +++- .../Models/multiplayer_scores_high.cs | 23 +++ .../Hubs/Metadata/MetadataHub.cs | 24 ++- osu.Server.Spectator/Hubs/ScoreUploader.cs | 2 +- .../Spectator/IScoreProcessedSubscriber.cs | 15 ++ .../Spectator/ScoreProcessedSubscriber.cs | 156 +++++++++++++++++- 10 files changed, 359 insertions(+), 18 deletions(-) create mode 100644 osu.Server.Spectator/Database/Models/multiplayer_scores_high.cs diff --git a/osu.Server.Spectator.Tests/MetadataHubTest.cs b/osu.Server.Spectator.Tests/MetadataHubTest.cs index 158824fb..a47ecca2 100644 --- a/osu.Server.Spectator.Tests/MetadataHubTest.cs +++ b/osu.Server.Spectator.Tests/MetadataHubTest.cs @@ -15,6 +15,7 @@ using osu.Server.Spectator.Database; using osu.Server.Spectator.Entities; using osu.Server.Spectator.Hubs.Metadata; +using osu.Server.Spectator.Hubs.Spectator; using Xunit; namespace osu.Server.Spectator.Tests @@ -41,7 +42,13 @@ public MetadataHubTest() loggerFactoryMock.Setup(factory => factory.CreateLogger(It.IsAny())) .Returns(new Mock().Object); - hub = new MetadataHub(loggerFactoryMock.Object, cache, userStates, databaseFactory.Object, new Mock().Object); + hub = new MetadataHub( + loggerFactoryMock.Object, + cache, + userStates, + databaseFactory.Object, + new Mock().Object, + new Mock().Object); var mockContext = new Mock(); mockContext.Setup(ctx => ctx.UserIdentifier).Returns(user_id.ToString()); diff --git a/osu.Server.Spectator.Tests/ScoreUploaderTests.cs b/osu.Server.Spectator.Tests/ScoreUploaderTests.cs index 5f619719..9b390bd7 100644 --- a/osu.Server.Spectator.Tests/ScoreUploaderTests.cs +++ b/osu.Server.Spectator.Tests/ScoreUploaderTests.cs @@ -26,7 +26,7 @@ public class ScoreUploaderTests public ScoreUploaderTests() { mockDatabase = new Mock(); - mockDatabase.Setup(db => db.GetScoreFromToken(1)).Returns(Task.FromResult(new SoloScore + mockDatabase.Setup(db => db.GetScoreFromTokenAsync(1)).Returns(Task.FromResult(new SoloScore { id = 2, passed = true @@ -124,7 +124,7 @@ public async Task ScoreUploadsWithDelayedScoreToken() mockStorage.Verify(s => s.WriteAsync(It.IsAny()), Times.Never); // Give the score a token. - mockDatabase.Setup(db => db.GetScoreFromToken(2)).Returns(Task.FromResult(new SoloScore + mockDatabase.Setup(db => db.GetScoreFromTokenAsync(2)).Returns(Task.FromResult(new SoloScore { id = 3, passed = true @@ -150,7 +150,7 @@ public async Task TimedOutScoreDoesNotUpload() mockStorage.Verify(s => s.WriteAsync(It.IsAny()), Times.Never); // Give the score a token now. It should still not upload because it has timed out. - mockDatabase.Setup(db => db.GetScoreFromToken(2)).Returns(Task.FromResult(new SoloScore + mockDatabase.Setup(db => db.GetScoreFromTokenAsync(2)).Returns(Task.FromResult(new SoloScore { id = 3, passed = true @@ -158,7 +158,7 @@ public async Task TimedOutScoreDoesNotUpload() mockStorage.Verify(s => s.WriteAsync(It.IsAny()), Times.Never); // New score that has a token (ensure the loop keeps running). - mockDatabase.Setup(db => db.GetScoreFromToken(3)).Returns(Task.FromResult(new SoloScore + mockDatabase.Setup(db => db.GetScoreFromTokenAsync(3)).Returns(Task.FromResult(new SoloScore { id = 4, passed = true diff --git a/osu.Server.Spectator.Tests/SpectatorHubTest.cs b/osu.Server.Spectator.Tests/SpectatorHubTest.cs index 97fbcf90..9874a2e9 100644 --- a/osu.Server.Spectator.Tests/SpectatorHubTest.cs +++ b/osu.Server.Spectator.Tests/SpectatorHubTest.cs @@ -128,7 +128,7 @@ public async Task ReplayDataIsSaved(bool savingEnabled) hub.Context = mockContext.Object; hub.Clients = mockClients.Object; - mockDatabase.Setup(db => db.GetScoreFromToken(1234)).Returns(Task.FromResult(new SoloScore + mockDatabase.Setup(db => db.GetScoreFromTokenAsync(1234)).Returns(Task.FromResult(new SoloScore { id = 456, passed = true @@ -184,7 +184,7 @@ public async Task ReplaysWithoutAnyHitsAreDiscarded() hub.Context = mockContext.Object; hub.Clients = mockClients.Object; - mockDatabase.Setup(db => db.GetScoreFromToken(1234)).Returns(Task.FromResult(new SoloScore + mockDatabase.Setup(db => db.GetScoreFromTokenAsync(1234)).Returns(Task.FromResult(new SoloScore { id = 456, passed = true @@ -348,7 +348,7 @@ public async Task ScoresAreOnlySavedOnRankedBeatmaps(BeatmapOnlineStatus status, hub.Context = mockContext.Object; hub.Clients = mockClients.Object; - mockDatabase.Setup(db => db.GetScoreFromToken(1234)).Returns(Task.FromResult(new SoloScore + mockDatabase.Setup(db => db.GetScoreFromTokenAsync(1234)).Returns(Task.FromResult(new SoloScore { id = 456, passed = true @@ -434,7 +434,7 @@ public async Task ScoresHaveAllUserRelatedMetadataFilledOutConsistently() hub.Context = mockContext.Object; hub.Clients = mockClients.Object; - mockDatabase.Setup(db => db.GetScoreFromToken(1234)).Returns(Task.FromResult(new SoloScore + mockDatabase.Setup(db => db.GetScoreFromTokenAsync(1234)).Returns(Task.FromResult(new SoloScore { id = 456, passed = true @@ -497,7 +497,7 @@ public async Task FailedScoresAreNotSaved() hub.Context = mockContext.Object; hub.Clients = mockClients.Object; - mockDatabase.Setup(db => db.GetScoreFromToken(1234)).Returns(Task.FromResult(new SoloScore + mockDatabase.Setup(db => db.GetScoreFromTokenAsync(1234)).Returns(Task.FromResult(new SoloScore { id = 456, passed = false @@ -543,7 +543,7 @@ public async Task ScoreRankPopulatedCorrectly() hub.Context = mockContext.Object; hub.Clients = mockClients.Object; - mockDatabase.Setup(db => db.GetScoreFromToken(1234)).Returns(Task.FromResult(new SoloScore + mockDatabase.Setup(db => db.GetScoreFromTokenAsync(1234)).Returns(Task.FromResult(new SoloScore { id = 456, passed = true diff --git a/osu.Server.Spectator/Database/DatabaseAccess.cs b/osu.Server.Spectator/Database/DatabaseAccess.cs index ba2a7eea..8655f661 100644 --- a/osu.Server.Spectator/Database/DatabaseAccess.cs +++ b/osu.Server.Spectator/Database/DatabaseAccess.cs @@ -48,6 +48,16 @@ public async Task IsUserRestrictedAsync(int userId) }) != 0; } + public async Task GetRoomAsync(long roomId) + { + var connection = await getConnectionAsync(); + + return await connection.QueryFirstOrDefaultAsync("SELECT * FROM multiplayer_rooms WHERE id = @RoomID", new + { + RoomID = roomId + }); + } + public async Task GetRealtimeRoomAsync(long roomId) { var connection = await getConnectionAsync(); @@ -294,7 +304,7 @@ public async Task MarkScoreHasReplay(Score score) }); } - public async Task GetScoreFromToken(long token) + public async Task GetScoreFromTokenAsync(long token) { var connection = await getConnectionAsync(); @@ -305,6 +315,16 @@ public async Task MarkScoreHasReplay(Score score) }); } + public async Task GetScoreAsync(long id) + { + var connection = await getConnectionAsync(); + + return await connection.QuerySingleOrDefaultAsync("SELECT * FROM `scores` WHERE `id` = @Id", new + { + Id = id + }); + } + public async Task IsScoreProcessedAsync(long scoreId) { var connection = await getConnectionAsync(); @@ -382,6 +402,78 @@ public async Task> GetActiveDailyChallengeRoomsAsy + "AND `ends_at` > NOW()"); } + public async Task<(long roomID, long playlistItemID)?> GetMultiplayerRoomIdForScoreAsync(long scoreId) + { + var connection = await getConnectionAsync(); + + return await connection.QuerySingleOrDefaultAsync<(long, long)?>( + "SELECT `multiplayer_playlist_items`.`room_id`, `multiplayer_playlist_items`.`id` " + + "FROM `multiplayer_score_links` " + + "JOIN `multiplayer_playlist_items` " + + "ON `multiplayer_score_links`.`playlist_item_id` = `multiplayer_playlist_items`.`id` " + + "WHERE `multiplayer_score_links`.`score_id` = @scoreId", + new { scoreId = scoreId }); + } + + public async Task GetMultiplayerRoomStatsAsync(long roomId) + { + var connection = await getConnectionAsync(); + + long[] playlistItemIds = (await GetAllPlaylistItemsAsync(roomId)).Select(item => item.id).ToArray(); + var result = new MultiplayerPlaylistItemStats[playlistItemIds.Length]; + + for (int i = 0; i < playlistItemIds.Length; ++i) + { + long[] totalScores = (await connection.QueryAsync( + "SELECT `scores`.`total_score` FROM `scores` " + + "JOIN `multiplayer_score_links` ON `multiplayer_score_links`.`score_id` = `scores`.`id` " + + "WHERE `multiplayer_score_links`.`playlist_item_id` = @playlistItemId", new + { + playlistItemId = playlistItemIds[i] + })).ToArray(); + + var totals = totalScores.GroupBy(score => (int)Math.Clamp(Math.Floor((float)score / 100000), 0, MultiplayerPlaylistItemStats.TOTAL_SCORE_DISTRIBUTION_BINS - 1)) + .OrderBy(grp => grp.Key) + .ToDictionary(grp => grp.Key, grp => grp.LongCount()); + + var stats = new MultiplayerPlaylistItemStats + { + PlaylistItemID = playlistItemIds[i], + TotalScoreDistribution = Enumerable.Range(0, MultiplayerPlaylistItemStats.TOTAL_SCORE_DISTRIBUTION_BINS).Select(i => totals.GetValueOrDefault(i)).ToArray(), + }; + + result[i] = stats; + } + + return result; + } + + public async Task GetUserBestScoreAsync(long playlistItemId, int userId) + { + var connection = await getConnectionAsync(); + + return await connection.QuerySingleOrDefaultAsync( + "SELECT * FROM `multiplayer_scores_high` WHERE `playlist_item_id` = @playlistItemId AND `user_id` = @userId", new + { + playlistItemId = playlistItemId, + userId = userId + }); + } + + public async Task GetUserRankInRoomAsync(long roomId, int userId) + { + var connection = await getConnectionAsync(); + + return await connection.QuerySingleAsync( + "SELECT COUNT(1) + 1 FROM `multiplayer_rooms_high` WHERE `room_id` = @roomId AND `user_id` != @userId " + + "AND `total_score` > (SELECT `total_score` FROM `multiplayer_rooms_high` WHERE `room_id` = @roomId AND `user_id` = @userId)", + new + { + roomId = roomId, + userId = userId, + }); + } + public void Dispose() { openConnection?.Dispose(); diff --git a/osu.Server.Spectator/Database/IDatabaseAccess.cs b/osu.Server.Spectator/Database/IDatabaseAccess.cs index bd1eea94..e6d4445f 100644 --- a/osu.Server.Spectator/Database/IDatabaseAccess.cs +++ b/osu.Server.Spectator/Database/IDatabaseAccess.cs @@ -33,6 +33,12 @@ public interface IDatabaseAccess : IDisposable /// /// Returns the with the given . /// + Task GetRoomAsync(long roomId); + + /// + /// Returns the with the given . + /// Rooms of type are not returned by this method. + /// Task GetRealtimeRoomAsync(long roomId); /// @@ -126,7 +132,12 @@ public interface IDatabaseAccess : IDisposable /// /// The score token. /// The . - Task GetScoreFromToken(long token); + Task GetScoreFromTokenAsync(long token); + + /// + /// Returns the for the given ID. + /// + Task GetScoreAsync(long scoreId); /// /// Returns if the score with the supplied has been successfully processed. @@ -167,5 +178,26 @@ public interface IDatabaseAccess : IDisposable /// Retrieves all active rooms from the category. /// Task> GetActiveDailyChallengeRoomsAsync(); + + /// + /// If is associated with a multiplayer score, returns the room ID and playlist item ID which the score was set on. + /// Otherwise, returns . + /// + Task<(long roomID, long playlistItemID)?> GetMultiplayerRoomIdForScoreAsync(long scoreId); + + /// + /// Returns for all playlist items in the room with the given . + /// + Task GetMultiplayerRoomStatsAsync(long roomId); + + /// + /// Returns the best score of user with on the playlist item with . + /// + Task GetUserBestScoreAsync(long playlistItemId, int userId); + + /// + /// Gets the overall rank of user in the room with . + /// + Task GetUserRankInRoomAsync(long roomId, int userId); } } diff --git a/osu.Server.Spectator/Database/Models/multiplayer_scores_high.cs b/osu.Server.Spectator/Database/Models/multiplayer_scores_high.cs new file mode 100644 index 00000000..a83b0a45 --- /dev/null +++ b/osu.Server.Spectator/Database/Models/multiplayer_scores_high.cs @@ -0,0 +1,23 @@ +// Copyright (c) ppy Pty Ltd . Licensed under the MIT Licence. +// See the LICENCE file in the repository root for full licence text. + +using System; + +namespace osu.Server.Spectator.Database.Models +{ + // ReSharper disable InconsistentNaming + [Serializable] + public class multiplayer_scores_high + { + public ulong id { get; set; } + public ulong? score_id { get; set; } + public uint user_id { get; set; } + public ulong playlist_item_id { get; set; } + public uint total_score { get; set; } + public float accuracy { get; set; } + public float? pp { get; set; } + public uint attempts { get; set; } + public DateTimeOffset created_at { get; set; } + public DateTimeOffset updated_at { get; set; } + } +} diff --git a/osu.Server.Spectator/Hubs/Metadata/MetadataHub.cs b/osu.Server.Spectator/Hubs/Metadata/MetadataHub.cs index 2b8db05b..b3242297 100644 --- a/osu.Server.Spectator/Hubs/Metadata/MetadataHub.cs +++ b/osu.Server.Spectator/Hubs/Metadata/MetadataHub.cs @@ -12,6 +12,7 @@ using osu.Server.Spectator.Database; using osu.Server.Spectator.Entities; using osu.Server.Spectator.Extensions; +using osu.Server.Spectator.Hubs.Spectator; namespace osu.Server.Spectator.Hubs.Metadata { @@ -19,19 +20,24 @@ public class MetadataHub : StatefulUserHub { private readonly IDatabaseFactory databaseFactory; private readonly IDailyChallengeUpdater dailyChallengeUpdater; + private readonly IScoreProcessedSubscriber scoreProcessedSubscriber; internal const string ONLINE_PRESENCE_WATCHERS_GROUP = "metadata:online-presence-watchers"; + internal static string MultiplayerRoomWatchersGroup(long roomId) => $"metadata:multiplayer-room-watchers:{roomId}"; + public MetadataHub( ILoggerFactory loggerFactory, IDistributedCache cache, EntityStore userStates, IDatabaseFactory databaseFactory, - IDailyChallengeUpdater dailyChallengeUpdater) + IDailyChallengeUpdater dailyChallengeUpdater, + IScoreProcessedSubscriber scoreProcessedSubscriber) : base(loggerFactory, cache, userStates) { this.databaseFactory = databaseFactory; this.dailyChallengeUpdater = dailyChallengeUpdater; + this.scoreProcessedSubscriber = scoreProcessedSubscriber; } public override async Task OnConnectedAsync() @@ -100,10 +106,26 @@ public async Task UpdateStatus(UserStatus? status) } } + public async Task BeginWatchingMultiplayerRoom(long id) + { + await Groups.AddToGroupAsync(Context.ConnectionId, MultiplayerRoomWatchersGroup(id)); + await scoreProcessedSubscriber.RegisterForMultiplayerRoomAsync(Context.GetUserId(), id); + + using var db = databaseFactory.GetInstance(); + return await db.GetMultiplayerRoomStatsAsync(id); + } + + public async Task EndWatchingMultiplayerRoom(long id) + { + await Groups.RemoveFromGroupAsync(Context.ConnectionId, MultiplayerRoomWatchersGroup(id)); + await scoreProcessedSubscriber.UnregisterFromMultiplayerRoomAsync(Context.GetUserId(), id); + } + protected override async Task CleanUpState(MetadataClientState state) { await base.CleanUpState(state); await broadcastUserPresenceUpdate(state.UserId, null); + await scoreProcessedSubscriber.UnregisterFromAllMultiplayerRoomsAsync(state.UserId); } private Task broadcastUserPresenceUpdate(int userId, UserPresence? userPresence) diff --git a/osu.Server.Spectator/Hubs/ScoreUploader.cs b/osu.Server.Spectator/Hubs/ScoreUploader.cs index 3195c558..9975a442 100644 --- a/osu.Server.Spectator/Hubs/ScoreUploader.cs +++ b/osu.Server.Spectator/Hubs/ScoreUploader.cs @@ -81,7 +81,7 @@ private async Task readLoop() try { - SoloScore? dbScore = await db.GetScoreFromToken(item.Token); + SoloScore? dbScore = await db.GetScoreFromTokenAsync(item.Token); if (dbScore == null && !item.Cancellation.IsCancellationRequested) { diff --git a/osu.Server.Spectator/Hubs/Spectator/IScoreProcessedSubscriber.cs b/osu.Server.Spectator/Hubs/Spectator/IScoreProcessedSubscriber.cs index 3cb81399..78308e73 100644 --- a/osu.Server.Spectator/Hubs/Spectator/IScoreProcessedSubscriber.cs +++ b/osu.Server.Spectator/Hubs/Spectator/IScoreProcessedSubscriber.cs @@ -17,5 +17,20 @@ public interface IScoreProcessedSubscriber /// The ID of the user who set the score. /// The ID of the score which is being processed. Task RegisterForSingleScoreAsync(string receiverConnectionId, int userId, long scoreId); + + /// + /// Registers a hub client for future notifications about incoming scores in a given . + /// + Task RegisterForMultiplayerRoomAsync(int userId, long roomId); + + /// + /// Unregisters a hub client from future notifications about incoming scores in a given . + /// + Task UnregisterFromMultiplayerRoomAsync(int userId, long roomId); + + /// + /// Unregisters a hub client from all multiplayer room subscriptions. + /// + Task UnregisterFromAllMultiplayerRoomsAsync(int userId); } } diff --git a/osu.Server.Spectator/Hubs/Spectator/ScoreProcessedSubscriber.cs b/osu.Server.Spectator/Hubs/Spectator/ScoreProcessedSubscriber.cs index 94e110e6..de1e4994 100644 --- a/osu.Server.Spectator/Hubs/Spectator/ScoreProcessedSubscriber.cs +++ b/osu.Server.Spectator/Hubs/Spectator/ScoreProcessedSubscriber.cs @@ -3,15 +3,19 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.Logging; using Newtonsoft.Json; +using osu.Game.Online.Metadata; using osu.Game.Online.Spectator; using osu.Server.Spectator.Database; using osu.Server.Spectator.Database.Models; +using osu.Server.Spectator.Hubs.Metadata; using StackExchange.Redis; using StatsdClient; using Timer = System.Timers.Timer; @@ -31,18 +35,24 @@ public sealed class ScoreProcessedSubscriber : IScoreProcessedSubscriber, IDispo private readonly ISubscriber? subscriber; private readonly ConcurrentDictionary singleScoreSubscriptions = new ConcurrentDictionary(); + + private readonly Dictionary multiplayerRoomSubscriptions = new Dictionary(); + private readonly Timer timer; private readonly ILogger logger; private readonly IHubContext spectatorHubContext; + private readonly IHubContext metadataHubContext; public ScoreProcessedSubscriber( IDatabaseFactory databaseFactory, IConnectionMultiplexer redis, IHubContext spectatorHubContext, + IHubContext metadataHubContext, ILoggerFactory loggerFactory) { this.databaseFactory = databaseFactory; this.spectatorHubContext = spectatorHubContext; + this.metadataHubContext = metadataHubContext; timer = new Timer(1000); timer.AutoReset = true; @@ -73,6 +83,8 @@ private void onMessageReceived(string? message) subscription.InvokeAsync().Wait(); } + Task.Run(async () => await notifyMultiplayerRoomSubscribers(scoreProcessed)); + DogStatsd.Increment($"{statsd_prefix}.messages.single-score.delivered"); } catch (Exception ex) @@ -82,13 +94,69 @@ private void onMessageReceived(string? message) } } + private async Task notifyMultiplayerRoomSubscribers(ScoreProcessed scoreProcessed) + { + try + { + using var db = databaseFactory.GetInstance(); + + (long roomID, long playlistItemID)? multiplayerLookup = await db.GetMultiplayerRoomIdForScoreAsync(scoreProcessed.ScoreId); + + if (multiplayerLookup == null) + return; + + // do one early check to attempt to ensure the database queries we are about to do are not for naught. + lock (multiplayerRoomSubscriptions) + { + if (!multiplayerRoomSubscriptions.TryGetValue(multiplayerLookup.Value.roomID, out _)) + return; + } + + var score = await db.GetScoreAsync(scoreProcessed.ScoreId); + Debug.Assert(score != null); + + if (!score.passed) + return; + + int? newRank = null; + var userBest = await db.GetUserBestScoreAsync(multiplayerLookup.Value.playlistItemID, (int)score.user_id); + + if (userBest?.score_id == score.id) + newRank = await db.GetUserRankInRoomAsync(multiplayerLookup.Value.roomID, (int)score.user_id); + + lock (multiplayerRoomSubscriptions) + { + // do another check just in case something has shifted under us while we were fetching all of the data. + if (!multiplayerRoomSubscriptions.TryGetValue(multiplayerLookup.Value.roomID, out var roomSubscription)) + return; + + roomSubscription.InvokeAsync(new MultiplayerRoomScoreSetEvent + { + RoomID = multiplayerLookup.Value.roomID, + PlaylistItemID = multiplayerLookup.Value.playlistItemID, + ScoreID = (long)score.id, + UserID = (int)score.user_id, + TotalScore = score.total_score, + NewRank = newRank, + }); + } + + DogStatsd.Increment($"{statsd_prefix}.messages.room.delivered", tags: [multiplayerLookup.Value.roomID.ToString()]); + } + catch (Exception ex) + { + logger.LogError(ex, "Error when attempting to deliver room subscription update"); + DogStatsd.Increment($"{statsd_prefix}.messages.room.dropped"); + } + } + public async Task RegisterForSingleScoreAsync(string receiverConnectionId, int userId, long scoreToken) { try { using var db = databaseFactory.GetInstance(); - SoloScore? score = await db.GetScoreFromToken(scoreToken); + SoloScore? score = await db.GetScoreFromTokenAsync(scoreToken); if (score == null) { @@ -122,12 +190,73 @@ public async Task RegisterForSingleScoreAsync(string receiverConnectionId, int u } } + public async Task RegisterForMultiplayerRoomAsync(int userId, long roomId) + { + using var db = databaseFactory.GetInstance(); + var room = await db.GetRoomAsync(roomId); + + if (room == null) + return; + + if (room.type != database_match_type.playlists) + { + logger.LogError("User {userId} attempted to subscribe for notifications for non-playlists multiplayer room {roomId}. This is currently unsupported.", userId, roomId); + return; + } + + lock (multiplayerRoomSubscriptions) + { + if (!multiplayerRoomSubscriptions.TryGetValue(roomId, out var existing)) + multiplayerRoomSubscriptions[roomId] = existing = new MultiplayerRoomSubscription(roomId, metadataHubContext); + + existing.AddUser(userId); + DogStatsd.Gauge($"{statsd_prefix}.subscriptions.room.total", existing.UserIds.Count, tags: [roomId.ToString()]); + } + } + + public Task UnregisterFromMultiplayerRoomAsync(int userId, long roomId) + { + lock (multiplayerRoomSubscriptions) + { + if (!multiplayerRoomSubscriptions.TryGetValue(roomId, out var subscription)) + return Task.CompletedTask; + + subscription.RemoveUser(userId); + if (subscription.UserIds.Count == 0) + multiplayerRoomSubscriptions.Remove(roomId); + DogStatsd.Gauge($"{statsd_prefix}.subscriptions.room.total", subscription.UserIds.Count, tags: [roomId.ToString()]); + } + + return Task.CompletedTask; + } + + public Task UnregisterFromAllMultiplayerRoomsAsync(int userId) + { + lock (multiplayerRoomSubscriptions) + { + foreach (var (roomId, subscription) in multiplayerRoomSubscriptions) + { + subscription.RemoveUser(userId); + DogStatsd.Gauge($"{statsd_prefix}.subscriptions.room.total", subscription.UserIds.Count, tags: [roomId.ToString()]); + } + + long[] emptySubscriptions = multiplayerRoomSubscriptions.Where(kv => kv.Value.UserIds.Count == 0) + .Select(kv => kv.Key) + .ToArray(); + + foreach (long key in emptySubscriptions) + multiplayerRoomSubscriptions.Remove(key); + } + + return Task.CompletedTask; + } + private void purgeTimedOutSubscriptions() { - var scoreIds = singleScoreSubscriptions.Keys.ToArray(); + long[] scoreIds = singleScoreSubscriptions.Keys.ToArray(); int purgedCount = 0; - foreach (var scoreId in scoreIds) + foreach (long scoreId in scoreIds) { if (singleScoreSubscriptions.TryGetValue(scoreId, out var subscription) && subscription.TimedOut) { @@ -187,5 +316,26 @@ public Task InvokeAsync() public void Dispose() => cancellationTokenSource.Dispose(); } + + private class MultiplayerRoomSubscription + { + public IReadOnlySet UserIds => userIds; + + private readonly HashSet userIds = new HashSet(); + private readonly long roomId; + private readonly IHubContext metadataHubContext; + + public MultiplayerRoomSubscription(long roomId, IHubContext metadataHubContext) + { + this.roomId = roomId; + this.metadataHubContext = metadataHubContext; + } + + public void AddUser(int userId) => userIds.Add(userId); + public void RemoveUser(int userId) => userIds.Remove(userId); + + public Task InvokeAsync(MultiplayerRoomScoreSetEvent roomScoreSetEvent) + => metadataHubContext.Clients.Group(MetadataHub.MultiplayerRoomWatchersGroup(roomId)).SendAsync(nameof(IMetadataClient.MultiplayerRoomScoreSet), roomScoreSetEvent); + } } } From 6528c368619511fcf5f90c4d7d59ec642f395aca Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Fri, 28 Jun 2024 18:16:01 +0900 Subject: [PATCH 3/3] Update game packages --- SampleMultiplayerClient/SampleMultiplayerClient.csproj | 2 +- SampleSpectatorClient/SampleSpectatorClient.csproj | 2 +- osu.Server.Spectator/osu.Server.Spectator.csproj | 10 +++++----- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/SampleMultiplayerClient/SampleMultiplayerClient.csproj b/SampleMultiplayerClient/SampleMultiplayerClient.csproj index b7ebbdf3..d33aa350 100644 --- a/SampleMultiplayerClient/SampleMultiplayerClient.csproj +++ b/SampleMultiplayerClient/SampleMultiplayerClient.csproj @@ -11,7 +11,7 @@ - + diff --git a/SampleSpectatorClient/SampleSpectatorClient.csproj b/SampleSpectatorClient/SampleSpectatorClient.csproj index b7ebbdf3..d33aa350 100644 --- a/SampleSpectatorClient/SampleSpectatorClient.csproj +++ b/SampleSpectatorClient/SampleSpectatorClient.csproj @@ -11,7 +11,7 @@ - + diff --git a/osu.Server.Spectator/osu.Server.Spectator.csproj b/osu.Server.Spectator/osu.Server.Spectator.csproj index fef8bd47..0da10764 100644 --- a/osu.Server.Spectator/osu.Server.Spectator.csproj +++ b/osu.Server.Spectator/osu.Server.Spectator.csproj @@ -15,11 +15,11 @@ - - - - - + + + + +