From 8003b1d3e05ab68191bf044760a205661f2ef025 Mon Sep 17 00:00:00 2001 From: Cheick Keita Date: Tue, 26 Apr 2022 09:30:14 -0700 Subject: [PATCH] Migrate timer_task (#1846) --- .../ApiService/OneFuzzTypes/Enums.cs | 63 ++++++-- .../ApiService/OneFuzzTypes/Events.cs | 70 +++++---- .../ApiService/OneFuzzTypes/Model.cs | 12 +- src/ApiService/ApiService/TimerProxy.cs | 4 +- src/ApiService/ApiService/TimerTasks.cs | 66 ++++++++ .../ApiService/onefuzzlib/JobOperations.cs | 73 ++++++++- .../ApiService/onefuzzlib/Scheduler.cs | 110 +++++++++++++ .../ApiService/onefuzzlib/TaskOperations.cs | 146 +++++++++++++++++- src/ApiService/Tests/OrmModelsTest.cs | 4 +- 9 files changed, 483 insertions(+), 65 deletions(-) create mode 100644 src/ApiService/ApiService/TimerTasks.cs create mode 100644 src/ApiService/ApiService/onefuzzlib/Scheduler.cs diff --git a/src/ApiService/ApiService/OneFuzzTypes/Enums.cs b/src/ApiService/ApiService/OneFuzzTypes/Enums.cs index 248b93e3d7..2e968d6e61 100644 --- a/src/ApiService/ApiService/OneFuzzTypes/Enums.cs +++ b/src/ApiService/ApiService/OneFuzzTypes/Enums.cs @@ -255,8 +255,43 @@ public static TaskState[] Available() }; }); } -} + internal static TaskState[] NeedsWork() + { + return + _states.GetOrAdd(nameof(TaskStateHelper.HasStarted), k => + new[]{ + TaskState.Init, + TaskState.Stopping + } + ); + } + + + public static TaskState[] ShuttingDown() + { + return + _states.GetOrAdd(nameof(TaskStateHelper.ShuttingDown), k => + new[]{ + TaskState.Stopping, + TaskState.Stopping, + } + ); + } + + internal static TaskState[] HasStarted() + { + return + _states.GetOrAdd(nameof(TaskStateHelper.HasStarted), k => + new[]{ + TaskState.Running, + TaskState.Stopping, + TaskState.Stopped + } + ); + } + +} public enum PoolState { Init, @@ -272,30 +307,26 @@ public static PoolState[] NeedsWork() { return _states.GetOrAdd("NeedsWork", k => - { - return - new[]{ - PoolState.Init, - PoolState.Shutdown, - PoolState.Halt - }; - }); + new[]{ + PoolState.Init, + PoolState.Shutdown, + PoolState.Halt + } + ); } public static PoolState[] Available() { return _states.GetOrAdd("Available", k => - { - return - new[]{ - PoolState.Running - }; - }); + new[]{ + PoolState.Running + } + ); } } -public enum Architecture +public enum Architecture { x86_64 } diff --git a/src/ApiService/ApiService/OneFuzzTypes/Events.cs b/src/ApiService/ApiService/OneFuzzTypes/Events.cs index 205b875826..99e53f4439 100644 --- a/src/ApiService/ApiService/OneFuzzTypes/Events.cs +++ b/src/ApiService/ApiService/OneFuzzTypes/Events.cs @@ -57,6 +57,9 @@ public EventType GetEventType() EventCrashReported _ => EventType.CrashReported, EventRegressionReported _ => EventType.RegressionReported, EventFileAdded _ => EventType.FileAdded, + EventTaskFailed _ => EventType.TaskFailed, + EventTaskStopped _ => EventType.TaskStopped, + EventTaskStateUpdated _ => EventType.TaskStateUpdated, _ => throw new NotImplementedException(), }; @@ -76,6 +79,10 @@ public static Type GetTypeInfo(EventType eventType) EventType.CrashReported => typeof(EventCrashReported), EventType.RegressionReported => typeof(EventRegressionReported), EventType.FileAdded => typeof(EventFileAdded), + EventType.TaskFailed => typeof(EventTaskFailed), + EventType.TaskStopped => typeof(EventTaskStopped), + EventType.TaskStateUpdated => typeof(EventTaskStateUpdated), + _ => throw new ArgumentException($"invalid input {eventType}"), }; @@ -90,21 +97,21 @@ public Type GetTypeInfo(object input) } } -//public record EventTaskStopped( -// Guid JobId, -// Guid TaskId, -// UserInfo? UserInfo, -// TaskConfig Config -//) : BaseEvent(); +public record EventTaskStopped( + Guid JobId, + Guid TaskId, + UserInfo? UserInfo, + TaskConfig Config +) : BaseEvent(); -//record EventTaskFailed( -// Guid JobId, -// Guid TaskId, -// Error Error, -// UserInfo? UserInfo, -// TaskConfig Config -// ) : BaseEvent(); +record EventTaskFailed( + Guid JobId, + Guid TaskId, + Error Error, + UserInfo? UserInfo, + TaskConfig Config + ) : BaseEvent(); //record EventJobCreated( @@ -114,18 +121,19 @@ public Type GetTypeInfo(object input) // ) : BaseEvent(); -//record JobTaskStopped( -// Guid TaskId, -// TaskType TaskType, -// Error? Error -// ) : BaseEvent(); +record JobTaskStopped( + Guid TaskId, + TaskType TaskType, + Error? Error + ) : BaseEvent(); -//record EventJobStopped( -// Guid JobId: UUId, -// JobConfig Config, -// UserInfo? UserInfo, -// List TaskInfo -//): BaseEvent(); + +record EventJobStopped( + Guid JobId, + JobConfig Config, + UserInfo? UserInfo, + List TaskInfo +) : BaseEvent(); //record EventTaskCreated( @@ -136,13 +144,13 @@ public Type GetTypeInfo(object input) // ) : BaseEvent(); -//record EventTaskStateUpdated( -// Guid JobId, -// Guid TaskId, -// TaskState State, -// DateTimeOffset? EndTime, -// TaskConfig Config -// ) : BaseEvent(); +record EventTaskStateUpdated( + Guid JobId, + Guid TaskId, + TaskState State, + DateTimeOffset? EndTime, + TaskConfig Config + ) : BaseEvent(); public record EventTaskHeartbeat( diff --git a/src/ApiService/ApiService/OneFuzzTypes/Model.cs b/src/ApiService/ApiService/OneFuzzTypes/Model.cs index 516b122762..abdfd64439 100644 --- a/src/ApiService/ApiService/OneFuzzTypes/Model.cs +++ b/src/ApiService/ApiService/OneFuzzTypes/Model.cs @@ -1,4 +1,4 @@ -using Microsoft.OneFuzz.Service.OneFuzzLib.Orm; +using Microsoft.OneFuzz.Service.OneFuzzLib.Orm; using System.Text.Json.Serialization; using Region = System.String; using PoolName = System.String; @@ -600,7 +600,9 @@ public record Job( JobState State, JobConfig Config, string? Error, - DateTimeOffset? EndTime, - List? TaskInfo, - UserInfo UserInfo -) : StatefulEntityBase(State); + DateTimeOffset? EndTime +) : StatefulEntityBase(State) +{ + public List? TaskInfo { get; set; } + public UserInfo? UserInfo { get; set; } +} diff --git a/src/ApiService/ApiService/TimerProxy.cs b/src/ApiService/ApiService/TimerProxy.cs index edb3e3bf86..634220b9b7 100644 --- a/src/ApiService/ApiService/TimerProxy.cs +++ b/src/ApiService/ApiService/TimerProxy.cs @@ -1,4 +1,4 @@ -using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker; namespace Microsoft.OneFuzz.Service; @@ -30,7 +30,7 @@ public TimerProxy(ILogTracer logTracer, IProxyOperations proxies, IScalesetOpera _subnet = subnet; } - //[Function("TimerDaily")] + //[Function("TimerProxy")] public async Async.Task Run([TimerTrigger("1.00:00:00")] TimerInfo myTimer) { var proxies = await _proxYOperations.QueryAsync().ToListAsync(); diff --git a/src/ApiService/ApiService/TimerTasks.cs b/src/ApiService/ApiService/TimerTasks.cs new file mode 100644 index 0000000000..ba7f660b43 --- /dev/null +++ b/src/ApiService/ApiService/TimerTasks.cs @@ -0,0 +1,66 @@ +using Microsoft.Azure.Functions.Worker; + +namespace Microsoft.OneFuzz.Service; + + +public class TimerTasks +{ + private readonly ILogTracer _logger; + + + private readonly ITaskOperations _taskOperations; + + private readonly IJobOperations _jobOperations; + + private readonly IScheduler _scheduler; + + + public TimerTasks(ILogTracer logger, ITaskOperations taskOperations, IJobOperations jobOperations, IScheduler scheduler) + { + _logger = logger; + _taskOperations = taskOperations; + _jobOperations = jobOperations; + _scheduler = scheduler; + } + + //[Function("TimerTasks")] + public async Async.Task Run([TimerTrigger("1.00:00:00")] TimerInfo myTimer) + { + var expriredTasks = _taskOperations.SearchExpired(); + await foreach (var task in expriredTasks) + { + _logger.Info($"stopping expired task. job_id:{task.JobId} task_id:{task.TaskId}"); + await _taskOperations.MarkStopping(task); + } + + + var expiredJobs = _jobOperations.SearchExpired(); + + await foreach (var job in expiredJobs) + { + _logger.Info($"stopping expired job. job_id:{job.JobId }"); + await _jobOperations.Stopping(job); + } + + var jobs = _jobOperations.SearchState(states: JobStateHelper.NeedsWork); + + await foreach (var job in jobs) + { + _logger.Info($"update job: {job.JobId}"); + await _jobOperations.ProcessStateUpdates(job); + } + + var tasks = _taskOperations.SearchStates(states: TaskStateHelper.NeedsWork()); + await foreach (var task in tasks) + { + _logger.Info($"update task: {task.TaskId}"); + await _taskOperations.ProcessStateUpdate(task); + } + + await _scheduler.ScheduleTasks(); + + await _jobOperations.StopNeverStartedJobs(); + } +} + + diff --git a/src/ApiService/ApiService/onefuzzlib/JobOperations.cs b/src/ApiService/ApiService/onefuzzlib/JobOperations.cs index fad7a59a21..21dbfee07d 100644 --- a/src/ApiService/ApiService/onefuzzlib/JobOperations.cs +++ b/src/ApiService/ApiService/onefuzzlib/JobOperations.cs @@ -4,16 +4,81 @@ namespace Microsoft.OneFuzz.Service; public interface IJobOperations : IStatefulOrm { - + System.Threading.Tasks.Task Get(Guid jobId); + System.Threading.Tasks.Task OnStart(Job job); + IAsyncEnumerable SearchExpired(); + System.Threading.Tasks.Task Stopping(Job job); + IAsyncEnumerable SearchState(IEnumerable states); + System.Threading.Tasks.Task StopNeverStartedJobs(); } - public class JobOperations : StatefulOrm, IJobOperations { + private readonly ITaskOperations _taskOperations; + private readonly IEvents _events; + + public JobOperations(IStorage storage, ILogTracer logTracer, IServiceConfig config, ITaskOperations taskOperations, IEvents events) : base(storage, logTracer, config) + { + _taskOperations = taskOperations; + _events = events; + } + + public async System.Threading.Tasks.Task Get(Guid jobId) + { + return await QueryAsync($"PartitionKey eq '{jobId}'").FirstOrDefaultAsync(); + } + + public async System.Threading.Tasks.Task OnStart(Job job) + { + if (job.EndTime == null) + { + await Replace(job with { EndTime = DateTimeOffset.UtcNow + TimeSpan.FromHours(job.Config.Duration) }); + } + } + + public IAsyncEnumerable SearchExpired() + { + return QueryAsync(filter: $"end_time lt datetime'{DateTimeOffset.UtcNow}'"); + } + + public IAsyncEnumerable SearchState(IEnumerable states) + { + var query = + string.Join(" or ", + states.Select(x => $"state eq '{x}'")); + + return QueryAsync(filter: query); + } - public JobOperations(IStorage storage, ILogTracer log, IServiceConfig config) - : base(storage, log, config) + public System.Threading.Tasks.Task StopNeverStartedJobs() { + throw new NotImplementedException(); } + public async System.Threading.Tasks.Task Stopping(Job job) + { + job = job with { State = JobState.Stopping }; + var tasks = await _taskOperations.QueryAsync(filter: $"job_id eq '{job.JobId}'").ToListAsync(); + var taskNotStopped = tasks.ToLookup(task => task.State != TaskState.Stopped); + + var notStopped = taskNotStopped[true]; + var stopped = taskNotStopped[false]; + + if (notStopped.Any()) + { + foreach (var task in notStopped) + { + await _taskOperations.MarkStopping(task); + } + } + else + { + job = job with { State = JobState.Stopped }; + var taskInfo = stopped.Select(t => new JobTaskStopped(t.TaskId, t.Config.Task.Type, t.Error)).ToList(); + await _events.SendEvent(new EventJobStopped(job.JobId, job.Config, job.UserInfo, taskInfo)); + } + + await Replace(job); + + } } diff --git a/src/ApiService/ApiService/onefuzzlib/Scheduler.cs b/src/ApiService/ApiService/onefuzzlib/Scheduler.cs new file mode 100644 index 0000000000..80b48b99b8 --- /dev/null +++ b/src/ApiService/ApiService/onefuzzlib/Scheduler.cs @@ -0,0 +1,110 @@ +namespace Microsoft.OneFuzz.Service; + + +public interface IScheduler +{ + Async.Task ScheduleTasks(); +} + +public class Scheduler : IScheduler +{ + private readonly ITaskOperations _taskOperations; + private readonly IConfig _config; + + // TODO: eventually, this should be tied to the pool. + const int MAX_TASKS_PER_SET = 10; + + public Scheduler(ITaskOperations taskOperations, IConfig config) + { + _taskOperations = taskOperations; + _config = config; + } + + public async System.Threading.Tasks.Task ScheduleTasks() + { + var tasks = await _taskOperations.SearchStates(states: new[] { TaskState.Waiting }).ToDictionaryAsync(x => x.TaskId); + var seen = new HashSet(); + + var buckets = BucketTasks(tasks.Values); + + foreach (var bucketedTasks in buckets) + { + foreach (var chunks in bucketedTasks.Chunk(MAX_TASKS_PER_SET)) + { + var result = BuildWorkSet(chunks); + } + } + + throw new NotImplementedException(); + } + + private object BuildWorkSet(Task[] chunks) + { + throw new NotImplementedException(); + } + + record struct BucketId(Os os, Guid jobId, (string, string)? vm, string? pool, string setupContainer, bool? reboot, Guid? unique); + + private ILookup BucketTasks(IEnumerable tasks) + { + + // buckets are hashed by: + // OS, JOB ID, vm sku & image (if available), pool name (if available), + // if the setup script requires rebooting, and a 'unique' value + // + // The unique value is set based on the following conditions: + // * if the task is set to run on more than one VM, than we assume it can't be shared + // * if the task is missing the 'colocate' flag or it's set to False + + return tasks.ToLookup(task => + { + + Guid? unique = null; + + // check for multiple VMs for pre-1.0.0 tasks + (string, string)? vm = task.Config.Vm != null ? (task.Config.Vm.Sku, task.Config.Vm.Image) : null; + if ((task.Config.Vm?.Count ?? 0) > 1) + { + unique = Guid.NewGuid(); + } + + // check for multiple VMs for 1.0.0 and later tasks + string? pool = task.Config.Pool?.PoolName; + if ((task.Config.Pool?.Count ?? 0) > 1) + { + unique = Guid.NewGuid(); + } + + if (!(task.Config.Colocate ?? false)) + { + unique = Guid.NewGuid(); + } + + return new BucketId(task.Os, task.JobId, vm, pool, _config.GetSetupContainer(task.Config), task.Config.Task.RebootAfterSetup, unique); + + }); + } +} + + +public interface IConfig +{ + string GetSetupContainer(TaskConfig config); +} + +public class Config : IConfig +{ + public string GetSetupContainer(TaskConfig config) + { + + foreach (var container in config.Containers ?? throw new Exception("Missing containers")) + { + if (container.Type == ContainerType.Setup) + { + return container.Name.ContainerName; + } + } + + throw new Exception($"task missing setup container: task_type = {config.Task.Type}"); + } +} \ No newline at end of file diff --git a/src/ApiService/ApiService/onefuzzlib/TaskOperations.cs b/src/ApiService/ApiService/onefuzzlib/TaskOperations.cs index 992051af5b..ee882e9cb9 100644 --- a/src/ApiService/ApiService/onefuzzlib/TaskOperations.cs +++ b/src/ApiService/ApiService/onefuzzlib/TaskOperations.cs @@ -13,20 +13,26 @@ public interface ITaskOperations : IStatefulOrm IEnumerable? GetInputContainerQueues(TaskConfig config); + IAsyncEnumerable SearchExpired(); + Async.Task MarkStopping(Task task); Async.Task GetReproVmConfig(Task task); + } public class TaskOperations : StatefulOrm, ITaskOperations { + private readonly IEvents _events; + private readonly IJobOperations _jobOperations; + private readonly IPoolOperations _poolOperations; + private readonly IScalesetOperations _scalesetOperations; - private IPoolOperations _poolOperations; - private IScalesetOperations _scalesetOperations; - - public TaskOperations(IStorage storage, ILogTracer log, IServiceConfig config, IPoolOperations poolOperations, IScalesetOperations scalesetOperations) + public TaskOperations(IStorage storage, ILogTracer log, IServiceConfig config, IPoolOperations poolOperations, IScalesetOperations scalesetOperations, IEvents events, IJobOperations jobOperations) : base(storage, log, config) { _poolOperations = poolOperations; _scalesetOperations = scalesetOperations; + _events = events; + _jobOperations = jobOperations; } public async Async.Task GetByTaskId(Guid taskId) @@ -69,6 +75,137 @@ public IAsyncEnumerable SearchStates(Guid? jobId = null, IEnumerable SearchExpired() + { + var timeFilter = $"end_time lt datetime'{DateTimeOffset.UtcNow.ToString("o") }'"; + return QueryAsync(filter: timeFilter); + } + + public async System.Threading.Tasks.Task MarkStopping(Task task) + { + if (TaskStateHelper.ShuttingDown().Contains(task.State)) + { + _logTracer.Verbose($"ignoring post - task stop calls to stop {task.JobId}:{task.TaskId}"); + return; + } + + if (TaskStateHelper.HasStarted().Contains(task.State)) + { + await MarkFailed(task, new Error(Code: ErrorCode.TASK_FAILED, Errors: new[] { "task never started" })); + + } + } + + public async Async.Task MarkFailed(Task task, Error error, List? taskInJob = null) + { + if (TaskStateHelper.ShuttingDown().Contains(task.State)) + { + _logTracer.Verbose( + $"ignoring post-task stop failures for {task.JobId}:{task.TaskId}" + ); + return; + } + + if (task.Error != null) + { + _logTracer.Verbose( + $"ignoring additional task error {task.JobId}:{task.TaskId}" + ); + return; + } + + _logTracer.Error($"task failed {task.JobId}:{task.TaskId} - {error}"); + + task = await SetState(task with { Error = error }, TaskState.Stopping); + //self.set_state(TaskState.stopping) + await MarkDependantsFailed(task, taskInJob); + } + + private async System.Threading.Tasks.Task MarkDependantsFailed(Task task, List? taskInJob = null) + { + taskInJob = taskInJob ?? await QueryAsync(filter: $"job_id eq ''{task.JobId}").ToListAsync(); + + foreach (var t in taskInJob) + { + if (t.Config.PrereqTasks != null) + { + if (t.Config.PrereqTasks.Contains(t.TaskId)) + { + await MarkFailed(task, new Error(ErrorCode.TASK_FAILED, new[] { $"prerequisite task failed. task_id:{t.TaskId}" }), taskInJob); + } + } + } + } + + private async Async.Task SetState(Task task, TaskState state) + { + if (task.State == state) + { + return task; + } + + if (task.State == TaskState.Running || task.State == TaskState.SettingUp) + { + task = await OnStart(task with { State = state }); + } + + await this.Replace(task); + + if (task.State == TaskState.Stopped) + { + if (task.Error != null) + { + await _events.SendEvent(new EventTaskFailed( + JobId: task.JobId, + TaskId: task.TaskId, + Error: task.Error, + UserInfo: task.UserInfo, + Config: task.Config) + ); + } + else + { + await _events.SendEvent(new EventTaskStopped( + JobId: task.JobId, + TaskId: task.TaskId, + UserInfo: task.UserInfo, + Config: task.Config) + ); + } + } + else + { + await _events.SendEvent(new EventTaskStateUpdated( + JobId: task.JobId, + TaskId: task.TaskId, + State: task.State, + EndTime: task.EndTime, + Config: task.Config) + ); + } + + return task; + } + + private async Async.Task OnStart(Task task) + { + if (task.EndTime == null) + { + task = task with { EndTime = DateTimeOffset.UtcNow + TimeSpan.FromHours(task.Config.Task.Duration) }; + + Job? job = await _jobOperations.Get(task.JobId); + if (job != null) + { + await _jobOperations.OnStart(job); + } + + } + + return task; + + } + public async Async.Task GetReproVmConfig(Task task) { if (task.Config.Vm != null) @@ -100,4 +237,5 @@ public IAsyncEnumerable SearchStates(Guid? jobId = null, IEnumerable Job() State: arg.Item2, Config: arg.Item3, Error: arg.Item4, - EndTime: arg.Item5, - TaskInfo: arg.Item6, - UserInfo: arg.Item7 + EndTime: arg.Item5 ) ); }