Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Introducing OnefuzzContext to prevent circular references #1886

Merged
merged 4 commits into from
May 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/ApiService/ApiService/OneFuzzTypes/Model.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,19 @@ public record Node
[RowKey] Guid MachineId,
Guid? PoolId,
string Version,

DateTimeOffset? Heartbeat = null,
DateTimeOffset? InitializedAt = null,
NodeState State = NodeState.Init,
List<NodeTasks>? Tasks = null,
List<NodeCommand>? Messages = null,

Guid? ScalesetId = null,
bool ReimageRequested = false,
bool DeleteRequested = false,
bool DebugKeepNode = false
) : StatefulEntityBase<NodeState>(State);
) : StatefulEntityBase<NodeState>(State) {

public List<NodeTasks>? Tasks { get; set; }
public List<NodeCommand>? Messages { get; set; }
}


public record Forward
Expand Down
1 change: 1 addition & 0 deletions src/ApiService/ApiService/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public static void Main() {
.AddScoped<IVmssOperations, VmssOperations>()
.AddScoped<INodeTasksOperations, NodeTasksOperations>()
.AddScoped<INodeMessageOperations, NodeMessageOperations>()
.AddScoped<IOnefuzzContext, OnefuzzContext>()
chkeita marked this conversation as resolved.
Show resolved Hide resolved

.AddSingleton<ICreds, Creds>()
.AddSingleton<IServiceConfig, ServiceConfiguration>()
Expand Down
16 changes: 8 additions & 8 deletions src/ApiService/ApiService/QueueNodeHearbeat.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@ namespace Microsoft.OneFuzz.Service;
public class QueueNodeHearbeat {
private readonly ILogTracer _log;

private readonly IEvents _events;
private readonly INodeOperations _nodes;
private readonly IOnefuzzContext _context;

public QueueNodeHearbeat(ILogTracer log, INodeOperations nodes, IEvents events) {
public QueueNodeHearbeat(ILogTracer log, IOnefuzzContext context) {
_log = log;
_nodes = nodes;
_events = events;
_context = context;
}

[Function("QueueNodeHearbeat")]
public async Async.Task Run([QueueTrigger("myqueue-items", Connection = "AzureWebJobsStorage")] string msg) {
_log.Info($"heartbeat: {msg}");
var nodes = _context.NodeOperations;
var events = _context.Events;

var hb = JsonSerializer.Deserialize<NodeHeartbeatEntry>(msg, EntityConverter.GetJsonSerializerOptions()).EnsureNotNull($"wrong data {msg}");

var node = await _nodes.GetByMachineId(hb.NodeId);
var node = await nodes.GetByMachineId(hb.NodeId);

var log = _log.WithTag("NodeId", hb.NodeId.ToString());

Expand All @@ -34,14 +34,14 @@ public async Async.Task Run([QueueTrigger("myqueue-items", Connection = "AzureWe

var newNode = node with { Heartbeat = DateTimeOffset.UtcNow };

var r = await _nodes.Replace(newNode);
var r = await nodes.Replace(newNode);

if (!r.IsOk) {
var (status, reason) = r.ErrorV;
log.Error($"Failed to replace heartbeat info due to [{status}] {reason}");
}

// TODO: do we still send event if we fail do update the table ?
await _events.SendEvent(new EventNodeHeartbeat(node.MachineId, node.ScalesetId, node.PoolName));
await events.SendEvent(new EventNodeHeartbeat(node.MachineId, node.ScalesetId, node.PoolName));
}
}
8 changes: 3 additions & 5 deletions src/ApiService/ApiService/onefuzzlib/InstanceConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,14 @@ public interface IConfigOperations : IOrm<InstanceConfig> {
}

public class ConfigOperations : Orm<InstanceConfig>, IConfigOperations {
private readonly IEvents _events;
private readonly ILogTracer _log;

public ConfigOperations(IStorage storage, IEvents events, ILogTracer log, IServiceConfig config) : base(storage, log, config) {
_events = events;
public ConfigOperations(ILogTracer log, IOnefuzzContext context) : base(log, context) {
_log = log;
}

public async Task<InstanceConfig> Fetch() {
var key = _config.OneFuzzInstanceName ?? throw new Exception("Environment variable ONEFUZZ_INSTANCE_NAME is not set");
var key = _context.ServiceConfiguration.OneFuzzInstanceName ?? throw new Exception("Environment variable ONEFUZZ_INSTANCE_NAME is not set");
var config = await GetEntityAsync(key, key);
return config;
}
Expand All @@ -44,6 +42,6 @@ public async Async.Task Save(InstanceConfig config, bool isNew = false, bool req
}
}

await _events.SendEvent(new EventInstanceConfigUpdated(config));
await _context.Events.SendEvent(new EventInstanceConfigUpdated(config));
}
}
6 changes: 2 additions & 4 deletions src/ApiService/ApiService/onefuzzlib/JobOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ public interface IJobOperations : IStatefulOrm<Job, JobState> {
}

public class JobOperations : StatefulOrm<Job, JobState>, IJobOperations {
private readonly IEvents _events;

public JobOperations(IStorage storage, ILogTracer logTracer, IServiceConfig config, IEvents events) : base(storage, logTracer, config) {
_events = events;
public JobOperations(ILogTracer logTracer, IOnefuzzContext context) : base(logTracer, context) {
}

public async Async.Task<Job?> Get(Guid jobId) {
Expand Down Expand Up @@ -56,7 +54,7 @@ public async Async.Task Stopping(Job job, ITaskOperations taskOperations) {
} else {
job = job with { State = JobState.Stopped };
var taskInfo = stopped.Select(t => new JobTaskStopped(t.TaskId, t.Config.Task.Type, t.Error)).ToList();
await _events.SendEvent(new EventJobStopped(job.JobId, job.Config, job.UserInfo, taskInfo));
await _context.Events.SendEvent(new EventJobStopped(job.JobId, job.Config, job.UserInfo, taskInfo));
}

await Replace(job);
Expand Down
95 changes: 34 additions & 61 deletions src/ApiService/ApiService/onefuzzlib/NodeOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,47 +52,20 @@ Async.Task<Node> Create(
/// https://docs.microsoft.com/en-us/azure/azure-monitor/platform/autoscale-common-metrics#commonly-used-storage-metrics

public class NodeOperations : StatefulOrm<Node, NodeState>, INodeOperations {
private IScalesetOperations _scalesetOperations;
private IPoolOperations _poolOperations;
private readonly INodeTasksOperations _nodeTasksOps;
private readonly ITaskOperations _taskOps;
private readonly INodeMessageOperations _nodeMessageOps;
private readonly IEvents _events;
private readonly ILogTracer _log;
private readonly ICreds _creds;

private readonly IVmssOperations _vmssOperations;

public NodeOperations(
IStorage storage,
ILogTracer log,
IServiceConfig config,
ITaskOperations taskOps,
INodeTasksOperations nodeTasksOps,
INodeMessageOperations nodeMessageOps,
IEvents events,
IScalesetOperations scalesetOperations,
IPoolOperations poolOperations,
IVmssOperations vmssOperations,
ICreds creds
IOnefuzzContext context
)
: base(storage, log, config) {

_taskOps = taskOps;
_nodeTasksOps = nodeTasksOps;
_nodeMessageOps = nodeMessageOps;
_events = events;
_scalesetOperations = scalesetOperations;
_poolOperations = poolOperations;
_vmssOperations = vmssOperations;
_creds = creds;
_log = log;
: base(log, context) {

}

public async Task<OneFuzzResultVoid> AcquireScaleInProtection(Node node) {
if (await ScalesetNodeExists(node) && node.ScalesetId != null) {
_logTracer.Info($"Setting scale-in protection on node {node.MachineId}");
return await _vmssOperations.UpdateScaleInProtection((Guid)node.ScalesetId, node.MachineId, protectFromScaleIn: true);
return await _context.VmssOperations.UpdateScaleInProtection((Guid)node.ScalesetId, node.MachineId, protectFromScaleIn: true);
}
return OneFuzzResultVoid.Ok();
}
Expand All @@ -102,19 +75,19 @@ public async Async.Task<bool> ScalesetNodeExists(Node node) {
return false;
}

var scalesetResult = await _scalesetOperations.GetById((Guid)(node.ScalesetId!));
var scalesetResult = await _context.ScalesetOperations.GetById((Guid)(node.ScalesetId!));
if (!scalesetResult.IsOk || scalesetResult.OkV == null) {
return false;
}
var scaleset = scalesetResult.OkV;

var instanceId = await _vmssOperations.GetInstanceId(scaleset.ScalesetId, node.MachineId);
var instanceId = await _context.VmssOperations.GetInstanceId(scaleset.ScalesetId, node.MachineId);
return instanceId.IsOk;
}

public async Task<bool> CanProcessNewWork(Node node) {
if (IsOutdated(node)) {
_logTracer.Info($"can_process_new_work agent and service versions differ, stopping node. machine_id:{node.MachineId} agent_version:{node.Version} service_version:{_config.OneFuzzVersion}");
_logTracer.Info($"can_process_new_work agent and service versions differ, stopping node. machine_id:{node.MachineId} agent_version:{node.Version} service_version:{_context.ServiceConfiguration.OneFuzzVersion}");
await Stop(node, done: true);
return false;
}
Expand Down Expand Up @@ -154,7 +127,7 @@ public async Task<bool> CanProcessNewWork(Node node) {
}

if (node.ScalesetId != null) {
var scalesetResult = await _scalesetOperations.GetById(node.ScalesetId.Value);
var scalesetResult = await _context.ScalesetOperations.GetById(node.ScalesetId.Value);
if (!scalesetResult.IsOk || scalesetResult.OkV == null) {
_logTracer.Info($"can_process_new_work invalid scaleset. scaleset_id:{node.ScalesetId} machine_id:{node.MachineId}");
return false;
Expand All @@ -167,7 +140,7 @@ public async Task<bool> CanProcessNewWork(Node node) {
}
}

var poolResult = await _poolOperations.GetByName(node.PoolName);
var poolResult = await _context.PoolOperations.GetByName(node.PoolName);
if (!poolResult.IsOk || poolResult.OkV == null) {
_logTracer.Info($"can_schedule - invalid pool. pool_name:{node.PoolName} machine_id:{node.MachineId}");
return false;
Expand All @@ -192,7 +165,7 @@ public async Async.Task ReimageLongLivedNodes(Guid scaleSetId) {

await foreach (var node in QueryAsync($"(scaleset_id eq {scaleSetId}) and {timeFilter}")) {
if (node.DebugKeepNode) {
_log.Info($"removing debug_keep_node for expired node. scaleset_id:{node.ScalesetId} machine_id:{node.MachineId}");
_logTracer.Info($"removing debug_keep_node for expired node. scaleset_id:{node.ScalesetId} machine_id:{node.MachineId}");
}
await ToReimage(node with { DebugKeepNode = false });
}
Expand All @@ -209,7 +182,7 @@ public async Async.Task ToReimage(Node node, bool done = false) {

var reimageRequested = node.ReimageRequested;
if (!node.ReimageRequested && !node.DeleteRequested) {
_log.Info($"setting reimage_requested: {node.MachineId}");
_logTracer.Info($"setting reimage_requested: {node.MachineId}");
reimageRequested = true;
}

Expand All @@ -219,7 +192,7 @@ public async Async.Task ToReimage(Node node, bool done = false) {

var r = await Replace(updatedNode);
if (!r.IsOk) {
_log.WithHttpStatus(r.ErrorV).Error("Failed to save Node record");
_logTracer.WithHttpStatus(r.ErrorV).Error("Failed to save Node record");
}
}

Expand Down Expand Up @@ -248,9 +221,9 @@ public async Async.Task<Node> Create(
r = await Update(node);
}
if (!r.IsOk) {
_log.WithHttpStatus(r.ErrorV).Error($"failed to save NodeRecord, isNew: {isNew}");
_logTracer.WithHttpStatus(r.ErrorV).Error($"failed to save NodeRecord, isNew: {isNew}");
} else {
await _events.SendEvent(
await _context.Events.SendEvent(
new EventNodeCreated(
node.MachineId,
node.ScalesetId,
Expand All @@ -273,23 +246,23 @@ public async Async.Task Stop(Node node, bool done = false) {
/// <param name="node"></param>
/// <returns></returns>
public async Async.Task SetHalt(Node node) {
_log.Info($"setting halt: {node.MachineId}");
_logTracer.Info($"setting halt: {node.MachineId}");
var updatedNode = node with { DeleteRequested = true };
await Stop(updatedNode, true);
await SendStopIfFree(updatedNode);
}

public async Async.Task SendStopIfFree(Node node) {
var ver = new Version(_config.OneFuzzVersion.Split('-')[0]);
var ver = new Version(_context.ServiceConfiguration.OneFuzzVersion.Split('-')[0]);
if (ver >= Version.Parse("2.16.1")) {
await SendMessage(node, new NodeCommand(StopIfFree: new NodeCommandStopIfFree()));
}
}

public async Async.Task SendMessage(Node node, NodeCommand message) {
var r = await _nodeMessageOps.Replace(new NodeMessage(node.MachineId, message));
var r = await _context.NodeMessageOperations.Replace(new NodeMessage(node.MachineId, message));
if (!r.IsOk) {
_log.WithHttpStatus(r.ErrorV).Error($"failed to replace NodeMessge record for machine_id: {node.MachineId}");
_logTracer.WithHttpStatus(r.ErrorV).Error($"failed to replace NodeMessge record for machine_id: {node.MachineId}");
}
}

Expand All @@ -301,7 +274,7 @@ public async Async.Task SendMessage(Node node, NodeCommand message) {
}

public bool IsOutdated(Node node) {
return node.Version != _config.OneFuzzVersion;
return node.Version != _context.ServiceConfiguration.OneFuzzVersion;
}

public bool IsTooOld(Node node) {
Expand All @@ -318,7 +291,7 @@ public async Async.Task SetState(Node node, NodeState state) {
var newNode = node;
if (node.State != state) {
newNode = newNode with { State = state };
await _events.SendEvent(new EventNodeStateUpdated(
await _context.Events.SendEvent(new EventNodeStateUpdated(
node.MachineId,
node.ScalesetId,
node.PoolName,
Expand Down Expand Up @@ -375,7 +348,7 @@ public IAsyncEnumerable<Node> SearchStates(
string? poolName = default,
bool excludeUpdateScheduled = false,
int? numResults = default) {
var query = NodeOperations.SearchStatesQuery(_config.OneFuzzVersion, poolId, scaleSetId, states, poolName, excludeUpdateScheduled, numResults);
var query = NodeOperations.SearchStatesQuery(_context.ServiceConfiguration.OneFuzzVersion, poolId, scaleSetId, states, poolName, excludeUpdateScheduled, numResults);
return QueryAsync(query);
}

Expand All @@ -384,10 +357,10 @@ public async Async.Task MarkTasksStoppedEarly(Node node, Error? error = null) {
error = new Error(ErrorCode.TASK_FAILED, new[] { $"node reimaged during task execution. machine_id: {node.MachineId}" });
}

await foreach (var entry in _nodeTasksOps.GetByMachineId(node.MachineId)) {
var task = await _taskOps.GetByTaskId(entry.TaskId);
await foreach (var entry in _context.NodeTasksOperations.GetByMachineId(node.MachineId)) {
var task = await _context.TaskOperations.GetByTaskId(entry.TaskId);
if (task is not null) {
await _taskOps.MarkFailed(task, error);
await _context.TaskOperations.MarkFailed(task, error);
}
if (!node.DebugKeepNode) {
await Delete(node);
Expand All @@ -397,11 +370,11 @@ public async Async.Task MarkTasksStoppedEarly(Node node, Error? error = null) {

public new async Async.Task Delete(Node node) {
await MarkTasksStoppedEarly(node);
await _nodeTasksOps.ClearByMachineId(node.MachineId);
await _nodeMessageOps.ClearMessages(node.MachineId);
await _context.NodeTasksOperations.ClearByMachineId(node.MachineId);
await _context.NodeMessageOperations.ClearMessages(node.MachineId);
await base.Delete(node);

await _events.SendEvent(new EventNodeDeleted(node.MachineId, node.ScalesetId, node.PoolName));
await _context.Events.SendEvent(new EventNodeDeleted(node.MachineId, node.ScalesetId, node.PoolName));
}

}
Expand All @@ -419,8 +392,8 @@ public class NodeTasksOperations : StatefulOrm<NodeTasks, NodeTaskState>, INodeT

ILogTracer _log;

public NodeTasksOperations(IStorage storage, ILogTracer log, IServiceConfig config)
: base(storage, log, config) {
public NodeTasksOperations(ILogTracer log, IOnefuzzContext context)
: base(log, context) {
_log = log;
}

Expand Down Expand Up @@ -453,11 +426,11 @@ public IAsyncEnumerable<NodeTasks> GetByTaskId(Guid taskId) {
}

public async Async.Task ClearByMachineId(Guid machineId) {
_log.Info($"clearing tasks for node {machineId}");
_logTracer.Info($"clearing tasks for node {machineId}");
await foreach (var entry in GetByMachineId(machineId)) {
var res = await Delete(entry);
if (!res.IsOk) {
_log.WithHttpStatus(res.ErrorV).Error($"failed to delete node task entry for machine_id: {entry.MachineId}");
_logTracer.WithHttpStatus(res.ErrorV).Error($"failed to delete node task entry for machine_id: {entry.MachineId}");
}
}
}
Expand All @@ -484,7 +457,7 @@ public interface INodeMessageOperations : IOrm<NodeMessage> {
public class NodeMessageOperations : Orm<NodeMessage>, INodeMessageOperations {

private readonly ILogTracer _log;
public NodeMessageOperations(IStorage storage, ILogTracer log, IServiceConfig config) : base(storage, log, config) {
public NodeMessageOperations(ILogTracer log, IOnefuzzContext context) : base(log, context) {
_log = log;
}

Expand All @@ -493,12 +466,12 @@ public IAsyncEnumerable<NodeMessage> GetMessage(Guid machineId) {
}

public async Async.Task ClearMessages(Guid machineId) {
_log.Info($"clearing messages for node {machineId}");
_logTracer.Info($"clearing messages for node {machineId}");

await foreach (var message in GetMessage(machineId)) {
var r = await Delete(message);
if (!r.IsOk) {
_log.WithHttpStatus(r.ErrorV).Error($"failed to delete message for node {machineId}");
_logTracer.WithHttpStatus(r.ErrorV).Error($"failed to delete message for node {machineId}");
}
}
}
Expand Down
Loading