Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Commit

Permalink
Introducing OnefuzzContext to prevent circular references (#1886)
Browse files Browse the repository at this point in the history
  • Loading branch information
chkeita authored May 3, 2022
1 parent 7676549 commit eca88cb
Show file tree
Hide file tree
Showing 17 changed files with 262 additions and 282 deletions.
10 changes: 6 additions & 4 deletions src/ApiService/ApiService/OneFuzzTypes/Model.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,19 @@ public record Node
[RowKey] Guid MachineId,
Guid? PoolId,
string Version,

DateTimeOffset? Heartbeat = null,
DateTimeOffset? InitializedAt = null,
NodeState State = NodeState.Init,
List<NodeTasks>? Tasks = null,
List<NodeCommand>? Messages = null,

Guid? ScalesetId = null,
bool ReimageRequested = false,
bool DeleteRequested = false,
bool DebugKeepNode = false
) : StatefulEntityBase<NodeState>(State);
) : StatefulEntityBase<NodeState>(State) {

public List<NodeTasks>? Tasks { get; set; }
public List<NodeCommand>? Messages { get; set; }
}


public record Forward
Expand Down
1 change: 1 addition & 0 deletions src/ApiService/ApiService/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public static void Main() {
.AddScoped<IVmssOperations, VmssOperations>()
.AddScoped<INodeTasksOperations, NodeTasksOperations>()
.AddScoped<INodeMessageOperations, NodeMessageOperations>()
.AddScoped<IOnefuzzContext, OnefuzzContext>()

.AddSingleton<ICreds, Creds>()
.AddSingleton<IServiceConfig, ServiceConfiguration>()
Expand Down
16 changes: 8 additions & 8 deletions src/ApiService/ApiService/QueueNodeHearbeat.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@ namespace Microsoft.OneFuzz.Service;
public class QueueNodeHearbeat {
private readonly ILogTracer _log;

private readonly IEvents _events;
private readonly INodeOperations _nodes;
private readonly IOnefuzzContext _context;

public QueueNodeHearbeat(ILogTracer log, INodeOperations nodes, IEvents events) {
public QueueNodeHearbeat(ILogTracer log, IOnefuzzContext context) {
_log = log;
_nodes = nodes;
_events = events;
_context = context;
}

[Function("QueueNodeHearbeat")]
public async Async.Task Run([QueueTrigger("myqueue-items", Connection = "AzureWebJobsStorage")] string msg) {
_log.Info($"heartbeat: {msg}");
var nodes = _context.NodeOperations;
var events = _context.Events;

var hb = JsonSerializer.Deserialize<NodeHeartbeatEntry>(msg, EntityConverter.GetJsonSerializerOptions()).EnsureNotNull($"wrong data {msg}");

var node = await _nodes.GetByMachineId(hb.NodeId);
var node = await nodes.GetByMachineId(hb.NodeId);

var log = _log.WithTag("NodeId", hb.NodeId.ToString());

Expand All @@ -34,14 +34,14 @@ public async Async.Task Run([QueueTrigger("myqueue-items", Connection = "AzureWe

var newNode = node with { Heartbeat = DateTimeOffset.UtcNow };

var r = await _nodes.Replace(newNode);
var r = await nodes.Replace(newNode);

if (!r.IsOk) {
var (status, reason) = r.ErrorV;
log.Error($"Failed to replace heartbeat info due to [{status}] {reason}");
}

// TODO: do we still send event if we fail do update the table ?
await _events.SendEvent(new EventNodeHeartbeat(node.MachineId, node.ScalesetId, node.PoolName));
await events.SendEvent(new EventNodeHeartbeat(node.MachineId, node.ScalesetId, node.PoolName));
}
}
8 changes: 3 additions & 5 deletions src/ApiService/ApiService/onefuzzlib/InstanceConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,14 @@ public interface IConfigOperations : IOrm<InstanceConfig> {
}

public class ConfigOperations : Orm<InstanceConfig>, IConfigOperations {
private readonly IEvents _events;
private readonly ILogTracer _log;

public ConfigOperations(IStorage storage, IEvents events, ILogTracer log, IServiceConfig config) : base(storage, log, config) {
_events = events;
public ConfigOperations(ILogTracer log, IOnefuzzContext context) : base(log, context) {
_log = log;
}

public async Task<InstanceConfig> Fetch() {
var key = _config.OneFuzzInstanceName ?? throw new Exception("Environment variable ONEFUZZ_INSTANCE_NAME is not set");
var key = _context.ServiceConfiguration.OneFuzzInstanceName ?? throw new Exception("Environment variable ONEFUZZ_INSTANCE_NAME is not set");
var config = await GetEntityAsync(key, key);
return config;
}
Expand All @@ -44,6 +42,6 @@ public async Async.Task Save(InstanceConfig config, bool isNew = false, bool req
}
}

await _events.SendEvent(new EventInstanceConfigUpdated(config));
await _context.Events.SendEvent(new EventInstanceConfigUpdated(config));
}
}
6 changes: 2 additions & 4 deletions src/ApiService/ApiService/onefuzzlib/JobOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ public interface IJobOperations : IStatefulOrm<Job, JobState> {
}

public class JobOperations : StatefulOrm<Job, JobState>, IJobOperations {
private readonly IEvents _events;

public JobOperations(IStorage storage, ILogTracer logTracer, IServiceConfig config, IEvents events) : base(storage, logTracer, config) {
_events = events;
public JobOperations(ILogTracer logTracer, IOnefuzzContext context) : base(logTracer, context) {
}

public async Async.Task<Job?> Get(Guid jobId) {
Expand Down Expand Up @@ -56,7 +54,7 @@ public async Async.Task Stopping(Job job, ITaskOperations taskOperations) {
} else {
job = job with { State = JobState.Stopped };
var taskInfo = stopped.Select(t => new JobTaskStopped(t.TaskId, t.Config.Task.Type, t.Error)).ToList();
await _events.SendEvent(new EventJobStopped(job.JobId, job.Config, job.UserInfo, taskInfo));
await _context.Events.SendEvent(new EventJobStopped(job.JobId, job.Config, job.UserInfo, taskInfo));
}

await Replace(job);
Expand Down
95 changes: 34 additions & 61 deletions src/ApiService/ApiService/onefuzzlib/NodeOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,47 +52,20 @@ Async.Task<Node> Create(
/// https://docs.microsoft.com/en-us/azure/azure-monitor/platform/autoscale-common-metrics#commonly-used-storage-metrics

public class NodeOperations : StatefulOrm<Node, NodeState>, INodeOperations {
private IScalesetOperations _scalesetOperations;
private IPoolOperations _poolOperations;
private readonly INodeTasksOperations _nodeTasksOps;
private readonly ITaskOperations _taskOps;
private readonly INodeMessageOperations _nodeMessageOps;
private readonly IEvents _events;
private readonly ILogTracer _log;
private readonly ICreds _creds;

private readonly IVmssOperations _vmssOperations;

public NodeOperations(
IStorage storage,
ILogTracer log,
IServiceConfig config,
ITaskOperations taskOps,
INodeTasksOperations nodeTasksOps,
INodeMessageOperations nodeMessageOps,
IEvents events,
IScalesetOperations scalesetOperations,
IPoolOperations poolOperations,
IVmssOperations vmssOperations,
ICreds creds
IOnefuzzContext context
)
: base(storage, log, config) {

_taskOps = taskOps;
_nodeTasksOps = nodeTasksOps;
_nodeMessageOps = nodeMessageOps;
_events = events;
_scalesetOperations = scalesetOperations;
_poolOperations = poolOperations;
_vmssOperations = vmssOperations;
_creds = creds;
_log = log;
: base(log, context) {

}

public async Task<OneFuzzResultVoid> AcquireScaleInProtection(Node node) {
if (await ScalesetNodeExists(node) && node.ScalesetId != null) {
_logTracer.Info($"Setting scale-in protection on node {node.MachineId}");
return await _vmssOperations.UpdateScaleInProtection((Guid)node.ScalesetId, node.MachineId, protectFromScaleIn: true);
return await _context.VmssOperations.UpdateScaleInProtection((Guid)node.ScalesetId, node.MachineId, protectFromScaleIn: true);
}
return OneFuzzResultVoid.Ok();
}
Expand All @@ -102,19 +75,19 @@ public async Async.Task<bool> ScalesetNodeExists(Node node) {
return false;
}

var scalesetResult = await _scalesetOperations.GetById((Guid)(node.ScalesetId!));
var scalesetResult = await _context.ScalesetOperations.GetById((Guid)(node.ScalesetId!));
if (!scalesetResult.IsOk || scalesetResult.OkV == null) {
return false;
}
var scaleset = scalesetResult.OkV;

var instanceId = await _vmssOperations.GetInstanceId(scaleset.ScalesetId, node.MachineId);
var instanceId = await _context.VmssOperations.GetInstanceId(scaleset.ScalesetId, node.MachineId);
return instanceId.IsOk;
}

public async Task<bool> CanProcessNewWork(Node node) {
if (IsOutdated(node)) {
_logTracer.Info($"can_process_new_work agent and service versions differ, stopping node. machine_id:{node.MachineId} agent_version:{node.Version} service_version:{_config.OneFuzzVersion}");
_logTracer.Info($"can_process_new_work agent and service versions differ, stopping node. machine_id:{node.MachineId} agent_version:{node.Version} service_version:{_context.ServiceConfiguration.OneFuzzVersion}");
await Stop(node, done: true);
return false;
}
Expand Down Expand Up @@ -154,7 +127,7 @@ public async Task<bool> CanProcessNewWork(Node node) {
}

if (node.ScalesetId != null) {
var scalesetResult = await _scalesetOperations.GetById(node.ScalesetId.Value);
var scalesetResult = await _context.ScalesetOperations.GetById(node.ScalesetId.Value);
if (!scalesetResult.IsOk || scalesetResult.OkV == null) {
_logTracer.Info($"can_process_new_work invalid scaleset. scaleset_id:{node.ScalesetId} machine_id:{node.MachineId}");
return false;
Expand All @@ -167,7 +140,7 @@ public async Task<bool> CanProcessNewWork(Node node) {
}
}

var poolResult = await _poolOperations.GetByName(node.PoolName);
var poolResult = await _context.PoolOperations.GetByName(node.PoolName);
if (!poolResult.IsOk || poolResult.OkV == null) {
_logTracer.Info($"can_schedule - invalid pool. pool_name:{node.PoolName} machine_id:{node.MachineId}");
return false;
Expand All @@ -192,7 +165,7 @@ public async Async.Task ReimageLongLivedNodes(Guid scaleSetId) {

await foreach (var node in QueryAsync($"(scaleset_id eq {scaleSetId}) and {timeFilter}")) {
if (node.DebugKeepNode) {
_log.Info($"removing debug_keep_node for expired node. scaleset_id:{node.ScalesetId} machine_id:{node.MachineId}");
_logTracer.Info($"removing debug_keep_node for expired node. scaleset_id:{node.ScalesetId} machine_id:{node.MachineId}");
}
await ToReimage(node with { DebugKeepNode = false });
}
Expand All @@ -209,7 +182,7 @@ public async Async.Task ToReimage(Node node, bool done = false) {

var reimageRequested = node.ReimageRequested;
if (!node.ReimageRequested && !node.DeleteRequested) {
_log.Info($"setting reimage_requested: {node.MachineId}");
_logTracer.Info($"setting reimage_requested: {node.MachineId}");
reimageRequested = true;
}

Expand All @@ -219,7 +192,7 @@ public async Async.Task ToReimage(Node node, bool done = false) {

var r = await Replace(updatedNode);
if (!r.IsOk) {
_log.WithHttpStatus(r.ErrorV).Error("Failed to save Node record");
_logTracer.WithHttpStatus(r.ErrorV).Error("Failed to save Node record");
}
}

Expand Down Expand Up @@ -248,9 +221,9 @@ public async Async.Task<Node> Create(
r = await Update(node);
}
if (!r.IsOk) {
_log.WithHttpStatus(r.ErrorV).Error($"failed to save NodeRecord, isNew: {isNew}");
_logTracer.WithHttpStatus(r.ErrorV).Error($"failed to save NodeRecord, isNew: {isNew}");
} else {
await _events.SendEvent(
await _context.Events.SendEvent(
new EventNodeCreated(
node.MachineId,
node.ScalesetId,
Expand All @@ -273,23 +246,23 @@ public async Async.Task Stop(Node node, bool done = false) {
/// <param name="node"></param>
/// <returns></returns>
public async Async.Task SetHalt(Node node) {
_log.Info($"setting halt: {node.MachineId}");
_logTracer.Info($"setting halt: {node.MachineId}");
var updatedNode = node with { DeleteRequested = true };
await Stop(updatedNode, true);
await SendStopIfFree(updatedNode);
}

public async Async.Task SendStopIfFree(Node node) {
var ver = new Version(_config.OneFuzzVersion.Split('-')[0]);
var ver = new Version(_context.ServiceConfiguration.OneFuzzVersion.Split('-')[0]);
if (ver >= Version.Parse("2.16.1")) {
await SendMessage(node, new NodeCommand(StopIfFree: new NodeCommandStopIfFree()));
}
}

public async Async.Task SendMessage(Node node, NodeCommand message) {
var r = await _nodeMessageOps.Replace(new NodeMessage(node.MachineId, message));
var r = await _context.NodeMessageOperations.Replace(new NodeMessage(node.MachineId, message));
if (!r.IsOk) {
_log.WithHttpStatus(r.ErrorV).Error($"failed to replace NodeMessge record for machine_id: {node.MachineId}");
_logTracer.WithHttpStatus(r.ErrorV).Error($"failed to replace NodeMessge record for machine_id: {node.MachineId}");
}
}

Expand All @@ -301,7 +274,7 @@ public async Async.Task SendMessage(Node node, NodeCommand message) {
}

public bool IsOutdated(Node node) {
return node.Version != _config.OneFuzzVersion;
return node.Version != _context.ServiceConfiguration.OneFuzzVersion;
}

public bool IsTooOld(Node node) {
Expand All @@ -318,7 +291,7 @@ public async Async.Task SetState(Node node, NodeState state) {
var newNode = node;
if (node.State != state) {
newNode = newNode with { State = state };
await _events.SendEvent(new EventNodeStateUpdated(
await _context.Events.SendEvent(new EventNodeStateUpdated(
node.MachineId,
node.ScalesetId,
node.PoolName,
Expand Down Expand Up @@ -375,7 +348,7 @@ public IAsyncEnumerable<Node> SearchStates(
string? poolName = default,
bool excludeUpdateScheduled = false,
int? numResults = default) {
var query = NodeOperations.SearchStatesQuery(_config.OneFuzzVersion, poolId, scaleSetId, states, poolName, excludeUpdateScheduled, numResults);
var query = NodeOperations.SearchStatesQuery(_context.ServiceConfiguration.OneFuzzVersion, poolId, scaleSetId, states, poolName, excludeUpdateScheduled, numResults);
return QueryAsync(query);
}

Expand All @@ -384,10 +357,10 @@ public async Async.Task MarkTasksStoppedEarly(Node node, Error? error = null) {
error = new Error(ErrorCode.TASK_FAILED, new[] { $"node reimaged during task execution. machine_id: {node.MachineId}" });
}

await foreach (var entry in _nodeTasksOps.GetByMachineId(node.MachineId)) {
var task = await _taskOps.GetByTaskId(entry.TaskId);
await foreach (var entry in _context.NodeTasksOperations.GetByMachineId(node.MachineId)) {
var task = await _context.TaskOperations.GetByTaskId(entry.TaskId);
if (task is not null) {
await _taskOps.MarkFailed(task, error);
await _context.TaskOperations.MarkFailed(task, error);
}
if (!node.DebugKeepNode) {
await Delete(node);
Expand All @@ -397,11 +370,11 @@ public async Async.Task MarkTasksStoppedEarly(Node node, Error? error = null) {

public new async Async.Task Delete(Node node) {
await MarkTasksStoppedEarly(node);
await _nodeTasksOps.ClearByMachineId(node.MachineId);
await _nodeMessageOps.ClearMessages(node.MachineId);
await _context.NodeTasksOperations.ClearByMachineId(node.MachineId);
await _context.NodeMessageOperations.ClearMessages(node.MachineId);
await base.Delete(node);

await _events.SendEvent(new EventNodeDeleted(node.MachineId, node.ScalesetId, node.PoolName));
await _context.Events.SendEvent(new EventNodeDeleted(node.MachineId, node.ScalesetId, node.PoolName));
}

}
Expand All @@ -419,8 +392,8 @@ public class NodeTasksOperations : StatefulOrm<NodeTasks, NodeTaskState>, INodeT

ILogTracer _log;

public NodeTasksOperations(IStorage storage, ILogTracer log, IServiceConfig config)
: base(storage, log, config) {
public NodeTasksOperations(ILogTracer log, IOnefuzzContext context)
: base(log, context) {
_log = log;
}

Expand Down Expand Up @@ -453,11 +426,11 @@ public IAsyncEnumerable<NodeTasks> GetByTaskId(Guid taskId) {
}

public async Async.Task ClearByMachineId(Guid machineId) {
_log.Info($"clearing tasks for node {machineId}");
_logTracer.Info($"clearing tasks for node {machineId}");
await foreach (var entry in GetByMachineId(machineId)) {
var res = await Delete(entry);
if (!res.IsOk) {
_log.WithHttpStatus(res.ErrorV).Error($"failed to delete node task entry for machine_id: {entry.MachineId}");
_logTracer.WithHttpStatus(res.ErrorV).Error($"failed to delete node task entry for machine_id: {entry.MachineId}");
}
}
}
Expand All @@ -484,7 +457,7 @@ public interface INodeMessageOperations : IOrm<NodeMessage> {
public class NodeMessageOperations : Orm<NodeMessage>, INodeMessageOperations {

private readonly ILogTracer _log;
public NodeMessageOperations(IStorage storage, ILogTracer log, IServiceConfig config) : base(storage, log, config) {
public NodeMessageOperations(ILogTracer log, IOnefuzzContext context) : base(log, context) {
_log = log;
}

Expand All @@ -493,12 +466,12 @@ public IAsyncEnumerable<NodeMessage> GetMessage(Guid machineId) {
}

public async Async.Task ClearMessages(Guid machineId) {
_log.Info($"clearing messages for node {machineId}");
_logTracer.Info($"clearing messages for node {machineId}");

await foreach (var message in GetMessage(machineId)) {
var r = await Delete(message);
if (!r.IsOk) {
_log.WithHttpStatus(r.ErrorV).Error($"failed to delete message for node {machineId}");
_logTracer.WithHttpStatus(r.ErrorV).Error($"failed to delete message for node {machineId}");
}
}
}
Expand Down
Loading

0 comments on commit eca88cb

Please sign in to comment.