Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Commit

Permalink
Migrate timer_task (#1846)
Browse files Browse the repository at this point in the history
  • Loading branch information
chkeita authored Apr 26, 2022
1 parent c71cdb6 commit 8003b1d
Show file tree
Hide file tree
Showing 9 changed files with 483 additions and 65 deletions.
63 changes: 47 additions & 16 deletions src/ApiService/ApiService/OneFuzzTypes/Enums.cs
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,43 @@ public static TaskState[] Available()
};
});
}
}

internal static TaskState[] NeedsWork()
{
return
_states.GetOrAdd(nameof(TaskStateHelper.HasStarted), k =>
new[]{
TaskState.Init,
TaskState.Stopping
}
);
}


public static TaskState[] ShuttingDown()
{
return
_states.GetOrAdd(nameof(TaskStateHelper.ShuttingDown), k =>
new[]{
TaskState.Stopping,
TaskState.Stopping,
}
);
}

internal static TaskState[] HasStarted()
{
return
_states.GetOrAdd(nameof(TaskStateHelper.HasStarted), k =>
new[]{
TaskState.Running,
TaskState.Stopping,
TaskState.Stopped
}
);
}

}
public enum PoolState
{
Init,
Expand All @@ -272,30 +307,26 @@ public static PoolState[] NeedsWork()
{
return
_states.GetOrAdd("NeedsWork", k =>
{
return
new[]{
PoolState.Init,
PoolState.Shutdown,
PoolState.Halt
};
});
new[]{
PoolState.Init,
PoolState.Shutdown,
PoolState.Halt
}
);
}

public static PoolState[] Available()
{
return
_states.GetOrAdd("Available", k =>
{
return
new[]{
PoolState.Running
};
});
new[]{
PoolState.Running
}
);
}
}

public enum Architecture
public enum Architecture
{
x86_64
}
70 changes: 39 additions & 31 deletions src/ApiService/ApiService/OneFuzzTypes/Events.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public EventType GetEventType()
EventCrashReported _ => EventType.CrashReported,
EventRegressionReported _ => EventType.RegressionReported,
EventFileAdded _ => EventType.FileAdded,
EventTaskFailed _ => EventType.TaskFailed,
EventTaskStopped _ => EventType.TaskStopped,
EventTaskStateUpdated _ => EventType.TaskStateUpdated,
_ => throw new NotImplementedException(),
};

Expand All @@ -76,6 +79,10 @@ public static Type GetTypeInfo(EventType eventType)
EventType.CrashReported => typeof(EventCrashReported),
EventType.RegressionReported => typeof(EventRegressionReported),
EventType.FileAdded => typeof(EventFileAdded),
EventType.TaskFailed => typeof(EventTaskFailed),
EventType.TaskStopped => typeof(EventTaskStopped),
EventType.TaskStateUpdated => typeof(EventTaskStateUpdated),

_ => throw new ArgumentException($"invalid input {eventType}"),

};
Expand All @@ -90,21 +97,21 @@ public Type GetTypeInfo(object input)
}
}

//public record EventTaskStopped(
// Guid JobId,
// Guid TaskId,
// UserInfo? UserInfo,
// TaskConfig Config
//) : BaseEvent();
public record EventTaskStopped(
Guid JobId,
Guid TaskId,
UserInfo? UserInfo,
TaskConfig Config
) : BaseEvent();


//record EventTaskFailed(
// Guid JobId,
// Guid TaskId,
// Error Error,
// UserInfo? UserInfo,
// TaskConfig Config
// ) : BaseEvent();
record EventTaskFailed(
Guid JobId,
Guid TaskId,
Error Error,
UserInfo? UserInfo,
TaskConfig Config
) : BaseEvent();


//record EventJobCreated(
Expand All @@ -114,18 +121,19 @@ public Type GetTypeInfo(object input)
// ) : BaseEvent();


//record JobTaskStopped(
// Guid TaskId,
// TaskType TaskType,
// Error? Error
// ) : BaseEvent();
record JobTaskStopped(
Guid TaskId,
TaskType TaskType,
Error? Error
) : BaseEvent();

//record EventJobStopped(
// Guid JobId: UUId,
// JobConfig Config,
// UserInfo? UserInfo,
// List<JobTaskStopped> TaskInfo
//): BaseEvent();

record EventJobStopped(
Guid JobId,
JobConfig Config,
UserInfo? UserInfo,
List<JobTaskStopped> TaskInfo
) : BaseEvent();


//record EventTaskCreated(
Expand All @@ -136,13 +144,13 @@ public Type GetTypeInfo(object input)
// ) : BaseEvent();


//record EventTaskStateUpdated(
// Guid JobId,
// Guid TaskId,
// TaskState State,
// DateTimeOffset? EndTime,
// TaskConfig Config
// ) : BaseEvent();
record EventTaskStateUpdated(
Guid JobId,
Guid TaskId,
TaskState State,
DateTimeOffset? EndTime,
TaskConfig Config
) : BaseEvent();


public record EventTaskHeartbeat(
Expand Down
12 changes: 7 additions & 5 deletions src/ApiService/ApiService/OneFuzzTypes/Model.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
using System.Text.Json.Serialization;
using Region = System.String;
using PoolName = System.String;
Expand Down Expand Up @@ -600,7 +600,9 @@ public record Job(
JobState State,
JobConfig Config,
string? Error,
DateTimeOffset? EndTime,
List<JobTaskInfo>? TaskInfo,
UserInfo UserInfo
) : StatefulEntityBase<JobState>(State);
DateTimeOffset? EndTime
) : StatefulEntityBase<JobState>(State)
{
public List<JobTaskInfo>? TaskInfo { get; set; }
public UserInfo? UserInfo { get; set; }
}
4 changes: 2 additions & 2 deletions src/ApiService/ApiService/TimerProxy.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker;

namespace Microsoft.OneFuzz.Service;

Expand Down Expand Up @@ -30,7 +30,7 @@ public TimerProxy(ILogTracer logTracer, IProxyOperations proxies, IScalesetOpera
_subnet = subnet;
}

//[Function("TimerDaily")]
//[Function("TimerProxy")]
public async Async.Task Run([TimerTrigger("1.00:00:00")] TimerInfo myTimer)
{
var proxies = await _proxYOperations.QueryAsync().ToListAsync();
Expand Down
66 changes: 66 additions & 0 deletions src/ApiService/ApiService/TimerTasks.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
using Microsoft.Azure.Functions.Worker;

namespace Microsoft.OneFuzz.Service;


public class TimerTasks
{
private readonly ILogTracer _logger;


private readonly ITaskOperations _taskOperations;

private readonly IJobOperations _jobOperations;

private readonly IScheduler _scheduler;


public TimerTasks(ILogTracer logger, ITaskOperations taskOperations, IJobOperations jobOperations, IScheduler scheduler)
{
_logger = logger;
_taskOperations = taskOperations;
_jobOperations = jobOperations;
_scheduler = scheduler;
}

//[Function("TimerTasks")]
public async Async.Task Run([TimerTrigger("1.00:00:00")] TimerInfo myTimer)
{
var expriredTasks = _taskOperations.SearchExpired();
await foreach (var task in expriredTasks)
{
_logger.Info($"stopping expired task. job_id:{task.JobId} task_id:{task.TaskId}");
await _taskOperations.MarkStopping(task);
}


var expiredJobs = _jobOperations.SearchExpired();

await foreach (var job in expiredJobs)
{
_logger.Info($"stopping expired job. job_id:{job.JobId }");
await _jobOperations.Stopping(job);
}

var jobs = _jobOperations.SearchState(states: JobStateHelper.NeedsWork);

await foreach (var job in jobs)
{
_logger.Info($"update job: {job.JobId}");
await _jobOperations.ProcessStateUpdates(job);
}

var tasks = _taskOperations.SearchStates(states: TaskStateHelper.NeedsWork());
await foreach (var task in tasks)
{
_logger.Info($"update task: {task.TaskId}");
await _taskOperations.ProcessStateUpdate(task);
}

await _scheduler.ScheduleTasks();

await _jobOperations.StopNeverStartedJobs();
}
}


73 changes: 69 additions & 4 deletions src/ApiService/ApiService/onefuzzlib/JobOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,81 @@ namespace Microsoft.OneFuzz.Service;

public interface IJobOperations : IStatefulOrm<Job, JobState>
{

System.Threading.Tasks.Task<Job?> Get(Guid jobId);
System.Threading.Tasks.Task OnStart(Job job);
IAsyncEnumerable<Job> SearchExpired();
System.Threading.Tasks.Task Stopping(Job job);
IAsyncEnumerable<Job> SearchState(IEnumerable<JobState> states);
System.Threading.Tasks.Task StopNeverStartedJobs();
}


public class JobOperations : StatefulOrm<Job, JobState>, IJobOperations
{
private readonly ITaskOperations _taskOperations;
private readonly IEvents _events;

public JobOperations(IStorage storage, ILogTracer logTracer, IServiceConfig config, ITaskOperations taskOperations, IEvents events) : base(storage, logTracer, config)
{
_taskOperations = taskOperations;
_events = events;
}

public async System.Threading.Tasks.Task<Job?> Get(Guid jobId)
{
return await QueryAsync($"PartitionKey eq '{jobId}'").FirstOrDefaultAsync();
}

public async System.Threading.Tasks.Task OnStart(Job job)
{
if (job.EndTime == null)
{
await Replace(job with { EndTime = DateTimeOffset.UtcNow + TimeSpan.FromHours(job.Config.Duration) });
}
}

public IAsyncEnumerable<Job> SearchExpired()
{
return QueryAsync(filter: $"end_time lt datetime'{DateTimeOffset.UtcNow}'");
}

public IAsyncEnumerable<Job> SearchState(IEnumerable<JobState> states)
{
var query =
string.Join(" or ",
states.Select(x => $"state eq '{x}'"));

return QueryAsync(filter: query);
}

public JobOperations(IStorage storage, ILogTracer log, IServiceConfig config)
: base(storage, log, config)
public System.Threading.Tasks.Task StopNeverStartedJobs()
{
throw new NotImplementedException();
}

public async System.Threading.Tasks.Task Stopping(Job job)
{
job = job with { State = JobState.Stopping };
var tasks = await _taskOperations.QueryAsync(filter: $"job_id eq '{job.JobId}'").ToListAsync();
var taskNotStopped = tasks.ToLookup(task => task.State != TaskState.Stopped);

var notStopped = taskNotStopped[true];
var stopped = taskNotStopped[false];

if (notStopped.Any())
{
foreach (var task in notStopped)
{
await _taskOperations.MarkStopping(task);
}
}
else
{
job = job with { State = JobState.Stopped };
var taskInfo = stopped.Select(t => new JobTaskStopped(t.TaskId, t.Config.Task.Type, t.Error)).ToList();
await _events.SendEvent(new EventJobStopped(job.JobId, job.Config, job.UserInfo, taskInfo));
}

await Replace(job);

}
}
Loading

0 comments on commit 8003b1d

Please sign in to comment.