Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Implement scaleset function for C# #2191

Merged
merged 14 commits into from
Aug 17, 2022
260 changes: 260 additions & 0 deletions src/ApiService/ApiService/Functions/Scaleset.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
using System.Buffers.Binary;
using System.Diagnostics;
using System.Security.Cryptography;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;

namespace Microsoft.OneFuzz.Service.Functions;

public class Scaleset {
private readonly ILogTracer _log;
private readonly IEndpointAuthorization _auth;
private readonly IOnefuzzContext _context;

public Scaleset(ILogTracer log, IEndpointAuthorization auth, IOnefuzzContext context) {
_log = log;
_auth = auth;
_context = context;
}

[Function("Scaleset")]
public Async.Task<HttpResponseData> Run([HttpTrigger(AuthorizationLevel.Anonymous, "GET", "PATCH", "POST", "DELETE")] HttpRequestData req) {
return _auth.CallIfUser(req, r => r.Method switch {
"GET" => Get(r),
"PATCH" => Patch(r),
"POST" => Post(r),
"DELETE" => Delete(r),
_ => throw new InvalidOperationException("Unsupported HTTP method"),
});
}

private async Task<HttpResponseData> Delete(HttpRequestData req) {
var request = await RequestHandling.ParseRequest<ScalesetStop>(req);
if (!request.IsOk) {
return await _context.RequestHandling.NotOk(req, request.ErrorV, "ScalesetDelete");
}

var answer = await _auth.CheckRequireAdmins(req);
if (!answer.IsOk) {
return await _context.RequestHandling.NotOk(req, answer.ErrorV, "ScalesetDelete");
}

var scalesetResult = await _context.ScalesetOperations.GetById(request.OkV.ScalesetId);
if (!scalesetResult.IsOk) {
return await _context.RequestHandling.NotOk(req, scalesetResult.ErrorV, "ScalesetDelete");
}

var scaleset = scalesetResult.OkV;
await _context.ScalesetOperations.SetShutdown(scaleset, request.OkV.Now);
return await RequestHandling.Ok(req, true);
}

private async Task<HttpResponseData> Post(HttpRequestData req) {
var request = await RequestHandling.ParseRequest<ScalesetCreate>(req);
if (!request.IsOk) {
return await _context.RequestHandling.NotOk(req, request.ErrorV, "ScalesetCreate");
}

var answer = await _auth.CheckRequireAdmins(req);
if (!answer.IsOk) {
return await _context.RequestHandling.NotOk(req, answer.ErrorV, "ScalesetCreate");
}

var create = request.OkV;
// verify the pool exists
var poolResult = await _context.PoolOperations.GetByName(create.PoolName);
if (!poolResult.IsOk) {
return await _context.RequestHandling.NotOk(req, answer.ErrorV, "ScalesetCreate");
}

var pool = poolResult.OkV;
if (!pool.Managed) {
return await _context.RequestHandling.NotOk(
req,
new Error(
Code: ErrorCode.UNABLE_TO_CREATE,
Errors: new string[] { "scalesets can only be added to managed pools " }),
context: "ScalesetCreate");
}

string region;
if (create.Region is null) {
region = await _context.Creds.GetBaseRegion();
} else {
var validRegions = await _context.Creds.GetRegions();
if (!validRegions.Contains(create.Region)) {
return await _context.RequestHandling.NotOk(
req,
new Error(
Code: ErrorCode.UNABLE_TO_CREATE,
Errors: new string[] { "invalid region" }),
context: "ScalesetCreate");
}

region = create.Region;
}

var availableSkus = await _context.VmssOperations.ListAvailableSkus(region);
if (!availableSkus.Contains(create.VmSku)) {
return await _context.RequestHandling.NotOk(
req,
new Error(
Code: ErrorCode.UNABLE_TO_CREATE,
Errors: new string[] { $"The specified VM SKU '{create.VmSku}' is not available in the location ${region}" }),
context: "ScalesetCreate");
}

var tags = create.Tags;
var configTags = (await _context.ConfigOperations.Fetch()).VmssTags;
if (configTags is not null) {
foreach (var (key, value) in configTags) {
tags[key] = value;
}
}

var scaleset = new Service.Scaleset(
ScalesetId: Guid.NewGuid(),
State: ScalesetState.Init,
NeedsConfigUpdate: false,
Auth: GenerateAuthentication(),
PoolName: create.PoolName,
VmSku: create.VmSku,
Image: create.Image,
Region: region,
Size: create.Size,
SpotInstances: create.SpotInstances,
EphemeralOsDisks: create.EphemeralOsDisks,
Tags: tags);

var inserted = await _context.ScalesetOperations.Insert(scaleset);
if (!inserted.IsOk) {
return await _context.RequestHandling.NotOk(
req,
new Error(
Code: ErrorCode.UNABLE_TO_CREATE,
new string[] { $"unable to insert scaleset: {inserted.ErrorV}" }
),
context: "ScalesetCreate");
}

if (create.AutoScale is AutoScaleOptions options) {
var autoScale = new AutoScale(
scaleset.ScalesetId,
Min: options.Min,
Max: options.Max,
Default: options.Default,
ScaleOutAmount: options.ScaleOutAmount,
ScaleOutCooldown: options.ScaleOutCooldown,
ScaleInAmount: options.ScaleInAmount,
ScaleInCooldown: options.ScaleInCooldown);

await _context.AutoScaleOperations.Insert(autoScale);
}

return await RequestHandling.Ok(req, ScalesetResponse.ForScaleset(scaleset));
}

private static Authentication GenerateAuthentication() {
using var rsa = RSA.Create(2048);
var privateKey = rsa.ExportRSAPrivateKey();
var formattedPrivateKey = $"-----BEGIN RSA PRIVATE KEY-----\n{Convert.ToBase64String(privateKey)}\n-----END RSA PRIVATE KEY-----\n";

var publicKey = BuildPublicKey(rsa);
var formattedPublicKey = $"ssh-rsa {Convert.ToBase64String(publicKey)} onefuzz-generated-key";

return new Authentication(
Password: Guid.NewGuid().ToString(),
PublicKey: formattedPublicKey,
PrivateKey: formattedPrivateKey);
}

private static ReadOnlySpan<byte> SSHRSABytes => new byte[] { (byte)'s', (byte)'s', (byte)'h', (byte)'-', (byte)'r', (byte)'s', (byte)'a' };

private static byte[] BuildPublicKey(RSA rsa) {
static Span<byte> WriteLengthPrefixedBytes(ReadOnlySpan<byte> src, Span<byte> dest) {
BinaryPrimitives.WriteInt32BigEndian(dest, src.Length);
dest = dest[sizeof(int)..];
src.CopyTo(dest);
return dest[src.Length..];
}

var parameters = rsa.ExportParameters(includePrivateParameters: false);

// public key format is "ssh-rsa", exponent, modulus, all written
// as (big-endian) length-prefixed bytes
var result = new byte[sizeof(int) + SSHRSABytes.Length + sizeof(int) + parameters.Modulus!.Length + sizeof(int) + parameters.Exponent!.Length];
var spanResult = result.AsSpan();
spanResult = WriteLengthPrefixedBytes(SSHRSABytes, spanResult);
spanResult = WriteLengthPrefixedBytes(parameters.Exponent, spanResult);
spanResult = WriteLengthPrefixedBytes(parameters.Modulus, spanResult);
Debug.Assert(spanResult.Length == 0);

return result;
}

private async Task<HttpResponseData> Patch(HttpRequestData req) {
var request = await RequestHandling.ParseRequest<ScalesetUpdate>(req);
if (!request.IsOk) {
return await _context.RequestHandling.NotOk(req, request.ErrorV, "ScalesetUpdate");
}

var answer = await _auth.CheckRequireAdmins(req);
if (!answer.IsOk) {
return await _context.RequestHandling.NotOk(req, answer.ErrorV, "ScalesetUpdate");
}

var scalesetResult = await _context.ScalesetOperations.GetById(request.OkV.ScalesetId);
if (!scalesetResult.IsOk) {
return await _context.RequestHandling.NotOk(req, scalesetResult.ErrorV, "ScalesetUpdate");
}

var scaleset = scalesetResult.OkV;
if (!scaleset.State.CanUpdate()) {
return await _context.RequestHandling.NotOk(
req,
new Error(
Code: ErrorCode.INVALID_REQUEST,
Errors: new[] { $"scaleset must be in one of the following states to update: {string.Join(", ", ScalesetStateHelper.CanUpdateStates)}" }),
"ScalesetUpdate");
}

if (request.OkV.Size is long size) {
scaleset = await _context.ScalesetOperations.SetSize(scaleset, size);
}

scaleset = scaleset with { Auth = null };
return await RequestHandling.Ok(req, ScalesetResponse.ForScaleset(scaleset));
}

private async Task<HttpResponseData> Get(HttpRequestData req) {
var request = await RequestHandling.ParseRequest<ScalesetSearch>(req);
if (!request.IsOk) {
return await _context.RequestHandling.NotOk(req, request.ErrorV, "ScalesetSearch");
}

var search = request.OkV;
if (search.ScalesetId is Guid id) {
var scalesetResult = await _context.ScalesetOperations.GetById(id);
if (!scalesetResult.IsOk) {
return await _context.RequestHandling.NotOk(req, scalesetResult.ErrorV, "ScalesetSearch");
}

var scaleset = scalesetResult.OkV;

var response = ScalesetResponse.ForScaleset(scaleset);
response = response with { Nodes = await _context.ScalesetOperations.GetNodes(scaleset) };
if (!search.IncludeAuth) {
response = response with { Auth = null };
}

return await RequestHandling.Ok(req, response);
}

var states = search.State ?? Enumerable.Empty<ScalesetState>();
var scalesets = await _context.ScalesetOperations.SearchStates(states).ToListAsync();
// don't return auths during list actions, only 'get'
var result = scalesets.Select(ss => ScalesetResponse.ForScaleset(ss with { Auth = null }));
return await RequestHandling.Ok(req, result);
}
}
2 changes: 1 addition & 1 deletion src/ApiService/ApiService/Functions/TimerWorkers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public TimerWorkers(ILogTracer log, IOnefuzzContext context) {
_nodeOps = context.NodeOperations;
}

public async Async.Task ProcessScalesets(Scaleset scaleset) {
public async Async.Task ProcessScalesets(Service.Scaleset scaleset) {
_log.Verbose($"checking scaleset for updates: {scaleset.ScalesetId}");

await _scaleSetOps.UpdateConfigs(scaleset);
Expand Down
40 changes: 30 additions & 10 deletions src/ApiService/ApiService/OneFuzzTypes/Enums.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,29 +138,49 @@ public static class JobStateHelper {


public static class ScalesetStateHelper {
private static readonly IReadOnlySet<ScalesetState> _canUpdate = new HashSet<ScalesetState> { ScalesetState.Init, ScalesetState.Resize };
private static readonly IReadOnlySet<ScalesetState> _needsWork =
new HashSet<ScalesetState>{
private static readonly HashSet<ScalesetState> _canUpdate =
new() {
ScalesetState.Init,
ScalesetState.Resize,
};

private static readonly HashSet<ScalesetState> _needsWork =
new() {
ScalesetState.Init,
ScalesetState.Setup,
ScalesetState.Resize,
ScalesetState.Shutdown,
ScalesetState.Halt
ScalesetState.Halt,
};

private static readonly HashSet<ScalesetState> _available =
new() {
ScalesetState.Resize,
ScalesetState.Running,
};

private static readonly HashSet<ScalesetState> _resizing =
new() {
ScalesetState.Halt,
ScalesetState.Init,
ScalesetState.Setup,
};
private static readonly IReadOnlySet<ScalesetState> _available = new HashSet<ScalesetState> { ScalesetState.Resize, ScalesetState.Running };
private static readonly IReadOnlySet<ScalesetState> _resizing = new HashSet<ScalesetState> { ScalesetState.Halt, ScalesetState.Init, ScalesetState.Setup };

/// set of states that indicate the scaleset can be updated
public static IReadOnlySet<ScalesetState> CanUpdate => _canUpdate;
public static bool CanUpdate(this ScalesetState state) => _canUpdate.Contains(state);
public static IReadOnlySet<ScalesetState> CanUpdateStates => _canUpdate;

/// set of states that indicate work is needed during eventing
public static IReadOnlySet<ScalesetState> NeedsWork => _needsWork;
public static bool NeedsWork(this ScalesetState state) => _needsWork.Contains(state);
public static IReadOnlySet<ScalesetState> NeedsWorkStates => _needsWork;

/// set of states that indicate if it's available for work
public static IReadOnlySet<ScalesetState> Available => _available;
public static bool IsAvailable(this ScalesetState state) => _available.Contains(state);
public static IReadOnlySet<ScalesetState> AvailableStates => _available;

/// set of states that indicate scaleset is resizing
public static IReadOnlySet<ScalesetState> Resizing => _resizing;
public static bool IsResizing(this ScalesetState state) => _resizing.Contains(state);
public static IReadOnlySet<ScalesetState> ResizingStates => _resizing;
}


Expand Down
22 changes: 9 additions & 13 deletions src/ApiService/ApiService/OneFuzzTypes/Model.cs
Original file line number Diff line number Diff line change
Expand Up @@ -378,38 +378,34 @@ public record AutoScale(
long Max,
long Default,
long ScaleOutAmount,
long ScaleOutCoolDown,
long ScaleOutCooldown,
long ScaleInAmount,
long ScaleInCoolDown
) : EntityBase();


long ScaleInCooldown
) : EntityBase;

public record ScalesetNodeState(
Guid MachineId,
string InstanceId,
NodeState? State

);


public record Scaleset(
[PartitionKey] PoolName PoolName,
[RowKey] Guid ScalesetId,
ScalesetState State,
Authentication? Auth,
string VmSku,
string Image,
Region Region,
long Size,
bool? SpotInstances,
bool EphemeralOsDisks,
bool NeedsConfigUpdate,
Error? Error,
List<ScalesetNodeState>? Nodes,
Guid? ClientId,
Guid? ClientObjectId,
Dictionary<string, string> Tags
Dictionary<string, string> Tags,
Authentication? Auth = null,
Error? Error = null,
Guid? ClientId = null,
Guid? ClientObjectId = null
// 'Nodes' removed when porting from Python: only used in search response
) : StatefulEntityBase<ScalesetState>(State);

[JsonConverter(typeof(ContainerConverter))]
Expand Down
Loading