Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Commit

Permalink
Merge branch 'main' into Release-8.7.0
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamL-Microsoft authored Aug 18, 2023
2 parents 74c04cf + 804b299 commit 3df9987
Show file tree
Hide file tree
Showing 33 changed files with 671 additions and 40 deletions.
60 changes: 60 additions & 0 deletions src/ApiService/ApiService/Functions/QueueJobResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using System.Text.Json;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
namespace Microsoft.OneFuzz.Service.Functions;


public class QueueJobResult {
private readonly ILogger _log;
private readonly IOnefuzzContext _context;

public QueueJobResult(ILogger<QueueJobResult> logTracer, IOnefuzzContext context) {
_log = logTracer;
_context = context;
}

[Function("QueueJobResult")]
public async Async.Task Run([QueueTrigger("job-result", Connection = "AzureWebJobsStorage")] string msg) {

var _tasks = _context.TaskOperations;
var _jobs = _context.JobOperations;

_log.LogInformation("job result: {msg}", msg);
var jr = JsonSerializer.Deserialize<TaskJobResultEntry>(msg, EntityConverter.GetJsonSerializerOptions()).EnsureNotNull($"wrong data {msg}");

var task = await _tasks.GetByTaskId(jr.TaskId);
if (task == null) {
_log.LogWarning("invalid {TaskId}", jr.TaskId);
return;
}

var job = await _jobs.Get(task.JobId);
if (job == null) {
_log.LogWarning("invalid {JobId}", task.JobId);
return;
}

JobResultData? data = jr.Data;
if (data == null) {
_log.LogWarning($"job result data is empty, throwing out: {jr}");
return;
}

var jobResultType = data.Type;
_log.LogInformation($"job result data type: {jobResultType}");

Dictionary<string, double> value;
if (jr.Value.Count > 0) {
value = jr.Value;
} else {
_log.LogWarning($"job result data is empty, throwing out: {jr}");
return;
}

var jobResult = await _context.JobResultOperations.CreateOrUpdate(job.JobId, jobResultType, value);
if (!jobResult.IsOk) {
_log.LogError("failed to create or update with job result {JobId}", job.JobId);
}
}
}
45 changes: 45 additions & 0 deletions src/ApiService/ApiService/OneFuzzTypes/Model.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ public enum HeartbeatType {
TaskAlive,
}

[SkipRename]
public enum JobResultType {
NewCrashingInput,
NoReproCrashingInput,
NewReport,
NewUniqueReport,
NewRegressionReport,
NewCoverage,
NewCrashDump,
CoverageData,
RuntimeStats,
}

public record HeartbeatData(HeartbeatType Type);

public record TaskHeartbeatEntry(
Expand All @@ -41,6 +54,16 @@ public record TaskHeartbeatEntry(
Guid MachineId,
HeartbeatData[] Data);

public record JobResultData(JobResultType Type);

public record TaskJobResultEntry(
Guid TaskId,
Guid? JobId,
Guid MachineId,
JobResultData Data,
Dictionary<string, double> Value
);

public record NodeHeartbeatEntry(Guid NodeId, HeartbeatData[] Data);

public record NodeCommandStopIfFree();
Expand Down Expand Up @@ -892,6 +915,27 @@ public record SecretAddress<T>(Uri Url) : ISecret<T> {
public record SecretData<T>(ISecret<T> Secret) {
}

public record JobResult(
[PartitionKey][RowKey] Guid JobId,
string Project,
string Name,
double NewCrashingInput = 0,
double NoReproCrashingInput = 0,
double NewReport = 0,
double NewUniqueReport = 0,
double NewRegressionReport = 0,
double NewCrashDump = 0,
double InstructionsCovered = 0,
double TotalInstructions = 0,
double CoverageRate = 0,
double IterationCount = 0
) : EntityBase() {
public JobResult(Guid JobId, string Project, string Name) : this(
JobId: JobId,
Project: Project,
Name: Name, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) { }
}

public record JobConfig(
string Project,
string Name,
Expand Down Expand Up @@ -1056,6 +1100,7 @@ public record TaskUnitConfig(
string? InstanceTelemetryKey,
string? MicrosoftTelemetryKey,
Uri HeartbeatQueue,
Uri JobResultQueue,
Dictionary<string, string> Tags
) {
public Uri? inputQueue { get; set; }
Expand Down
1 change: 1 addition & 0 deletions src/ApiService/ApiService/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public static async Async.Task Main() {
.AddScoped<IVmOperations, VmOperations>()
.AddScoped<ISecretsOperations, SecretsOperations>()
.AddScoped<IJobOperations, JobOperations>()
.AddScoped<IJobResultOperations, JobResultOperations>()
.AddScoped<INsgOperations, NsgOperations>()
.AddScoped<IScheduler, Scheduler>()
.AddScoped<IConfig, Config>()
Expand Down
1 change: 1 addition & 0 deletions src/ApiService/ApiService/onefuzzlib/Config.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ private static BlobContainerSasPermissions ConvertPermissions(ContainerPermissio
InstanceTelemetryKey: _serviceConfig.ApplicationInsightsInstrumentationKey,
MicrosoftTelemetryKey: _serviceConfig.OneFuzzTelemetry,
HeartbeatQueue: await _queue.GetQueueSas("task-heartbeat", StorageType.Config, QueueSasPermissions.Add) ?? throw new Exception("unable to get heartbeat queue sas"),
JobResultQueue: await _queue.GetQueueSas("job-result", StorageType.Config, QueueSasPermissions.Add) ?? throw new Exception("unable to get heartbeat queue sas"),
Tags: task.Config.Tags ?? new Dictionary<string, string>()
);

Expand Down
125 changes: 125 additions & 0 deletions src/ApiService/ApiService/onefuzzlib/JobResultOperations.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
using ApiService.OneFuzzLib.Orm;
using Microsoft.Extensions.Logging;
using Polly;
namespace Microsoft.OneFuzz.Service;

public interface IJobResultOperations : IOrm<JobResult> {

Async.Task<JobResult?> GetJobResult(Guid jobId);
Async.Task<OneFuzzResultVoid> CreateOrUpdate(Guid jobId, JobResultType resultType, Dictionary<string, double> resultValue);

}
public class JobResultOperations : Orm<JobResult>, IJobResultOperations {

public JobResultOperations(ILogger<JobResultOperations> log, IOnefuzzContext context)
: base(log, context) {
}

public async Async.Task<JobResult?> GetJobResult(Guid jobId) {
return await SearchByPartitionKeys(new[] { jobId.ToString() }).SingleOrDefaultAsync();
}

private JobResult UpdateResult(JobResult result, JobResultType type, Dictionary<string, double> resultValue) {

var newResult = result;
double newValue;
switch (type) {
case JobResultType.NewCrashingInput:
newValue = result.NewCrashingInput + resultValue["count"];
newResult = result with { NewCrashingInput = newValue };
break;
case JobResultType.NewReport:
newValue = result.NewReport + resultValue["count"];
newResult = result with { NewReport = newValue };
break;
case JobResultType.NewUniqueReport:
newValue = result.NewUniqueReport + resultValue["count"];
newResult = result with { NewUniqueReport = newValue };
break;
case JobResultType.NewRegressionReport:
newValue = result.NewRegressionReport + resultValue["count"];
newResult = result with { NewRegressionReport = newValue };
break;
case JobResultType.NewCrashDump:
newValue = result.NewCrashDump + resultValue["count"];
newResult = result with { NewCrashDump = newValue };
break;
case JobResultType.CoverageData:
double newCovered = resultValue["covered"];
double newTotalCovered = resultValue["features"];
double newCoverageRate = resultValue["rate"];
newResult = result with { InstructionsCovered = newCovered, TotalInstructions = newTotalCovered, CoverageRate = newCoverageRate };
break;
case JobResultType.RuntimeStats:
double newTotalIterations = resultValue["total_count"];
newResult = result with { IterationCount = newTotalIterations };
break;
default:
_logTracer.LogWarning($"Invalid Field {type}.");
break;
}
_logTracer.LogInformation($"Attempting to log new result: {newResult}");
return newResult;
}

private async Async.Task<bool> TryUpdate(Job job, JobResultType resultType, Dictionary<string, double> resultValue) {
var jobId = job.JobId;

var jobResult = await GetJobResult(jobId);

if (jobResult == null) {
_logTracer.LogInformation("Creating new JobResult for Job {JobId}", jobId);

var entry = new JobResult(JobId: jobId, Project: job.Config.Project, Name: job.Config.Name);

jobResult = UpdateResult(entry, resultType, resultValue);

var r = await Insert(jobResult);
if (!r.IsOk) {
_logTracer.AddHttpStatus(r.ErrorV);
_logTracer.LogError("failed to insert job result {JobId}", jobResult.JobId);
throw new InvalidOperationException($"failed to insert job result {jobResult.JobId}");
}
_logTracer.LogInformation("created job result {JobId}", jobResult.JobId);
} else {
_logTracer.LogInformation("Updating existing JobResult entry for Job {JobId}", jobId);

jobResult = UpdateResult(jobResult, resultType, resultValue);

var r = await Update(jobResult);
if (!r.IsOk) {
_logTracer.AddHttpStatus(r.ErrorV);
_logTracer.LogError("failed to update job result {JobId}", jobResult.JobId);
throw new InvalidOperationException($"failed to insert job result {jobResult.JobId}");
}
_logTracer.LogInformation("updated job result {JobId}", jobResult.JobId);
}

return true;
}

public async Async.Task<OneFuzzResultVoid> CreateOrUpdate(Guid jobId, JobResultType resultType, Dictionary<string, double> resultValue) {

var job = await _context.JobOperations.Get(jobId);
if (job == null) {
return OneFuzzResultVoid.Error(ErrorCode.INVALID_REQUEST, "invalid job");
}

var success = false;
try {
_logTracer.LogInformation("attempt to update job result {JobId}", job.JobId);
var policy = Policy.Handle<InvalidOperationException>().WaitAndRetryAsync(50, _ => new TimeSpan(0, 0, 5));
await policy.ExecuteAsync(async () => {
success = await TryUpdate(job, resultType, resultValue);
_logTracer.LogInformation("attempt {success}", success);
});
return OneFuzzResultVoid.Ok;
} catch (Exception e) {
return OneFuzzResultVoid.Error(ErrorCode.UNABLE_TO_UPDATE, new string[] {
$"Unexpected failure when attempting to update job result for {job.JobId}",
$"Exception: {e}"
});
}
}
}

2 changes: 2 additions & 0 deletions src/ApiService/ApiService/onefuzzlib/OnefuzzContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public interface IOnefuzzContext {
IExtensions Extensions { get; }
IIpOperations IpOperations { get; }
IJobOperations JobOperations { get; }
IJobResultOperations JobResultOperations { get; }
ILogAnalytics LogAnalytics { get; }
INodeMessageOperations NodeMessageOperations { get; }
INodeOperations NodeOperations { get; }
Expand Down Expand Up @@ -83,6 +84,7 @@ public OnefuzzContext(IServiceProvider serviceProvider) {
public IVmOperations VmOperations => _serviceProvider.GetRequiredService<IVmOperations>();
public ISecretsOperations SecretsOperations => _serviceProvider.GetRequiredService<ISecretsOperations>();
public IJobOperations JobOperations => _serviceProvider.GetRequiredService<IJobOperations>();
public IJobResultOperations JobResultOperations => _serviceProvider.GetRequiredService<IJobResultOperations>();
public IScheduler Scheduler => _serviceProvider.GetRequiredService<IScheduler>();
public IConfig Config => _serviceProvider.GetRequiredService<IConfig>();
public ILogAnalytics LogAnalytics => _serviceProvider.GetRequiredService<ILogAnalytics>();
Expand Down
3 changes: 3 additions & 0 deletions src/ApiService/IntegrationTests/Fakes/TestContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public TestContext(IHttpClientFactory httpClientFactory, OneFuzzLoggerProvider p
TaskOperations = new TaskOperations(provider.CreateLogger<TaskOperations>(), Cache, this);
NodeOperations = new NodeOperations(provider.CreateLogger<NodeOperations>(), this);
JobOperations = new JobOperations(provider.CreateLogger<JobOperations>(), this);
JobResultOperations = new JobResultOperations(provider.CreateLogger<JobResultOperations>(), this);
NodeTasksOperations = new NodeTasksOperations(provider.CreateLogger<NodeTasksOperations>(), this);
TaskEventOperations = new TaskEventOperations(provider.CreateLogger<TaskEventOperations>(), this);
NodeMessageOperations = new NodeMessageOperations(provider.CreateLogger<NodeMessageOperations>(), this);
Expand All @@ -57,6 +58,7 @@ public Async.Task InsertAll(params EntityBase[] objs)
Node n => NodeOperations.Insert(n),
Pool p => PoolOperations.Insert(p),
Job j => JobOperations.Insert(j),
JobResult jr => JobResultOperations.Insert(jr),
Repro r => ReproOperations.Insert(r),
Scaleset ss => ScalesetOperations.Insert(ss),
NodeTasks nt => NodeTasksOperations.Insert(nt),
Expand Down Expand Up @@ -84,6 +86,7 @@ public Async.Task InsertAll(params EntityBase[] objs)

public ITaskOperations TaskOperations { get; }
public IJobOperations JobOperations { get; }
public IJobResultOperations JobResultOperations { get; }
public INodeOperations NodeOperations { get; }
public INodeTasksOperations NodeTasksOperations { get; }
public ITaskEventOperations TaskEventOperations { get; }
Expand Down
16 changes: 16 additions & 0 deletions src/agent/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ members = [
"onefuzz",
"onefuzz-task",
"onefuzz-agent",
"onefuzz-result",
"onefuzz-file-format",
"onefuzz-telemetry",
"reqwest-retry",
Expand Down
Loading

0 comments on commit 3df9987

Please sign in to comment.