Skip to content

Commit

Permalink
Adding request cancellation to Desktop .NET scenario (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
tpeczek committed Feb 20, 2022
1 parent 358e071 commit 1a39b3e
Showing 1 changed file with 47 additions and 23 deletions.
70 changes: 47 additions & 23 deletions Demo.Ndjson.AsyncStreams.Net.Http/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
using System.Buffers;
using System.Net.Http;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using Ndjson.AsyncStreams.Net.Http;
using Demo.WeatherForecasts;

Expand All @@ -17,32 +19,54 @@ class Program

static async Task Main(string[] args)
{
await ConsumeNdjsonStreamAsync().ConfigureAwait(false);
using CancellationTokenSource cancellationTokenSource = new();

await NegotiateRawJsonStreamAsync().ConfigureAwait(false);
void consoleCancelEventHandler(object sender, ConsoleCancelEventArgs eventArgs)
{
Console.WriteLine("Attempting to cancel . . .");

cancellationTokenSource.Cancel();
eventArgs.Cancel = true;
};
Console.CancelKeyPress += consoleCancelEventHandler;

CancellationToken cancellationToken = cancellationTokenSource.Token;

try
{
//await ConsumeNdjsonStreamAsync(cancellationToken).ConfigureAwait(false);

await NegotiateJsonStreamAsync().ConfigureAwait(false);
//await NegotiateRawJsonStreamAsync(cancellationToken).ConfigureAwait(false);

await NegotiateNdjsonStreamAsync().ConfigureAwait(false);
//await NegotiateJsonStreamAsync(cancellationToken).ConfigureAwait(false);

await StreamNdjsonAsync().ConfigureAwait(false);
//await NegotiateNdjsonStreamAsync(cancellationToken).ConfigureAwait(false);

Console.WriteLine("Press any key to exit . . .");
Console.ReadKey(true);
await StreamNdjsonAsync(cancellationToken).ConfigureAwait(false);

Console.CancelKeyPress -= consoleCancelEventHandler;

Console.WriteLine("Press any key to exit . . .");
Console.ReadKey(true);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
Console.WriteLine("Successfully cancelled.");
}
}

private static async Task ConsumeNdjsonStreamAsync()
private static async Task ConsumeNdjsonStreamAsync(CancellationToken cancellationToken)
{
Console.WriteLine($"-- {nameof(ConsumeNdjsonStreamAsync)} --");
Console.WriteLine($"[{DateTime.UtcNow:hh:mm:ss.fff}] Receving weather forecasts . . .");

using HttpClient httpClient = new();

using HttpResponseMessage response = await httpClient.GetAsync("https://localhost:5001/api/WeatherForecasts/stream", HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false);
using HttpResponseMessage response = await httpClient.GetAsync("https://localhost:5001/api/WeatherForecasts/stream", HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);

response.EnsureSuccessStatusCode();

await foreach (WeatherForecast weatherForecast in response.Content!.ReadFromNdjsonAsync<WeatherForecast>().ConfigureAwait(false))
await foreach (WeatherForecast weatherForecast in response.Content!.ReadFromNdjsonAsync<WeatherForecast>(cancellationToken: cancellationToken).ConfigureAwait(false))
{
Console.WriteLine($"[{DateTime.UtcNow:hh:mm:ss.fff}] {weatherForecast.Summary}");
}
Expand All @@ -51,23 +75,23 @@ private static async Task ConsumeNdjsonStreamAsync()
Console.WriteLine();
}

private static async Task NegotiateRawJsonStreamAsync()
private static async Task NegotiateRawJsonStreamAsync(CancellationToken cancellationToken)
{
Console.WriteLine($"-- {nameof(NegotiateRawJsonStreamAsync)} --");
Console.WriteLine($"[{DateTime.UtcNow:hh:mm:ss.fff}] Receving weather forecasts . . .");

using HttpClient httpClient = new();
httpClient.DefaultRequestHeaders.Add("Accept", "application/json");

using HttpResponseMessage response = await httpClient.GetAsync("https://localhost:5001/api/WeatherForecasts/negotiate-stream", HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false);
using HttpResponseMessage response = await httpClient.GetAsync("https://localhost:5001/api/WeatherForecasts/negotiate-stream", HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);

response.EnsureSuccessStatusCode();
using Stream responseStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false);
using Stream responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);

while (true)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(128);
int bytesRead = await responseStream.ReadAsync(buffer);
int bytesRead = await responseStream.ReadAsync(buffer, cancellationToken);

Console.WriteLine($"[{DateTime.UtcNow:hh:mm:ss.fff}] ({bytesRead}/{buffer.Length}) {Encoding.UTF8.GetString(buffer)}");

Expand All @@ -83,20 +107,20 @@ private static async Task NegotiateRawJsonStreamAsync()
Console.WriteLine();
}

private static async Task NegotiateJsonStreamAsync()
private static async Task NegotiateJsonStreamAsync(CancellationToken cancellationToken)
{
Console.WriteLine($"-- {nameof(NegotiateJsonStreamAsync)} --");
Console.WriteLine($"[{DateTime.UtcNow:hh:mm:ss.fff}] Receving weather forecasts . . .");

using HttpClient httpClient = new();
httpClient.DefaultRequestHeaders.Add("Accept", "application/json");

using HttpResponseMessage response = await httpClient.GetAsync("https://localhost:5001/api/WeatherForecasts/negotiate-stream", HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false);
using HttpResponseMessage response = await httpClient.GetAsync("https://localhost:5001/api/WeatherForecasts/negotiate-stream", HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);

response.EnsureSuccessStatusCode();
using Stream responseStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false);
using Stream responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);

await foreach (WeatherForecast weatherForecast in JsonSerializer.DeserializeAsyncEnumerable<WeatherForecast>(responseStream, new JsonSerializerOptions { PropertyNameCaseInsensitive = true, DefaultBufferSize = 128 }))
await foreach (WeatherForecast weatherForecast in JsonSerializer.DeserializeAsyncEnumerable<WeatherForecast>(responseStream, new JsonSerializerOptions { PropertyNameCaseInsensitive = true, DefaultBufferSize = 128 }, cancellationToken))
{
Console.WriteLine($"[{DateTime.UtcNow:hh:mm:ss.fff}] {weatherForecast.Summary}");
}
Expand All @@ -105,19 +129,19 @@ private static async Task NegotiateJsonStreamAsync()
Console.WriteLine();
}

private static async Task NegotiateNdjsonStreamAsync()
private static async Task NegotiateNdjsonStreamAsync(CancellationToken cancellationToken)
{
Console.WriteLine($"-- {nameof(NegotiateNdjsonStreamAsync)} --");
Console.WriteLine($"[{DateTime.UtcNow:hh:mm:ss.fff}] Receving weather forecasts . . .");

using HttpClient httpClient = new();
httpClient.DefaultRequestHeaders.Add("Accept", "application/x-ndjson");

using HttpResponseMessage response = await httpClient.GetAsync("https://localhost:5001/api/WeatherForecasts/negotiate-stream", HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false);
using HttpResponseMessage response = await httpClient.GetAsync("https://localhost:5001/api/WeatherForecasts/negotiate-stream", HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);

response.EnsureSuccessStatusCode();

await foreach (WeatherForecast weatherForecast in response.Content!.ReadFromNdjsonAsync<WeatherForecast>().ConfigureAwait(false))
await foreach (WeatherForecast weatherForecast in response.Content!.ReadFromNdjsonAsync<WeatherForecast>(cancellationToken: cancellationToken).ConfigureAwait(false))
{
Console.WriteLine($"[{DateTime.UtcNow:hh:mm:ss.fff}] {weatherForecast.Summary}");
}
Expand All @@ -126,7 +150,7 @@ private static async Task NegotiateNdjsonStreamAsync()
Console.WriteLine();
}

private static async Task StreamNdjsonAsync()
private static async Task StreamNdjsonAsync(CancellationToken cancellationToken)
{
static async IAsyncEnumerable<WeatherForecast> streamWeatherForecastsAsync()
{
Expand All @@ -143,7 +167,7 @@ static async IAsyncEnumerable<WeatherForecast> streamWeatherForecastsAsync()

using HttpClient httpClient = new();

using HttpResponseMessage response = await httpClient.PostAsNdjsonAsync("https://localhost:5001/api/WeatherForecasts/stream", streamWeatherForecastsAsync()).ConfigureAwait(false);
using HttpResponseMessage response = await httpClient.PostAsNdjsonAsync("https://localhost:5001/api/WeatherForecasts/stream", streamWeatherForecastsAsync(), cancellationToken).ConfigureAwait(false);

response.EnsureSuccessStatusCode();

Expand Down

0 comments on commit 1a39b3e

Please sign in to comment.