Skip to content

Commit d039e85

Browse files
feat: implement code review recommendations for BOLT protocol
- Replace string-based write query detection with QueryEngine.analyze() - Add BOLT_DEFAULT_DATABASE and BOLT_MAX_CONNECTIONS configurations - Implement accurate timing metrics (t_first, t_last) using nanoTime - Determine query type dynamically (read vs write) - Improve exception handling with null checks and fallback messages - Add connection limiting with concurrent tracking - Add 9 new integration tests covering: * Database selection and switching * Concurrent sessions (5 threads) * Invalid database handling * Connection pooling stress test * Transaction isolation * Large parameter maps (50 params) * NULL parameter handling * Multiple error recovery Co-authored-by: Luca Garulli <lvca@users.noreply.github.com>
1 parent 69a720e commit d039e85

File tree

4 files changed

+300
-43
lines changed

4 files changed

+300
-43
lines changed

bolt/src/main/java/com/arcadedb/bolt/BoltNetworkExecutor.java

Lines changed: 76 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,12 @@ private enum State {
6565
INTERRUPTED
6666
}
6767

68-
private final ArcadeDBServer server;
69-
private final Socket socket;
70-
private final BoltChunkedInput input;
71-
private final BoltChunkedOutput output;
72-
private final boolean debug;
68+
private final ArcadeDBServer server;
69+
private final Socket socket;
70+
private final BoltChunkedInput input;
71+
private final BoltChunkedOutput output;
72+
private final boolean debug;
73+
private final BoltNetworkListener listener; // For notifying when connection closes
7374

7475
private State state = State.DISCONNECTED;
7576
private int protocolVersion;
@@ -89,11 +90,15 @@ private enum State {
8990
private List<String> currentFields;
9091
private Result firstResult; // Buffered first result for field name extraction
9192
private int recordsStreamed;
93+
private long queryStartTime; // Nanosecond timestamp when query execution started
94+
private long firstRecordTime; // Nanosecond timestamp when first record was retrieved
95+
private boolean isWriteOperation; // Whether the current query performs writes
9296

93-
public BoltNetworkExecutor(final ArcadeDBServer server, final Socket socket) throws IOException {
97+
public BoltNetworkExecutor(final ArcadeDBServer server, final Socket socket, final BoltNetworkListener listener) throws IOException {
9498
super("BOLT-" + socket.getRemoteSocketAddress());
9599
this.server = server;
96100
this.socket = socket;
101+
this.listener = listener;
97102
this.input = new BoltChunkedInput(socket.getInputStream());
98103
this.output = new BoltChunkedOutput(socket.getOutputStream());
99104
this.debug = GlobalConfiguration.BOLT_DEBUG.getValueAsBoolean();
@@ -406,9 +411,15 @@ private void handleRun(final RunMessage message) throws IOException {
406411
LogManager.instance().log(this, Level.INFO, "BOLT executing: %s with params %s", query, params);
407412
}
408413

409-
// Determine if this is a write query by checking for write keywords
414+
// Start timing for performance metrics
415+
queryStartTime = System.nanoTime();
416+
firstRecordTime = 0;
417+
418+
// Determine if this is a write query using the query analyzer
419+
isWriteOperation = isWriteQuery(query);
420+
410421
// Use command() for writes, query() for reads
411-
if (isWriteQuery(query)) {
422+
if (isWriteOperation) {
412423
currentResultSet = database.command("opencypher", query, params);
413424
} else {
414425
currentResultSet = database.query("opencypher", query, params);
@@ -419,18 +430,26 @@ private void handleRun(final RunMessage message) throws IOException {
419430
// Build success response with query metadata
420431
final Map<String, Object> metadata = new LinkedHashMap<>();
421432
metadata.put("fields", currentFields);
422-
// TODO: Implement actual time to first record calculation for accurate performance monitoring
423-
metadata.put("t_first", 0L);
433+
434+
// Calculate time to first record if we already have one buffered
435+
if (firstResult != null && firstRecordTime > 0) {
436+
final long tFirstMs = (firstRecordTime - queryStartTime) / 1_000_000;
437+
metadata.put("t_first", tFirstMs);
438+
} else {
439+
metadata.put("t_first", 0L);
440+
}
424441

425442
sendSuccess(metadata);
426443
state = explicitTransaction ? State.TX_STREAMING : State.STREAMING;
427444

428445
} catch (final CommandParsingException e) {
429-
sendFailure(BoltException.SYNTAX_ERROR, e.getMessage());
446+
final String message = e.getMessage() != null ? e.getMessage() : "Query parsing error";
447+
sendFailure(BoltException.SYNTAX_ERROR, message);
430448
state = State.FAILED;
431449
} catch (final Exception e) {
432450
LogManager.instance().log(this, Level.WARNING, "BOLT query error", e);
433-
sendFailure(BoltException.DATABASE_ERROR, e.getMessage());
451+
final String message = e.getMessage() != null ? e.getMessage() : "Database error";
452+
sendFailure(BoltException.DATABASE_ERROR, message);
434453
state = State.FAILED;
435454
}
436455
}
@@ -493,10 +512,14 @@ private void handlePull(final PullMessage message) throws IOException {
493512
// Build success metadata
494513
final Map<String, Object> metadata = new LinkedHashMap<>();
495514
if (!hasMore) {
496-
// TODO: Determine query type dynamically (r=read, w=write, rw=read-write, s=schema)
497-
metadata.put("type", "r");
498-
// TODO: Implement actual time to last record calculation for accurate performance metrics
499-
metadata.put("t_last", 0L);
515+
// Determine query type based on whether it performed writes
516+
// r=read, w=write (for simplicity, we use binary classification)
517+
metadata.put("type", isWriteOperation ? "w" : "r");
518+
519+
// Calculate time to last record
520+
final long tLastMs = (System.nanoTime() - queryStartTime) / 1_000_000;
521+
metadata.put("t_last", tLastMs);
522+
500523
try {
501524
currentResultSet.close();
502525
} catch (final Exception e) {
@@ -513,7 +536,8 @@ private void handlePull(final PullMessage message) throws IOException {
513536

514537
} catch (final Exception e) {
515538
LogManager.instance().log(this, Level.WARNING, "BOLT PULL error", e);
516-
sendFailure(BoltException.DATABASE_ERROR, e.getMessage());
539+
final String message = e.getMessage() != null ? e.getMessage() : "Error fetching records";
540+
sendFailure(BoltException.DATABASE_ERROR, message);
517541
state = State.FAILED;
518542
}
519543
}
@@ -589,7 +613,8 @@ private void handleBegin(final BeginMessage message) throws IOException {
589613
state = State.TX_READY;
590614

591615
} catch (final Exception e) {
592-
sendFailure(BoltException.TRANSACTION_ERROR, e.getMessage());
616+
final String message = e.getMessage() != null ? e.getMessage() : "Transaction error";
617+
sendFailure(BoltException.TRANSACTION_ERROR, message);
593618
state = State.FAILED;
594619
}
595620
}
@@ -622,7 +647,8 @@ private void handleCommit() throws IOException {
622647
state = State.READY;
623648

624649
} catch (final Exception e) {
625-
sendFailure(BoltException.TRANSACTION_ERROR, e.getMessage());
650+
final String message = e.getMessage() != null ? e.getMessage() : "Commit error";
651+
sendFailure(BoltException.TRANSACTION_ERROR, message);
626652
state = State.FAILED;
627653
}
628654
}
@@ -662,7 +688,8 @@ private void handleRollback() throws IOException {
662688
state = State.READY;
663689

664690
} catch (final Exception e) {
665-
sendFailure(BoltException.TRANSACTION_ERROR, e.getMessage());
691+
final String message = e.getMessage() != null ? e.getMessage() : "Rollback error";
692+
sendFailure(BoltException.TRANSACTION_ERROR, message);
666693
state = State.FAILED;
667694
}
668695
}
@@ -719,15 +746,19 @@ private boolean ensureDatabase() throws IOException {
719746
}
720747

721748
if (databaseName == null || databaseName.isEmpty()) {
722-
// TODO: Consider making default database selection configurable or requiring explicit database name
723-
// to avoid unpredictable behavior in multi-database environments
724-
final Collection<String> databases = server.getDatabaseNames();
725-
if (databases.isEmpty()) {
726-
sendFailure(BoltException.DATABASE_ERROR, "No database available");
727-
state = State.FAILED;
728-
return false;
749+
// Try to use configured default database
750+
databaseName = GlobalConfiguration.BOLT_DEFAULT_DATABASE.getValueAsString();
751+
752+
if (databaseName == null || databaseName.isEmpty()) {
753+
// If no default configured, use the first available database
754+
final Collection<String> databases = server.getDatabaseNames();
755+
if (databases.isEmpty()) {
756+
sendFailure(BoltException.DATABASE_ERROR, "No database available");
757+
state = State.FAILED;
758+
return false;
759+
}
760+
databaseName = databases.iterator().next();
729761
}
730-
databaseName = databases.iterator().next();
731762
}
732763

733764
try {
@@ -739,7 +770,8 @@ private boolean ensureDatabase() throws IOException {
739770
}
740771
return true;
741772
} catch (final Exception e) {
742-
sendFailure(BoltException.DATABASE_ERROR, "Cannot open database: " + databaseName + " - " + e.getMessage());
773+
final String message = e.getMessage() != null ? e.getMessage() : "Unknown error";
774+
sendFailure(BoltException.DATABASE_ERROR, "Cannot open database: " + databaseName + " - " + message);
743775
state = State.FAILED;
744776
return false;
745777
}
@@ -757,6 +789,7 @@ private List<String> extractFieldNames(final ResultSet resultSet) {
757789
// Peek at first result to get field names
758790
if (resultSet.hasNext()) {
759791
firstResult = resultSet.next();
792+
firstRecordTime = System.nanoTime(); // Capture time when first record is available
760793
final Set<String> propertyNames = firstResult.getPropertyNames();
761794
return propertyNames != null ? new ArrayList<>(propertyNames) : List.of();
762795
}
@@ -766,20 +799,20 @@ private List<String> extractFieldNames(final ResultSet resultSet) {
766799

767800
/**
768801
* Determine if a Cypher query contains write operations.
769-
* This checks for common write keywords in the query.
802+
* Uses ArcadeDB's query analyzer for accurate detection.
770803
*/
771804
private boolean isWriteQuery(final String query) {
772805
if (query == null || query.isEmpty()) {
773806
return false;
774807
}
775-
final String normalized = query.toUpperCase().trim();
776-
// Check for write keywords - these indicate modifying operations
777-
return normalized.contains("CREATE") ||
778-
normalized.contains("DELETE") ||
779-
normalized.contains("SET ") ||
780-
normalized.contains("REMOVE") ||
781-
normalized.contains("MERGE") ||
782-
normalized.contains("DETACH");
808+
try {
809+
// Use the query engine's analyzer to determine if the query is idempotent (read-only)
810+
return !database.getQueryEngine("opencypher").analyze(query).isIdempotent();
811+
} catch (final Exception e) {
812+
// If analysis fails, assume it's a write operation to be safe
813+
LogManager.instance().log(this, Level.WARNING, "Failed to analyze query, assuming write operation: " + query, e);
814+
return true;
815+
}
783816
}
784817

785818
/**
@@ -897,6 +930,11 @@ private void cleanup() {
897930
// Ignore
898931
}
899932

933+
// Notify listener that this connection is closed
934+
if (listener != null) {
935+
listener.removeConnection(this);
936+
}
937+
900938
if (debug) {
901939
LogManager.instance().log(this, Level.INFO, "BOLT connection closed");
902940
}

bolt/src/main/java/com/arcadedb/bolt/BoltNetworkListener.java

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package com.arcadedb.bolt;
2020

21+
import com.arcadedb.GlobalConfiguration;
2122
import com.arcadedb.exception.ArcadeDBException;
2223
import com.arcadedb.log.LogManager;
2324
import com.arcadedb.server.ArcadeDBServer;
@@ -31,6 +32,8 @@
3132
import java.net.ServerSocket;
3233
import java.net.Socket;
3334
import java.net.SocketException;
35+
import java.util.Set;
36+
import java.util.concurrent.ConcurrentHashMap;
3437
import java.util.logging.Level;
3538

3639
/**
@@ -40,10 +43,12 @@
4043
public class BoltNetworkListener extends Thread {
4144
private static final int BOLT_PROTOCOL_VERSION = 4; // BOLT v4.4
4245

43-
private final ArcadeDBServer server;
44-
private final ServerSocketFactory socketFactory;
45-
private ServerSocket serverSocket;
46-
private volatile boolean active = true;
46+
private final ArcadeDBServer server;
47+
private final ServerSocketFactory socketFactory;
48+
private ServerSocket serverSocket;
49+
private volatile boolean active = true;
50+
private final Set<BoltNetworkExecutor> activeConnections = ConcurrentHashMap.newKeySet();
51+
private final int maxConnections;
4752

4853
public BoltNetworkListener(final ArcadeDBServer server,
4954
final ServerSocketFactory socketFactory,
@@ -53,6 +58,7 @@ public BoltNetworkListener(final ArcadeDBServer server,
5358

5459
this.server = server;
5560
this.socketFactory = socketFactory;
61+
this.maxConnections = GlobalConfiguration.BOLT_MAX_CONNECTIONS.getValueAsInteger();
5662

5763
listen(hostName, hostPortRange);
5864
start();
@@ -65,11 +71,25 @@ public void run() {
6571
try {
6672
final Socket socket = serverSocket.accept();
6773

74+
// Check connection limit
75+
if (maxConnections > 0 && activeConnections.size() >= maxConnections) {
76+
LogManager.instance().log(this, Level.WARNING,
77+
"BOLT connection limit reached (%d connections), rejecting new connection from %s",
78+
maxConnections, socket.getRemoteSocketAddress());
79+
try {
80+
socket.close();
81+
} catch (final IOException e) {
82+
// Ignore
83+
}
84+
continue;
85+
}
86+
6887
socket.setPerformancePreferences(0, 2, 1);
6988
socket.setTcpNoDelay(true);
7089

7190
// Create a new executor for this connection
72-
final BoltNetworkExecutor connection = new BoltNetworkExecutor(server, socket);
91+
final BoltNetworkExecutor connection = new BoltNetworkExecutor(server, socket, this);
92+
activeConnections.add(connection);
7393
connection.start();
7494

7595
} catch (final Exception e) {
@@ -87,6 +107,13 @@ public void run() {
87107
}
88108
}
89109

110+
/**
111+
* Called by BoltNetworkExecutor when a connection is closed.
112+
*/
113+
void removeConnection(final BoltNetworkExecutor connection) {
114+
activeConnections.remove(connection);
115+
}
116+
90117
public void close() {
91118
this.active = false;
92119

0 commit comments

Comments
 (0)