Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 85 additions & 31 deletions bolt/src/main/java/com/arcadedb/bolt/BoltNetworkExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,22 @@

import com.arcadedb.Constants;
import com.arcadedb.GlobalConfiguration;
import com.arcadedb.bolt.message.*;
import com.arcadedb.bolt.message.BeginMessage;
import com.arcadedb.bolt.message.BoltMessage;
import com.arcadedb.bolt.message.DiscardMessage;
import com.arcadedb.bolt.message.FailureMessage;
import com.arcadedb.bolt.message.HelloMessage;
import com.arcadedb.bolt.message.IgnoredMessage;
import com.arcadedb.bolt.message.LogonMessage;
import com.arcadedb.bolt.message.PullMessage;
import com.arcadedb.bolt.message.RecordMessage;
import com.arcadedb.bolt.message.RouteMessage;
import com.arcadedb.bolt.message.RunMessage;
import com.arcadedb.bolt.message.SuccessMessage;
import com.arcadedb.bolt.packstream.PackStreamReader;
import com.arcadedb.bolt.packstream.PackStreamWriter;
import com.arcadedb.bolt.structure.BoltStructureMapper;
import com.arcadedb.database.Database;
import com.arcadedb.database.DatabaseInternal;
import com.arcadedb.exception.CommandParsingException;
import com.arcadedb.log.LogManager;
import com.arcadedb.query.sql.executor.Result;
Expand All @@ -38,9 +48,17 @@
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;

import static com.arcadedb.query.opencypher.executor.steps.FinalProjectionStep.PROJECTION_NAME_METADATA;

/**
* Handles a single BOLT protocol connection.
* Implements the BOLT server state machine and processes client messages.
Expand Down Expand Up @@ -72,11 +90,11 @@ private enum State {
private final boolean debug;
private final BoltNetworkListener listener; // For notifying when connection closes

private State state = State.DISCONNECTED;
private int protocolVersion;
private State state = State.DISCONNECTED;
private int protocolVersion;
private ServerSecurityUser user;
private Database database;
private String databaseName;
private Database database;
private String databaseName;

// Transaction state
private boolean explicitTransaction = false;
Expand All @@ -86,15 +104,16 @@ private enum State {
* Thread-safety: This class is designed to handle a single connection in a dedicated thread.
* All state variables are accessed only by the executor thread and do not require synchronization.
*/
private ResultSet currentResultSet;
private List<String> currentFields;
private Result firstResult; // Buffered first result for field name extraction
private int recordsStreamed;
private long queryStartTime; // Nanosecond timestamp when query execution started
private long firstRecordTime; // Nanosecond timestamp when first record was retrieved
private boolean isWriteOperation; // Whether the current query performs writes

public BoltNetworkExecutor(final ArcadeDBServer server, final Socket socket, final BoltNetworkListener listener) throws IOException {
private ResultSet currentResultSet;
private List<String> currentFields;
private Result firstResult; // Buffered first result for field name extraction
private int recordsStreamed;
private long queryStartTime; // Nanosecond timestamp when query execution started
private long firstRecordTime; // Nanosecond timestamp when first record was retrieved
private boolean isWriteOperation; // Whether the current query performs writes

public BoltNetworkExecutor(final ArcadeDBServer server, final Socket socket, final BoltNetworkListener listener)
throws IOException {
super("BOLT-" + socket.getRemoteSocketAddress());
this.server = server;
this.socket = socket;
Expand Down Expand Up @@ -128,7 +147,8 @@ public void run() {
final Object value = reader.readValue();

if (!(value instanceof PackStreamReader.StructureValue structure)) {
sendFailure(BoltException.PROTOCOL_ERROR, "Expected structure, got: " + (value != null ? value.getClass().getSimpleName() : "null"));
sendFailure(BoltException.PROTOCOL_ERROR,
"Expected structure, got: " + (value != null ? value.getClass().getSimpleName() : "null"));
continue;
}

Expand Down Expand Up @@ -195,7 +215,8 @@ private boolean performHandshake() throws IOException {
break;
}
}
if (protocolVersion != 0) break;
if (protocolVersion != 0)
break;
}

// Send selected version
Expand Down Expand Up @@ -487,11 +508,7 @@ private void handlePull(final PullMessage message) throws IOException {

// First, return the buffered first result if present
if (firstResult != null && (n < 0 || count < n)) {
final List<Object> values = new ArrayList<>();
for (final String field : currentFields) {
final Object value = firstResult.getProperty(field);
values.add(BoltStructureMapper.toPackStreamValue(value));
}
final List<Object> values = extractRecordValues(firstResult);
sendRecord(values);
count++;
recordsStreamed++;
Expand All @@ -501,12 +518,7 @@ private void handlePull(final PullMessage message) throws IOException {
// Then continue with the rest of the result set
while (currentResultSet.hasNext() && (n < 0 || count < n)) {
final Result record = currentResultSet.next();
final List<Object> values = new ArrayList<>();

for (final String field : currentFields) {
final Object value = record.getProperty(field);
values.add(BoltStructureMapper.toPackStreamValue(value));
}
final List<Object> values = extractRecordValues(record);

sendRecord(values);
count++;
Expand Down Expand Up @@ -794,6 +806,9 @@ private boolean ensureDatabase() throws IOException {
/**
* Extract field names from result set by peeking at the first result.
* The first result is buffered and will be returned first during PULL.
* <p>
* For single-element results (e.g., RETURN n), the projection name is stored
* in metadata by FinalProjectionStep and used here to preserve field names.
*/
private List<String> extractFieldNames(final ResultSet resultSet) {
if (resultSet == null) {
Expand All @@ -804,13 +819,50 @@ private List<String> extractFieldNames(final ResultSet resultSet) {
if (resultSet.hasNext()) {
firstResult = resultSet.next();
firstRecordTime = System.nanoTime(); // Capture time when first record is available

// Check if this is an unwrapped element with a projection name in metadata
// This happens for queries like "MATCH (n) RETURN n" where the vertex is
// returned directly but we need to preserve the field name "n" for Bolt protocol
if (firstResult.isElement()) {
final Object projectionName = firstResult.getMetadata(PROJECTION_NAME_METADATA);
if (projectionName instanceof String name) {
return List.of(name);
}
}

final Set<String> propertyNames = firstResult.getPropertyNames();
return propertyNames != null ? new ArrayList<>(propertyNames) : List.of();
}

return List.of();
}

/**
* Extract values from a result for sending as a BOLT RECORD.
* Handles both projection results and element results.
* <p>
* For element results (e.g., RETURN n where n is a vertex), the whole element
* is returned as a single value, converted to BoltNode/BoltRelationship.
*/
private List<Object> extractRecordValues(final Result result) {
final List<Object> values = new ArrayList<>();

// Check if this is an unwrapped element result
// (single vertex/edge returned directly from RETURN clause)
if (result.isElement() && result.getMetadata(PROJECTION_NAME_METADATA) != null) {
// Return the element as a single value
values.add(BoltStructureMapper.toPackStreamValue(result.getElement().orElse(null)));
} else {
// Standard projection result - extract each field
for (final String field : currentFields) {
final Object value = result.getProperty(field);
values.add(BoltStructureMapper.toPackStreamValue(value));
}
}

return values;
}

/**
* Determine if a Cypher query contains write operations.
* Uses ArcadeDB's query analyzer for accurate detection.
Expand All @@ -827,7 +879,7 @@ private boolean isWriteQuery(final String query) {
// Log at FINE level to avoid spam for complex but valid queries
LogManager.instance().log(this, Level.FINE,
"Query analysis failed for: " + (query.length() > 100 ? query.substring(0, 100) + "..." : query) +
" - assuming write operation", e);
" - assuming write operation", e);
return true;
}
}
Expand All @@ -842,9 +894,11 @@ private String generateBookmark() {
/**
* Authenticate user with provided credentials.
*
* @param principal the username
* @param principal the username
* @param credentials the password
*
* @return true if authentication succeeded, false otherwise (failure already sent)
*
* @throws IOException if sending failure message fails
*/
private boolean authenticateUser(final String principal, final String credentials) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,18 @@
};
}

/**
* Metadata key for storing the projection name when unwrapping single-element results.
* This allows wire protocols (like Bolt) to correctly format responses with field names.
*/
public static final String PROJECTION_NAME_METADATA = "_projectionName";

Check notice on line 120 in engine/src/main/java/com/arcadedb/query/opencypher/executor/steps/FinalProjectionStep.java

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

engine/src/main/java/com/arcadedb/query/opencypher/executor/steps/FinalProjectionStep.java#L120

Fields should be declared at the top of the class, before any method declarations, constructors, initializers or inner classes.

/**
* Filters the result to only include the requested properties.
* When the result contains a single property that is a Document (vertex/edge),
* returns it as an element result directly, matching the behavior of the
* Gremlin-based Cypher engine.
* Gremlin-based Cypher engine. The original projection name is stored in metadata
* for wire protocols that need field names.
*/
private ResultInternal filterResult(final Result inputResult) {
// When returning a single variable that resolves to an element (vertex/edge),
Expand All @@ -128,7 +135,10 @@
if (inputResult.hasProperty(singleProp)) {
final Object value = inputResult.getProperty(singleProp);
if (value instanceof Document doc) {
return new ResultInternal(doc);
final ResultInternal result = new ResultInternal(doc);
// Store the original projection name for wire protocols (Bolt, HTTP, etc.)
result.setMetadata(PROJECTION_NAME_METADATA, singleProp);
return result;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.ArrayList;
import java.util.List;

import static com.arcadedb.query.opencypher.executor.steps.FinalProjectionStep.PROJECTION_NAME_METADATA;
import static org.assertj.core.api.Assertions.assertThat;

/**
Expand Down Expand Up @@ -150,4 +151,20 @@ void returnCountIsProjection() {
assertThat(results).hasSize(1);
assertThat(results.getFirst().isProjection()).as("Count result should be a projection").isTrue();
}

@Test
void singleNodeHasProjectionNameMetadata() {
// RETURN n should have _projectionName metadata for wire protocols
final ResultSet result = database.query("opencypher", "MATCH (n:Person) RETURN n");
final List<Result> results = new ArrayList<>();
while (result.hasNext()) {
results.add(result.next());
}

assertThat(results).hasSize(2);
for (final Result r : results) {
assertThat(r.isElement()).isTrue();
assertThat(r.getMetadata(PROJECTION_NAME_METADATA)).isEqualTo("n");
}
}
}
18 changes: 10 additions & 8 deletions postgresw/src/test/java/com/arcadedb/postgres/PostgresWJdbcIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -321,19 +321,21 @@ void cypher() throws Exception {

for (int i = 0; i < 100; i++) {
st.execute("{opencypher} MATCH (n) DETACH DELETE n;");
st.execute("{opencypher} CREATE (james:PersonVertex {name: \"James\", height: 1.9});");
st.execute("{opencypher} CREATE (henry:PersonVertex {name: \"Henry\"});");
st.execute("""
{opencypher} CREATE (james:PersonVertex {name: "James", height: 1.9});""");
st.execute("""
{opencypher} CREATE (henry:PersonVertex {name: "Henry"});""");

var rs = st.executeQuery("{opencypher} MATCH (person:PersonVertex) RETURN person.name, person.height;");
var rs = st.executeQuery("{opencypher} MATCH (person:PersonVertex) RETURN person.name AS name, person.height AS height;");

int numberOfPeople = 0;
while (rs.next()) {
assertThat(rs.getString(1)).isNotNull();
assertThat(rs.getString("name")).isNotNull();

if (rs.getString(1).equals("James"))
assertThat(rs.getFloat(2)).isEqualTo(1.9F);
else if (rs.getString(1).equals("Henry"))
assertThat(rs.getString(2)).isNull();
if (rs.getString("name").equals("James"))
assertThat(rs.getFloat("height")).isEqualTo(1.9F);
else if (rs.getString("name").equals("Henry"))
assertThat(rs.getString("height")).isNull();
else
fail("");

Expand Down
Loading