Skip to content

Commit

Permalink
Add client diagnostics - Add sync APIs to RemoteFetcher and LocalFetc…
Browse files Browse the repository at this point in the history
…her (#18807)
  • Loading branch information
azabbasi authored Feb 16, 2021
1 parent 1df6709 commit 0f86217
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,39 @@
</EmbeddedResource>
</ItemGroup>

<!-- Common source from Azure.Core -->
<ItemGroup>
<Compile Include="$(AzureCoreSharedSources)ArrayBufferWriter.cs">
<LinkBase>Shared\Azure.Core</LinkBase>
</Compile>
<Compile Include="$(AzureCoreSharedSources)ClientDiagnostics.cs">
<LinkBase>Shared\Azure.Core</LinkBase>
</Compile>
<Compile Include="$(AzureCoreSharedSources)ContentTypeUtilities.cs">
<LinkBase>Shared\Azure.Core</LinkBase>
</Compile>
<Compile Include="$(AzureCoreSharedSources)DiagnosticScope.cs">
<LinkBase>Shared\Azure.Core</LinkBase>
</Compile>
<Compile Include="$(AzureCoreSharedSources)DiagnosticScopeFactory.cs">
<LinkBase>Shared\Azure.Core</LinkBase>
</Compile>
<Compile Include="$(AzureCoreSharedSources)HttpMessageSanitizer.cs">
<LinkBase>Shared\Azure.Core</LinkBase>
</Compile>
<Compile Include="$(AzureCoreSharedSources)TaskExtensions.cs">
<LinkBase>Shared\Azure.Core</LinkBase>
</Compile>
<Compile Include="$(AzureCoreSharedSources)OperationHelpers.cs">
<LinkBase>Shared\Azure.Core</LinkBase>
</Compile>
<Compile Include="$(AzureCoreSharedSources)Argument.cs">
<LinkBase>Shared\Azure.Core</LinkBase>
</Compile>
<Compile Include="$(AzureCoreSharedSources)AzureResourceProviderNamespaceAttribute.cs">
<LinkBase>Shared\Azure.Core</LinkBase>
</Compile>
</ItemGroup>

<Import Project="$(RepoRoot)\sdk\core\Azure.Core\src\Azure.Core.props" />
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ public enum DependencyResolutionOption
/// Do not process external dependencies.
/// </summary>
Disabled,

/// <summary>
/// Enable external dependencies.
/// </summary>
Enabled,

/// <summary>
/// Try to get external dependencies using .expanded.json.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@
using System.Collections.Generic;
using System.Threading;
using System.Globalization;
using Azure.Core.Pipeline;

namespace Azure.Iot.ModelsRepository.Fetchers
{
internal class LocalModelFetcher : IModelFetcher
{
private readonly bool _tryExpanded;
private readonly ClientDiagnostics _clientDiagnostics;

public LocalModelFetcher(ResolverClientOptions clientOptions)
public LocalModelFetcher(ClientDiagnostics clientDiagnostics, ResolverClientOptions clientOptions)
{
_clientDiagnostics = clientDiagnostics;
_tryExpanded = clientOptions.DependencyResolution == DependencyResolutionOption.TryFromExpanded;
}

Expand All @@ -27,35 +30,48 @@ public Task<FetchResult> FetchAsync(string dtmi, Uri repositoryUri, Cancellation

public FetchResult Fetch(string dtmi, Uri repositoryUri, CancellationToken cancellationToken = default)
{
var work = new Queue<string>();
using DiagnosticScope scope = _clientDiagnostics.CreateScope("LocalModelFetcher.Fetch");
scope.Start();

if (_tryExpanded)
try
{
work.Enqueue(GetPath(dtmi, repositoryUri, true));
}
var work = new Queue<string>();

work.Enqueue(GetPath(dtmi, repositoryUri, false));
if (_tryExpanded)
{
work.Enqueue(GetPath(dtmi, repositoryUri, true));
}

string fnfError = string.Empty;
while (work.Count != 0 && !cancellationToken.IsCancellationRequested)
{
string tryContentPath = work.Dequeue();
ResolverEventSource.Shared.FetchingModelContent(tryContentPath);
work.Enqueue(GetPath(dtmi, repositoryUri, false));

if (File.Exists(tryContentPath))
string fnfError = string.Empty;
while (work.Count != 0)
{
return new FetchResult()
cancellationToken.ThrowIfCancellationRequested();

string tryContentPath = work.Dequeue();
ResolverEventSource.Instance.FetchingModelContent(tryContentPath);

if (File.Exists(tryContentPath))
{
Definition = File.ReadAllText(tryContentPath, Encoding.UTF8),
Path = tryContentPath
};
return new FetchResult
{
Definition = File.ReadAllText(tryContentPath, Encoding.UTF8),
Path = tryContentPath
};
}

ResolverEventSource.Instance.ErrorFetchingModelContent(tryContentPath);
fnfError = string.Format(CultureInfo.CurrentCulture, ServiceStrings.ErrorFetchingModelContent, tryContentPath);
}

ResolverEventSource.Shared.ErrorFetchingModelContent(tryContentPath);
fnfError = string.Format(CultureInfo.InvariantCulture, StandardStrings.ErrorFetchingModelContent, tryContentPath);
throw new RequestFailedException(fnfError, new FileNotFoundException(fnfError));
}
catch (Exception ex)
{
scope.Failed(ex);
throw;
}

throw new FileNotFoundException(fnfError);
}

private static string GetPath(string dtmi, Uri repositoryUri, bool expanded = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,51 +16,110 @@ namespace Azure.Iot.ModelsRepository.Fetchers
internal class RemoteModelFetcher : IModelFetcher
{
private readonly HttpPipeline _pipeline;
private readonly ClientDiagnostics _clientDiagnostics;
private readonly bool _tryExpanded;

public RemoteModelFetcher(ResolverClientOptions clientOptions)
public RemoteModelFetcher(ClientDiagnostics clientDiagnostics, ResolverClientOptions clientOptions)
{
_pipeline = CreatePipeline(clientOptions);
_tryExpanded = clientOptions.DependencyResolution == DependencyResolutionOption.TryFromExpanded;
_clientDiagnostics = clientDiagnostics;
}

public FetchResult Fetch(string dtmi, Uri repositoryUri, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
using DiagnosticScope scope = _clientDiagnostics.CreateScope("RemoteModelFetcher.Fetch");
scope.Start();
try
{
Queue<string> work = PrepareWork(dtmi, repositoryUri);

public async Task<FetchResult> FetchAsync(string dtmi, Uri repositoryUri, CancellationToken cancellationToken = default)
{
Queue<string> work = new Queue<string>();
string remoteFetchError = string.Empty;

if (_tryExpanded)
while (work.Count != 0)
{
cancellationToken.ThrowIfCancellationRequested();

string tryContentPath = work.Dequeue();
ResolverEventSource.Instance.FetchingModelContent(tryContentPath);

try
{
string content = EvaluatePath(tryContentPath, cancellationToken);
return new FetchResult
{
Definition = content,
Path = tryContentPath
};
}
catch (Exception)
{
remoteFetchError = string.Format(CultureInfo.CurrentCulture, StandardStrings.ErrorFetchingModelContent, tryContentPath);
}
}

throw new RequestFailedException(remoteFetchError);
}
catch (Exception ex)
{
work.Enqueue(GetPath(dtmi, repositoryUri, true));
scope.Failed(ex);
throw;
}
}

work.Enqueue(GetPath(dtmi, repositoryUri, false));

string remoteFetchError = string.Empty;
while (work.Count != 0 && !cancellationToken.IsCancellationRequested)
public async Task<FetchResult> FetchAsync(string dtmi, Uri repositoryUri, CancellationToken cancellationToken = default)
{
using DiagnosticScope scope = _clientDiagnostics.CreateScope("RemoteModelFetcher.Fetch");
scope.Start();
try
{
string tryContentPath = work.Dequeue();
ResolverEventSource.Shared.FetchingModelContent(tryContentPath);
Queue<string> work = PrepareWork(dtmi, repositoryUri);

string remoteFetchError = string.Empty;

string content = await EvaluatePathAsync(tryContentPath, cancellationToken).ConfigureAwait(false);
if (!string.IsNullOrEmpty(content))
while (work.Count != 0)
{
return new FetchResult()
cancellationToken.ThrowIfCancellationRequested();

string tryContentPath = work.Dequeue();
ResolverEventSource.Instance.FetchingModelContent(tryContentPath);

try
{
Definition = content,
Path = tryContentPath
};
string content = await EvaluatePathAsync(tryContentPath, cancellationToken).ConfigureAwait(false);
return new FetchResult()
{
Definition = content,
Path = tryContentPath
};
}
catch (Exception)
{
remoteFetchError = string.Format(CultureInfo.CurrentCulture, StandardStrings.ErrorFetchingModelContent, tryContentPath);
}
}

ResolverEventSource.Shared.ErrorFetchingModelContent(tryContentPath);
remoteFetchError = string.Format(CultureInfo.CurrentCulture, StandardStrings.ErrorFetchingModelContent, tryContentPath);
throw new RequestFailedException(remoteFetchError);
}
catch (Exception ex)
{
scope.Failed(ex);
throw;
}
}

private Queue<string> PrepareWork(string dtmi, Uri repositoryUri)
{
Queue<string> work = new Queue<string>();

throw new RequestFailedException(remoteFetchError);
if (_tryExpanded)
{
work.Enqueue(GetPath(dtmi, repositoryUri, true));
}

work.Enqueue(GetPath(dtmi, repositoryUri, false));

return work;
}

private static string GetPath(string dtmi, Uri repositoryUri, bool expanded = false)
Expand All @@ -69,32 +128,89 @@ private static string GetPath(string dtmi, Uri repositoryUri, bool expanded = fa
return DtmiConventions.DtmiToQualifiedPath(dtmi, absoluteUri, expanded);
}

private async Task<string> EvaluatePathAsync(string path, CancellationToken cancellationToken)
private string EvaluatePath(string path, CancellationToken cancellationToken = default)
{
Request request = _pipeline.CreateRequest();
request.Method = RequestMethod.Get;
request.Uri = new RequestUriBuilder();
request.Uri.Reset(new Uri(path));
using DiagnosticScope scope = _clientDiagnostics.CreateScope("RemoteModelFetcher.EvaluatePath");
scope.Start();

try
{
using HttpMessage message = CreateGetRequest(path);

Response response = await _pipeline.SendRequestAsync(request, cancellationToken).ConfigureAwait(false);
_pipeline.Send(message, cancellationToken);

if (response.Status >= 200 && response.Status <= 299)
switch (message.Response.Status)
{
case 200:
{
return GetContent(message.Response.ContentStream);
}
default:
throw _clientDiagnostics.CreateRequestFailedException(message.Response);
}
}
catch (Exception ex)
{
return await GetContentAsync(response.ContentStream, cancellationToken).ConfigureAwait(false);
scope.Failed(ex);
throw;
}

return null;
}

private static async Task<string> GetContentAsync(Stream content, CancellationToken cancellationToken)
private async Task<string> EvaluatePathAsync(string path, CancellationToken cancellationToken = default)
{
using (JsonDocument json = await JsonDocument.ParseAsync(content, default, cancellationToken).ConfigureAwait(false))
using DiagnosticScope scope = _clientDiagnostics.CreateScope("RemoteModelFetcher.EvaluatePath");
scope.Start();

try
{
using HttpMessage message = CreateGetRequest(path);

await _pipeline.SendAsync(message, cancellationToken).ConfigureAwait(false);

switch (message.Response.Status)
{
case 200:
{
return await GetContentAsync(message.Response.ContentStream, cancellationToken).ConfigureAwait(false);
}
default:
throw _clientDiagnostics.CreateRequestFailedException(message.Response);
}
}
catch (Exception ex)
{
JsonElement root = json.RootElement;
return root.GetRawText();
scope.Failed(ex);
throw;
}
}

private HttpMessage CreateGetRequest(string path)
{
HttpMessage message = _pipeline.CreateMessage();
Request request = message.Request;
request.Method = RequestMethod.Get;
var uri = new RequestUriBuilder();
uri.Reset(new Uri(path));
request.Uri = uri;

return message;
}

private static string GetContent(Stream content)
{
using JsonDocument json = JsonDocument.Parse(content);
JsonElement root = json.RootElement;
return root.GetRawText();
}

private static async Task<string> GetContentAsync(Stream content, CancellationToken cancellationToken)
{
using JsonDocument json = await JsonDocument.ParseAsync(content, default, cancellationToken).ConfigureAwait(false);

JsonElement root = json.RootElement;
return root.GetRawText();
}

private static HttpPipeline CreatePipeline(ResolverClientOptions options)
{
return HttpPipelineBuilder.Build(options);
Expand Down
Loading

0 comments on commit 0f86217

Please sign in to comment.