Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Commit

Permalink
Migrating QueueTaskHeartbeat (#1777)
Browse files Browse the repository at this point in the history
* Migrating QueueTaskHeartbeat

* changing the name of the input queue

* rename type alias Tasks to Async

* Fix property casing

* fixing types

* Removing IStorageProvider

* fix function name

* address PR comments
  • Loading branch information
chkeita authored and AdamL-Microsoft committed Apr 18, 2022
1 parent 1835f3d commit 8eb8aa9
Show file tree
Hide file tree
Showing 14 changed files with 288 additions and 122 deletions.
63 changes: 63 additions & 0 deletions src/ApiService/ApiService/OneFuzzTypes/Enums.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,67 @@ public enum WebhookMessageState
Retrying,
Succeeded,
Failed
}

public enum TaskState
{
Init,
Waiting,
Scheduled,
Setting_up,
Running,
Stopping,
Stopped,
WaitJob
}

public enum TaskType
{
Coverage,
LibfuzzerFuzz,
LibfuzzerCoverage,
LibfuzzerCrashReport,
LibfuzzerMerge,
LibfuzzerRegression,
GenericAnalysis,
GenericSupervisor,
GenericMerge,
GenericGenerator,
GenericCrashReport,
GenericRegression
}

public enum Os
{
Windows,
Linux
}

public enum ContainerType
{
Analysis,
Coverage,
Crashes,
Inputs,
NoRepro,
ReadonlyInputs,
Reports,
Setup,
Tools,
UniqueInputs,
UniqueReports,
RegressionReports,
Logs
}


public enum StatsFormat
{
AFL
}

public enum TaskDebugFlag
{
KeepNodeOnFailure,
KeepNodeOnCompletion,
}
10 changes: 5 additions & 5 deletions src/ApiService/ApiService/OneFuzzTypes/Events.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ public EventType GetEventType()
// ) : BaseEvent();


//record EventTaskHeartbeat(
// JobId: Guid,
// TaskId: Guid,
// Config: TaskConfig
//): BaseEvent();
record EventTaskHeartbeat(
Guid JobId,
Guid TaskId,
TaskConfig Config
) : BaseEvent();


//record EventPing(
Expand Down
159 changes: 123 additions & 36 deletions src/ApiService/ApiService/OneFuzzTypes/Model.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
using System;
using System.Collections.Generic;
using PoolName = System.String;
using Region = System.String;
using Container = System.String;

namespace Microsoft.OneFuzz.Service;

Expand Down Expand Up @@ -29,9 +31,15 @@ public enum HeartbeatType
TaskAlive,
}

public record HeartbeatData(HeartbeatType type);
public record HeartbeatData(HeartbeatType Type);

public record NodeHeartbeatEntry(Guid NodeId, HeartbeatData[] data);
public record TaskHeartbeatEntry(
Guid TaskId,
Guid? JobId,
Guid MachineId,
HeartbeatData[] Data
);
public record NodeHeartbeatEntry(Guid NodeId, HeartbeatData[] Data);

public record NodeCommandStopIfFree();

Expand Down Expand Up @@ -79,7 +87,7 @@ public enum NodeState

public record ProxyHeartbeat
(
string Region,
Region Region,
Guid ProxyId,
List<ProxyForward> Forwards,
DateTimeOffset TimeStamp
Expand All @@ -102,35 +110,35 @@ bool DebugKeepNode


public partial record ProxyForward
(
[PartitionKey] string Region,
(
[PartitionKey] Region Region,
[RowKey] int DstPort,
int SrcPort,
int SrcPort,
string DstIp
) : EntityBase();

public partial record ProxyConfig
(
Uri Url,
string Notification,
string Region,
Guid? ProxyId,
List<ProxyForward> Forwards,
string InstanceTelemetryKey,
public partial record ProxyConfig
(
Uri Url,
string Notification,
Region Region,
Guid? ProxyId,
List<ProxyForward> Forwards,
string InstanceTelemetryKey,
string MicrosoftTelemetryKey

);

public partial record Proxy
(
[PartitionKey] string Region,
[PartitionKey] Region Region,
[RowKey] Guid ProxyId,
DateTimeOffset? CreatedTimestamp,
VmState State,
Authentication Auth,
string? Ip,
Error? Error,
string Version,
VmState State,
Authentication Auth,
string? Ip,
Error? Error,
string Version,
ProxyHeartbeat? heartbeat
) : EntityBase();

Expand All @@ -148,23 +156,102 @@ String InstanceName
) : EntityBase();


//record AnyHttpUrl(AnyUrl):
// allowed_schemes = {'http', 'https
//


public record TaskDetails(

TaskType Type,
int Duration,
string? TargetExe,
Dictionary<string, string>? TargetEnv,
List<string>? TargetOptions,
int? TargetWorkers,
bool? TargetOptionsMerge,
bool? CheckAsanLog,
bool? CheckDebugger,
int? CheckRetryCount,
bool? CheckFuzzerHelp,
bool? ExpectCrashOnFailure,
bool? RenameOutput,
string? SupervisorExe,
Dictionary<string, string>? SupervisorEnv,
List<string>? SupervisorOptions,
string? SupervisorInputMarker,
string? GeneratorExe,
Dictionary<string, string>? GeneratorEnv,
List<string>? GeneratorOptions,
string? AnalyzerExe,
Dictionary<string, string>? AnalyzerEnv,
List<string> AnalyzerOptions,
ContainerType? WaitForFiles,
string? StatsFile,
StatsFormat? StatsFormat,
bool? RebootAfterSetup,
int? TargetTimeout,
int? EnsembleSyncDelay,
bool? PreserveExistingOutputs,
List<string>? ReportList,
int? MinimizedStackDepth,
string? CoverageFilter
);

public record TaskVm(
Region Region,
string Sku,
string Image,
int Count,
bool SpotInstance,
bool? RebootAfterSetup
);

public record TaskPool(
int Count,
PoolName PoolName
);

//public record TaskConfig(
// Guid jobId,
// List<Guid> PrereqTasks,
// TaskDetails Task,
// TaskVm? vm,
// TaskPool pool: Optional[]
// containers: List[TaskContainers]
// tags: Dict[str, str]
// debug: Optional[List[TaskDebugFlag]]
// colocate: Optional[bool]
// ): EntityBase();
public record TaskContainers(
ContainerType Type,
Container Name
);
public record TaskConfig(
Guid JobId,
List<Guid>? PrereqTasks,
TaskDetails Task,
TaskVm? Vm,
TaskPool? Pool,
List<TaskContainers>? Containers,
Dictionary<string, string>? Tags,
List<TaskDebugFlag>? Debug,
bool? Colocate
);


public record TaskEventSummary(
DateTimeOffset? Timestamp,
string EventData,
string EventType
);


public record NodeAssignment(
Guid NodeId,
Guid? ScalesetId,
NodeTaskState State
);


public record Task(
// Timestamp: Optional[datetime] = Field(alias="Timestamp")
[PartitionKey] Guid JobId,
[RowKey] Guid TaskId,
TaskState State,
Os Os,
TaskConfig Config,
Error? Error,
Authentication? Auth,
DateTimeOffset? Heartbeat,
DateTimeOffset? EndTime,
UserInfo? UserInfo) : EntityBase()
{
List<TaskEventSummary> Events { get; set; } = new List<TaskEventSummary>();
List<NodeAssignment> Nodes { get; set; } = new List<NodeAssignment>();

}
8 changes: 6 additions & 2 deletions src/ApiService/ApiService/Program.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
// to avoid collision with Task in model.cs
global using Async = System.Threading.Tasks;

using System;
using System.Collections.Generic;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
using ApiService.OneFuzzLib;



namespace Microsoft.OneFuzz.Service;

public class Program
Expand Down Expand Up @@ -34,11 +38,11 @@ public static void Main()
.ConfigureServices((context, services) =>
services
.AddSingleton<ILogTracerFactory>(_ => new LogTracerFactory(GetLoggers()))
.AddSingleton<IStorageProvider>(_ => new StorageProvider(EnvironmentVariables.OneFuzz.FuncStorage ?? throw new InvalidOperationException("Missing account id")))
.AddSingleton<INodeOperations, NodeOperations>()
.AddSingleton<IEvents, Events>()
.AddSingleton<IWebhookOperations, WebhookOperations>()
.AddSingleton<IWebhookMessageLogOperations, WebhookMessageLogOperations>()
.AddSingleton<ITaskOperations, TaskOperations>()
.AddSingleton<IQueue, Queue>()
.AddSingleton<ICreds>(_ => new Creds())
.AddSingleton<IStorage, Storage>()
Expand Down
13 changes: 5 additions & 8 deletions src/ApiService/ApiService/QueueFileChanges.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using Microsoft.Azure.Functions.Worker;
using System.Collections.Generic;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
using System.Linq;

Expand All @@ -15,19 +14,17 @@ public class QueueFileChanges
const int MAX_DEQUEUE_COUNT = 5;

private readonly ILogTracerFactory _loggerFactory;
private readonly IStorageProvider _storageProvider;

private readonly IStorage _storage;

public QueueFileChanges(ILogTracerFactory loggerFactory, IStorageProvider storageProvider, IStorage storage)
public QueueFileChanges(ILogTracerFactory loggerFactory, IStorage storage)
{
_loggerFactory = loggerFactory;
_storageProvider = storageProvider;
_storage = storage;
}

[Function("QueueFileChanges")]
public Task Run(
public Async.Task Run(
[QueueTrigger("file-changes-refactored", Connection = "AzureWebJobsStorage")] string msg,
int dequeueCount)
{
Expand All @@ -42,18 +39,18 @@ public Task Run(
if (!fileChangeEvent.ContainsKey(eventType)
|| fileChangeEvent[eventType] != "Microsoft.Storage.BlobCreated")
{
return Task.CompletedTask;
return Async.Task.CompletedTask;
}

const string topic = "topic";
if (!fileChangeEvent.ContainsKey(topic)
|| !_storage.CorpusAccounts(log).Contains(fileChangeEvent[topic]))
{
return Task.CompletedTask;
return Async.Task.CompletedTask;
}

file_added(log, fileChangeEvent, lastTry);
return Task.CompletedTask;
return Async.Task.CompletedTask;
}

private void file_added(ILogTracer log, Dictionary<string, string> fileChangeEvent, bool failTaskOnTransientError)
Expand Down
3 changes: 1 addition & 2 deletions src/ApiService/ApiService/QueueNodeHearbeat.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using Microsoft.Azure.Functions.Worker;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;

namespace Microsoft.OneFuzz.Service;
Expand All @@ -22,7 +21,7 @@ public QueueNodeHearbeat(ILogTracerFactory loggerFactory, INodeOperations nodes,
}

[Function("QueueNodeHearbeat")]
public async Task Run([QueueTrigger("myqueue-items", Connection = "AzureWebJobsStorage")] string msg)
public async Async.Task Run([QueueTrigger("myqueue-items", Connection = "AzureWebJobsStorage")] string msg)
{
var log = _loggerFactory.MakeLogTracer(Guid.NewGuid());
log.Info($"heartbeat: {msg}");
Expand Down
Loading

0 comments on commit 8eb8aa9

Please sign in to comment.