Skip to content

Commit

Permalink
Add GqlStatusObject support as notifications to ResultSummary (#588)
Browse files Browse the repository at this point in the history
Introduces GQL-compliant status objects to the ResultSummary.
  • Loading branch information
StephenCathcart authored Jul 24, 2024
1 parent a8c8342 commit f49845b
Show file tree
Hide file tree
Showing 24 changed files with 1,026 additions and 86 deletions.
37 changes: 22 additions & 15 deletions neo4j/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,22 @@ type ServerAddress = config.ServerAddress

func defaultConfig() *Config {
return &Config{
AddressResolver: nil,
MaxTransactionRetryTime: 30 * time.Second,
MaxConnectionPoolSize: 100,
MaxConnectionLifetime: 1 * time.Hour,
ConnectionAcquisitionTimeout: 1 * time.Minute,
ConnectionLivenessCheckTimeout: pool.DefaultConnectionLivenessCheckTimeout,
SocketConnectTimeout: 5 * time.Second,
SocketKeepalive: true,
RootCAs: nil,
UserAgent: UserAgent,
FetchSize: FetchDefault,
NotificationsMinSeverity: notifications.DefaultLevel,
NotificationsDisabledCategories: notifications.NotificationDisabledCategories{},
TelemetryDisabled: false,
ReadBufferSize: bolt.DefaultReadBufferSize,
AddressResolver: nil,
MaxTransactionRetryTime: 30 * time.Second,
MaxConnectionPoolSize: 100,
MaxConnectionLifetime: 1 * time.Hour,
ConnectionAcquisitionTimeout: 1 * time.Minute,
ConnectionLivenessCheckTimeout: pool.DefaultConnectionLivenessCheckTimeout,
SocketConnectTimeout: 5 * time.Second,
SocketKeepalive: true,
RootCAs: nil,
UserAgent: UserAgent,
FetchSize: FetchDefault,
NotificationsMinSeverity: notifications.DefaultLevel,
NotificationsDisabledCategories: notifications.NotificationDisabledCategories{},
NotificationsDisabledClassifications: notifications.NotificationDisabledClassifications{},
TelemetryDisabled: false,
ReadBufferSize: bolt.DefaultReadBufferSize,
}
}

Expand Down Expand Up @@ -91,6 +92,12 @@ func validateAndNormaliseConfig(config *Config) error {
config.SocketConnectTimeout = 0
}

// Check notifications have not been configured with both categories and classifications.
if len(config.NotificationsDisabledCategories.DisabledCategories()) > 0 &&
len(config.NotificationsDisabledClassifications.DisabledClassifications()) > 0 {
return &UsageError{Message: "Notifications cannot be disabled for both categories and classifications at the same time."}
}

return nil
}

Expand Down
6 changes: 6 additions & 0 deletions neo4j/config/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ type Config struct {
// By default, the server's settings are used.
// Disabling categories allows the server to skip analysis for those, which can speed up query execution.
NotificationsDisabledCategories notifications.NotificationDisabledCategories
// NotificationsDisabledClassifications is identical to NotificationsDisabledCategories.
// This alternative is provided for a consistent naming with neo4j.GqlStatusObject Classification.
//
// NotificationsDisabledClassifications is part of the GQL compliant notifications preview feature
// (see README on what it means in terms of support and compatibility guarantees)
NotificationsDisabledClassifications notifications.NotificationDisabledClassifications
// By default, if the server requests it, the driver will automatically transmit anonymous usage
// statistics to the server it is connected to.
//
Expand Down
13 changes: 13 additions & 0 deletions neo4j/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package neo4j

import (
"github.com/neo4j/neo4j-go-driver/v5/neo4j/notifications"
"math"
"testing"
"time"
Expand Down Expand Up @@ -121,4 +122,16 @@ func TestValidateAndNormaliseConfig(rt *testing.T) {
t.Errorf("SocketConnectTimeout should be set to (0 * time.Nanosecond) when negative")
}
})

rt.Run("Configure both NotificationsDisabledCategories and NotificationsDisabledCategories", func(t *testing.T) {
config := defaultConfig()

config.NotificationsDisabledCategories = notifications.DisableCategories(notifications.Deprecation)
config.NotificationsDisabledClassifications = notifications.DisableClassifications(notifications.Deprecation)

err := validateAndNormaliseConfig(config)
if err == nil {
t.Errorf("NotificationsDisabledCategories and NotificationsDisabledCategories are configured but no error returned")
}
})
}
58 changes: 58 additions & 0 deletions neo4j/db/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ type ProfiledPlan struct {
Time int64
}

// StreamSummary is part of the GQL compliant notifications preview feature
// (see README on what it means in terms of support and compatibility guarantees)
type StreamSummary struct {
HadRecord bool
HadKey bool
}

type Notification struct {
Code string
Title string
Expand All @@ -99,6 +106,55 @@ type Notification struct {
Category string
}

// GqlStatusObject represents a GqlStatusObject generated when executing a statement.
// A GqlStatusObject can be visualized in a client pinpointing problems or other information about the statement.
// Contrary to failures or errors, GqlStatusObjects do not affect the execution of the statement.
//
// GqlStatusObject is part of the GQL compliant notifications preview feature
// (see README on what it means in terms of support and compatibility guarantees)
type GqlStatusObject struct {
// Deprecated: for backward compatibility with Notification.Code only.
Code string
// Deprecated: for backward compatibility with Notification.Title only.
Title string
// Deprecated: for backward compatibility with Notification.Description only.
Description string
// GqlStatus returns the GQLSTATUS.
// The following GQLSTATUS codes denote codes that the driver will use for
// polyfilling (when connected to an old, non-GQL-aware server). Further, they may be used by servers
// during the transition-phase to GQLSTATUS-awareness.
// - "01N42" (warning - unknown warning)
// - "02N42" (no data - unknown subcondition)
// - "03N42" (informational - unknown notification)
// - "05N42" (general processing exception - unknown error)
//
// This means these codes are not guaranteed to be stable and may change in future versions.
GqlStatus string
// StatusDescription returns a description of the status.
StatusDescription string
// Position returns the position in the statement where this status points to.
// Not all statuses have a unique position to point to and in that case the position would be set to nil.
//
// Only Notifications (see IsNotification) have a meaningful position.
Position *InputPosition
// Classification returns the mapped Classification of this status.
// If the Classification is not a known value, Classification returns notifications.UnknownClassification.
//
// Only Notifications (see IsNotification) have a meaningful classification.
Classification string
// Severity returns the mapped Severity of this status.
// If the Severity is not a known value, Severity returns notifications.UnknownSeverity.
//
// Only Notifications (see IsNotification) have a meaningful severity.
Severity string
// DiagnosticRecord returns further information about the status for diagnostic purposes.
DiagnosticRecord map[string]any
// IsNotification returns true if this status is a Notification.
//
// Only some statuses are Notifications.
IsNotification bool
}

// InputPosition contains information about a specific position in a statement
type InputPosition struct {
// Offset contains the character offset referred to by this position; offset numbers start at 0.
Expand Down Expand Up @@ -127,7 +183,9 @@ type Summary struct {
Plan *Plan
ProfiledPlan *ProfiledPlan
Notifications []Notification
GqlStatusObjects []GqlStatusObject
Database string
ContainsSystemUpdates *bool
ContainsUpdates *bool
StreamSummary StreamSummary
}
4 changes: 4 additions & 0 deletions neo4j/driver_with_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,10 @@ func (sum *fakeSummary) Notifications() []Notification {
panic("implement me")
}

func (sum *fakeSummary) GqlStatusObjects() []GqlStatusObject {
panic("implement me")
}

func (sum *fakeSummary) ResultAvailableAfter() time.Duration {
return sum.resultAvailableAfter
}
Expand Down
2 changes: 2 additions & 0 deletions neo4j/internal/bolt/bolt3.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,7 @@ func (b *bolt3) receiveNext(ctx context.Context) (*db.Record, *db.Summary, error
switch message := res.(type) {
case *db.Record:
message.Keys = b.currStream.keys
b.currStream.hadRecord = true
return message, nil, nil
case *success:
// End of stream, parse summary
Expand Down Expand Up @@ -652,6 +653,7 @@ func (b *bolt3) receiveNext(ctx context.Context) (*db.Record, *db.Summary, error
sum.Minor = b.minor
sum.ServerName = b.serverName
sum.TFirst = b.currStream.tfirst
sum.StreamSummary = b.currStream.ToSummary()
b.currStream.sum = sum
b.currStream = nil
return nil, sum, nil
Expand Down
66 changes: 66 additions & 0 deletions neo4j/internal/bolt/bolt3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,4 +1000,70 @@ func TestBolt3(outer *testing.T) {
}
})
}

outer.Run("StreamSummary tests", func(t *testing.T) {
// Test where both HadRecord and HadKey are false - omitted result
t.Run("StreamSummary omitted result", func(t *testing.T) {
bolt, cleanup := connectToServer(t, func(srv *bolt3server) {
srv.accept(3)
// Simulate a run that returns no keys and no records
srv.waitForRun()
srv.send(msgSuccess, map[string]any{"fields": []any{}})
srv.send(msgSuccess, map[string]any{})
})
defer cleanup()
defer bolt.Close(context.Background())

stream, _ := bolt.Run(context.Background(), idb.Command{Cypher: "MATCH (n) RETURN n"}, idb.TxConfig{Mode: idb.ReadMode})
summary, err := bolt.Consume(context.Background(), stream)
AssertNoError(t, err)

// Validate StreamSummary fields
AssertFalse(t, summary.StreamSummary.HadRecord)
AssertFalse(t, summary.StreamSummary.HadKey)
})

// Test where both HadRecord and HadKey are true - success result
t.Run("StreamSummary success result", func(t *testing.T) {
bolt, cleanup := connectToServer(t, func(srv *bolt3server) {
srv.accept(3)
// Simulate a run that returns keys and records
srv.waitForRun()
srv.send(msgSuccess, map[string]any{"fields": []any{"name"}})
srv.send(msgRecord, []any{"John Doe"})
srv.send(msgSuccess, map[string]any{})
})
defer cleanup()
defer bolt.Close(context.Background())

stream, _ := bolt.Run(context.Background(), idb.Command{Cypher: "MATCH (n) RETURN n"}, idb.TxConfig{Mode: idb.ReadMode})
summary, err := bolt.Consume(context.Background(), stream)
AssertNoError(t, err)

// Validate StreamSummary fields
AssertTrue(t, summary.StreamSummary.HadRecord)
AssertTrue(t, summary.StreamSummary.HadKey)
})

// Test where HadRecord is false but HadKey is true - no data result
t.Run("StreamSummary no data result", func(t *testing.T) {
bolt, cleanup := connectToServer(t, func(srv *bolt3server) {
srv.accept(3)
// Simulate a run that returns keys but no records
srv.waitForRun()
srv.send(msgSuccess, map[string]any{"fields": []any{"name"}})
srv.send(msgSuccess, map[string]any{})
})
defer cleanup()
defer bolt.Close(context.Background())

stream, _ := bolt.Run(context.Background(), idb.Command{Cypher: "MATCH (n) RETURN n"}, idb.TxConfig{Mode: idb.ReadMode})
summary, err := bolt.Consume(context.Background(), stream)
AssertNoError(t, err)

// Validate StreamSummary fields
AssertFalse(t, summary.StreamSummary.HadRecord)
AssertTrue(t, summary.StreamSummary.HadKey)
})
})
}
4 changes: 4 additions & 0 deletions neo4j/internal/bolt/bolt4.go
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,9 @@ func (b *bolt4) discardResponseHandler(stream *stream) responseHandler {
func (b *bolt4) pullResponseHandler(stream *stream) responseHandler {
return responseHandler{
onRecord: func(record *db.Record) {
if record != nil {
stream.hadRecord = true
}
if stream.discarding {
stream.emptyRecords()
} else {
Expand Down Expand Up @@ -1181,6 +1184,7 @@ func (b *bolt4) extractSummary(success *success, stream *stream) *db.Summary {
summary.Minor = b.minor
summary.ServerName = b.serverName
summary.TFirst = stream.tfirst
summary.StreamSummary = stream.ToSummary()
return summary
}

Expand Down
66 changes: 66 additions & 0 deletions neo4j/internal/bolt/bolt4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1579,4 +1579,70 @@ func TestBolt4(outer *testing.T) {
}
})
}

outer.Run("StreamSummary tests", func(t *testing.T) {
// Test where both HadRecord and HadKey are false - omitted result
t.Run("StreamSummary omitted result", func(t *testing.T) {
bolt, cleanup := connectToServer(t, func(srv *bolt4server) {
srv.accept(4)
// Simulate a run that returns no keys and no records
srv.waitForRun(nil)
srv.send(msgSuccess, map[string]any{"fields": []any{}})
srv.send(msgSuccess, map[string]any{})
})
defer cleanup()
defer bolt.Close(context.Background())

stream, _ := bolt.Run(context.Background(), idb.Command{Cypher: "MATCH (n) RETURN n"}, idb.TxConfig{Mode: idb.ReadMode})
summary, err := bolt.Consume(context.Background(), stream)
AssertNoError(t, err)

// Validate StreamSummary fields
AssertFalse(t, summary.StreamSummary.HadRecord)
AssertFalse(t, summary.StreamSummary.HadKey)
})

// Test where both HadRecord and HadKey are true - success result
t.Run("StreamSummary success result", func(t *testing.T) {
bolt, cleanup := connectToServer(t, func(srv *bolt4server) {
srv.accept(4)
// Simulate a run that returns keys and records
srv.waitForRun(nil)
srv.send(msgSuccess, map[string]any{"fields": []any{"name"}})
srv.send(msgRecord, []any{"John Doe"})
srv.send(msgSuccess, map[string]any{})
})
defer cleanup()
defer bolt.Close(context.Background())

stream, _ := bolt.Run(context.Background(), idb.Command{Cypher: "MATCH (n) RETURN n"}, idb.TxConfig{Mode: idb.ReadMode})
summary, err := bolt.Consume(context.Background(), stream)
AssertNoError(t, err)

// Validate StreamSummary fields
AssertTrue(t, summary.StreamSummary.HadRecord)
AssertTrue(t, summary.StreamSummary.HadKey)
})

// Test where HadRecord is false but HadKey is true - no data result
t.Run("StreamSummary no data result", func(t *testing.T) {
bolt, cleanup := connectToServer(t, func(srv *bolt4server) {
srv.accept(4)
// Simulate a run that returns keys but no records
srv.waitForRun(nil)
srv.send(msgSuccess, map[string]any{"fields": []any{"name"}})
srv.send(msgSuccess, map[string]any{})
})
defer cleanup()
defer bolt.Close(context.Background())

stream, _ := bolt.Run(context.Background(), idb.Command{Cypher: "MATCH (n) RETURN n"}, idb.TxConfig{Mode: idb.ReadMode})
summary, err := bolt.Consume(context.Background(), stream)
AssertNoError(t, err)

// Validate StreamSummary fields
AssertFalse(t, summary.StreamSummary.HadRecord)
AssertTrue(t, summary.StreamSummary.HadKey)
})
})
}
Loading

0 comments on commit f49845b

Please sign in to comment.