Skip to content

Commit

Permalink
Added webhooks for triggering new orchestration instances (#483)
Browse files Browse the repository at this point in the history
Resolves #61
  • Loading branch information
kashimiz authored and cgillum committed Oct 23, 2018
1 parent fe8cef6 commit 097fb1e
Show file tree
Hide file tree
Showing 6 changed files with 331 additions and 50 deletions.
4 changes: 1 addition & 3 deletions samples/csx/HttpStart/run.csx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,5 @@ public static async Task<HttpResponseMessage> Run(

log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

var res = starter.CreateCheckStatusResponse(req, instanceId);
res.Headers.RetryAfter = new RetryConditionHeaderValue(TimeSpan.FromSeconds(10));
return res;
return starter.CreateCheckStatusResponse(req, instanceId);
}
4 changes: 1 addition & 3 deletions samples/precompiled/HttpStart.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ public static async Task<HttpResponseMessage> Run(

log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

var res = starter.CreateCheckStatusResponse(req, instanceId);
res.Headers.RetryAfter = new RetryConditionHeaderValue(TimeSpan.FromSeconds(10));
return res;
return starter.CreateCheckStatusResponse(req, instanceId);
}
}
}
1 change: 1 addition & 0 deletions src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Http;
using System.Reflection;
using System.Text;
Expand Down
170 changes: 132 additions & 38 deletions src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
Expand All @@ -19,6 +20,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
internal class HttpApiHandler
{
private const string InstancesControllerSegment = "/instances/";
private const string OrchestratorsControllerSegment = "/orchestrators/";
private const string TaskHubParameter = "taskHub";
private const string ConnectionParameter = "connection";
private const string RaiseEventOperation = "raiseEvent";
Expand Down Expand Up @@ -106,65 +108,105 @@ internal async Task<HttpResponseMessage> WaitForCompletionOrCreateCheckStatusRes

public async Task<HttpResponseMessage> HandleRequestAsync(HttpRequestMessage request)
{
string path = request.RequestUri.AbsolutePath.TrimEnd('/');
int i = path.IndexOf(InstancesControllerSegment, StringComparison.OrdinalIgnoreCase);
if (i < 0)
try
{
// Retrive All Status or conditional query in case of the request URL ends e.g. /instances/
if (request.Method == HttpMethod.Get
&& path.EndsWith(InstancesControllerSegment.TrimEnd('/'), StringComparison.OrdinalIgnoreCase))
string path = request.RequestUri.AbsolutePath.TrimEnd('/');
int i = path.IndexOf(OrchestratorsControllerSegment, StringComparison.OrdinalIgnoreCase);
int nextSlash = -1;
if (i >= 0 && request.Method == HttpMethod.Post)
{
return await this.HandleGetStatusRequestAsync(request);
}
string functionName;
string instanceId = string.Empty;

return request.CreateResponse(HttpStatusCode.NotFound);
}
i += OrchestratorsControllerSegment.Length;
nextSlash = path.IndexOf('/', i);

i += InstancesControllerSegment.Length;
int nextSlash = path.IndexOf('/', i);
if (nextSlash < 0)
{
functionName = path.Substring(i);
}
else
{
functionName = path.Substring(i, nextSlash - i);
i = nextSlash + 1;
instanceId = path.Substring(i);
}

if (nextSlash < 0)
{
string instanceId = path.Substring(i);
if (request.Method == HttpMethod.Get)
return await this.HandleStartOrchestratorRequestAsync(request, functionName, instanceId);
}

i = path.IndexOf(InstancesControllerSegment, StringComparison.OrdinalIgnoreCase);
if (i < 0)
{
return await this.HandleGetStatusRequestAsync(request, instanceId);
// Retrive All Status or conditional query in case of the request URL ends e.g. /instances/
if (request.Method == HttpMethod.Get
&& path.EndsWith(InstancesControllerSegment.TrimEnd('/'), StringComparison.OrdinalIgnoreCase))
{
return await this.HandleGetStatusRequestAsync(request);
}

return request.CreateResponse(HttpStatusCode.NotFound);
}
}
else if (request.Method == HttpMethod.Post)
{
string instanceId = path.Substring(i, nextSlash - i);
i = nextSlash + 1;

i += InstancesControllerSegment.Length;
nextSlash = path.IndexOf('/', i);

if (nextSlash < 0)
{
string operation = path.Substring(i);
if (string.Equals(operation, TerminateOperation, StringComparison.OrdinalIgnoreCase))
{
return await this.HandleTerminateInstanceRequestAsync(request, instanceId);
}
else if (string.Equals(operation, RewindOperation, StringComparison.OrdinalIgnoreCase))
string instanceId = path.Substring(i);
if (request.Method == HttpMethod.Get)
{
return await this.HandleRewindInstanceRequestAsync(request, instanceId);
return await this.HandleGetStatusRequestAsync(request, instanceId);
}
}
else
else if (request.Method == HttpMethod.Post)
{
string operation = path.Substring(i, nextSlash - i);
if (string.Equals(operation, RaiseEventOperation, StringComparison.OrdinalIgnoreCase))
string instanceId = path.Substring(i, nextSlash - i);
i = nextSlash + 1;
nextSlash = path.IndexOf('/', i);
if (nextSlash < 0)
{
i = nextSlash + 1;
nextSlash = path.IndexOf('/', i);
if (nextSlash < 0)
string operation = path.Substring(i);
if (string.Equals(operation, TerminateOperation, StringComparison.OrdinalIgnoreCase))
{
return await this.HandleTerminateInstanceRequestAsync(request, instanceId);
}
else if (string.Equals(operation, RewindOperation, StringComparison.OrdinalIgnoreCase))
{
return await this.HandleRewindInstanceRequestAsync(request, instanceId);
}
}
else
{
string operation = path.Substring(i, nextSlash - i);
if (string.Equals(operation, RaiseEventOperation, StringComparison.OrdinalIgnoreCase))
{
string eventName = path.Substring(i);
return await this.HandleRaiseEventRequestAsync(request, instanceId, eventName);
i = nextSlash + 1;
nextSlash = path.IndexOf('/', i);
if (nextSlash < 0)
{
string eventName = path.Substring(i);
return await this.HandleRaiseEventRequestAsync(request, instanceId, eventName);
}
}
}
}

return request.CreateErrorResponse(HttpStatusCode.BadRequest, "No such API");
}

return request.CreateErrorResponse(HttpStatusCode.BadRequest, "No such API");
/* Some handler methods throw ArgumentExceptions in specialized cases which should be returned to the client, such as when:
* - the function name is not found (starting a new function)
* - the orchestration instance is not in a Failed state (rewinding an orchestration instance)
*/
catch (ArgumentException e)
{
return request.CreateErrorResponse(HttpStatusCode.BadRequest, "One or more of the arguments submitted is incorrect", e);
}
catch (Exception e)
{
return request.CreateErrorResponse(HttpStatusCode.InternalServerError, "Something went wrong while processing your request", e);
}
}

private async Task<HttpResponseMessage> HandleGetStatusRequestAsync(
Expand Down Expand Up @@ -326,6 +368,47 @@ private async Task<HttpResponseMessage> HandleTerminateInstanceRequestAsync(
return request.CreateResponse(HttpStatusCode.Accepted);
}

private async Task<HttpResponseMessage> HandleStartOrchestratorRequestAsync(
HttpRequestMessage request,
string functionName,
string instanceId)
{
try
{
DurableOrchestrationClientBase client = this.GetClient(request);

object input = null;
if (request.Content != null)
{
using (Stream s = await request.Content.ReadAsStreamAsync())
using (StreamReader sr = new StreamReader(s))
using (JsonReader reader = new JsonTextReader(sr))
{
JsonSerializer serializer = JsonSerializer.Create(MessagePayloadDataConverter.MessageSettings);
input = serializer.Deserialize<object>(reader);
}
}

string id = await client.StartNewAsync(functionName, instanceId, input);

TimeSpan? timeout = GetTimeSpan(request, "timeout");
TimeSpan? pollingInterval = GetTimeSpan(request, "pollingInterval");

if (timeout.HasValue && pollingInterval.HasValue)
{
return await client.WaitForCompletionOrCreateCheckStatusResponseAsync(request, id, timeout.Value, pollingInterval.Value);
}
else
{
return client.CreateCheckStatusResponse(request, id);
}
}
catch (JsonReaderException e)
{
return request.CreateErrorResponse(HttpStatusCode.BadRequest, "Invalid JSON content", e);
}
}

private async Task<HttpResponseMessage> HandleRewindInstanceRequestAsync(
HttpRequestMessage request,
string instanceId)
Expand Down Expand Up @@ -493,5 +576,16 @@ private HttpResponseMessage CreateCheckStatusResponseMessage(HttpRequestMessage
response.Headers.RetryAfter = new RetryConditionHeaderValue(TimeSpan.FromSeconds(10));
return response;
}

private static TimeSpan? GetTimeSpan(HttpRequestMessage request, string queryParameterName)
{
string queryParameterStringValue = request.GetQueryNameValuePairs()[queryParameterName];
if (string.IsNullOrEmpty(queryParameterStringValue))
{
return null;
}

return TimeSpan.FromSeconds(double.Parse(queryParameterStringValue));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ internal class MessagePayloadDataConverter : JsonDataConverter
// The default JsonDataConverter for DTFx includes type information in JSON objects. This causes issues
// because the type information generated from C# scripts cannot be understood by DTFx. For this reason, explicitly
// configure the JsonDataConverter to not include CLR type information. This is also safer from a security perspective.
private static readonly JsonSerializerSettings MessageSettings = new JsonSerializerSettings
internal static readonly JsonSerializerSettings MessageSettings = new JsonSerializerSettings
{
TypeNameHandling = TypeNameHandling.None,
};
Expand Down
Loading

0 comments on commit 097fb1e

Please sign in to comment.