@@ -80,7 +80,11 @@ private enum State {
8080 // Transaction state
8181 private boolean explicitTransaction = false ;
8282
83- // Current result set for streaming
83+ /**
84+ * Current result set for streaming results.
85+ * Thread-safety: This class is designed to handle a single connection in a dedicated thread.
86+ * All state variables are accessed only by the executor thread and do not require synchronization.
87+ */
8488 private ResultSet currentResultSet ;
8589 private List <String > currentFields ;
8690 private Result firstResult ; // Buffered first result for field name extraction
@@ -275,16 +279,7 @@ private void handleHello(final HelloMessage message) throws IOException {
275279
276280 // Try to authenticate
277281 if ("basic" .equals (scheme ) && principal != null && credentials != null ) {
278- try {
279- user = server .getSecurity ().authenticate (principal , credentials , databaseName );
280- if (user == null ) {
281- sendFailure (BoltException .AUTHENTICATION_ERROR , "Invalid credentials" );
282- state = State .FAILED ;
283- return ;
284- }
285- } catch (final ServerSecurityException e ) {
286- sendFailure (BoltException .AUTHENTICATION_ERROR , e .getMessage ());
287- state = State .FAILED ;
282+ if (!authenticateUser (principal , credentials )) {
288283 return ;
289284 }
290285 } else if ("none" .equals (scheme )) {
@@ -294,16 +289,7 @@ private void handleHello(final HelloMessage message) throws IOException {
294289 return ;
295290 } else if (principal != null && credentials != null ) {
296291 // Try basic auth even without explicit scheme
297- try {
298- user = server .getSecurity ().authenticate (principal , credentials , databaseName );
299- if (user == null ) {
300- sendFailure (BoltException .AUTHENTICATION_ERROR , "Invalid credentials" );
301- state = State .FAILED ;
302- return ;
303- }
304- } catch (final ServerSecurityException e ) {
305- sendFailure (BoltException .AUTHENTICATION_ERROR , e .getMessage ());
306- state = State .FAILED ;
292+ if (!authenticateUser (principal , credentials )) {
307293 return ;
308294 }
309295 }
@@ -331,19 +317,8 @@ private void handleLogon(final LogonMessage message) throws IOException {
331317 final String principal = message .getPrincipal ();
332318 final String credentials = message .getCredentials ();
333319
334- if (principal != null && credentials != null ) {
335- try {
336- user = server .getSecurity ().authenticate (principal , credentials , databaseName );
337- if (user == null ) {
338- sendFailure (BoltException .AUTHENTICATION_ERROR , "Invalid credentials" );
339- state = State .FAILED ;
340- return ;
341- }
342- } catch (final ServerSecurityException e ) {
343- sendFailure (BoltException .AUTHENTICATION_ERROR , e .getMessage ());
344- state = State .FAILED ;
345- return ;
346- }
320+ if (!authenticateUser (principal , credentials )) {
321+ return ;
347322 }
348323
349324 sendSuccess (Map .of ());
@@ -370,6 +345,15 @@ private void handleGoodbye() {
370345 * Handle RESET message - reset to initial state.
371346 */
372347 private void handleReset () throws IOException {
348+ // Close any open result set
349+ if (currentResultSet != null ) {
350+ try {
351+ currentResultSet .close ();
352+ } catch (final Exception e ) {
353+ // Ignore
354+ }
355+ }
356+
373357 // Rollback any open transaction
374358 if (explicitTransaction && database != null ) {
375359 try {
@@ -430,7 +414,8 @@ private void handleRun(final RunMessage message) throws IOException {
430414 // Build success response with query metadata
431415 final Map <String , Object > metadata = new LinkedHashMap <>();
432416 metadata .put ("fields" , currentFields );
433- metadata .put ("t_first" , 0L ); // Time to first record (placeholder)
417+ // TODO: Implement actual time to first record calculation for accurate performance monitoring
418+ metadata .put ("t_first" , 0L );
434419
435420 sendSuccess (metadata );
436421 state = explicitTransaction ? State .TX_STREAMING : State .STREAMING ;
@@ -503,8 +488,15 @@ private void handlePull(final PullMessage message) throws IOException {
503488 // Build success metadata
504489 final Map <String , Object > metadata = new LinkedHashMap <>();
505490 if (!hasMore ) {
506- metadata .put ("type" , "r" ); // Read-only query type
507- metadata .put ("t_last" , 0L ); // Time to last record
491+ // TODO: Determine query type dynamically (r=read, w=write, rw=read-write, s=schema)
492+ metadata .put ("type" , "r" );
493+ // TODO: Implement actual time to last record calculation for accurate performance metrics
494+ metadata .put ("t_last" , 0L );
495+ try {
496+ currentResultSet .close ();
497+ } catch (final Exception e ) {
498+ // Ignore
499+ }
508500 currentResultSet = null ;
509501 currentFields = null ;
510502 firstResult = null ;
@@ -541,6 +533,11 @@ private void handleDiscard(final DiscardMessage message) throws IOException {
541533 while (currentResultSet .hasNext ()) {
542534 currentResultSet .next ();
543535 }
536+ try {
537+ currentResultSet .close ();
538+ } catch (final Exception e ) {
539+ // Ignore
540+ }
544541 }
545542
546543 currentResultSet = null ;
@@ -641,6 +638,13 @@ private void handleRollback() throws IOException {
641638 }
642639
643640 try {
641+ if (currentResultSet != null ) {
642+ try {
643+ currentResultSet .close ();
644+ } catch (final Exception e ) {
645+ // Ignore
646+ }
647+ }
644648 if (database != null ) {
645649 database .rollback ();
646650 }
@@ -673,7 +677,7 @@ private void handleRoute(final RouteMessage message) throws IOException {
673677 final String address = host + ":" + port ;
674678
675679 final Map <String , Object > rt = new LinkedHashMap <>();
676- rt .put ("ttl" , 300L ); // 5 minute TTL
680+ rt .put ("ttl" , GlobalConfiguration . BOLT_ROUTING_TTL . getValueAsLong ());
677681 rt .put ("db" , message .getDatabase () != null ? message .getDatabase () : databaseName );
678682
679683 final List <Map <String , Object >> servers = new ArrayList <>();
@@ -710,7 +714,8 @@ private boolean ensureDatabase() throws IOException {
710714 }
711715
712716 if (databaseName == null || databaseName .isEmpty ()) {
713- // Try to get default database or first available
717+ // TODO: Consider making default database selection configurable or requiring explicit database name
718+ // to avoid unpredictable behavior in multi-database environments
714719 final Collection <String > databases = server .getDatabaseNames ();
715720 if (databases .isEmpty ()) {
716721 sendFailure (BoltException .DATABASE_ERROR , "No database available" );
@@ -761,6 +766,34 @@ private String generateBookmark() {
761766 return "arcade:tx:" + System .currentTimeMillis ();
762767 }
763768
769+ /**
770+ * Authenticate user with provided credentials.
771+ *
772+ * @param principal the username
773+ * @param credentials the password
774+ * @return true if authentication succeeded, false otherwise (failure already sent)
775+ * @throws IOException if sending failure message fails
776+ */
777+ private boolean authenticateUser (final String principal , final String credentials ) throws IOException {
778+ if (principal == null || credentials == null ) {
779+ return false ;
780+ }
781+
782+ try {
783+ user = server .getSecurity ().authenticate (principal , credentials , databaseName );
784+ if (user == null ) {
785+ sendFailure (BoltException .AUTHENTICATION_ERROR , "Invalid credentials" );
786+ state = State .FAILED ;
787+ return false ;
788+ }
789+ return true ;
790+ } catch (final ServerSecurityException e ) {
791+ sendFailure (BoltException .AUTHENTICATION_ERROR , e .getMessage ());
792+ state = State .FAILED ;
793+ return false ;
794+ }
795+ }
796+
764797 /**
765798 * Send a SUCCESS response message.
766799 */
@@ -809,6 +842,15 @@ private void sendMessage(final BoltMessage message) throws IOException {
809842 * Cleanup resources when connection closes.
810843 */
811844 private void cleanup () {
845+ try {
846+ if (currentResultSet != null ) {
847+ currentResultSet .close ();
848+ currentResultSet = null ;
849+ }
850+ } catch (final Exception e ) {
851+ // Ignore
852+ }
853+
812854 try {
813855 if (explicitTransaction && database != null ) {
814856 database .rollback ();
@@ -817,6 +859,15 @@ private void cleanup() {
817859 // Ignore
818860 }
819861
862+ try {
863+ if (database != null ) {
864+ database .close ();
865+ database = null ;
866+ }
867+ } catch (final Exception e ) {
868+ // Ignore
869+ }
870+
820871 try {
821872 socket .close ();
822873 } catch (final Exception e ) {
0 commit comments