From 8d37d23236db9f69cdc1e11f08d915688361fe02 Mon Sep 17 00:00:00 2001 From: stas Date: Mon, 25 Apr 2022 16:07:18 -0700 Subject: [PATCH] Timer workers (part 1) --- src/ApiService/ApiService/Log.cs | 16 +- .../ApiService/OneFuzzTypes/Enums.cs | 36 +++- .../ApiService/OneFuzzTypes/Events.cs | 24 ++- .../ApiService/OneFuzzTypes/Model.cs | 13 +- src/ApiService/ApiService/Program.cs | 3 + src/ApiService/ApiService/TimerWorkers.cs | 40 ++++ .../ApiService/onefuzzlib/Events.cs | 6 +- .../ApiService/onefuzzlib/NodeOperations.cs | 202 +++++++++++++++++- .../onefuzzlib/NotificationOperations.cs | 2 +- src/ApiService/ApiService/onefuzzlib/Queue.cs | 131 ++++++++++-- .../onefuzzlib/ScalesetOperations.cs | 47 +++- .../ApiService/onefuzzlib/ShrinkQueue.cs | 58 +++++ .../ApiService/onefuzzlib/TaskOperations.cs | 2 + .../ApiService/onefuzzlib/orm/Queries.cs | 17 ++ src/ApiService/Tests/OrmModelsTest.cs | 22 ++ src/ApiService/Tests/QueryTest.cs | 37 ++++ 16 files changed, 593 insertions(+), 63 deletions(-) create mode 100644 src/ApiService/ApiService/TimerWorkers.cs create mode 100644 src/ApiService/ApiService/onefuzzlib/ShrinkQueue.cs create mode 100644 src/ApiService/ApiService/onefuzzlib/orm/Queries.cs create mode 100644 src/ApiService/Tests/QueryTest.cs diff --git a/src/ApiService/ApiService/Log.cs b/src/ApiService/ApiService/Log.cs index 1fc04436bd..93407dedf9 100644 --- a/src/ApiService/ApiService/Log.cs +++ b/src/ApiService/ApiService/Log.cs @@ -8,7 +8,7 @@ namespace Microsoft.OneFuzz.Service; public interface ILog { void Log(Guid correlationId, String message, SeverityLevel level, IReadOnlyDictionary tags, string? caller); void LogEvent(Guid correlationId, String evt, IReadOnlyDictionary tags, IReadOnlyDictionary? metrics, string? caller); - void LogException(Guid correlationId, Exception ex, IReadOnlyDictionary tags, IReadOnlyDictionary? metrics, string? caller); + void LogException(Guid correlationId, Exception ex, string message, IReadOnlyDictionary tags, IReadOnlyDictionary? metrics, string? caller); void Flush(); } @@ -37,7 +37,7 @@ public void LogEvent(Guid correlationId, String evt, IReadOnlyDictionary tags, IReadOnlyDictionary? metrics, string? caller) { + public void LogException(Guid correlationId, Exception ex, string message, IReadOnlyDictionary tags, IReadOnlyDictionary? metrics, string? caller) { Dictionary copyTags = new(tags); copyTags["Correlation ID"] = correlationId.ToString(); if (caller is not null) copyTags["CalledBy"] = caller; @@ -47,6 +47,8 @@ public void LogException(Guid correlationId, Exception ex, IReadOnlyDictionary tags, IReadOnlyDictionary? metrics, string? caller) { - System.Console.Out.WriteLine($"[{correlationId}][Exception] {ex}"); + public void LogException(Guid correlationId, Exception ex, string message, IReadOnlyDictionary tags, IReadOnlyDictionary? metrics, string? caller) { + System.Console.Out.WriteLine($"[{correlationId}][Exception] {message}:{ex}"); LogTags(correlationId, tags); LogMetrics(correlationId, metrics); } @@ -105,7 +107,7 @@ public interface ILogTracer { void Critical(string message); void Error(string message); void Event(string evt, IReadOnlyDictionary? metrics); - void Exception(Exception ex, IReadOnlyDictionary? metrics = null); + void Exception(Exception ex, string message = "", IReadOnlyDictionary? metrics = null); void ForceFlush(); void Info(string message); void Warning(string message); @@ -238,10 +240,10 @@ public void Event(string evt, IReadOnlyDictionary? metrics) { } } - public void Exception(Exception ex, IReadOnlyDictionary? metrics) { + public void Exception(Exception ex, string message, IReadOnlyDictionary? metrics) { var caller = GetCaller(); foreach (var logger in _loggers) { - logger.LogException(CorrelationId, ex, Tags, metrics, caller); + logger.LogException(CorrelationId, ex, message, Tags, metrics, caller); } } diff --git a/src/ApiService/ApiService/OneFuzzTypes/Enums.cs b/src/ApiService/ApiService/OneFuzzTypes/Enums.cs index bea8fb2532..c203fc9181 100644 --- a/src/ApiService/ApiService/OneFuzzTypes/Enums.cs +++ b/src/ApiService/ApiService/OneFuzzTypes/Enums.cs @@ -121,9 +121,9 @@ public enum JobState { } public static class JobStateHelper { - private static readonly HashSet _shuttingDown = new HashSet(new[] { JobState.Stopping, JobState.Stopped }); - private static readonly HashSet _avaiable = new HashSet(new[] { JobState.Init, JobState.Enabled }); - private static readonly HashSet _needsWork = new HashSet(new[] { JobState.Init, JobState.Stopping }); + private static readonly IReadOnlySet _shuttingDown = new HashSet(new[] { JobState.Stopping, JobState.Stopped }); + private static readonly IReadOnlySet _avaiable = new HashSet(new[] { JobState.Init, JobState.Enabled }); + private static readonly IReadOnlySet _needsWork = new HashSet(new[] { JobState.Init, JobState.Stopping }); public static IReadOnlySet Available => _avaiable; public static IReadOnlySet NeedsWork => _needsWork; @@ -354,3 +354,33 @@ public enum AgentMode { Repro, Proxy } + + +public enum NodeState { + Init, + Free, + SettingUp, + Rebooting, + Ready, + Busy, + Done, + Shutdown, + Halt, +} + +public static class NodeStateHelper { + + private static readonly IReadOnlySet _needsWork = new HashSet(new[] { NodeState.Done, NodeState.Shutdown, NodeState.Halt }); + private static readonly IReadOnlySet _readyForReset = new HashSet(new[] { NodeState.Done, NodeState.Shutdown, NodeState.Halt }); + private static readonly IReadOnlySet _canProcessNewWork = new HashSet(new[] { NodeState.Free }); + + + public static IReadOnlySet NeedsWork => _needsWork; + + ///If Node is in one of these states, ignore updates from the agent. + public static IReadOnlySet ReadyForReset => _readyForReset; + + public static IReadOnlySet CanProcessNewWork => _canProcessNewWork; +} + + diff --git a/src/ApiService/ApiService/OneFuzzTypes/Events.cs b/src/ApiService/ApiService/OneFuzzTypes/Events.cs index dc5d8bbb27..98c45edeb7 100644 --- a/src/ApiService/ApiService/OneFuzzTypes/Events.cs +++ b/src/ApiService/ApiService/OneFuzzTypes/Events.cs @@ -60,6 +60,7 @@ public EventType GetEventType() { EventScalesetFailed _ => EventType.ScalesetFailed, EventScalesetResizeScheduled _ => EventType.ScalesetResizeScheduled, EventScalesetStateUpdated _ => EventType.ScalesetStateUpdated, + EventNodeDeleted _ => EventType.NodeDeleted, _ => throw new NotImplementedException(), }; @@ -81,7 +82,10 @@ public static Type GetTypeInfo(EventType eventType) { EventType.TaskFailed => typeof(EventTaskFailed), EventType.TaskStopped => typeof(EventTaskStopped), EventType.TaskStateUpdated => typeof(EventTaskStateUpdated), - + EventType.ScalesetFailed => typeof(EventScalesetFailed), + EventType.ScalesetResizeScheduled => typeof(EventScalesetResizeScheduled), + EventType.ScalesetStateUpdated => typeof(EventScalesetStateUpdated), + EventType.NodeDeleted => typeof(EventNodeDeleted), _ => throw new ArgumentException($"invalid input {eventType}"), }; @@ -102,7 +106,7 @@ TaskConfig Config ) : BaseEvent(); -record EventTaskFailed( +public record EventTaskFailed( Guid JobId, Guid TaskId, Error Error, @@ -118,14 +122,14 @@ TaskConfig Config // ) : BaseEvent(); -record JobTaskStopped( +public record JobTaskStopped( Guid TaskId, TaskType TaskType, Error? Error ) : BaseEvent(); -record EventJobStopped( +public record EventJobStopped( Guid JobId, JobConfig Config, UserInfo? UserInfo, @@ -141,7 +145,7 @@ List TaskInfo // ) : BaseEvent(); -record EventTaskStateUpdated( +public record EventTaskStateUpdated( Guid JobId, Guid TaskId, TaskState State, @@ -245,11 +249,11 @@ PoolName PoolName ) : BaseEvent(); -// record EventNodeDeleted( -// Guid MachineId, -// Guid ScalesetId, -// PoolName PoolName -// ) : BaseEvent(); +public record EventNodeDeleted( + Guid MachineId, + Guid? ScalesetId, + PoolName PoolName +) : BaseEvent(); public record EventScalesetStateUpdated( diff --git a/src/ApiService/ApiService/OneFuzzTypes/Model.cs b/src/ApiService/ApiService/OneFuzzTypes/Model.cs index 25a9b223cc..1361fe84cb 100644 --- a/src/ApiService/ApiService/OneFuzzTypes/Model.cs +++ b/src/ApiService/ApiService/OneFuzzTypes/Model.cs @@ -70,19 +70,8 @@ public record NodeTasks Guid MachineId, Guid TaskId, NodeTaskState State = NodeTaskState.Init -); +) : StatefulEntityBase(State); -public enum NodeState { - Init, - Free, - SettingUp, - Rebooting, - Ready, - Busy, - Done, - Shutdown, - Halt, -} public record ProxyHeartbeat ( diff --git a/src/ApiService/ApiService/Program.cs b/src/ApiService/ApiService/Program.cs index 093436400f..13c928f5b1 100644 --- a/src/ApiService/ApiService/Program.cs +++ b/src/ApiService/ApiService/Program.cs @@ -88,6 +88,9 @@ public static void Main() { .AddScoped() .AddScoped() .AddScoped() + .AddScoped() + .AddScoped() + .AddSingleton() .AddSingleton() .AddHttpClient() diff --git a/src/ApiService/ApiService/TimerWorkers.cs b/src/ApiService/ApiService/TimerWorkers.cs new file mode 100644 index 0000000000..d52a166c95 --- /dev/null +++ b/src/ApiService/ApiService/TimerWorkers.cs @@ -0,0 +1,40 @@ +namespace Microsoft.OneFuzz.Service; + +public class TimerWorkers { + ILogTracer _log; + IScalesetOperations _scaleSetOps; + + public TimerWorkers(ILogTracer log, IScalesetOperations scaleSetOps) { + _log = log; + _scaleSetOps = scaleSetOps; + } + + void ProcessScaleSets(Scaleset scaleset) { + _log.Verbose($"checking scaleset for updates: {scaleset.ScalesetId}"); + + _scaleSetOps.UpdateConfigs(scaleset); + + + + } + + + //public async Async.Task Run([TimerTrigger("00:01:30")] TimerInfo t) { + // NOTE: Update pools first, such that scalesets impacted by pool updates + // (such as shutdown or resize) happen during this iteration `timer_worker` + // rather than the following iteration. + + + + + // NOTE: Nodes, and Scalesets should be processed in a consistent order such + // during 'pool scale down' operations. This means that pools that are + // scaling down will more likely remove from the same scalesets over time. + // By more likely removing from the same scalesets, we are more likely to + // get to empty scalesets, which can safely be deleted. + + + //} + + +} diff --git a/src/ApiService/ApiService/onefuzzlib/Events.cs b/src/ApiService/ApiService/onefuzzlib/Events.cs index ae8dc31caa..15b79d826f 100644 --- a/src/ApiService/ApiService/onefuzzlib/Events.cs +++ b/src/ApiService/ApiService/onefuzzlib/Events.cs @@ -1,5 +1,4 @@ -using System.Text; -using System.Text.Json; +using System.Text.Json; using System.Text.Json.Serialization; using Microsoft.OneFuzz.Service.OneFuzzLib.Orm; @@ -32,8 +31,7 @@ public Events(IQueue queue, IWebhookOperations webhook, ILogTracer log) { public async Async.Task QueueSignalrEvent(EventMessage eventMessage) { var message = new SignalREvent("events", new List() { eventMessage }); - var encodedMessage = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message)); - await _queue.SendMessage("signalr-events", encodedMessage, StorageType.Config); + await _queue.SendMessage("signalr-events", JsonSerializer.Serialize(message), StorageType.Config); } public async Async.Task SendEvent(BaseEvent anEvent) { diff --git a/src/ApiService/ApiService/onefuzzlib/NodeOperations.cs b/src/ApiService/ApiService/onefuzzlib/NodeOperations.cs index 87e7661120..73367ba19d 100644 --- a/src/ApiService/ApiService/onefuzzlib/NodeOperations.cs +++ b/src/ApiService/ApiService/onefuzzlib/NodeOperations.cs @@ -1,17 +1,45 @@ -using System.Threading.Tasks; +using System.Text.Json; +using System.Threading.Tasks; using ApiService.OneFuzzLib.Orm; +using Microsoft.OneFuzz.Service.OneFuzzLib.Orm; namespace Microsoft.OneFuzz.Service; public interface INodeOperations : IStatefulOrm { Task GetByMachineId(Guid machineId); + + IAsyncEnumerable SearchStates(Guid? poolId = default, + Guid? scaleSetId = default, + IList? states = default, + string? poolName = default, + bool excludeUpdateScheduled = false, + int? numResults = default); + + new Async.Task Delete(Node node); } public class NodeOperations : StatefulOrm, INodeOperations { - public NodeOperations(IStorage storage, ILogTracer log, IServiceConfig config) + private readonly INodeTasksOperations _nodeTasksOps; + private readonly ITaskOperations _taskOps; + private readonly INodeMessageOperations _nodeMessageOps; + private readonly IEvents _events; + + public NodeOperations( + IStorage storage, + ILogTracer log, + IServiceConfig config, + ITaskOperations taskOps, + INodeTasksOperations nodeTasksOps, + INodeMessageOperations nodeMessageOps, + IEvents events + ) : base(storage, log, config) { + _taskOps = taskOps; + _nodeTasksOps = nodeTasksOps; + _nodeMessageOps = nodeMessageOps; + _events = events; } public async Task GetByMachineId(Guid machineId) { @@ -20,4 +48,174 @@ public NodeOperations(IStorage storage, ILogTracer log, IServiceConfig config) return await data.FirstOrDefaultAsync(); } + public static string SearchStatesQuery( + string oneFuzzVersion, + Guid? poolId = default, + Guid? scaleSetId = default, + IEnumerable? states = default, + string? poolName = default, + bool excludeUpdateScheduled = false, + int? numResults = default) { + + List queryParts = new(); + + if (poolId is not null) { + queryParts.Add($"(pool_id eq '{poolId}')"); + } + + if (scaleSetId is not null) { + queryParts.Add($"(scaleset_id eq '{scaleSetId}')"); + } + + if (states is not null) { + IEnumerable convertedStates = states.Select(x => JsonSerializer.Serialize(x, EntityConverter.GetJsonSerializerOptions()).Trim('"')); + var q = Query.EqualAny("state", convertedStates); + queryParts.Add($"({q})"); + } + + if (excludeUpdateScheduled) { + queryParts.Add($"reimage_requested eq false"); + queryParts.Add($"delete_requested eq false"); + } + + //# azure table query always return false when the column does not exist + //# We write the query this way to allow us to get the nodes where the + //# version is not defined as well as the nodes with a mismatched version + var versionQuery = $"not (version eq '{oneFuzzVersion}')"; + queryParts.Add(versionQuery); + + return Query.And(queryParts); + } + + + public IAsyncEnumerable SearchStates( + Guid? poolId = default, + Guid? scaleSetId = default, + IList? states = default, + string? poolName = default, + bool excludeUpdateScheduled = false, + int? numResults = default) { + var query = NodeOperations.SearchStatesQuery(_config.OneFuzzVersion, poolId, scaleSetId, states, poolName, excludeUpdateScheduled, numResults); + return QueryAsync(query); + } + + public async Async.Task MarkTasksStoppedEarly(Node node, Error? error = null) { + if (error is null) { + error = new Error(ErrorCode.TASK_FAILED, new[] { $"node reimaged during task execution. machine_id: {node.MachineId}" }); + } + + await foreach (var entry in _nodeTasksOps.GetByMachineId(node.MachineId)) { + var task = await _taskOps.GetByTaskId(entry.TaskId); + if (task is not null) { + await _taskOps.MarkFailed(task, error); + } + if (!node.DebugKeepNode) { + await Delete(node); + } + } + } + + public new async Async.Task Delete(Node node) { + await MarkTasksStoppedEarly(node); + await _nodeTasksOps.ClearByMachineId(node.MachineId); + await _nodeMessageOps.ClearMessages(node.MachineId); + await base.Delete(node); + + await _events.SendEvent(new EventNodeDeleted(node.MachineId, node.ScalesetId, node.PoolName)); + } +} + + +public interface INodeTasksOperations : IStatefulOrm { + IAsyncEnumerable GetNodesByTaskId(Guid taskId, INodeOperations nodeOps); + IAsyncEnumerable GetNodeAssignments(Guid taskId, INodeOperations nodeOps); + IAsyncEnumerable GetByMachineId(Guid machineId); + IAsyncEnumerable GetByTaskId(Guid taskId); + Async.Task ClearByMachineId(Guid machineId); +} + +public class NodeTasksOperations : StatefulOrm, INodeTasksOperations { + + ILogTracer _log; + + public NodeTasksOperations(IStorage storage, ILogTracer log, IServiceConfig config) + : base(storage, log, config) { + _log = log; + } + + //TODO: suggest by Cheick: this can probably be optimize by query all NodesTasks then query the all machine in single request + public async IAsyncEnumerable GetNodesByTaskId(Guid taskId, INodeOperations nodeOps) { + List results = new(); + await foreach (var entry in QueryAsync($"task_id eq '{taskId}'")) { + var node = await nodeOps.GetByMachineId(entry.MachineId); + if (node is not null) { + yield return node; + } + } + } + public async IAsyncEnumerable GetNodeAssignments(Guid taskId, INodeOperations nodeOps) { + + await foreach (var entry in QueryAsync($"task_id eq '{taskId}'")) { + var node = await nodeOps.GetByMachineId(entry.MachineId); + if (node is not null) { + var nodeAssignment = new NodeAssignment(node.MachineId, node.ScalesetId, entry.State); + yield return nodeAssignment; + } + } + } + + public IAsyncEnumerable GetByMachineId(Guid machineId) { + return QueryAsync($"macine_id eq '{machineId}'"); + } + + public IAsyncEnumerable GetByTaskId(Guid taskId) { + return QueryAsync($"task_id eq '{taskId}'"); + } + + public async Async.Task ClearByMachineId(Guid machineId) { + _log.Info($"clearing tasks for node {machineId}"); + await foreach (var entry in GetByMachineId(machineId)) { + var res = await Delete(entry); + if (!res.IsOk) { + _log.Error($"failed to delete node task entry for machine_id: {entry.MachineId} due to [{res.ErrorV.Item1}] {res.ErrorV.Item2}"); + } + } + } +} + +//# this isn't anticipated to be needed by the client, hence it not +//# being in onefuzztypes +public record NodeMessage( + [PartitionKey] Guid MachineId, + [RowKey] string MessageId, + NodeCommand Message +) : EntityBase; + +public interface INodeMessageOperations : IOrm { + IAsyncEnumerable GetMessage(Guid machineId); + Async.Task ClearMessages(Guid machineId); +} + + +public class NodeMessageOperations : Orm, INodeMessageOperations { + + private readonly ILogTracer _log; + public NodeMessageOperations(IStorage storage, ILogTracer log, IServiceConfig config) : base(storage, log, config) { + _log = log; + } + + public IAsyncEnumerable GetMessage(Guid machineId) { + return QueryAsync($"machine_id eq '{machineId}'"); + } + + public async Async.Task ClearMessages(Guid machineId) { + _log.Info($"clearing messages for node {machineId}"); + + await foreach (var message in GetMessage(machineId)) { + var r = await Delete(message); + if (!r.IsOk) { + _log.Error($"failed to delete message for node {machineId} due to [{r.ErrorV.Item1}] {r.ErrorV.Item2}"); + } + } + } } diff --git a/src/ApiService/ApiService/onefuzzlib/NotificationOperations.cs b/src/ApiService/ApiService/onefuzzlib/NotificationOperations.cs index 6063f4d5cf..c1bba766c7 100644 --- a/src/ApiService/ApiService/onefuzzlib/NotificationOperations.cs +++ b/src/ApiService/ApiService/onefuzzlib/NotificationOperations.cs @@ -65,7 +65,7 @@ public async Async.Task NewFiles(Container container, string filename, bool fail if (containers.Contains(container.ContainerName)) { _logTracer.Info($"queuing input {container.ContainerName} {filename} {task.TaskId}"); var url = _containers.GetFileSasUrl(container, filename, StorageType.Corpus, BlobSasPermissions.Read | BlobSasPermissions.Delete); - await _queue.SendMessage(task.TaskId.ToString(), System.Text.Encoding.UTF8.GetBytes(url?.ToString() ?? ""), StorageType.Corpus); + await _queue.SendMessage(task.TaskId.ToString(), url?.ToString() ?? "", StorageType.Corpus); } } diff --git a/src/ApiService/ApiService/onefuzzlib/Queue.cs b/src/ApiService/ApiService/onefuzzlib/Queue.cs index 84e663f5ab..ebed45f6d0 100644 --- a/src/ApiService/ApiService/onefuzzlib/Queue.cs +++ b/src/ApiService/ApiService/onefuzzlib/Queue.cs @@ -1,5 +1,6 @@ using System.Text.Json; using System.Threading.Tasks; +using Azure.Core; using Azure.Storage; using Azure.Storage.Queues; using Azure.Storage.Sas; @@ -7,9 +8,15 @@ namespace Microsoft.OneFuzz.Service; public interface IQueue { - Async.Task SendMessage(string name, byte[] message, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null); - Async.Task QueueObject(string name, T obj, StorageType storageType, TimeSpan? visibilityTimeout = null); + Async.Task SendMessage(string name, string message, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null); + Async.Task QueueObject(string name, T obj, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null); Async.Task GetQueueSas(string name, StorageType storageType, QueueSasPermissions permissions, TimeSpan? duration = null); + ResourceIdentifier GeResourceId(string queueName, StorageType storageType); + Task> PeekQueue(string name, StorageType storageType); + Async.Task RemoveFirstMessage(string name, StorageType storageType); + Async.Task ClearQueue(string name, StorageType storageType); + Async.Task DeleteQueue(string name, StorageType storageType); + Async.Task CreateQueue(string name, StorageType storageType); } @@ -25,41 +32,123 @@ public Queue(IStorage storage, ILogTracer log) { } - public async Async.Task SendMessage(string name, byte[] message, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null) { - var queue = await GetQueue(name, storageType); - if (queue != null) { - await queue.SendMessageAsync(Convert.ToBase64String(message), visibilityTimeout: visibilityTimeout, timeToLive: timeToLive); + public async Async.Task SendMessage(string name, string message, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null) { + var queue = await GetQueueClient(name, storageType); + try { + await queue.SendMessageAsync(message, visibilityTimeout: visibilityTimeout, timeToLive: timeToLive); + } catch (Exception ex) { + _log.Exception(ex, $"Failed to send message {message}"); + throw; } } - public async Task GetQueue(string name, StorageType storageType) { - var client = await GetQueueClient(storageType); + public async Task GetQueueClient(string name, StorageType storageType) { + var client = await GetQueueClientService(storageType); return client.GetQueueClient(name); } - - public async Task GetQueueClient(StorageType storageType) { + public async Task GetQueueClientService(StorageType storageType) { var accountId = _storage.GetPrimaryAccount(storageType); - //_logger.LogDEbug("getting blob container (account_id: %s)", account_id) + _log.Verbose($"getting blob container (account_id: {accountId})"); var (name, key) = await _storage.GetStorageAccountNameAndKey(accountId); var accountUrl = new Uri($"https://{name}.queue.core.windows.net"); - var client = new QueueServiceClient(accountUrl, new StorageSharedKeyCredential(name, key)); - return client; + var options = new QueueClientOptions { MessageEncoding = QueueMessageEncoding.Base64 }; + return new QueueServiceClient(accountUrl, new StorageSharedKeyCredential(name, key), options); } - public async Task QueueObject(string name, T obj, StorageType storageType, TimeSpan? visibilityTimeout) { - var queue = await GetQueue(name, storageType) ?? throw new Exception($"unable to queue object, no such queue: {name}"); - - var serialized = JsonSerializer.Serialize(obj, EntityConverter.GetJsonSerializerOptions()); - //var encoded = Encoding.UTF8.GetBytes(serialized); - var response = await queue.SendMessageAsync(serialized, visibilityTimeout: visibilityTimeout); - return !response.GetRawResponse().IsError; + public async Task QueueObject(string name, T obj, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null) { + var queueClient = await GetQueueClient(name, storageType) ?? throw new Exception($"unable to queue object, no such queue: {name}"); + try { + var serialized = JsonSerializer.Serialize(obj, EntityConverter.GetJsonSerializerOptions()); + var res = await queueClient.SendMessageAsync(serialized, visibilityTimeout: visibilityTimeout, timeToLive); + if (res.GetRawResponse().IsError) { + _log.Error($"Failed to send message {serialized} in queue {name} due to {res.GetRawResponse().ReasonPhrase}"); + return false; + } else { + return true; + } + } catch (Exception ex) { + _log.Exception(ex, $"Failed to queue message in queue {name}"); + return false; + } } public async Task GetQueueSas(string name, StorageType storageType, QueueSasPermissions permissions, TimeSpan? duration) { - var queue = await GetQueue(name, storageType) ?? throw new Exception($"unable to queue object, no such queue: {name}"); + var queue = await GetQueueClient(name, storageType) ?? throw new Exception($"unable to queue object, no such queue: {name}"); var sasaBuilder = new QueueSasBuilder(permissions, DateTimeOffset.UtcNow + (duration ?? DEFAULT_DURATION)); var url = queue.GenerateSasUri(sasaBuilder); return url; } + + + public async Async.Task CreateQueue(string name, StorageType storageType) { + var client = await GetQueueClient(name, storageType); + var resp = await client.CreateIfNotExistsAsync(); + + if (resp.IsError) { + _log.Error($"failed to create queue {name} due to {resp.ReasonPhrase}"); + } + } + + public async Async.Task DeleteQueue(string name, StorageType storageType) { + var client = await GetQueueClient(name, storageType); + var resp = await client.DeleteIfExistsAsync(); + if (resp.GetRawResponse().IsError) { + _log.Error($"failed to delete queue {name} due to {resp.GetRawResponse().ReasonPhrase}"); + } + } + + public async Async.Task ClearQueue(string name, StorageType storageType) { + var client = await GetQueueClient(name, storageType); + var resp = await client.ClearMessagesAsync(); + if (resp.IsError) { + _log.Error($"failed to clear the queue {name} due to {resp.ReasonPhrase}"); + } + } + + public async Async.Task RemoveFirstMessage(string name, StorageType storageType) { + var client = await GetQueueClient(name, storageType); + + var msgs = await client.ReceiveMessagesAsync(); + foreach (var msg in msgs.Value) { + var resp = await client.DeleteMessageAsync(msg.MessageId, msg.PopReceipt); + if (resp.IsError) { + _log.Error($"failed to delete message from the queue {name} due to {resp.ReasonPhrase}"); + return false; + } else { + return true; + } + } + return false; ; + } + + public async Task> PeekQueue(string name, StorageType storageType) { + var client = await GetQueueClient(name, storageType); + + var result = new List(); + + var msgs = await client.PeekMessagesAsync(client.MaxPeekableMessages); + if (msgs is null) { + return result; + } else if (msgs.GetRawResponse().IsError) { + + _log.Error($"failed to peek messages due to {msgs.GetRawResponse().ReasonPhrase}"); + return result; + } else { + foreach (var msg in msgs.Value) { + + var obj = JsonSerializer.Deserialize(msg.Body.ToString(), EntityConverter.GetJsonSerializerOptions()); + if (obj is not null) { + result.Add(obj); + } + } + } + return result; + } + + public ResourceIdentifier GeResourceId(string queueName, StorageType storageType) { + var account = _storage.GetPrimaryAccount(storageType); + return new ResourceIdentifier($"{account}/services/queue/queues/{queueName}"); + } + } diff --git a/src/ApiService/ApiService/onefuzzlib/ScalesetOperations.cs b/src/ApiService/ApiService/onefuzzlib/ScalesetOperations.cs index 0147825e7b..ed5cc7e4a3 100644 --- a/src/ApiService/ApiService/onefuzzlib/ScalesetOperations.cs +++ b/src/ApiService/ApiService/onefuzzlib/ScalesetOperations.cs @@ -17,14 +17,18 @@ public class ScalesetOperations : StatefulOrm, IScalese IEvents _events; IExtensions _extensions; IVmssOperations _vmssOps; + IQueue _queue; + INodeOperations _nodeOps; - public ScalesetOperations(IStorage storage, ILogTracer log, IServiceConfig config, IPoolOperations poolOps, IEvents events, IExtensions extensions, IVmssOperations vmssOps) + public ScalesetOperations(IStorage storage, ILogTracer log, IServiceConfig config, IPoolOperations poolOps, IEvents events, IExtensions extensions, IVmssOperations vmssOps, IQueue queue, INodeOperations nodeOps) : base(storage, log, config) { _log = log; _poolOps = poolOps; _events = events; _extensions = extensions; _vmssOps = vmssOps; + _queue = queue; + _nodeOps = nodeOps; } public IAsyncEnumerable Search() { @@ -87,7 +91,7 @@ public async Async.Task UpdateConfigs(Scaleset scaleSet) { var pool = await _poolOps.GetByName(scaleSet.PoolName); if (!pool.IsOk || pool.OkV is null) { - _log.Error($"{SCALESET_LOG_PREFIX}: unable to find pool during config update. pool:{scaleSet.PoolName}, scaleset_id:{scaleSet.ScalesetId}"); + _log.Error($"{SCALESET_LOG_PREFIX} unable to find pool during config update. pool:{scaleSet.PoolName}, scaleset_id:{scaleSet.ScalesetId}"); await SetFailed(scaleSet, pool.ErrorV!); return; } @@ -97,7 +101,44 @@ public async Async.Task UpdateConfigs(Scaleset scaleSet) { var res = await _vmssOps.UpdateExtensions(scaleSet.ScalesetId, extensions); if (!res.IsOk) { - _log.Info($"{SCALESET_LOG_PREFIX}: unable to update configs {string.Join(',', res.ErrorV.Errors!)}"); + _log.Info($"{SCALESET_LOG_PREFIX} unable to update configs {string.Join(',', res.ErrorV.Errors!)}"); } } + + + public async Async.Task Halt(Scaleset scaleset) { + var shrinkQueue = new ShrinkQueue(scaleset.ScalesetId, _queue, _log); + await shrinkQueue.Delete(); + + await foreach (var node in _nodeOps.SearchStates(scaleSetId: scaleset.ScalesetId)) { + _log.Info($"{SCALESET_LOG_PREFIX} deleting node scaleset_id {scaleset.ScalesetId} machine_id {node.MachineId}"); + + + } + //_nodeOps. + + + } + + + /// + /// Cleanup scaleset nodes + /// + /// + /// true if scaleset got modified + public async Async.Task CleanupNodes(Scaleset scaleSet) { + _log.Info($"{SCALESET_LOG_PREFIX} cleaning up nodes. scaleset_id {scaleSet.ScalesetId}"); + + if (scaleSet.State == ScalesetState.Halt) { + _log.Info($"{SCALESET_LOG_PREFIX} halting scaleset scaleset_id {scaleSet.ScalesetId}"); + + await Halt(scaleSet); + + return true; + } + + throw new NotImplementedException(); + } + + } diff --git a/src/ApiService/ApiService/onefuzzlib/ShrinkQueue.cs b/src/ApiService/ApiService/onefuzzlib/ShrinkQueue.cs new file mode 100644 index 0000000000..5449b871a9 --- /dev/null +++ b/src/ApiService/ApiService/onefuzzlib/ShrinkQueue.cs @@ -0,0 +1,58 @@ +namespace Microsoft.OneFuzz.Service; + +public record ShrinkEntry(Guid ShrinkId); + + +public class ShrinkQueue { + Guid _baseId; + IQueue _queueOps; + ILogTracer _log; + + public ShrinkQueue(Guid baseId, IQueue queueOps, ILogTracer log) { + _baseId = baseId; + _queueOps = queueOps; + _log = log; + } + + public override string ToString() { + return $"to-shrink-{_baseId.ToString("N")}"; + } + public string QueueName => this.ToString(); + + public async Async.Task Clear() { + await _queueOps.ClearQueue(QueueName, StorageType.Config); + } + + public async Async.Task Create() { + await _queueOps.CreateQueue(QueueName, StorageType.Config); + } + + public async Async.Task Delete() { + await _queueOps.DeleteQueue(QueueName, StorageType.Config); + } + + public async Async.Task AddEntry() { + return await _queueOps.QueueObject(QueueName, new ShrinkEntry(Guid.NewGuid()), StorageType.Config); + } + + public async Async.Task SetSize(int size) { + await Clear(); + var i = 0; + + while (i < size) { + var r = await AddEntry(); + if (r) { + i++; + } else { + //TODO: retry after a delay ? I guess make a decision on this + //if we hit this error message... For now just log and move on to + //make it behave same as Python code. + _log.Error($"failed to add entry to shrink queue"); + i++; + } + } + + } + + +} diff --git a/src/ApiService/ApiService/onefuzzlib/TaskOperations.cs b/src/ApiService/ApiService/onefuzzlib/TaskOperations.cs index dcccdc8806..e968841e2f 100644 --- a/src/ApiService/ApiService/onefuzzlib/TaskOperations.cs +++ b/src/ApiService/ApiService/onefuzzlib/TaskOperations.cs @@ -14,6 +14,8 @@ public interface ITaskOperations : IStatefulOrm { IAsyncEnumerable SearchExpired(); Async.Task MarkStopping(Task task); + Async.Task MarkFailed(Task task, Error error, List? taskInJob = null); + Async.Task GetReproVmConfig(Task task); Async.Task CheckPrereqTasks(Task task); System.Threading.Tasks.Task GetPool(Task task); diff --git a/src/ApiService/ApiService/onefuzzlib/orm/Queries.cs b/src/ApiService/ApiService/onefuzzlib/orm/Queries.cs new file mode 100644 index 0000000000..165bde53c2 --- /dev/null +++ b/src/ApiService/ApiService/onefuzzlib/orm/Queries.cs @@ -0,0 +1,17 @@ +namespace ApiService.OneFuzzLib.Orm { + public static class Query { + + public static string Or(IEnumerable queries) { + return string.Join(" or ", queries.Select(x => $"({x})")); + } + + public static string And(IEnumerable queries) { + return string.Join(" and ", queries.Select(x => $"({x})")); + } + + public static string EqualAny(string property, IEnumerable values) { + return Or(values.Select(x => $"{property} eq '{x}'")); + } + + } +} diff --git a/src/ApiService/Tests/OrmModelsTest.cs b/src/ApiService/Tests/OrmModelsTest.cs index 1352f90c45..b4b531d94e 100644 --- a/src/ApiService/Tests/OrmModelsTest.cs +++ b/src/ApiService/Tests/OrmModelsTest.cs @@ -24,6 +24,13 @@ public static Gen BaseEvent() { Arb.Generate().Select(e => e as BaseEvent), Arb.Generate().Select(e => e as BaseEvent), Arb.Generate().Select(e => e as BaseEvent), + Arb.Generate().Select(e => e as BaseEvent), + Arb.Generate().Select(e => e as BaseEvent), + Arb.Generate().Select(e => e as BaseEvent), + Arb.Generate().Select(e => e as BaseEvent), + Arb.Generate().Select(e => e as BaseEvent), + Arb.Generate().Select(e => e as BaseEvent), + Arb.Generate().Select(e => e as BaseEvent), }); } @@ -47,6 +54,17 @@ public static Gen WebhookMessageLog() { )); } + public static Gen NodeTasks() { + return Arb.Generate>().Select( + arg => + new NodeTasks( + MachineId: arg.Item1, + TaskId: arg.Item2, + State: arg.Item3 + ) + ); + } + public static Gen Node() { return Arb.Generate, Tuple>>().Select( arg => new Node( @@ -311,6 +329,10 @@ public static Arbitrary BaseEvent() { return Arb.From(OrmGenerators.BaseEvent()); } + public static Arbitrary NodeTasks() { + return Arb.From(OrmGenerators.NodeTasks()); + } + public static Arbitrary Node() { return Arb.From(OrmGenerators.Node()); } diff --git a/src/ApiService/Tests/QueryTest.cs b/src/ApiService/Tests/QueryTest.cs new file mode 100644 index 0000000000..8d5ca800fc --- /dev/null +++ b/src/ApiService/Tests/QueryTest.cs @@ -0,0 +1,37 @@ +using System; +using Microsoft.OneFuzz.Service; +using Xunit; + + +namespace Tests { + + public class QueryTests { + + [Fact] + public void NodeOperationsSearchStatesQuery() { + + var query1 = NodeOperations.SearchStatesQuery("1.2.3"); + Assert.Equal("(not (version eq '1.2.3'))", query1); + + var query2 = NodeOperations.SearchStatesQuery("1.2.3", poolId: Guid.Parse("3b0426d3-9bde-4ae8-89ac-4edf0d3b3618")); + Assert.Equal("((pool_id eq '3b0426d3-9bde-4ae8-89ac-4edf0d3b3618')) and (not (version eq '1.2.3'))", query2); + + var query3 = NodeOperations.SearchStatesQuery("1.2.3", scaleSetId: Guid.Parse("4c96dd6b-9bdb-4758-9720-1010c244fa4b")); + Assert.Equal("((scaleset_id eq '4c96dd6b-9bdb-4758-9720-1010c244fa4b')) and (not (version eq '1.2.3'))", query3); + + var query4 = NodeOperations.SearchStatesQuery("1.2.3", states: new[] { NodeState.Free, NodeState.Done, NodeState.Ready }); + Assert.Equal("(((state eq 'free') or (state eq 'done') or (state eq 'ready'))) and (not (version eq '1.2.3'))", query4); + + var query5 = NodeOperations.SearchStatesQuery("1.2.3", excludeUpdateScheduled: true); + Assert.Equal("(reimage_requested eq false) and (delete_requested eq false) and (not (version eq '1.2.3'))", query5); + + var query7 = NodeOperations.SearchStatesQuery( + "1.2.3", + poolId: Guid.Parse("3b0426d3-9bde-4ae8-89ac-4edf0d3b3618"), + scaleSetId: Guid.Parse("4c96dd6b-9bdb-4758-9720-1010c244fa4b"), + states: new[] { NodeState.Free, NodeState.Done, NodeState.Ready }, + excludeUpdateScheduled: true); + Assert.Equal("((pool_id eq '3b0426d3-9bde-4ae8-89ac-4edf0d3b3618')) and ((scaleset_id eq '4c96dd6b-9bdb-4758-9720-1010c244fa4b')) and (((state eq 'free') or (state eq 'done') or (state eq 'ready'))) and (reimage_requested eq false) and (delete_requested eq false) and (not (version eq '1.2.3'))", query7); + } + } +}