Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Commit

Permalink
mark tasks as failed if a work unit cannot be created for the task (#…
Browse files Browse the repository at this point in the history
…2409)

* mark tasks as failed if a work unit cannot be created for the task

* fix up time queries

* query improvements

Co-authored-by: stas <statis@microsoft.com>
  • Loading branch information
stishkin and stas authored Sep 17, 2022
1 parent 867cdbc commit b647e4a
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 41 deletions.
2 changes: 1 addition & 1 deletion src/ApiService/ApiService/Functions/TimerRetention.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public async Async.Task Run([TimerTrigger("20:00:00")] TimerInfo t) {
var timeRetainedNewer = now + SEARCH_EXTENT;

var timeFilter = Query.TimeRange(timeRetainedOlder, timeRetainedNewer);
var timeFilterNewer = Query.NewerThan(timeRetainedOlder);
var timeFilterNewer = Query.TimestampNewerThan(timeRetainedOlder);

// Collecting 'still relevant' task containers.
// NOTE: This must be done before potentially modifying tasks otherwise
Expand Down
21 changes: 12 additions & 9 deletions src/ApiService/ApiService/onefuzzlib/JobOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,20 @@ public JobOperations(ILogTracer logTracer, IOnefuzzContext context) : base(logTr
}

public async Async.Task<Job?> Get(Guid jobId) {
return await QueryAsync($"PartitionKey eq '{jobId}'").FirstOrDefaultAsync();
return await QueryAsync(Query.PartitionKey(jobId.ToString())).FirstOrDefaultAsync();
}

public async Async.Task OnStart(Job job) {
if (job.EndTime == null) {
await Replace(job with { EndTime = DateTimeOffset.UtcNow + TimeSpan.FromHours(job.Config.Duration) });
var r = await Replace(job with { EndTime = DateTimeOffset.UtcNow + TimeSpan.FromHours(job.Config.Duration) });
if (!r.IsOk) {
_logTracer.Error($"failed to replace job {job.JobId} when calling OnStart due to {r.ErrorV}");
}
}
}

public IAsyncEnumerable<Job> SearchExpired() {
var timeFilter = $"end_time lt datetime'{DateTimeOffset.UtcNow.ToString("o")}'";
var timeFilter = Query.OlderThan("end_time", DateTimeOffset.UtcNow);
var stateFilter = Query.EqualAnyEnum("state", JobStateHelper.Available);
var filter = Query.And(stateFilter, timeFilter);
return QueryAsync(filter: filter);
Expand All @@ -48,20 +51,20 @@ public IAsyncEnumerable<Job> SearchState(IEnumerable<JobState> states) {

public async Async.Task StopIfAllDone(Job job) {

var jobs = _context.TaskOperations.GetByJobId(job.JobId);
var tasks = _context.TaskOperations.GetByJobId(job.JobId);

if (!await jobs.AnyAsync()) {
if (!await tasks.AnyAsync()) {
_logTracer.Warning($"StopIfAllDone could not find any tasks for job with id {job.JobId}");
}

var anyNotStoppedTasks = await jobs.AnyAsync(task => task.State != TaskState.Stopped);
var anyNotStoppedTasks = await tasks.AnyAsync(task => task.State != TaskState.Stopped);

if (anyNotStoppedTasks) {
return;
}

_logTracer.Info($"stopping job as all tasks are stopped: {job.JobId}");
await Stopping(job);
var _ = await Stopping(job);
}

public async Async.Task StopNeverStartedJobs() {
Expand All @@ -72,7 +75,7 @@ public async Async.Task StopNeverStartedJobs() {

var filter = Query.And(new[] {
$"Timestamp lt datetime'{lastTimeStamp}' and not(end_time ge datetime'2000-01-11T00:00:00.0Z')",
Query.EqualAnyEnum("state", new[] {JobState.Enabled})
Query.EqualEnum("state", JobState.Enabled)
});

var jobs = this.QueryAsync(filter);
Expand All @@ -99,7 +102,7 @@ public async Async.Task<Job> Init(Job job) {

public async Async.Task<Job> Stopping(Job job) {
job = job with { State = JobState.Stopping };
var tasks = await _context.TaskOperations.QueryAsync(filter: $"job_id eq '{job.JobId}'").ToListAsync();
var tasks = await _context.TaskOperations.QueryAsync(Query.PartitionKey(job.JobId.ToString())).ToListAsync();
var taskNotStopped = tasks.ToLookup(task => task.State != TaskState.Stopped);

var notStopped = taskNotStopped[true];
Expand Down
11 changes: 7 additions & 4 deletions src/ApiService/ApiService/onefuzzlib/PoolOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,18 @@ public async Async.Task<Pool> SetState(Pool pool, PoolState state) {
}

pool = pool with { State = state };
await Replace(pool);
var r = await Replace(pool);
if (!r.IsOk) {
_logTracer.Error($"failed to replace pool {pool.PoolId} when setting state due to {r.ErrorV}");
}
return pool;
}

public async Async.Task<Pool> Init(Pool pool) {
await _context.Queue.CreateQueue(GetPoolQueue(pool.PoolId), StorageType.Corpus);
var shrinkQueue = new ShrinkQueue(pool.PoolId, _context.Queue, _logTracer);
await shrinkQueue.Create();
await SetState(pool, PoolState.Running);
var _ = await SetState(pool, PoolState.Running);
return pool;
}

Expand All @@ -180,7 +183,7 @@ public async Async.Task<Pool> Shutdown(Pool pool) {
if (scalesets is not null) {
await foreach (var scaleset in scalesets) {
if (scaleset is not null) {
await _context.ScalesetOperations.SetShutdown(scaleset, now: true);
var _ = await _context.ScalesetOperations.SetShutdown(scaleset, now: true);
}
}
}
Expand Down Expand Up @@ -217,7 +220,7 @@ public async Async.Task<Pool> Halt(Pool pool) {
if (scalesets is not null) {
await foreach (var scaleset in scalesets) {
if (scaleset is not null) {
await _context.ScalesetOperations.SetState(scaleset, ScalesetState.Halt);
var _ = await _context.ScalesetOperations.SetState(scaleset, ScalesetState.Halt);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/ApiService/ApiService/onefuzzlib/ReproOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public ReproOperations(ILogTracer log, IOnefuzzContext context)
}

public IAsyncEnumerable<Repro> SearchExpired() {
return QueryAsync(filter: $"end_time lt datetime'{DateTime.UtcNow.ToString("o")}'");
return QueryAsync(filter: Query.OlderThan("end_time", DateTimeOffset.UtcNow));
}

public async Async.Task<Vm> GetVm(Repro repro, InstanceConfig config) {
Expand Down
2 changes: 1 addition & 1 deletion src/ApiService/ApiService/onefuzzlib/ScalesetOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ await Async.Task.WhenAll(nodes
}

public async Task<OneFuzzResult<Scaleset>> GetById(Guid scalesetId) {
var data = QueryAsync(filter: $"RowKey eq '{scalesetId}'");
var data = QueryAsync(filter: Query.RowKey(scalesetId.ToString()));
var scaleSets = data is not null ? (await data.ToListAsync()) : null;

if (scaleSets == null || scaleSets.Count == 0) {
Expand Down
23 changes: 13 additions & 10 deletions src/ApiService/ApiService/onefuzzlib/Scheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,16 @@ private async Async.Task<bool> ScheduleWorkset(WorkSet workSet, Pool pool, long
}

var result = await BuildWorkunit(task, poolCache);
if (result is var (newBucketConfig, workUnit)) {
if (result.IsOk) {
var (newBucketConfig, workUnit) = result.OkV;
if (bucketConfig is null) {
bucketConfig = newBucketConfig;
} else if (bucketConfig != newBucketConfig) {
throw new Exception($"bucket configs differ: {bucketConfig} VS {newBucketConfig}");
}

workUnits.Add(workUnit);
} else {
await _taskOperations.MarkFailed(task, result.ErrorV);
}
}

Expand Down Expand Up @@ -136,10 +138,10 @@ record PoolKey(
return null;
}

private async Async.Task<(BucketConfig, WorkUnit)?> BuildWorkunit(Task task, Dictionary<PoolKey, Pool> poolCache) {
private async Async.Task<OneFuzzResult<(BucketConfig, WorkUnit)>> BuildWorkunit(Task task, Dictionary<PoolKey, Pool> poolCache) {
var poolKey = GetPoolKey(task);
if (poolKey is null) {
return null;
return OneFuzzResult<(BucketConfig, WorkUnit)>.Error(ErrorCode.UNABLE_TO_FIND, $"unable to find pool key for the task {task.TaskId} in job {task.JobId}");
}

// we cache the pools by key so that we only fetch each pool once
Expand All @@ -149,7 +151,7 @@ record PoolKey(
var foundPool = await _taskOperations.GetPool(task);
if (foundPool is null) {
_logTracer.Info($"unable to find pool for task: {task.TaskId}");
return null;
return OneFuzzResult<(BucketConfig, WorkUnit)>.Error(ErrorCode.UNABLE_TO_FIND, $"unable to find pool for the task {task.TaskId} in job {task.JobId}");
}

pool = poolCache[poolKey] = foundPool;
Expand All @@ -159,13 +161,14 @@ record PoolKey(

var job = await _jobOperations.Get(task.JobId);
if (job is null) {
throw new Exception($"invalid job_id {task.JobId} for task {task.TaskId}");
_logTracer.Error($"invalid job_id {task.JobId} for task {task.TaskId}");
return OneFuzzResult<(BucketConfig, WorkUnit)>.Error(ErrorCode.INVALID_JOB, $"invalid job_id {task.JobId} for task {task.TaskId}");
}

var taskConfig = await _config.BuildTaskConfig(job, task);
if (taskConfig is null) {
_logTracer.Info($"unable to build task config for task: {task.TaskId}");
return null;
_logTracer.Error($"unable to build task config for task: {task.TaskId}");
return OneFuzzResult<(BucketConfig, WorkUnit)>.Error(ErrorCode.INVALID_CONFIGURATION, $"unable to build task config for task: {task.TaskId} in job {task.JobId}");
}
var setupContainer = task.Config.Containers?.FirstOrDefault(c => c.Type == ContainerType.Setup) ?? throw new Exception($"task missing setup container: task_type = {task.Config.Task.Type}");

Expand All @@ -190,7 +193,7 @@ record PoolKey(
count = vm.Count;
reboot = (vm.RebootAfterSetup ?? false) || (task.Config.Task.RebootAfterSetup ?? false);
} else {
throw new Exception("Either Pool or VM should be set");
return OneFuzzResult<(BucketConfig, WorkUnit)>.Error(ErrorCode.INVALID_CONFIGURATION, $"Either Pool or VM should be set for task: {task.TaskId} in job {task.JobId}");
}

var workUnit = new WorkUnit(
Expand All @@ -208,7 +211,7 @@ record PoolKey(
setupScript,
pool with { ETag = default, TimeStamp = default });

return (bucketConfig, workUnit);
return OneFuzzResult<(BucketConfig, WorkUnit)>.Ok((bucketConfig, workUnit));
}

public record struct BucketId(Os os, Guid jobId, (string, string)? vm, PoolName? pool, Container setupContainer, bool? reboot, Guid? unique);
Expand Down
21 changes: 13 additions & 8 deletions src/ApiService/ApiService/onefuzzlib/TaskOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,11 @@ public IAsyncEnumerable<Task> GetByTaskIds(IEnumerable<Guid> taskId) {
}

public IAsyncEnumerable<Task> GetByJobId(Guid jobId) {

return QueryAsync(filter: $"PartitionKey eq '{jobId}'");
return QueryAsync(Query.PartitionKey(jobId.ToString()));
}

public async Async.Task<Task?> GetByJobIdAndTaskId(Guid jobId, Guid taskId) {
var data = QueryAsync(filter: $"PartitionKey eq '{jobId}' and RowKey eq '{taskId}'");

var data = QueryAsync(Query.SingleEntity(jobId.ToString(), taskId.ToString()));
return await data.FirstOrDefaultAsync();
}
public IAsyncEnumerable<Task> SearchStates(Guid? jobId = null, IEnumerable<TaskState>? states = null) {
Expand Down Expand Up @@ -94,7 +92,7 @@ public IAsyncEnumerable<Task> SearchStates(Guid? jobId = null, IEnumerable<TaskS
}

public IAsyncEnumerable<Task> SearchExpired() {
var timeFilter = $"end_time lt datetime'{DateTimeOffset.UtcNow.ToString("o")}'";
var timeFilter = Query.OlderThan("end_time", DateTimeOffset.UtcNow);
var stateFilter = Query.EqualAnyEnum("state", TaskStateHelper.AvailableStates);
var filter = Query.And(stateFilter, timeFilter);
return QueryAsync(filter: filter);
Expand All @@ -109,7 +107,7 @@ public async Async.Task MarkStopping(Task task) {
if (!task.State.HasStarted()) {
await MarkFailed(task, new Error(Code: ErrorCode.TASK_FAILED, Errors: new[] { "task never started" }));
} else {
await SetState(task, TaskState.Stopping);
var _ = await SetState(task, TaskState.Stopping);
}
}

Expand All @@ -122,7 +120,7 @@ public async Async.Task MarkFailed(Task task, Error error, List<Task>? taskInJob
return;
}

_logTracer.Error($"task failed {task.JobId}:{task.TaskId} - {error}");
_logTracer.Info($"task failed {task.JobId}:{task.TaskId} - {error}");

task = await SetState(task with { Error = error }, TaskState.Stopping);
await MarkDependantsFailed(task, taskInJob);
Expand Down Expand Up @@ -209,7 +207,10 @@ public async Task<OneFuzzResult<Task>> Create(TaskConfig config, Guid jobId, Use

var task = new Task(jobId, Guid.NewGuid(), TaskState.Init, os, config, UserInfo: userInfo);

await _context.TaskOperations.Insert(task);
var r = await _context.TaskOperations.Insert(task);
if (!r.IsOk) {
_logTracer.Error($"failed to insert task {task.TaskId} due to {r.ErrorV}");
}
await _context.Events.SendEvent(new EventTaskCreated(jobId, task.TaskId, config, userInfo));

_logTracer.Info($"created task. job_id:{jobId} task_id:{task.TaskId} type:{task.Config.Task.Type}");
Expand Down Expand Up @@ -267,6 +268,10 @@ public async Async.Task<bool> CheckPrereqTasks(Task task) {
return false;
}

if (t.JobId != task.JobId) {
_logTracer.Critical("Tasks are not from the same job");
}

if (!t.State.HasStarted()) {
return false;
}
Expand Down
6 changes: 3 additions & 3 deletions src/ApiService/ApiService/onefuzzlib/WebhookOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,14 @@ private async Async.Task<bool> Send(WebhookMessageLog message) {
}

public IAsyncEnumerable<WebhookMessageLog> SearchExpired() {
var expireTime = (DateTimeOffset.UtcNow - TimeSpan.FromDays(EXPIRE_DAYS)).ToString("o");
var expireTime = DateTimeOffset.UtcNow - TimeSpan.FromDays(EXPIRE_DAYS);

var timeFilter = $"Timestamp lt datetime'{expireTime}'";
var timeFilter = Query.TimestampOlderThan(expireTime);
return QueryAsync(filter: timeFilter);
}

public async Async.Task<WebhookMessageLog?> GetWebhookMessageById(Guid webhookId, Guid eventId) {
var data = QueryAsync(filter: $"PartitionKey eq '{webhookId}' and RowKey eq '{eventId}'");
var data = QueryAsync(filter: Query.SingleEntity(webhookId.ToString(), eventId.ToString()));

return await data.FirstOrDefaultAsync();
}
Expand Down
6 changes: 5 additions & 1 deletion src/ApiService/ApiService/onefuzzlib/orm/Orm.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ static StatefulOrm() {
null => null,
MethodInfo info => new Lazy<Func<object>>(() => (Func<object>)Delegate.CreateDelegate(typeof(Func<object>), info), true)
};

return;
}

public StatefulOrm(ILogTracer logTracer, IOnefuzzContext context) : base(logTracer, context) {
Expand All @@ -215,7 +217,9 @@ public StatefulOrm(ILogTracer logTracer, IOnefuzzContext context) : base(logTrac
};

if (func != null) {
_logTracer.Info($"processing state update: {typeof(T)} - PartitionKey {_partitionKeyGetter?.Value()} {_rowKeyGetter?.Value()} - {state}");
var partitionKey = _partitionKeyGetter?.Value();
var rowKey = _rowKeyGetter?.Value();
_logTracer.Info($"processing state update: {typeof(T)} - PartitionKey: {partitionKey} RowKey: {rowKey} - {state}");
return await func(entity);
} else {
_logTracer.Info($"State function for state: '{state}' not found on type {typeof(T)}");
Expand Down
12 changes: 9 additions & 3 deletions src/ApiService/ApiService/onefuzzlib/orm/Queries.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,20 @@ public static string TimeRange(DateTimeOffset min, DateTimeOffset max) {
return TableClient.CreateQueryFilter($"Timestamp lt {max} and Timestamp gt {min}");
}

public static string NewerThan(DateTimeOffset t) {
public static string TimestampNewerThan(DateTimeOffset t) {
return TableClient.CreateQueryFilter($"Timestamp gt {t}");
}

public static string OlderThan(DateTimeOffset t) {
public static string NewerThan(string field, DateTimeOffset t) {
return $"{field} gt {TableClient.CreateQueryFilter($"{t}")}";
}
public static string TimestampOlderThan(DateTimeOffset t) {
return TableClient.CreateQueryFilter($"Timestamp lt {t}");
}

public static string OlderThan(string field, DateTimeOffset t) {
return $"{field} lt {TableClient.CreateQueryFilter($"{t}")}";
}

public static string StartsWith(string property, string prefix) {
var upperBound = prefix[..(prefix.Length - 1)] + (char)(prefix.Last() + 1);
// property name should not be escaped, but strings should be:
Expand Down

0 comments on commit b647e4a

Please sign in to comment.