From 06bec1153e415c6378779f6deebd4e069cef11d9 Mon Sep 17 00:00:00 2001 From: tjoubert Date: Wed, 14 Sep 2022 13:15:03 +0400 Subject: [PATCH] Added Pregel API support with 3.10 features --- arangodb-net-standard/ArangoDBClient.cs | 7 + arangodb-net-standard/IArangoDBClient.cs | 6 + .../PregelApi/IPregelApiClient.cs | 76 +++++++++ .../PregelApi/Models/PostStartJobBody.cs | 57 +++++++ .../PregelApi/Models/PregelAlgorithms.cs | 58 +++++++ .../Models/PregelJobAggregatedStatus.cs | 10 ++ .../PregelApi/Models/PregelJobDetail.cs | 7 + .../Models/PregelJobGraphStoreStatus.cs | 10 ++ .../PregelApi/Models/PregelJobGssStatus.cs | 9 + .../Models/PregelJobGssStatusItem.cs | 10 ++ .../PregelApi/Models/PregelJobStatus.cs | 144 ++++++++++++++++ .../PregelApi/PregelApiClient.cs | 159 ++++++++++++++++++ 12 files changed, 553 insertions(+) create mode 100644 arangodb-net-standard/PregelApi/IPregelApiClient.cs create mode 100644 arangodb-net-standard/PregelApi/Models/PostStartJobBody.cs create mode 100644 arangodb-net-standard/PregelApi/Models/PregelAlgorithms.cs create mode 100644 arangodb-net-standard/PregelApi/Models/PregelJobAggregatedStatus.cs create mode 100644 arangodb-net-standard/PregelApi/Models/PregelJobDetail.cs create mode 100644 arangodb-net-standard/PregelApi/Models/PregelJobGraphStoreStatus.cs create mode 100644 arangodb-net-standard/PregelApi/Models/PregelJobGssStatus.cs create mode 100644 arangodb-net-standard/PregelApi/Models/PregelJobGssStatusItem.cs create mode 100644 arangodb-net-standard/PregelApi/Models/PregelJobStatus.cs create mode 100644 arangodb-net-standard/PregelApi/PregelApiClient.cs diff --git a/arangodb-net-standard/ArangoDBClient.cs b/arangodb-net-standard/ArangoDBClient.cs index c187fee5..e5d54b28 100644 --- a/arangodb-net-standard/ArangoDBClient.cs +++ b/arangodb-net-standard/ArangoDBClient.cs @@ -10,6 +10,7 @@ using ArangoDBNetStandard.DocumentApi; using ArangoDBNetStandard.GraphApi; using ArangoDBNetStandard.IndexApi; +using ArangoDBNetStandard.PregelApi; using ArangoDBNetStandard.Serialization; using ArangoDBNetStandard.TransactionApi; using ArangoDBNetStandard.Transport; @@ -99,6 +100,11 @@ public class ArangoDBClient : IArangoDBClient /// public AdminApiClient Admin { get; private set; } + /// + /// Pregel management API + /// + public PregelApiClient Pregel { get; private set; } + /// /// Create an instance of from an existing /// instance, using the default JSON serialization. @@ -166,6 +172,7 @@ private void InitializeApis( View = new ViewApiClient(transport, serialization); Analyzer = new AnalyzerApiClient(transport, serialization); Admin = new AdminApiClient(transport, serialization); + Pregel = new PregelApiClient(transport, serialization); } } } \ No newline at end of file diff --git a/arangodb-net-standard/IArangoDBClient.cs b/arangodb-net-standard/IArangoDBClient.cs index 132d1701..b9b0087e 100644 --- a/arangodb-net-standard/IArangoDBClient.cs +++ b/arangodb-net-standard/IArangoDBClient.cs @@ -10,6 +10,7 @@ using ArangoDBNetStandard.DocumentApi; using ArangoDBNetStandard.GraphApi; using ArangoDBNetStandard.IndexApi; +using ArangoDBNetStandard.PregelApi; using ArangoDBNetStandard.TransactionApi; using ArangoDBNetStandard.UserApi; using ArangoDBNetStandard.ViewApi; @@ -87,5 +88,10 @@ public interface IArangoDBClient : IDisposable /// Admin API /// AdminApiClient Admin { get; } + + /// + /// Pregel API + /// + PregelApiClient Pregel { get; } } } \ No newline at end of file diff --git a/arangodb-net-standard/PregelApi/IPregelApiClient.cs b/arangodb-net-standard/PregelApi/IPregelApiClient.cs new file mode 100644 index 00000000..45a37090 --- /dev/null +++ b/arangodb-net-standard/PregelApi/IPregelApiClient.cs @@ -0,0 +1,76 @@ +using ArangoDBNetStandard.PregelApi.Models; +using ArangoDBNetStandard.ViewApi.Models; +using System.Collections.Generic; +using System.Text.RegularExpressions; +using System.Threading; +using System.Threading.Tasks; +using static System.Net.WebRequestMethods; + +namespace ArangoDBNetStandard.PregelApi +{ + /// + /// Defines a client to access the ArangoDB API for Pregel + /// (Distributed Iterative Graph Processing). + /// + public interface IPregelApiClient + { + /// + /// Start the execution of a Pregel algorithm. + /// POST /_api/control_pregel + /// + /// + /// To start an execution you need to specify the + /// algorithm name and a named graph (SmartGraph in cluster). + /// Alternatively you can specify the vertex and edge collections. + /// Additionally you can specify custom parameters which + /// vary for each algorithm + /// + /// The body of the request containing required properties. + /// A CancellationToken to observe while waiting for the task to complete or to cancel the task. + /// The ID of the newly started job. + Task PostStartJobAsync( + PostStartJobBody body, + CancellationToken token = default); + + /// + /// Get the status of a Pregel job execution. + /// GET /_api/control_pregel/{id} + /// + /// The ID of the job. + /// A CancellationToken to observe while waiting for the task to complete or to cancel the task. + /// + Task GetJobStatusAsync( + string jobId, + CancellationToken token = default); + + /// + /// Get the overview of currently running Pregel jobs. + /// GET /_api/control_pregel + /// + /// A CancellationToken to observe while waiting for the task to complete or to cancel the task. + /// + /// Returns a list of currently running and recently + /// finished Pregel jobs without retrieving their results. + /// + Task> GetAllRunningJobsAsync( + CancellationToken token = default); + + /// + /// Cancel an ongoing Pregel execution. + /// DELETE /_api/control_pregel/{id} + /// + /// + /// Cancel an execution which is still running, and + /// discard any intermediate results. This will immediately + /// free all memory taken up by the execution, and will + /// make you lose all intermediary data. + /// For more information + /// + /// The ID of the job. + /// A CancellationToken to observe while waiting for the task to complete or to cancel the task. + /// + Task DeleteJobAsync( + string jobId, + CancellationToken token = default); + } +} \ No newline at end of file diff --git a/arangodb-net-standard/PregelApi/Models/PostStartJobBody.cs b/arangodb-net-standard/PregelApi/Models/PostStartJobBody.cs new file mode 100644 index 00000000..cb1e2888 --- /dev/null +++ b/arangodb-net-standard/PregelApi/Models/PostStartJobBody.cs @@ -0,0 +1,57 @@ +using System; +using System.Collections.Generic; +using System.Text; +using static System.Net.WebRequestMethods; + +namespace ArangoDBNetStandard.PregelApi.Models +{ + /// + /// Body parameter for + /// + /// + public class PostStartJobBody + { + /// + /// Required. Name of the algorithm. + /// See for possible values. + /// + public string Algorithm { get; set; } + + /// + /// Optional. Name of a graph. Either this or the parameters + /// and + /// are required. + /// + /// + /// Please note that there are special sharding requirements + /// for graphs in order to be used with Pregel. + /// + public string GraphName { get; set; } + + /// + /// List of vertex collection names. + /// + /// + /// Please note that there are special sharding requirements + /// for collections in order to be used with Pregel. + /// + public IEnumerable VertexCollections { get; set; } + + /// + /// List of edge collection names. + /// + /// + /// Please note that there are special sharding requirements + /// for collections in order to be used with Pregel. + /// + public IEnumerable EdgeCollections { get; set; } + + /// + /// General as well as algorithm-specific options. + /// + /// + /// + /// + public Dictionary Params { get; set; } + } +} \ No newline at end of file diff --git a/arangodb-net-standard/PregelApi/Models/PregelAlgorithms.cs b/arangodb-net-standard/PregelApi/Models/PregelAlgorithms.cs new file mode 100644 index 00000000..61962502 --- /dev/null +++ b/arangodb-net-standard/PregelApi/Models/PregelAlgorithms.cs @@ -0,0 +1,58 @@ +namespace ArangoDBNetStandard.PregelApi.Models +{ + /// + /// List of pregel algorithm names + /// + public struct PregelAlgorithms + { + /// + /// Page Rank + /// + public const string PageRank = "pagerank"; + + /// + /// Single-Source Shortest Path + /// + public const string SSSP = "sssp"; + + /// + /// Connected Components + /// + public const string ConnectedComponents = "connectedcomponents"; + + /// + /// Weakly Connected Components + /// + public const string WCC = "wcc"; + + /// + /// Strongly Connected Components + /// + public const string SCC = "scc"; + + /// + /// Hyperlink-Induced Topic Search + /// + public const string HITS = "hits"; + + /// + /// Effective Closeness + /// + public const string EffectiveCloseness = "effectivecloseness"; + + /// + /// LineRank + /// + public const string LineRank = "linerank"; + + /// + /// Label Propagation + /// + public const string LabelPropagation = "labelpropagation"; + + /// + /// Speaker-Listener Label Propagation + /// + public const string SLPA = "slpa"; + } +} diff --git a/arangodb-net-standard/PregelApi/Models/PregelJobAggregatedStatus.cs b/arangodb-net-standard/PregelApi/Models/PregelJobAggregatedStatus.cs new file mode 100644 index 00000000..87549700 --- /dev/null +++ b/arangodb-net-standard/PregelApi/Models/PregelJobAggregatedStatus.cs @@ -0,0 +1,10 @@ +namespace ArangoDBNetStandard.PregelApi.Models +{ + public class PregelJobAggregatedStatus + { + public string TimeStamp { get; set; } + public PregelJobGraphStoreStatus GraphStoreStatus { get; set; } + public PregelJobGssStatus AllGssStatus { get; set; } + public object WorkerStatus { get; set; } + } +} \ No newline at end of file diff --git a/arangodb-net-standard/PregelApi/Models/PregelJobDetail.cs b/arangodb-net-standard/PregelApi/Models/PregelJobDetail.cs new file mode 100644 index 00000000..7c693a56 --- /dev/null +++ b/arangodb-net-standard/PregelApi/Models/PregelJobDetail.cs @@ -0,0 +1,7 @@ +namespace ArangoDBNetStandard.PregelApi.Models +{ + public class PregelJobDetail + { + public PregelJobAggregatedStatus AggregatedStatus { get; set; } + } +} \ No newline at end of file diff --git a/arangodb-net-standard/PregelApi/Models/PregelJobGraphStoreStatus.cs b/arangodb-net-standard/PregelApi/Models/PregelJobGraphStoreStatus.cs new file mode 100644 index 00000000..8357c5b9 --- /dev/null +++ b/arangodb-net-standard/PregelApi/Models/PregelJobGraphStoreStatus.cs @@ -0,0 +1,10 @@ +namespace ArangoDBNetStandard.PregelApi.Models +{ + public class PregelJobGraphStoreStatus + { + public int? VerticesLoaded { get; set; } + public int? EdgesLoaded { get; set; } + public long? MemoryBytesUsed { get; set; } + public int? VerticesStored { get; set; } + } +} \ No newline at end of file diff --git a/arangodb-net-standard/PregelApi/Models/PregelJobGssStatus.cs b/arangodb-net-standard/PregelApi/Models/PregelJobGssStatus.cs new file mode 100644 index 00000000..30a9ca50 --- /dev/null +++ b/arangodb-net-standard/PregelApi/Models/PregelJobGssStatus.cs @@ -0,0 +1,9 @@ +using System.Collections.Generic; + +namespace ArangoDBNetStandard.PregelApi.Models +{ + public class PregelJobGssStatus + { + public List Items { get; set; } + } +} \ No newline at end of file diff --git a/arangodb-net-standard/PregelApi/Models/PregelJobGssStatusItem.cs b/arangodb-net-standard/PregelApi/Models/PregelJobGssStatusItem.cs new file mode 100644 index 00000000..10ee8805 --- /dev/null +++ b/arangodb-net-standard/PregelApi/Models/PregelJobGssStatusItem.cs @@ -0,0 +1,10 @@ +namespace ArangoDBNetStandard.PregelApi.Models +{ + public class PregelJobGssStatusItem + { + public int? VerticesProcessed { get; set; } + public int? MessagesSent { get; set; } + public int? MessagesReceived { get; set; } + public long? MemoryBytesUsedForMessages { get; set; } + } +} \ No newline at end of file diff --git a/arangodb-net-standard/PregelApi/Models/PregelJobStatus.cs b/arangodb-net-standard/PregelApi/Models/PregelJobStatus.cs new file mode 100644 index 00000000..82b09c54 --- /dev/null +++ b/arangodb-net-standard/PregelApi/Models/PregelJobStatus.cs @@ -0,0 +1,144 @@ +using Newtonsoft.Json; +using System; +using System.Data.Common; +using System.Text; + +namespace ArangoDBNetStandard.PregelApi.Models +{ + public class PregelJobStatus + { + /// + /// ID of the Pregel job. + /// + public string ID { get; set; } + + /// + /// An algorithm used by the job. + /// + public string Algorithm { get; set; } + + /// + /// The date and time when the job was created. + /// + public string Created { get; set; } + + /// + /// The date and time when the job results expire. + /// + /// + /// The expiration date is only meaningful for jobs + /// that were completed, canceled or resulted in an error. + /// Such jobs are cleaned up by the garbage collection + /// when they reach their expiration date/time. + /// + public string Expires { get; set; } + + /// + /// The TTL (time to live) value for the job results, + /// specified in seconds. The TTL is used to calculate + /// the expiration date for the job’s results. + /// + public int TTL { get; set; } + + /// + /// State of the job execution. + /// + /// + /// The following values can be returned: + /// "loading": Job is loading. Introduced in v3.10. + /// "running": Algorithm is executing normally. + /// "storing": The algorithm finished, but the results + /// are still being written back into the collections. + /// Occurs only if the store parameter is set to true. + /// "done": The execution is done.In version 3.7.1 and + /// later, this means that storing is also done. + /// In earlier versions, the results may not be written + /// back into the collections yet.This event is announced + /// in the server log (requires at least info log level + /// for the pregel log topic). + /// "canceled": The execution was permanently canceled, + /// either by the user or by an error. + /// "fatal error": The execution has failed and cannot recover. + /// "in error" (currently unused): The execution is in an + /// error state.This can be caused by DB-Servers being not + /// reachable or being non responsive. The execution might + /// recover later, or switch to "canceled" if it was not + /// able to recover successfully. + /// "recovering" (currently unused): The execution is actively + /// recovering, will switch back to running if the recovery + /// was successful. + /// + public string State { get; set; } + + /// + /// The number of global supersteps executed. + /// + public int GSS { get; set; } + + /// + /// Total runtime of the execution up to now + /// (if the execution is still ongoing). + /// + public decimal TotalRuntime { get; set; } + + /// + /// Startup runtime of the execution. The startup + /// time includes the data loading time and can be + /// substantial. The startup time will be reported + /// as 0 if the startup is still ongoing. + /// + public decimal StartupTime { get; set; } + + /// + /// Algorithm execution time. The computation + /// time will be reported as 0 if the + /// computation still ongoing. + /// + public decimal ComputationTime { get; set; } + + /// + /// Time for storing the results if the job + /// includes results storage. The storage time + /// be reported as 0 if storing the results is + /// still ongoing. + /// + public decimal StorageTime { get; set; } + + /// + /// Database in which the job is executing. + /// + public string Database { get; set; } + + /// + /// + /// + public int SendCount { get; set; } + + /// + /// + /// + public int ReceivedCount { get; set; } + + /// + /// Total number of vertices processed. + /// + public int VertexCount { get; set; } + + /// + /// Total number of edges processed. + /// + public int EdgeCount { get; set; } + + /// + /// Statistics about the Pregel execution. + /// The value will only be populated once + /// the algorithm has finished. + /// + public object Reports { get; set; } + + /// + /// Additional Pregel job run details + /// + public PregelJobDetail Detail { get; set; } + } +} \ No newline at end of file diff --git a/arangodb-net-standard/PregelApi/PregelApiClient.cs b/arangodb-net-standard/PregelApi/PregelApiClient.cs new file mode 100644 index 00000000..a2d90bd5 --- /dev/null +++ b/arangodb-net-standard/PregelApi/PregelApiClient.cs @@ -0,0 +1,159 @@ +using ArangoDBNetStandard.ViewApi.Models; +using ArangoDBNetStandard.Serialization; +using ArangoDBNetStandard.Transport; +using System.Net; +using System.Threading.Tasks; +using System.Threading; +using ArangoDBNetStandard.PregelApi.Models; +using System.Collections.Generic; +using static System.Net.WebRequestMethods; +using System; +using System.Security.Cryptography; + +namespace ArangoDBNetStandard.PregelApi +{ + /// + /// A client to interact with ArangoDB HTTP API endpoints + /// for Pregel jobs management. + /// + public class PregelApiClient : ApiClientBase, IPregelApiClient + { + /// + /// The transport client used to communicate with the ArangoDB host. + /// + protected readonly IApiClientTransport _transport; + + /// + /// The root path of the API. + /// + protected readonly string _apiPath = "_api/control_pregel"; + + /// + /// Create an instance of + /// using the provided transport layer and the default JSON serialization. + /// + /// + public PregelApiClient(IApiClientTransport transport) + : base(new JsonNetApiClientSerialization()) + { + _transport = transport; + } + + /// + /// Create an instance of + /// using the provided transport and serialization layers. + /// + /// + /// + public PregelApiClient(IApiClientTransport transport, IApiClientSerialization serializer) + : base(serializer) + { + _transport = transport; + } + + /// + /// Start the execution of a Pregel algorithm. + /// POST /_api/control_pregel + /// + /// + /// To start an execution you need to specify the + /// algorithm name and a named graph (SmartGraph in cluster). + /// Alternatively you can specify the vertex and edge collections. + /// Additionally you can specify custom parameters which + /// vary for each algorithm + /// + /// The body of the request containing required properties. + /// A CancellationToken to observe while waiting for the task to complete or to cancel the task. + /// The ID of the newly started job. + public virtual async Task PostStartJobAsync(PostStartJobBody body, CancellationToken token = default) + { + string uri = _apiPath; + var content = GetContent(body, new ApiClientSerializationOptions(true, true)); + using (var response = await _transport.PostAsync(uri, content, token: token).ConfigureAwait(false)) + { + if (response.IsSuccessStatusCode) + { + var stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false); + return DeserializeJsonFromStream(stream); + } + throw await GetApiErrorException(response).ConfigureAwait(false); + } + } + + /// + /// Get the status of a Pregel job execution. + /// GET /_api/control_pregel/{id} + /// + /// The ID of the job. + /// A CancellationToken to observe while waiting for the task to complete or to cancel the task. + /// + public virtual async Task GetJobStatusAsync(string jobId, CancellationToken token = default) + { + if (string.IsNullOrWhiteSpace(jobId)) + { + throw new ArgumentNullException(nameof(jobId)); + } + string uri = _apiPath + '/' + jobId; + using (var response = await _transport.GetAsync(uri, token: token).ConfigureAwait(false)) + { + if (response.IsSuccessStatusCode) + { + var stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false); + return DeserializeJsonFromStream(stream); + } + throw await GetApiErrorException(response).ConfigureAwait(false); + } + } + + /// + /// Get the overview of currently running Pregel jobs. + /// GET /_api/control_pregel + /// + /// A CancellationToken to observe while waiting for the task to complete or to cancel the task. + /// + /// Returns a list of currently running and recently + /// finished Pregel jobs without retrieving their results. + /// + public virtual async Task> GetAllRunningJobsAsync(CancellationToken token = default) + { + string uri = _apiPath; + using (var response = await _transport.GetAsync(uri, token: token).ConfigureAwait(false)) + { + if (response.IsSuccessStatusCode) + { + var stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false); + return DeserializeJsonFromStream>(stream); + } + throw await GetApiErrorException(response).ConfigureAwait(false); + } + } + + /// + /// Cancel an ongoing Pregel execution. + /// DELETE /_api/control_pregel/{id} + /// + /// + /// Cancel an execution which is still running, and + /// discard any intermediate results. This will immediately + /// free all memory taken up by the execution, and will + /// make you lose all intermediary data. + /// For more information + /// + /// The ID of the job. + /// A CancellationToken to observe while waiting for the task to complete or to cancel the task. + /// + public virtual async Task DeleteJobAsync(string jobId, CancellationToken token = default) + { + string uri = _apiPath + '/' + jobId; + using (var response = await _transport.DeleteAsync(uri, token: token).ConfigureAwait(false)) + { + if (response.IsSuccessStatusCode) + { + var stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false); + return DeserializeJsonFromStream(stream); + } + throw await GetApiErrorException(response).ConfigureAwait(false); + } + } + } +} \ No newline at end of file