Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Add deleteAll to the orm #2519

Merged
merged 2 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions src/ApiService/ApiService/onefuzzlib/NodeMessageOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ public IAsyncEnumerable<NodeMessage> GetMessage(Guid machineId)
public async Async.Task ClearMessages(Guid machineId) {
_logTracer.Info($"clearing messages for node {machineId:Tag:MachineId}");

await foreach (var message in GetMessage(machineId)) {
var r = await Delete(message);
if (!r.IsOk) {
_logTracer.WithHttpStatus(r.ErrorV).Error($"failed to delete message for node {machineId:Tag:MachineId}");
}
var result = await DeleteAll(new (string?, string?)[] { (machineId.ToString(), null) });

if (result.FailureCount > 0) {
_logTracer.Error($"failed to delete {result.FailureCount:Tag:FailedDeleteMessageCount} messages for node {machineId:Tag:MachineId}");
}
}

Expand Down
24 changes: 24 additions & 0 deletions src/ApiService/ApiService/onefuzzlib/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,27 @@ public static T EnsureNotNull<T>(this T? thisObject, string message) {
public static Async.Task IgnoreResult<T>(this Async.Task<T> task)
=> task;
}

public static class IAsyncEnumerableExtension {
public static async IAsyncEnumerable<List<TSource>> Chunk<TSource>(this IAsyncEnumerable<TSource> source, int size) {

chkeita marked this conversation as resolved.
Show resolved Hide resolved
if (size <= 0) {
throw new ArgumentException("size must be greater than 0");
}

var enumerator = source.GetAsyncEnumerator();
List<TSource> result = new List<TSource>(size);
while (await enumerator.MoveNextAsync()) {
result.Add(enumerator.Current);

if (result.Count == size) {
yield return result;
result = new List<TSource>(size);
}
}

if (result.Count > 0) {
yield return result;
}
}
}
57 changes: 57 additions & 0 deletions src/ApiService/ApiService/onefuzzlib/orm/Orm.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
using System.Collections.Concurrent;
using System.Reflection;
using System.Threading.Tasks;
using Azure;
using Azure.Core;
using Azure.Data.Tables;
using Microsoft.OneFuzz.Service;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;


namespace ApiService.OneFuzzLib.Orm {
public interface IOrm<T> where T : EntityBase {
Task<TableClient> GetTableClient(string table, ResourceIdentifier? accountId = null);
Expand All @@ -17,6 +19,8 @@ public interface IOrm<T> where T : EntityBase {
Task<ResultVoid<(int, string)>> Update(T entity);
Task<ResultVoid<(int, string)>> Delete(T entity);

Task<DeleteAllResult> DeleteAll(IEnumerable<(string?, string?)> keys);

IAsyncEnumerable<T> SearchAll();
IAsyncEnumerable<T> SearchByPartitionKeys(IEnumerable<string> partitionKeys);
IAsyncEnumerable<T> SearchByRowKeys(IEnumerable<string> rowKeys);
Expand All @@ -27,6 +31,7 @@ IAsyncEnumerable<T> SearchByTimeRange((DateTimeOffset min, DateTimeOffset max) r
=> SearchByTimeRange(range.min, range.max);
}

public record DeleteAllResult(int SuccessCount, int FailureCount);

public abstract class Orm<T> : IOrm<T> where T : EntityBase {
#pragma warning disable CA1051 // permit visible instance fields
Expand All @@ -35,6 +40,7 @@ public abstract class Orm<T> : IOrm<T> where T : EntityBase {
protected readonly ILogTracer _logTracer;
#pragma warning restore CA1051

const int MAX_TRANSACTION_SIZE = 100;

public Orm(ILogTracer logTracer, IOnefuzzContext context) {
_context = context;
Expand All @@ -61,6 +67,7 @@ public async IAsyncEnumerable<T> QueryAsync(string? filter = null) {
var tableEntity = _entityConverter.ToTableEntity(entity);
var response = await tableClient.AddEntityAsync(tableEntity);


if (response.IsError) {
return ResultVoid<(int, string)>.Error((response.Status, response.ReasonPhrase));
} else {
Expand Down Expand Up @@ -134,6 +141,56 @@ public IAsyncEnumerable<T> SearchByRowKeys(IEnumerable<string> rowKeys)
public IAsyncEnumerable<T> SearchByTimeRange(DateTimeOffset min, DateTimeOffset max) {
return QueryAsync(Query.TimeRange(min, max));
}

public async Task<List<ResultVoid<(int, string)>>> BatchOperation(IAsyncEnumerable<T> entities, TableTransactionActionType actionType) {
var tableClient = await GetTableClient(typeof(T).Name);
var transactions = await entities.Select(e => new TableTransactionAction(actionType, _entityConverter.ToTableEntity(e))).ToListAsync();
var responses = await tableClient.SubmitTransactionAsync(transactions);
return responses.Value.Select(response =>
response.IsError ? ResultVoid<(int, string)>.Error((response.Status, response.ReasonPhrase)) : ResultVoid<(int, string)>.Ok()
).ToList();
}


public async Task<DeleteAllResult> DeleteAll(IEnumerable<(string?, string?)> keys) {
var query = Query.Or(
keys.Select(key =>
key switch {
(null, null) => throw new ArgumentException("partitionKey and rowKey cannot both be null"),
(string partitionKey, null) => Query.PartitionKey(partitionKey),
(null, string rowKey) => Query.RowKey(rowKey),
(string partitionKey, string rowKey) => Query.And(
Query.PartitionKey(partitionKey),
Query.RowKey(rowKey)
),
}
)
);

var tableClient = await GetTableClient(typeof(T).Name);
var pages = tableClient.QueryAsync<TableEntity>(query, select: new[] { "PartitionKey, RowKey" });

var requests = await pages
.Chunk(MAX_TRANSACTION_SIZE)
.Select(chunk => {
var transactions = chunk.Select(e => new TableTransactionAction(TableTransactionActionType.Delete, e));
return tableClient.SubmitTransactionAsync(transactions);
})
.ToListAsync();

var responses = await System.Threading.Tasks.Task.WhenAll(requests);
var (successes, failures) = responses
.SelectMany(x => x.Value)
.Aggregate(
(0, 0),
((int Successes, int Failures) acc, Response current) =>
current.IsError
? (acc.Successes, acc.Failures + 1)
: (acc.Successes + 1, acc.Failures)
);

return new DeleteAllResult(successes, failures);
}
}


Expand Down