Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

cleanup queues for non-existent pools and non-existent tasks #2433

Merged
merged 2 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion src/ApiService/ApiService/Functions/TimerRetention.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,24 @@ public class TimerRetention {
private readonly INotificationOperations _notificaitonOps;
private readonly IJobOperations _jobOps;
private readonly IReproOperations _reproOps;
private readonly IQueue _queue;
private readonly IPoolOperations _poolOps;

public TimerRetention(
ILogTracer log,
ITaskOperations taskOps,
INotificationOperations notificaitonOps,
IJobOperations jobOps,
IReproOperations reproOps) {
IReproOperations reproOps,
IQueue queue,
IPoolOperations poolOps) {
_log = log;
_taskOps = taskOps;
_notificaitonOps = notificaitonOps;
_jobOps = jobOps;
_reproOps = reproOps;
_queue = queue;
_poolOps = poolOps;
}


Expand Down Expand Up @@ -100,5 +106,34 @@ from container in task.Config.Containers
}
}
}

//delete Task queues for tasks that do not exist in the table (manually deleted from the table)
//delete Pool queues for pools that were deleted before https://github.com/microsoft/onefuzz/issues/2430 got fixed
await foreach (var q in _queue.ListQueues(StorageType.Corpus)) {
Guid queueId;
if (q.Name.StartsWith(IPoolOperations.PoolQueueNamePrefix)) {
var queueIdStr = q.Name[IPoolOperations.PoolQueueNamePrefix.Length..];
if (Guid.TryParse(queueIdStr, out queueId)) {
var pool = await _poolOps.GetById(queueId);
if (!pool.IsOk) {
//pool does not exist. Ok to delete the pool queue
_log.Info($"Deleting pool queue since pool could not be found in Pool table {q.Name}");
await _queue.DeleteQueue(q.Name, StorageType.Corpus);
}
}
} else if (Guid.TryParse(q.Name, out queueId)) {
//this is a task queue
var taskQueue = await _taskOps.GetByTaskId(queueId);
if (taskQueue is null) {
// task does not exist. Ok to delete the task queue
_log.Info($"Deleting task queue, since task could not be found in Task table {q.Name}");
await _queue.DeleteQueue(q.Name, StorageType.Corpus);
}
} else if (q.Name.StartsWith(ShrinkQueue.ShrinkQueueNamePrefix)) {
//ignore Shrink Queues, since they seem to behave ok
} else {
_log.Warning($"Unhandled queue name {q.Name} when doing garbage collection on queues");
}
}
}
}
4 changes: 3 additions & 1 deletion src/ApiService/ApiService/onefuzzlib/PoolOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public interface IPoolOperations : IStatefulOrm<Pool, PoolState> {
Async.Task<Pool> Running(Pool pool);
Async.Task<Pool> Shutdown(Pool pool);
Async.Task<Pool> Halt(Pool pool);

public static string PoolQueueNamePrefix => "pool-";
}

public class PoolOperations : StatefulOrm<Pool, PoolState, PoolOperations>, IPoolOperations {
Expand Down Expand Up @@ -91,7 +93,7 @@ public IAsyncEnumerable<Pool> GetByClientId(Guid clientId) {
}

public string GetPoolQueue(Guid poolId)
=> $"pool-{poolId:N}";
=> $"{IPoolOperations.PoolQueueNamePrefix}{poolId:N}";

public async Async.Task<List<ScalesetSummary>> GetScalesetSummary(PoolName name)
=> await _context.ScalesetOperations.SearchByPool(name)
Expand Down
10 changes: 10 additions & 0 deletions src/ApiService/ApiService/onefuzzlib/Queue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Threading.Tasks;
using Azure.Core;
using Azure.Storage.Queues;
using Azure.Storage.Queues.Models;
using Azure.Storage.Sas;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;

Expand All @@ -16,9 +17,12 @@ public interface IQueue {
Async.Task ClearQueue(string name, StorageType storageType);
Async.Task DeleteQueue(string name, StorageType storageType);
Async.Task CreateQueue(string name, StorageType storageType);
IAsyncEnumerable<QueueItem> ListQueues(StorageType storageType);
}




public class Queue : IQueue {
IStorage _storage;
ILogTracer _log;
Expand All @@ -30,6 +34,12 @@ public Queue(IStorage storage, ILogTracer log) {
_log = log;
}

public async IAsyncEnumerable<QueueItem> ListQueues(StorageType storageType) {
var queueServiceClient = await GetQueueClientService(storageType);
await foreach (var q in queueServiceClient.GetQueuesAsync()) {
stishkin marked this conversation as resolved.
Show resolved Hide resolved
yield return q;
}
}

public async Async.Task SendMessage(string name, string message, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null) {
var queue = await GetQueueClient(name, storageType);
Expand Down
4 changes: 3 additions & 1 deletion src/ApiService/ApiService/onefuzzlib/ShrinkQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ public ShrinkQueue(Guid baseId, IQueue queueOps, ILogTracer log) {
_log = log;
}

public static string ShrinkQueueNamePrefix => "to-shrink-";

public override string ToString() {
return $"to-shrink-{_baseId:N}";
return $"{ShrinkQueue.ShrinkQueueNamePrefix}{_baseId:N}";
}

public string QueueName => this.ToString();
Expand Down