Skip to content

Commit

Permalink
Improves Queue resilience to lockups
Browse files Browse the repository at this point in the history
  • Loading branch information
redbaty committed Jan 8, 2023
1 parent 32e5b13 commit a06681c
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 49 deletions.
90 changes: 62 additions & 28 deletions Guetta.App/GuildQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public GuildQueue(ILogger<GuildQueue> logger, Voice voice, ulong guildId)

private CancellationTokenSource CancellationTokenSource { get; set; }

private CancellationTokenSource QueueCancellationTokenSource { get; set; } = new();

private QueueItem CurrentItem { get; set; }

private void ReOrderQueue()
Expand All @@ -51,43 +53,75 @@ private void StartQueueLoop()
LoopQueue = null;
}

if (LoopQueue != null || Queue.Count <= 0)
if (Queue.Count <= 0)
return;

LoopQueue = Task.Run(async () =>
if (LoopQueue != null)
{
while (Queue.TryDequeue(out var queueItem))
if (Queue.Count > 0 && CurrentItem == null)
{
CurrentItem = queueItem;
queueItem.CurrentQueueIndex = 0;
ReOrderQueue();
Logger.LogInformation("Playing {@Title} requested by {@User}", queueItem.VideoInformation.Title, queueItem.User.Username);
await Voice.Join(queueItem.VoiceChannel);
CancellationTokenSource?.Dispose();
CancellationTokenSource = new CancellationTokenSource();
queueItem.Playing = true;
QueueCancellationTokenSource.Cancel();
QueueCancellationTokenSource = new CancellationTokenSource();
LoopQueue = null;
}
else
{
return;
}
}

try
{
await Voice.Play(queueItem, CancellationTokenSource);
}
catch(Exception ex)
LoopQueue = Task.Run(async () =>
{
while (Queue.TryDequeue(out var queueItem))
{
Logger.LogError(ex, "Failed to play song");
CurrentItem = queueItem;
queueItem.CurrentQueueIndex = 0;
ReOrderQueue();
Logger.LogInformation("Playing {@Title} requested by {@User}", queueItem.VideoInformation.Title, queueItem.User.Username);
await Voice.Join(queueItem.VoiceChannel);
CancellationTokenSource?.Dispose();
CancellationTokenSource = new CancellationTokenSource();
queueItem.Playing = true;
try
{
Logger.LogInformation("Goind to play song {@Url} queued by {@Requester}", queueItem.VideoInformation.Url, queueItem.User.Username);
await Voice.Play(queueItem, CancellationTokenSource);
}
catch (Exception ex)
{
Logger.LogError(ex, "Failed to play song");
}
finally
{
queueItem.Playing = false;
CurrentItem = null;
}
}
finally
Logger.LogInformation("Queue is ending");
await Voice.Disconnect()
.ContinueWith(t =>
{
if (!t.IsCompletedSuccessfully)
Logger.LogError(t.Exception, "Failed to disconnect from voice");
});
Logger.LogInformation("Queue has ended");
}, QueueCancellationTokenSource.Token)
.ContinueWith(t =>
{
LoopQueue = null;
if (!t.IsCompletedSuccessfully)
{
queueItem.Playing = false;
CurrentItem = null;
Logger.LogError(t.Exception, "Queue task has ended with an error");
}
}
LoopQueue = null;
await Voice.Disconnect();
});
});
}

public IEnumerable<QueueItem> GetQueueItems()
Expand Down
40 changes: 19 additions & 21 deletions Guetta.App/Voice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ public Voice(YoutubeDlService youtubeDlService, LocalisationService localisation
}

public ulong GuildId { get; }

public VoiceTransmitSink CurrentDiscordSink { get; set; }


private YoutubeDlService YoutubeDlService { get; }

Expand All @@ -42,18 +41,16 @@ public Voice(YoutubeDlService youtubeDlService, LocalisationService localisation

private ILogger<Voice> Logger { get; }

private SemaphoreSlim ConnectionSemaphore { get; } = new SemaphoreSlim(1);

public Task ChangeVolume(double newVolume)
{
CurrentDiscordSink.VolumeModifier = newVolume;
if(AudioClient?.GetTransmitSink() is { } transmitSink)
transmitSink.VolumeModifier = newVolume;

return Task.CompletedTask;
}

public async Task Join(DiscordChannel voiceChannel)
{
await ConnectionSemaphore.WaitAsync();

if (AudioClient != null && AudioClient.TargetChannel.Id == voiceChannel.Id)
return;

Expand All @@ -64,39 +61,40 @@ public async Task Join(DiscordChannel voiceChannel)

var audioClient = await voiceChannel.ConnectAsync();
AudioClient = audioClient;

ConnectionSemaphore.Release();
}

public async Task Disconnect()
{
await ConnectionSemaphore.WaitAsync();

if (AudioClient != null || AudioClient.IsDisposed())
{
if (AudioClient != null && !AudioClient.IsDisposed())
{
await AudioClient.WaitForPlaybackFinishAsync();
if (AudioClient.IsPlaying)
{
Logger.LogInformation("Going to wait for playback to finish");

var timeoutTask = Task.Delay(TimeSpan.FromSeconds(10));
var waitForFinishTask = AudioClient.WaitForPlaybackFinishAsync();
var endedTask = await Task.WhenAny(waitForFinishTask, timeoutTask);

if (endedTask == timeoutTask)
{
Logger.LogWarning("Waiting for playback to finish timed out");
}
}

AudioClient.Disconnect();
}

AudioClient = null;
}

if (CurrentDiscordSink != null)
{
CurrentDiscordSink.Dispose();
CurrentDiscordSink = null;
}

ConnectionSemaphore.Release();
}

public Task Play(QueueItem queueItem, CancellationTokenSource cancellationToken) => Play(new PlayRequest(queueItem, cancellationToken));

private async Task Play(PlayRequest playRequest)
{
CurrentDiscordSink ??= AudioClient.GetTransmitSink();
var CurrentDiscordSink = AudioClient.GetTransmitSink();
var webSocketClient = AudioClient.GetWebsocket();

Task WebSocketClientOnDisconnected(IWebSocketClient sender, SocketCloseEventArgs args)
Expand Down

0 comments on commit a06681c

Please sign in to comment.