Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Report discarded transactions to external service #69

Merged
merged 29 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
d8fa72a
feat: Report discarded transactions to external endpoint
usmansaleem Aug 20, 2024
b5a998b
Merge remote-tracking branch 'upstream/main' into notify-dropped-txs
usmansaleem Aug 21, 2024
68fb015
Use PendingBlockHeader
usmansaleem Aug 21, 2024
da8a2cf
simplify discard condition
usmansaleem Aug 21, 2024
69710a0
feat: Make json-rpc call for discarded tx
usmansaleem Aug 27, 2024
957b3da
cli option for reporting rejected tx endpoint
usmansaleem Aug 27, 2024
d38b539
add spdx header
usmansaleem Aug 27, 2024
7a8d380
build: Add wiremock dependency
usmansaleem Sep 2, 2024
425f05c
Add unit test
usmansaleem Sep 4, 2024
0ce850d
Update json-rpc method and unit tests
usmansaleem Sep 4, 2024
5e01ae9
Adding JsonRpcManager
usmansaleem Sep 10, 2024
6fb16d6
Adding compileOnly okhttp dependency
usmansaleem Sep 10, 2024
4af7d4a
Updated JsonRPCManagerTests
usmansaleem Sep 10, 2024
5dcf745
Adding license header
usmansaleem Sep 10, 2024
3eb843d
Removing initial implementation of json rpc client
usmansaleem Sep 10, 2024
7a6a78a
Updating retry schedule logic
usmansaleem Sep 10, 2024
386008a
review suggestions
usmansaleem Sep 10, 2024
f32b379
Changing json-rpc ExecutorService to use virtual threads
usmansaleem Sep 10, 2024
140cb48
Adding comments to saveJsonToFile method
usmansaleem Sep 11, 2024
80de637
Use high-precision timestamp and a UUID to ensure uniqueness
usmansaleem Sep 11, 2024
3122a92
Use Duration and Instant instead of long
usmansaleem Sep 11, 2024
9199fdd
Updating logs entry in submitJsonRpcCall
usmansaleem Sep 11, 2024
45363cc
Update logs message. Move discarded json-rpc calls to separate direct…
usmansaleem Sep 11, 2024
a784018
Added unit test for processing json-rpc during start. Also introduced…
usmansaleem Sep 11, 2024
4cde321
Use Instant.now to generate timestamp
usmansaleem Sep 12, 2024
aabf838
Update changelog
usmansaleem Sep 12, 2024
c1975fa
Merge remote-tracking branch 'upstream/main' into notify-dropped-txs
usmansaleem Sep 12, 2024
900ba40
post merge
usmansaleem Sep 12, 2024
7ff3a55
Merge remote-tracking branch 'upstream/main' into notify-dropped-txs
usmansaleem Sep 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions gradle/common-dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ dependencies {

testImplementation 'org.mockito:mockito-core'
testImplementation 'org.mockito:mockito-junit-jupiter'

testImplementation "org.wiremock:wiremock"
}
4 changes: 4 additions & 0 deletions gradle/dependency-management.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ dependencyManagement {
entry 'picocli-codegen'
}

dependency 'com.squareup.okhttp3:okhttp:4.12.0'

dependencySet(group: 'io.tmio', version: '2.4.2') {
entry 'tuweni-bytes'
entry 'tuweni-net'
Expand Down Expand Up @@ -157,5 +159,7 @@ dependencyManagement {
entry 'core'
entry 'crypto'
}

dependency "org.wiremock:wiremock:3.9.1"
}
}
3 changes: 3 additions & 0 deletions sequencer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ dependencies {

compileOnly 'io.vertx:vertx-core'

compileOnly 'com.squareup.okhttp3:okhttp'

implementation project(":native:compress")

implementation 'com.fasterxml.jackson.core:jackson-databind'
Expand All @@ -68,6 +70,7 @@ dependencies {
testImplementation "${besuArtifactGroup}.internal:core"
testImplementation "${besuArtifactGroup}.internal:rlp"
testImplementation "${besuArtifactGroup}.internal:core"
testImplementation "${besuArtifactGroup}:plugin-api"

// workaround for bug https://github.com/dnsjava/dnsjava/issues/329, remove when upgraded upstream
testImplementation 'dnsjava:dnsjava:3.6.1'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package net.consensys.linea.config;

import java.net.URI;

import com.google.common.base.MoreObjects;
import jakarta.validation.constraints.Positive;
import net.consensys.linea.plugins.LineaCliOptions;
Expand All @@ -39,6 +41,8 @@ public class LineaTransactionSelectorCliOptions implements LineaCliOptions {
public static final String UNPROFITABLE_RETRY_LIMIT = "--plugin-linea-unprofitable-retry-limit";
public static final int DEFAULT_UNPROFITABLE_RETRY_LIMIT = 10;

public static final String REJECTED_TX_ENDPOINT = "--plugin-linea-rejected-tx-endpoint";

@Positive
@CommandLine.Option(
names = {MAX_BLOCK_CALLDATA_SIZE},
Expand Down Expand Up @@ -82,6 +86,13 @@ public class LineaTransactionSelectorCliOptions implements LineaCliOptions {
"Max number of unprofitable transactions we retry on each block creation (default: ${DEFAULT-VALUE})")
private int unprofitableRetryLimit = DEFAULT_UNPROFITABLE_RETRY_LIMIT;

@CommandLine.Option(
names = {REJECTED_TX_ENDPOINT},
hidden = true,
paramLabel = "<URI>",
description = "Endpoint URI for reporting rejected transactions (default: ${DEFAULT-VALUE})")
private URI rejectedTxEndpoint = null;

private LineaTransactionSelectorCliOptions() {}

/**
Expand All @@ -107,6 +118,7 @@ public static LineaTransactionSelectorCliOptions fromConfig(
options.maxGasPerBlock = config.maxGasPerBlock();
options.unprofitableCacheSize = config.unprofitableCacheSize();
options.unprofitableRetryLimit = config.unprofitableRetryLimit();
options.rejectedTxEndpoint = config.rejectedTxEndpoint();
return options;
}

Expand All @@ -123,6 +135,7 @@ public LineaTransactionSelectorConfiguration toDomainObject() {
.maxGasPerBlock(maxGasPerBlock)
.unprofitableCacheSize(unprofitableCacheSize)
.unprofitableRetryLimit(unprofitableRetryLimit)
.rejectedTxEndpoint(rejectedTxEndpoint)
.build();
}

Expand All @@ -134,6 +147,7 @@ public String toString() {
.add(MAX_GAS_PER_BLOCK, maxGasPerBlock)
.add(UNPROFITABLE_CACHE_SIZE, unprofitableCacheSize)
.add(UNPROFITABLE_RETRY_LIMIT, unprofitableRetryLimit)
.add(REJECTED_TX_ENDPOINT, rejectedTxEndpoint)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package net.consensys.linea.config;

import java.net.URI;

import lombok.Builder;
import net.consensys.linea.plugins.LineaOptionsConfiguration;

Expand All @@ -25,5 +27,6 @@ public record LineaTransactionSelectorConfiguration(
int overLinesLimitCacheSize,
long maxGasPerBlock,
int unprofitableCacheSize,
int unprofitableRetryLimit)
int unprofitableRetryLimit,
URI rejectedTxEndpoint)
implements LineaOptionsConfiguration {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
/*
* Copyright Consensys Software Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package net.consensys.linea.jsonrpc;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.file.DirectoryStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Comparator;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;

/** This class is responsible for managing JSON-RPC requests for reporting rejected transactions */
@Slf4j
public class JsonRpcManager {
private static final long INITIAL_RETRY_DELAY = 1000L;
fab-10 marked this conversation as resolved.
Show resolved Hide resolved
private static final int MAX_THREADS =
Math.min(32, Runtime.getRuntime().availableProcessors() * 2);
private static final long MAX_RETRY_DURATION = TimeUnit.HOURS.toMillis(2);
fab-10 marked this conversation as resolved.
Show resolved Hide resolved
private static final MediaType JSON = MediaType.get("application/json; charset=utf-8");

private final OkHttpClient client = new OkHttpClient();
private final ObjectMapper objectMapper = new ObjectMapper();
private final Map<Path, Long> fileStartTimes = new ConcurrentHashMap<>();

private final Path rejTxRpcDirectory;
private final URI rejectedTxEndpoint;
private final ExecutorService executorService;
private final ScheduledExecutorService retrySchedulerService;

/**
* Creates a new JSON-RPC manager.
*
* @param besuDataDir Path to Besu data directory. The json-rpc files will be stored here under
* rej-tx-rpc subdirectory.
* @param rejectedTxEndpoint The endpoint to send rejected transactions to
*/
public JsonRpcManager(final Path besuDataDir, final URI rejectedTxEndpoint) {
this.rejTxRpcDirectory = besuDataDir.resolve("rej_tx_rpc");
this.rejectedTxEndpoint = rejectedTxEndpoint;
this.executorService = Executors.newFixedThreadPool(MAX_THREADS);
this.retrySchedulerService = Executors.newScheduledThreadPool(1);
fab-10 marked this conversation as resolved.
Show resolved Hide resolved
}

/** Load existing JSON-RPC and submit them. */
public JsonRpcManager start() {
try {
// Create the rej-tx-rpc directory if it doesn't exist
Files.createDirectories(rejTxRpcDirectory);

// Load existing JSON files
loadExistingJsonFiles();
return this;
} catch (IOException e) {
log.error("Failed to create or access rej-tx-rpc directory", e);
throw new UncheckedIOException(e);
}
}

/** Shuts down the executor service and scheduler service. */
public void shutdown() {
executorService.shutdown();
retrySchedulerService.shutdown();
}

/**
* Submits a new JSON-RPC call.
*
* @param jsonContent The JSON content to submit
*/
public void submitNewJsonRpcCall(final String jsonContent) {
try {
final Path jsonFile = saveJsonToFile(jsonContent);
fileStartTimes.put(jsonFile, System.currentTimeMillis());
submitJsonRpcCall(jsonFile, INITIAL_RETRY_DELAY);
} catch (final IOException e) {
log.error("Failed to save JSON content", e);
}
}

private void loadExistingJsonFiles() {
fab-10 marked this conversation as resolved.
Show resolved Hide resolved
try {
final TreeSet<Path> sortedFiles = new TreeSet<>(Comparator.comparing(Path::getFileName));

try (DirectoryStream<Path> stream =
Files.newDirectoryStream(rejTxRpcDirectory, "rpc_*.json")) {
for (Path path : stream) {
sortedFiles.add(path);
}
}

for (Path path : sortedFiles) {
fileStartTimes.put(path, System.currentTimeMillis());
submitJsonRpcCall(path, INITIAL_RETRY_DELAY);
}

log.info("Loaded {} existing JSON files for rej-tx reporting", sortedFiles.size());
} catch (final IOException e) {
log.error("Failed to load existing JSON files", e);
}
}

private void submitJsonRpcCall(final Path jsonFile, final long nextDelay) {
executorService.submit(
() -> {
if (!Files.exists(jsonFile)) {
log.debug("json-rpc file no longer exists, skipping processing: {}", jsonFile);
fileStartTimes.remove(jsonFile);
return;
}
try {
final String jsonContent = new String(Files.readAllBytes(jsonFile));
final boolean success = sendJsonRpcCall(jsonContent);
if (success) {
Files.deleteIfExists(jsonFile);
fileStartTimes.remove(jsonFile);
} else {
log.error(
"Failed to send JSON-RPC call to {}, retrying: {}", rejectedTxEndpoint, jsonFile);
scheduleRetry(jsonFile, nextDelay);
}
} catch (final Exception e) {
log.error("Failed to process json-rpc file: {}", jsonFile, e);
fab-10 marked this conversation as resolved.
Show resolved Hide resolved
scheduleRetry(jsonFile, nextDelay);
}
});
}

private void scheduleRetry(final Path jsonFile, final long currentDelay) {
final Long startTime = fileStartTimes.get(jsonFile);
if (startTime == null) {
log.debug("No start time found for file: {}. Skipping retry.", jsonFile);
return;
}

// check if we're still within the maximum retry duration
if (System.currentTimeMillis() - startTime < MAX_RETRY_DURATION) {
// schedule a retry with exponential backoff
long nextDelay = Math.min(currentDelay * 2, TimeUnit.MINUTES.toMillis(1)); // Cap at 1 minute
retrySchedulerService.schedule(
() -> submitJsonRpcCall(jsonFile, nextDelay), currentDelay, TimeUnit.MILLISECONDS);
} else {
log.error("Exceeded maximum retry duration for rej-tx json-rpc file: {}", jsonFile);
fab-10 marked this conversation as resolved.
Show resolved Hide resolved
fileStartTimes.remove(jsonFile);
}
}

private boolean sendJsonRpcCall(final String jsonContent) {
final RequestBody body = RequestBody.create(jsonContent, JSON);
final Request request =
new Request.Builder().url(rejectedTxEndpoint.toString()).post(body).build();
try (final Response response = client.newCall(request).execute()) {
if (!response.isSuccessful()) {
log.error("Unexpected response code from rejected-tx endpoint: {}", response.code());
return false;
}

// process the response body here ...
final String responseBody = response.body() != null ? response.body().string() : null;
if (responseBody == null) {
log.error("Unexpected empty response body from rejected-tx endpoint");
return false;
}

final JsonNode jsonNode = objectMapper.readTree(responseBody);
if (jsonNode == null) {
log.error("Failed to parse JSON response from rejected-tx endpoint: {}", responseBody);
return false;
}
if (jsonNode.has("error")) {
log.error("Error response from rejected-tx endpoint: {}", jsonNode.get("error"));
return false;
}
// Check for result
if (jsonNode.has("result")) {
String status = jsonNode.get("result").get("status").asText();
log.debug("Rejected-tx JSON-RPC call successful. Status: {}", status);
return true;
}

log.warn("Unexpected rejected-tx JSON-RPC response format: {}", responseBody);
return false;
} catch (final IOException e) {
log.error("Failed to send JSON-RPC call to rejected-tx endpoint {}", rejectedTxEndpoint, e);
return false;
}
}

private Path saveJsonToFile(final String jsonContent) throws IOException {
long timestamp = System.currentTimeMillis();
for (int attempt = 0; attempt < 100; attempt++) {
final String fileName = String.format("rpc_%d_%d.json", timestamp, attempt);
final Path filePath = rejTxRpcDirectory.resolve(fileName);
try {
return Files.writeString(filePath, jsonContent, StandardOpenOption.CREATE_NEW);
} catch (final FileAlreadyExistsException e) {
log.trace("File already exists {}, retrying.", filePath);
}
}
fab-10 marked this conversation as resolved.
Show resolved Hide resolved
throw new IOException("Failed to save JSON content after 100 attempts");
}
}
Loading
Loading