Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf/read rpc message to end #6951

Merged
merged 8 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipelines;
Expand All @@ -11,6 +12,7 @@ namespace Nethermind.JsonRpc.Test
{
public static class JsonRpcProcessorExtensions
{
public static IAsyncEnumerable<JsonRpcResult> ProcessAsync(this IJsonRpcProcessor processor, string request, JsonRpcContext context) => processor.ProcessAsync(PipeReader.Create(new MemoryStream(Encoding.UTF8.GetBytes(request))), context);
public static IAsyncEnumerable<JsonRpcResult> ProcessAsync(this IJsonRpcProcessor processor, string request, JsonRpcContext context) =>
processor.ProcessAsync(PipeReader.Create(new ReadOnlySequence<byte>(Encoding.UTF8.GetBytes(request))), context);
}
}
253 changes: 133 additions & 120 deletions src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.IO.Pipelines;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http;
Expand Down Expand Up @@ -119,152 +120,142 @@ public async IAsyncEnumerable<JsonRpcResult> ProcessAsync(PipeReader reader, Jso
{
reader = await RecordRequest(reader);
Stopwatch stopwatch = Stopwatch.StartNew();
CancellationTokenSource timeoutSource = new(_jsonRpcConfig.Timeout);

// Handles general exceptions during parsing and validation.
// Sends an error response and stops the stopwatch.
JsonRpcResult GetParsingError(string error, Exception? exception = null)
{
Metrics.JsonRpcRequestDeserializationFailures++;
if (_logger.IsError) _logger.Error(error, exception);
JsonRpcErrorResponse response = _jsonRpcService.GetErrorResponse(ErrorCodes.ParseError, "Incorrect message");
TraceResult(response);
stopwatch.Stop();
return JsonRpcResult.Single(RecordResponse(response, new RpcReport("# parsing error #", stopwatch.ElapsedMicroseconds(), false)));
}

// Initializes a buffer to store the data read from the reader.
ReadOnlySequence<byte> buffer = default;
try
{
// Continuously read data from the PipeReader in a loop.
// Can read multiple requests, ends when there is no more requests to read or there is an error in deserialization.
while (true)
// Asynchronously reads data from the PipeReader.
ReadResult readResult = await reader.ReadToEndAsync(timeoutSource.Token);

buffer = readResult.Buffer;
// Placeholder for a result in case of deserialization failure.
JsonRpcResult? deserializationFailureResult = null;

// Processes the buffer while it's not empty; before going out to outer loop to get more data.
while (!buffer.IsEmpty)
{
// Asynchronously reads data from the PipeReader.
ReadResult readResult = await reader.ReadAsync();
buffer = readResult.Buffer;
// Placeholder for a result in case of deserialization failure.
JsonRpcResult? deserializationFailureResult = null;

// Processes the buffer while it's not empty; before going out to outer loop to get more data.
while (!buffer.IsEmpty)
JsonDocument? jsonDocument = null;
JsonRpcRequest? model = null;
ArrayPoolList<JsonRpcRequest>? collection = null;
try
{
JsonDocument? jsonDocument = null;
JsonRpcRequest? model = null;
ArrayPoolList<JsonRpcRequest>? collection = null;
try
// Tries to parse the JSON from the buffer.
if (!TryParseJson(ref buffer, out jsonDocument))
{
// Tries to parse the JSON from the buffer.
if (!TryParseJson(ref buffer, out jsonDocument))
{
// More data needs to be read to complete a document
break;
}

// Deserializes the JSON document into a request object or a collection of requests.
(model, collection) = DeserializeObjectOrArray(jsonDocument);
deserializationFailureResult = GetParsingError("Error during parsing/validation.");
}
catch (BadHttpRequestException e)
else
{
// Increments failure metric and logs the exception, then stops processing.
Metrics.JsonRpcRequestDeserializationFailures++;
if (_logger.IsDebug) _logger.Debug($"Couldn't read request.{Environment.NewLine}{e}");
yield break;
}
catch (ConnectionResetException e)
{
// Logs exception, then stop processing.
if (_logger.IsTrace) _logger.Trace($"Connection reset.{Environment.NewLine}{e}");
yield break;
}
catch (Exception ex)
{
// Handles general exceptions during parsing and validation.
// Sends an error response and stops the stopwatch.
Metrics.JsonRpcRequestDeserializationFailures++;
if (_logger.IsError) _logger.Error($"Error during parsing/validation.", ex);
JsonRpcErrorResponse response = _jsonRpcService.GetErrorResponse(ErrorCodes.ParseError, "Incorrect message");
TraceResult(response);
stopwatch.Stop();
deserializationFailureResult = JsonRpcResult.Single(
RecordResponse(response, new RpcReport("# parsing error #", stopwatch.ElapsedMicroseconds(), false)));
// Deserializes the JSON document into a request object or a collection of requests.
(model, collection) = DeserializeObjectOrArray(jsonDocument);
}
}
catch (BadHttpRequestException e)
{
// Increments failure metric and logs the exception, then stops processing.
Metrics.JsonRpcRequestDeserializationFailures++;
if (_logger.IsDebug) _logger.Debug($"Couldn't read request.{Environment.NewLine}{e}");
yield break;
}
catch (ConnectionResetException e)
{
// Logs exception, then stop processing.
if (_logger.IsTrace) _logger.Trace($"Connection reset.{Environment.NewLine}{e}");
yield break;
}
catch (Exception ex)
{
deserializationFailureResult = GetParsingError("Error during parsing/validation.", ex);
}

// Checks for deserialization failure and yields the result.
if (deserializationFailureResult.HasValue)
{
yield return deserializationFailureResult.Value;
break;
}
// Checks for deserialization failure and yields the result.
if (deserializationFailureResult.HasValue)
{
yield return deserializationFailureResult.Value;
break;
}

// Handles a single JSON RPC request.
if (model is not null)
{
if (_logger.IsDebug) _logger.Debug($"JSON RPC request {model}");
// Handles a single JSON RPC request.
if (model is not null)
{
if (_logger.IsDebug) _logger.Debug($"JSON RPC request {model}");

// Processes the individual request.
JsonRpcResult.Entry result = await HandleSingleRequest(model, context);
result.Response.AddDisposable(() => jsonDocument.Dispose());
// Processes the individual request.
JsonRpcResult.Entry result = await HandleSingleRequest(model, context);
result.Response.AddDisposable(() => jsonDocument.Dispose());

// Returns the result of the processed request.
yield return JsonRpcResult.Single(RecordResponse(result));
}
// Returns the result of the processed request.
yield return JsonRpcResult.Single(RecordResponse(result));
}

// Processes a collection of JSON RPC requests.
if (collection is not null)
{
if (_logger.IsDebug) _logger.Debug($"{collection.Count} JSON RPC requests");

// Checks for authentication and batch size limit.
if (!context.IsAuthenticated && collection.Count > _jsonRpcConfig.MaxBatchSize)
{
if (_logger.IsWarn) _logger.Warn($"The batch size limit was exceeded. The requested batch size {collection.Count}, and the current config setting is JsonRpc.{nameof(_jsonRpcConfig.MaxBatchSize)} = {_jsonRpcConfig.MaxBatchSize}.");
JsonRpcErrorResponse? response = _jsonRpcService.GetErrorResponse(ErrorCodes.LimitExceeded, "Batch size limit exceeded");
response.AddDisposable(() => jsonDocument.Dispose());

deserializationFailureResult = JsonRpcResult.Single(RecordResponse(response, RpcReport.Error));
collection.Dispose();
yield return deserializationFailureResult.Value;
break;
}

// Stops the stopwatch and yields the batch processing result.
stopwatch.Stop();
JsonRpcBatchResult jsonRpcBatchResult = new((e, c) => IterateRequest(collection, context, e).GetAsyncEnumerator(c));
jsonRpcBatchResult.AddDisposable(() => collection.Dispose());
yield return JsonRpcResult.Collection(jsonRpcBatchResult);
}
// Processes a collection of JSON RPC requests.
if (collection is not null)
{
if (_logger.IsDebug) _logger.Debug($"{collection.Count} JSON RPC requests");

// Handles invalid requests.
if (model is null && collection is null)
// Checks for authentication and batch size limit.
if (!context.IsAuthenticated && collection.Count > _jsonRpcConfig.MaxBatchSize)
{
Metrics.JsonRpcInvalidRequests++;
JsonRpcErrorResponse errorResponse = _jsonRpcService.GetErrorResponse(ErrorCodes.InvalidRequest, "Invalid request");
errorResponse.AddDisposable(() => jsonDocument.Dispose());

TraceResult(errorResponse);
stopwatch.Stop();
if (_logger.IsDebug) _logger.Debug($" Failed request handled in {stopwatch.Elapsed.TotalMilliseconds}ms");
deserializationFailureResult = JsonRpcResult.Single(RecordResponse(errorResponse, new RpcReport("# parsing error #", stopwatch.ElapsedMicroseconds(), false)));
if (_logger.IsWarn) _logger.Warn($"The batch size limit was exceeded. The requested batch size {collection.Count}, and the current config setting is JsonRpc.{nameof(_jsonRpcConfig.MaxBatchSize)} = {_jsonRpcConfig.MaxBatchSize}.");
JsonRpcErrorResponse? response = _jsonRpcService.GetErrorResponse(ErrorCodes.LimitExceeded, "Batch size limit exceeded");
response.AddDisposable(() => jsonDocument.Dispose());

deserializationFailureResult = JsonRpcResult.Single(RecordResponse(response, RpcReport.Error));
collection.Dispose();
yield return deserializationFailureResult.Value;
break;
}

// Stops the stopwatch and yields the batch processing result.
stopwatch.Stop();
JsonRpcBatchResult jsonRpcBatchResult = new((e, c) => IterateRequest(collection, context, e).GetAsyncEnumerator(c));
jsonRpcBatchResult.AddDisposable(() => collection.Dispose());
yield return JsonRpcResult.Collection(jsonRpcBatchResult);
}

// Checks if the deserialization failed
if (deserializationFailureResult.HasValue)
// Handles invalid requests.
if (model is null && collection is null)
{
Metrics.JsonRpcInvalidRequests++;
JsonRpcErrorResponse errorResponse = _jsonRpcService.GetErrorResponse(ErrorCodes.InvalidRequest, "Invalid request");
errorResponse.AddDisposable(() => jsonDocument.Dispose());

TraceResult(errorResponse);
stopwatch.Stop();
if (_logger.IsDebug) _logger.Debug($" Failed request handled in {stopwatch.Elapsed.TotalMilliseconds}ms");
deserializationFailureResult = JsonRpcResult.Single(RecordResponse(errorResponse, new RpcReport("# parsing error #", stopwatch.ElapsedMicroseconds(), false)));
yield return deserializationFailureResult.Value;
break;
}
}

// Checks if the read operation is completed.
if (readResult.IsCompleted)
{
if (buffer.Length > 0 && (buffer.IsSingleSegment ? buffer.FirstSpan : buffer.ToArray()).IndexOfAnyExcept(WhiteSpace()) >= 0)
{
Metrics.JsonRpcRequestDeserializationFailures++;
if (_logger.IsError) _logger.Error($"Error during parsing/validation. Incomplete request");
JsonRpcErrorResponse response = _jsonRpcService.GetErrorResponse(ErrorCodes.ParseError, "Incorrect message");
TraceResult(response);
stopwatch.Stop();
deserializationFailureResult = JsonRpcResult.Single(
RecordResponse(response, new RpcReport("# parsing error #", stopwatch.ElapsedMicroseconds(), false)));
yield return deserializationFailureResult.Value;
}
// Checks if the deserialization failed
if (deserializationFailureResult.HasValue)
{
yield break;
}

break;
// Checks if the read operation is completed.
if (readResult.IsCompleted)
{
if (buffer.Length > 0 && HasNonWhitespace(buffer))
{
yield return GetParsingError("Error during parsing/validation. Incomplete request");
}

// Advances the reader to the next segment of the buffer.
reader.AdvanceTo(buffer.Start, buffer.End);
buffer = default;
}
}
finally
Expand All @@ -280,7 +271,29 @@ public async IAsyncEnumerable<JsonRpcResult> ProcessAsync(PipeReader reader, Jso
await reader.CompleteAsync();
}

private static ReadOnlySpan<byte> WhiteSpace() => " \n\r\t"u8;
private static bool HasNonWhitespace(ReadOnlySequence<byte> buffer)
{
static bool HasNonWhitespace(ReadOnlySpan<byte> span)
{
static ReadOnlySpan<byte> WhiteSpace() => " \n\r\t"u8;
return span.IndexOfAnyExcept(WhiteSpace()) >= 0;
}

if (buffer.IsSingleSegment)
{
return HasNonWhitespace(buffer.FirstSpan);
}

foreach (ReadOnlyMemory<byte> memory in buffer)
{
if (HasNonWhitespace(memory.Span))
{
return true;
}
}

return false;
}

private async IAsyncEnumerable<JsonRpcResult.Entry> IterateRequest(
ArrayPoolList<JsonRpcRequest> requests,
Expand Down Expand Up @@ -351,7 +364,7 @@ public async IAsyncEnumerable<JsonRpcResult> ProcessAsync(PipeReader reader, Jso

private static bool TryParseJson(ref ReadOnlySequence<byte> buffer, out JsonDocument jsonDocument)
{
Utf8JsonReader reader = new(buffer, isFinalBlock: false, default);
Utf8JsonReader reader = new(buffer);

if (JsonDocument.TryParseValue(ref reader, out jsonDocument))
{
Expand Down
22 changes: 6 additions & 16 deletions src/Nethermind/Nethermind.JsonRpc/JsonRpcService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -281,14 +281,9 @@ private void LogRequest(string methodName, JsonElement providedParameters, Expec

if (providedParameter.ValueKind == JsonValueKind.Null || (providedParameter.ValueKind == JsonValueKind.String && providedParameter.ValueEquals(ReadOnlySpan<byte>.Empty)))
{
if (providedParameter.ValueKind == JsonValueKind.Null && expectedParameter.IsNullable)
{
return null;
}
else
{
return Type.Missing;
}
return providedParameter.ValueKind == JsonValueKind.Null && expectedParameter.IsNullable
? null
: Type.Missing;
}

object? executionParam;
Expand All @@ -309,14 +304,9 @@ private void LogRequest(string methodName, JsonElement providedParameters, Expec
if (providedParameter.ValueKind == JsonValueKind.String)
{
JsonConverter converter = EthereumJsonSerializer.JsonOptions.GetConverter(paramType);
if (converter.GetType().FullName.StartsWith("System."))
{
executionParam = JsonSerializer.Deserialize(providedParameter.GetString(), paramType, EthereumJsonSerializer.JsonOptions);
}
else
{
executionParam = providedParameter.Deserialize(paramType, EthereumJsonSerializer.JsonOptions);
}
executionParam = converter.GetType().FullName.StartsWith("System.")
? JsonSerializer.Deserialize(providedParameter.GetString(), paramType, EthereumJsonSerializer.JsonOptions)
: providedParameter.Deserialize(paramType, EthereumJsonSerializer.JsonOptions);
}
else
{
Expand Down
26 changes: 26 additions & 0 deletions src/Nethermind/Nethermind.JsonRpc/PipeReaderExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System.Buffers;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;

namespace Nethermind.JsonRpc;

public static class PipeReaderExtensions
{
public static async Task<ReadResult> ReadToEndAsync(this PipeReader reader, CancellationToken cancellationToken = default)
{
while (true)
{
ReadResult result = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
ReadOnlySequence<byte> buffer = result.Buffer;
if (result.IsCompleted || result.IsCanceled)
{
return result;
}
reader.AdvanceTo(buffer.Start, buffer.End);
}
}
}
Loading