Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial migration of sidecar code into this repo #331

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Microsoft.DurableTask.sln
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Analyzers.Tests", "test\Ana
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AzureFunctionsApp.Tests", "samples\AzureFunctionsUnitTests\AzureFunctionsApp.Tests.csproj", "{FC2692E7-79AE-400E-A50F-8E0BCC8C9BD9}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Services", "Services", "{A9CA1883-133C-49BE-8FA1-B6D6E27110A8}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sidecar", "src\Services\Sidecar\Sidecar.csproj", "{47ACE256-E8C8-4734-B1D6-B9B39EBF6990}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sidecar.App", "src\Services\Sidecar.App\Sidecar.App.csproj", "{2F2A8D76-6294-420D-B308-DC2D087EE6B1}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -185,6 +191,14 @@ Global
{FC2692E7-79AE-400E-A50F-8E0BCC8C9BD9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FC2692E7-79AE-400E-A50F-8E0BCC8C9BD9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FC2692E7-79AE-400E-A50F-8E0BCC8C9BD9}.Release|Any CPU.Build.0 = Release|Any CPU
{47ACE256-E8C8-4734-B1D6-B9B39EBF6990}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{47ACE256-E8C8-4734-B1D6-B9B39EBF6990}.Debug|Any CPU.Build.0 = Debug|Any CPU
{47ACE256-E8C8-4734-B1D6-B9B39EBF6990}.Release|Any CPU.ActiveCfg = Release|Any CPU
{47ACE256-E8C8-4734-B1D6-B9B39EBF6990}.Release|Any CPU.Build.0 = Release|Any CPU
{2F2A8D76-6294-420D-B308-DC2D087EE6B1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2F2A8D76-6294-420D-B308-DC2D087EE6B1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2F2A8D76-6294-420D-B308-DC2D087EE6B1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2F2A8D76-6294-420D-B308-DC2D087EE6B1}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -220,6 +234,9 @@ Global
{998E9D97-BD36-4A9D-81FC-5DAC1CE40083} = {8AFC9781-F6F1-4696-BB4A-9ED7CA9D612B}
{541FCCCE-1059-4691-B027-F761CD80DE92} = {E5637F81-2FB9-4CD7-900D-455363B142A7}
{FC2692E7-79AE-400E-A50F-8E0BCC8C9BD9} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17}
{A9CA1883-133C-49BE-8FA1-B6D6E27110A8} = {8AFC9781-F6F1-4696-BB4A-9ED7CA9D612B}
{47ACE256-E8C8-4734-B1D6-B9B39EBF6990} = {A9CA1883-133C-49BE-8FA1-B6D6E27110A8}
{2F2A8D76-6294-420D-B308-DC2D087EE6B1} = {A9CA1883-133C-49BE-8FA1-B6D6E27110A8}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {AB41CB55-35EA-4986-A522-387AB3402E71}
Expand Down
15 changes: 15 additions & 0 deletions src/Services/Sidecar.App/BackendType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace Microsoft.DurableTask.Sidecar.App;

/// <summary>
/// Represents the supported Durable Task storage provider backends.
/// </summary>
enum BackendType
{
AzureStorage,
MSSQL,
Netherite,
Emulator,
}
15 changes: 15 additions & 0 deletions src/Services/Sidecar.App/IInputReader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace Microsoft.DurableTask.Sidecar.App;

/// <summary>
/// Abstraction for reading from standard input. This abstraction allows tests to mock stdin.
/// </summary>
interface IInputReader
{
/// <summary>
/// Reads a single line from standard input.
/// </summary>
Task<string?> ReadLineAsync();
}
42 changes: 42 additions & 0 deletions src/Services/Sidecar.App/Logs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using Microsoft.Extensions.Logging;

namespace Microsoft.DurableTask.Sidecar.App
{
static partial class Logs
{
[LoggerMessage(
EventId = 1,
Level = LogLevel.Information,
Message = "Initializing the Durable Task sidecar. Listen address = {address}, backend type = {backendType}.")]
public static partial void InitializingSidecar(
this ILogger logger,
string address,
string backendType);

[LoggerMessage(
EventId = 2,
Level = LogLevel.Information,
Message = "Sidecar initialized successfully in {latencyMs}ms.")]
public static partial void SidecarInitialized(
this ILogger logger,
long latencyMs);

[LoggerMessage(
EventId = 3,
Level = LogLevel.Error,
Message = "Sidecar listen port {port} is in use by another process!")]
public static partial void SidecarListenPortAlreadyInUse(
this ILogger logger,
int port);

[LoggerMessage(
EventId = 4,
Level = LogLevel.Information,
Message = "The Durable Task sidecar is shutting down.")]
public static partial void SidecarShuttingDown(this ILogger logger);
}
}

254 changes: 254 additions & 0 deletions src/Services/Sidecar.App/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Diagnostics;
using CommandLine;
using DurableTask.AzureStorage;
using DurableTask.Core;
using DurableTask.SqlServer;
using Grpc.Net.Client;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.DurableTask.Protobuf;
using Microsoft.DurableTask.Sidecar.Grpc;
using static Microsoft.DurableTask.Protobuf.TaskHubSidecarService;

namespace Microsoft.DurableTask.Sidecar.App;

static class Program
{
public static readonly InMemoryOrchestrationService SingletonLocalOrchestrationService = new();

// We allow stdin to be overwritten for in-process testing
public static IInputReader InputReader { get; set; } = new StandardInputReader();

// We allow an additional logger provider for in-process testing
public static ILoggerProvider? AdditionalLoggerProvider { get; set; }

public static async Task<int> Main(string[] args) =>
await Parser.Default.ParseArguments<StartOptions>(args).MapResult(
(StartOptions options) => OnStartCommand(options),
errors => Task.FromResult(1));

static async Task<int> OnStartCommand(StartOptions options)
{
Stopwatch startupLatencyStopwatch = Stopwatch.StartNew();
ILoggerFactory loggerFactory = GetLoggerFactory();
ILogger log = loggerFactory.CreateLogger("Microsoft.DurableTask.Sidecar");

string listenAddress = $"http://0.0.0.0:{options.ListenPort}";
log.InitializingSidecar(listenAddress, options.BackendType.ToString());

IOrchestrationService orchestrationService = GetOrchestrationService(options, loggerFactory);
await orchestrationService.CreateIfNotExistsAsync();

// TODO: Support clients that don't share the same runtime type as the service
IOrchestrationServiceClient orchestrationServiceClient = (IOrchestrationServiceClient)orchestrationService;

IWebHost host;
try
{
host = new WebHostBuilder()
.UseKestrel(options =>
{
// Need to force Http2 in Kestrel in unencrypted scenarios
// https://docs.microsoft.com/en-us/aspnet/core/grpc/troubleshoot?view=aspnetcore-3.0
options.ConfigureEndpointDefaults(listenOptions => listenOptions.Protocols = HttpProtocols.Http2);
})
.UseUrls(listenAddress)
.ConfigureServices(services =>
{
services.AddGrpc();
services.AddSingleton<ILoggerFactory>(loggerFactory);
services.AddSingleton<IOrchestrationService>(orchestrationService);
services.AddSingleton<IOrchestrationServiceClient>(orchestrationServiceClient);
services.AddSingleton<TaskHubGrpcServer>();
})
.Configure(app =>
{
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapGrpcService<TaskHubGrpcServer>();
});
})
.Build();
await host.StartAsync();

log.SidecarInitialized(startupLatencyStopwatch.ElapsedMilliseconds);
}
catch (IOException e) when (e.InnerException is AddressInUseException)
{
log.SidecarListenPortAlreadyInUse(options.ListenPort);
return 1;
}

if (options.Interactive)
{
Console.ForegroundColor = ConsoleColor.White;
Console.WriteLine("Interactive mode. Type the name of an orchestrator and press [ENTER] to submit. Type 'exit' to quit.");
Console.WriteLine();
Console.Write("> ");
Console.ResetColor();

// Create a gRPC channel to talk to the management service endpoint that we just started.
// Alternatively, we could consider making direct calls using TaskHubClient.
string localListenAddress = $"http://localhost:{options.ListenPort}";
GrpcChannel grpcChannel = GrpcChannel.ForAddress(localListenAddress, new GrpcChannelOptions
{
// NOTE: This is a localhost connection, so we can safely disable TLS.
UnsafeUseInsecureChannelCallCredentials = true,
});

var client = new TaskHubSidecarServiceClient(grpcChannel);

try
{
while (true)
{
string? input = (await ReadLineAsync())?.Trim();
if (string.IsNullOrEmpty(input) || string.Equals(input, "help", StringComparison.OrdinalIgnoreCase))
{
Console.ForegroundColor = ConsoleColor.White;
Console.WriteLine("Usage: {orchestrator-name} [{orchestrator-input}]");
Console.Write("> ");
Console.ResetColor();
continue;
}

if (string.Equals(input, "exit", StringComparison.OrdinalIgnoreCase))
{
break;
}

string[] parts = input.Split(' ');
string name = parts.First();

var request = new CreateInstanceRequest
{
Name = name,
InstanceId = $"dt-interactive-{Guid.NewGuid():N}",
};

if (parts.Length > 1)
{
request.Input = parts[1];
}

await client.StartInstanceAsync(request);
}
}
finally
{
await grpcChannel.ShutdownAsync();
}
}
else
{
// TODO: Block until we receive a SIGTERM or SIGKILL
await Task.Delay(Timeout.Infinite);
}

log.SidecarShuttingDown();
await host.StopAsync();
host.Dispose();

return 0;
}

static ILoggerFactory GetLoggerFactory() => LoggerFactory.Create(builder =>
{
builder.AddSimpleConsole(options =>
{
options.SingleLine = true;
options.UseUtcTimestamp = true;
options.TimestampFormat = "yyyy-MM-ddThh:mm:ss.ffffffZ ";
});

// TODO: Support Application Insights URLs for sovereign clouds
string? appInsightsKey = Environment.GetEnvironmentVariable("APPINSIGHTS_INSTRUMENTATIONKEY");
if (!string.IsNullOrEmpty(appInsightsKey))
{
builder.AddApplicationInsights(appInsightsKey);
}

// Support a statically configured logger provider for in-memory testing.
if (AdditionalLoggerProvider != null)
{
builder.AddProvider(AdditionalLoggerProvider);
}

// Sidecar logging can be optionally configured using environment variables.
string? sidecarLogLevelString = Environment.GetEnvironmentVariable("DURABLETASK_SIDECAR_LOGLEVEL");
if (!Enum.TryParse(sidecarLogLevelString, ignoreCase: true, out LogLevel sidecarLogLevel))
{
sidecarLogLevel = LogLevel.Information;
}

// Storage provider logs should be warning+ by default and
// core execution logs (DurableTask.Core) should be information+ by default
// to support basic tracking.
builder.AddFilter("DurableTask", LogLevel.Warning);
builder.AddFilter("DurableTask.Core", LogLevel.Information);
builder.AddFilter("Microsoft.DurableTask.Sidecar", sidecarLogLevel);

// ASP.NET Core logs to warning since they can otherwise be noisy.
// This should be increased if it's necessary to debug gRPC request/response issues.
builder.AddFilter("Microsoft.AspNetCore", LogLevel.Warning);
});

static IOrchestrationService GetOrchestrationService(StartOptions options, ILoggerFactory loggerFactory)
{
switch (options.BackendType)
{
case BackendType.AzureStorage:
const string AzureStorageConnectionStringName = "DURABLETASK_AZURESTORAGE_CONNECTIONSTRING";
string? storageConnectionString = Environment.GetEnvironmentVariable(AzureStorageConnectionStringName);
if (string.IsNullOrEmpty(storageConnectionString))
{
// Local storage emulator: "UseDevelopmentStorage=true"
throw new InvalidOperationException($"The Azure Storage provider requires a {AzureStorageConnectionStringName} environment variable.");
}

var azureStorageSettings = new AzureStorageOrchestrationServiceSettings
{
TaskHubName = "DurableServerTests",
StorageConnectionString = storageConnectionString,
MaxQueuePollingInterval = TimeSpan.FromSeconds(5),
LoggerFactory = loggerFactory,
};
return new AzureStorageOrchestrationService(azureStorageSettings);

case BackendType.Emulator:
return SingletonLocalOrchestrationService;

case BackendType.MSSQL:
const string SqlConnectionStringName = "DURABLETASK_MSSQL_CONNECTIONSTRING";
string? sqlConnectionString = Environment.GetEnvironmentVariable(SqlConnectionStringName);
if (string.IsNullOrEmpty(sqlConnectionString))
{
// Local Windows install: "Server=localhost;Database=DurableDB;Trusted_Connection=True;"
throw new InvalidOperationException($"The MSSQL storage provider requires a {SqlConnectionStringName} environment variable.");
}

var mssqlSettings = new SqlOrchestrationServiceSettings(sqlConnectionString)
{
LoggerFactory = loggerFactory,
};
return new SqlOrchestrationService(mssqlSettings);

case BackendType.Netherite:
throw new NotSupportedException("Netherite is not yet supported.");

default:
throw new ArgumentException($"Unknown backend type: {options.BackendType}");
}
}

static Task<string?> ReadLineAsync() => InputReader.ReadLineAsync();

class StandardInputReader : IInputReader
{
public Task<string?> ReadLineAsync() => Console.In.ReadLineAsync();
}
}
8 changes: 8 additions & 0 deletions src/Services/Sidecar.App/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"profiles": {
"DurableTask.Sidecar.App": {
"commandName": "Project",
"commandLineArgs": "--backend Emulator"
}
}
}
Loading
Loading