Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new CLI option to limit the number of requests in a single RPC batch request #4965

Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 23.1.1

### Breaking Changes
- Add a new CLI option to limit the number of requests in a single RPC batch request. Default=1 [#4965](https://github.com/hyperledger/besu/pull/4965)

- Changed JsonRpc http service to return the error -32602 (Invalid params) with a 200 http status code

Expand Down
8 changes: 8 additions & 0 deletions besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,13 @@ static class JsonRPCHttpOptionGroup {
split = ",",
arity = "1..*")
private final List<String> rpcHttpTlsCipherSuites = new ArrayList<>();

@CommandLine.Option(
names = {"--rpc-http-max-batch-size"},
paramLabel = MANDATORY_INTEGER_FORMAT_HELP,
description =
"Specifies the maximum number of requests in a single RPC batch request via RPC. -1 specifies no limit (default: ${DEFAULT-VALUE})")
private final Integer rpcHttpMaxBatchSize = DEFAULT_HTTP_MAX_BATCH_SIZE;
}

// JSON-RPC Websocket Options
Expand Down Expand Up @@ -2378,6 +2385,7 @@ && rpcHttpAuthenticationCredentialsFile() == null
jsonRPCHttpOptionGroup.rpcHttpAuthenticationAlgorithm);
jsonRpcConfiguration.setTlsConfiguration(rpcHttpTlsConfiguration());
jsonRpcConfiguration.setHttpTimeoutSec(unstableRPCOptions.getHttpTimeoutSec());
jsonRpcConfiguration.setMaxBatchSize(jsonRPCHttpOptionGroup.rpcHttpMaxBatchSize);
return jsonRpcConfiguration;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public interface DefaultCommandValues {
int DEFAULT_P2P_PEER_LOWER_BOUND = 25;
/** The constant DEFAULT_HTTP_MAX_CONNECTIONS. */
int DEFAULT_HTTP_MAX_CONNECTIONS = 80;
/** The constant DEFAULT_HTTP_MAX_BATCH_SIZE. */
int DEFAULT_HTTP_MAX_BATCH_SIZE = 1;
/** The constant DEFAULT_WS_MAX_CONNECTIONS. */
int DEFAULT_WS_MAX_CONNECTIONS = 80;
/** The constant DEFAULT_WS_MAX_FRAME_SIZE. */
Expand Down
15 changes: 15 additions & 0 deletions besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1614,6 +1614,21 @@ public void p2pPeerUpperBound_without_p2pPeerLowerBound_shouldSetLowerBoundEqual
verify(mockRunnerBuilder).build();
}

@Test
public void rpcHttpMaxBatchSizeOptionMustBeUsed() {
final int rpcHttpMaxBatchSize = 1;
parseCommand("--rpc-http-max-batch-size", Integer.toString(rpcHttpMaxBatchSize));

verify(mockRunnerBuilder).jsonRpcConfiguration(jsonRpcConfigArgumentCaptor.capture());
verify(mockRunnerBuilder).build();

assertThat(jsonRpcConfigArgumentCaptor.getValue().getMaxBatchSize())
.isEqualTo(rpcHttpMaxBatchSize);

assertThat(commandOutput.toString(UTF_8)).isEmpty();
assertThat(commandErrorOutput.toString(UTF_8)).isEmpty();
}

@Test
public void maxpeersSet_p2pPeerLowerBoundSet() {

Expand Down
1 change: 1 addition & 0 deletions besu/src/test/resources/everything_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ rpc-http-authentication-jwt-algorithm="RS256"
rpc-ws-authentication-jwt-algorithm="RS256"
rpc-http-tls-protocols=["TLSv1.2,TlSv1.1"]
rpc-http-tls-cipher-suites=["TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384","TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"]
rpc-http-max-batch-size=1
rpc-max-logs-range=100

# PRIVACY TLS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package org.hyperledger.besu.ethereum.api.handlers;

import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.AuthenticationService;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
Expand Down Expand Up @@ -47,7 +48,9 @@ public static Handler<RoutingContext> jsonRpcParser() {
}

public static Handler<RoutingContext> jsonRpcExecutor(
final JsonRpcExecutor jsonRpcExecutor, final Tracer tracer) {
return JsonRpcExecutorHandler.handler(jsonRpcExecutor, tracer);
final JsonRpcExecutor jsonRpcExecutor,
final Tracer tracer,
final JsonRpcConfiguration jsonRpcConfiguration) {
return JsonRpcExecutorHandler.handler(jsonRpcExecutor, tracer, jsonRpcConfiguration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError.INVALID_REQUEST;

import org.hyperledger.besu.ethereum.api.jsonrpc.JsonResponseStreamer;
import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.context.ContextKey;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequest;
Expand All @@ -26,6 +27,7 @@
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponseType;

import java.io.IOException;
import java.security.InvalidParameterException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -64,71 +66,27 @@ public class JsonRpcExecutorHandler {
private JsonRpcExecutorHandler() {}

public static Handler<RoutingContext> handler(
final JsonRpcExecutor jsonRpcExecutor, final Tracer tracer) {
final JsonRpcExecutor jsonRpcExecutor,
final Tracer tracer,
final JsonRpcConfiguration jsonRpcConfiguration) {
return ctx -> {
HttpServerResponse response = ctx.response();
try {
Optional<User> user = ContextKey.AUTHENTICATED_USER.extractFrom(ctx, Optional::empty);
Context spanContext = ctx.get(SPAN_CONTEXT);
response = response.putHeader("Content-Type", APPLICATION_JSON);

if (ctx.data().containsKey(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name())) {
JsonObject jsonRequest = ctx.get(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name());
lazyTraceLogger(jsonRequest::toString);
JsonRpcResponse jsonRpcResponse =
jsonRpcExecutor.execute(
user,
tracer,
spanContext,
() -> !ctx.response().closed(),
jsonRequest,
req -> req.mapTo(JsonRpcRequest.class));
response.setStatusCode(status(jsonRpcResponse).code());
if (jsonRpcResponse.getType() == JsonRpcResponseType.NONE) {
response.end();
} else {
try (final JsonResponseStreamer streamer =
new JsonResponseStreamer(response, ctx.request().remoteAddress())) {
// underlying output stream lifecycle is managed by the json object writer
lazyTraceLogger(() -> JSON_OBJECT_MAPPER.writeValueAsString(jsonRpcResponse));
JSON_OBJECT_WRITER.writeValue(streamer, jsonRpcResponse);
}
}
} else if (ctx.data().containsKey(ContextKey.REQUEST_BODY_AS_JSON_ARRAY.name())) {
JsonArray batchJsonRequest = ctx.get(ContextKey.REQUEST_BODY_AS_JSON_ARRAY.name());
lazyTraceLogger(batchJsonRequest::toString);
List<JsonRpcResponse> jsonRpcBatchResponses = new ArrayList<>();
if (isJsonObjectRequest(ctx)) {
final JsonRpcResponse jsonRpcResponse =
executeJsonObjectRequest(jsonRpcExecutor, tracer, ctx);
handleJsonObjectResponse(response, jsonRpcResponse, ctx);
} else if (isJsonArrayRequest(ctx)) {
final List<JsonRpcResponse> jsonRpcBatchResponses;
try {
for (int i = 0; i < batchJsonRequest.size(); i++) {
final JsonObject jsonRequest;
try {
jsonRequest = batchJsonRequest.getJsonObject(i);
} catch (ClassCastException e) {
jsonRpcBatchResponses.add(new JsonRpcErrorResponse(null, INVALID_REQUEST));
continue;
}
jsonRpcBatchResponses.add(
jsonRpcExecutor.execute(
user,
tracer,
spanContext,
() -> !ctx.response().closed(),
jsonRequest,
req -> req.mapTo(JsonRpcRequest.class)));
}
} catch (RuntimeException e) {
jsonRpcBatchResponses =
executeJsonArrayRequest(jsonRpcExecutor, tracer, ctx, jsonRpcConfiguration);
handleJsonArrayResponse(response, jsonRpcBatchResponses, ctx);
} catch (final InvalidParameterException e) {
handleJsonRpcError(ctx, null, JsonRpcError.EXCEEDS_RPC_MAX_BATCH_SIZE);
} catch (final RuntimeException e) {
response.setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).end();
return;
}
final JsonRpcResponse[] completed =
jsonRpcBatchResponses.stream()
.filter(jsonRpcResponse -> jsonRpcResponse.getType() != JsonRpcResponseType.NONE)
.toArray(JsonRpcResponse[]::new);
try (final JsonResponseStreamer streamer =
new JsonResponseStreamer(response, ctx.request().remoteAddress())) {
// underlying output stream lifecycle is managed by the json object writer
lazyTraceLogger(() -> JSON_OBJECT_MAPPER.writeValueAsString(completed));
JSON_OBJECT_WRITER.writeValue(streamer, completed);
}
} else {
handleJsonRpcError(ctx, null, JsonRpcError.PARSE_ERROR);
Expand All @@ -142,8 +100,102 @@ public static Handler<RoutingContext> handler(
};
}

private static boolean isJsonObjectRequest(final RoutingContext ctx) {
return ctx.data().containsKey(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name());
}

private static boolean isJsonArrayRequest(final RoutingContext ctx) {
return ctx.data().containsKey(ContextKey.REQUEST_BODY_AS_JSON_ARRAY.name());
}

private static JsonRpcResponse executeRequest(
final JsonRpcExecutor jsonRpcExecutor,
final Tracer tracer,
final JsonObject jsonRequest,
final RoutingContext ctx) {
final Optional<User> user = ContextKey.AUTHENTICATED_USER.extractFrom(ctx, Optional::empty);
final Context spanContext = ctx.get(SPAN_CONTEXT);
return jsonRpcExecutor.execute(
user,
tracer,
spanContext,
() -> !ctx.response().closed(),
jsonRequest,
req -> req.mapTo(JsonRpcRequest.class));
}

private static JsonRpcResponse executeJsonObjectRequest(
final JsonRpcExecutor jsonRpcExecutor, final Tracer tracer, final RoutingContext ctx) {
final JsonObject jsonRequest = ctx.get(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name());
lazyTraceLogger(jsonRequest::toString);
return executeRequest(jsonRpcExecutor, tracer, jsonRequest, ctx);
}

private static List<JsonRpcResponse> executeJsonArrayRequest(
final JsonRpcExecutor jsonRpcExecutor,
final Tracer tracer,
final RoutingContext ctx,
final JsonRpcConfiguration jsonRpcConfiguration)
throws InvalidParameterException {
final JsonArray batchJsonRequest = ctx.get(ContextKey.REQUEST_BODY_AS_JSON_ARRAY.name());
lazyTraceLogger(batchJsonRequest::toString);
final List<JsonRpcResponse> jsonRpcBatchResponses = new ArrayList<>();

if (jsonRpcConfiguration.getMaxBatchSize() > 0
&& batchJsonRequest.size() > jsonRpcConfiguration.getMaxBatchSize()) {
throw new InvalidParameterException();
}

for (int i = 0; i < batchJsonRequest.size(); i++) {
final JsonObject jsonRequest;
try {
jsonRequest = batchJsonRequest.getJsonObject(i);
} catch (final ClassCastException e) {
jsonRpcBatchResponses.add(new JsonRpcErrorResponse(null, INVALID_REQUEST));
continue;
}
jsonRpcBatchResponses.add(executeRequest(jsonRpcExecutor, tracer, jsonRequest, ctx));
}
return jsonRpcBatchResponses;
}

private static void handleJsonObjectResponse(
final HttpServerResponse response,
final JsonRpcResponse jsonRpcResponse,
final RoutingContext ctx)
throws IOException {
response.setStatusCode(status(jsonRpcResponse).code());
if (jsonRpcResponse.getType() == JsonRpcResponseType.NONE) {
response.end();
} else {
try (final JsonResponseStreamer streamer =
new JsonResponseStreamer(response, ctx.request().remoteAddress())) {
// underlying output stream lifecycle is managed by the json object writer
lazyTraceLogger(() -> JSON_OBJECT_MAPPER.writeValueAsString(jsonRpcResponse));
JSON_OBJECT_WRITER.writeValue(streamer, jsonRpcResponse);
}
}
}

private static void handleJsonArrayResponse(
final HttpServerResponse response,
final List<JsonRpcResponse> jsonRpcBatchResponses,
final RoutingContext ctx)
throws IOException {
final JsonRpcResponse[] completed =
jsonRpcBatchResponses.stream()
.filter(jsonRpcResponse -> jsonRpcResponse.getType() != JsonRpcResponseType.NONE)
.toArray(JsonRpcResponse[]::new);
try (final JsonResponseStreamer streamer =
new JsonResponseStreamer(response, ctx.request().remoteAddress())) {
// underlying output stream lifecycle is managed by the json object writer
lazyTraceLogger(() -> JSON_OBJECT_MAPPER.writeValueAsString(completed));
JSON_OBJECT_WRITER.writeValue(streamer, completed);
}
}

private static String getRpcMethodName(final RoutingContext ctx) {
if (ctx.data().containsKey(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name())) {
if (isJsonObjectRequest(ctx)) {
final JsonObject jsonObject = ctx.get(ContextKey.REQUEST_BODY_AS_JSON_OBJECT.name());
return jsonObject.getString("method");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class JsonRpcConfiguration {
public static final int DEFAULT_JSON_RPC_PORT = 8545;
public static final int DEFAULT_ENGINE_JSON_RPC_PORT = 8551;
public static final int DEFAULT_MAX_ACTIVE_CONNECTIONS = 80;
public static final int DEFAULT_MAX_BATCH_SIZE = 1;

private boolean enabled;
private int port;
Expand All @@ -51,6 +52,7 @@ public class JsonRpcConfiguration {
private Optional<TlsConfiguration> tlsConfiguration = Optional.empty();
private long httpTimeoutSec = TimeoutOptions.defaultOptions().getTimeoutSeconds();
private int maxActiveConnections;
private int maxBatchSize;

public static JsonRpcConfiguration createDefault() {
final JsonRpcConfiguration config = new JsonRpcConfiguration();
Expand All @@ -60,6 +62,7 @@ public static JsonRpcConfiguration createDefault() {
config.setRpcApis(DEFAULT_RPC_APIS);
config.httpTimeoutSec = TimeoutOptions.defaultOptions().getTimeoutSeconds();
config.setMaxActiveConnections(DEFAULT_MAX_ACTIVE_CONNECTIONS);
config.setMaxBatchSize(DEFAULT_MAX_BATCH_SIZE);
return config;
}

Expand Down Expand Up @@ -249,4 +252,12 @@ public int getMaxActiveConnections() {
public void setMaxActiveConnections(final int maxActiveConnections) {
this.maxActiveConnections = maxActiveConnections;
}

public int getMaxBatchSize() {
return maxBatchSize;
}

public void setMaxBatchSize(final int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ private Router buildRouter() {
authenticationService.get(),
config.getNoAuthRpcApis()),
rpcMethods),
tracer),
tracer,
config),
false);
} else {
mainRoute.blockingHandler(
Expand All @@ -359,7 +360,8 @@ private Router buildRouter() {
new TimedJsonRpcProcessor(
new TracedJsonRpcProcessor(new BaseJsonRpcProcessor()), requestTimer),
rpcMethods),
tracer),
tracer,
config),
false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,8 @@ private Router buildRouter() {
authenticationService.get(),
config.getNoAuthRpcApis()),
rpcMethods),
tracer),
tracer,
config),
false);
} else {
mainRoute.blockingHandler(
Expand All @@ -462,7 +463,8 @@ private Router buildRouter() {
new TimedJsonRpcProcessor(
new TracedJsonRpcProcessor(new BaseJsonRpcProcessor()), requestTimer),
rpcMethods),
tracer),
tracer,
config),
false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public enum JsonRpcError {
TRANSACTION_UPFRONT_COST_EXCEEDS_BALANCE(-32004, "Upfront cost exceeds account balance"),
EXCEEDS_BLOCK_GAS_LIMIT(-32005, "Transaction gas limit exceeds block gas limit"),
EXCEEDS_RPC_MAX_BLOCK_RANGE(-32005, "Requested range exceeds maximum RPC range limit"),
EXCEEDS_RPC_MAX_BATCH_SIZE(-32005, "Number of requests exceeds max batch size"),
NONCE_TOO_HIGH(-32006, "Nonce too high"),
TX_SENDER_NOT_AUTHORIZED(-32007, "Sender account not authorized to send transactions"),
CHAIN_HEAD_WORLD_STATE_NOT_AVAILABLE(-32008, "Initial sync is still in progress"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ private void startService(final BlockchainSetupUtil blockchainSetupUtil) throws
final NatService natService = new NatService(Optional.empty());

config.setPort(0);
config.setMaxBatchSize(10);
service =
new JsonRpcHttpService(
vertx,
Expand Down
Loading