From 8ddfa453e199d3bd11e42a8404edf9f5c5d57d14 Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Tue, 17 Sep 2024 15:16:49 +0300 Subject: [PATCH 01/12] add output cypher component Signed-off-by: Jem Davies --- cmd/bento/config.yaml | 23 ++ go.mod | 1 + go.sum | 2 + internal/impl/cypher/output_cypher.go | 155 +++++++++++++ public/components/all/package.go | 1 + public/components/cypher/package.go | 6 + website/docs/components/outputs/cypher.md | 270 ++++++++++++++++++++++ 7 files changed, 458 insertions(+) create mode 100644 cmd/bento/config.yaml create mode 100644 internal/impl/cypher/output_cypher.go create mode 100644 public/components/cypher/package.go create mode 100644 website/docs/components/outputs/cypher.md diff --git a/cmd/bento/config.yaml b/cmd/bento/config.yaml new file mode 100644 index 000000000..cd926ff50 --- /dev/null +++ b/cmd/bento/config.yaml @@ -0,0 +1,23 @@ +input: + stdin: {} + +output: + cypher: + database: neo4j + uri: bolt://localhost:7687 + query: | + CREATE (p:Person {name: $name}) RETURN p + values: + name: ${! json("name") } + batching: + count: 0 + # byte_size: 0 + # period: "" + # check: "" + +## Return all nodes and relations: +# MATCH (n) OPTIONAL MATCH (n)-[r]->() RETURN n, r + +## Delete all nodes and relations +# MATCH (n) DETACH DELETE n + diff --git a/go.mod b/go.mod index 9838201d4..951c549dc 100644 --- a/go.mod +++ b/go.mod @@ -296,6 +296,7 @@ require ( github.com/nats-io/nats-server/v2 v2.9.23 // indirect github.com/nats-io/nats-streaming-server v0.24.6 // indirect github.com/nats-io/nuid v1.0.1 // indirect + github.com/neo4j/neo4j-go-driver/v5 v5.24.0 github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0-rc5 // indirect diff --git a/go.sum b/go.sum index 6e66c1c06..a0a050b6c 100644 --- a/go.sum +++ b/go.sum @@ -932,6 +932,8 @@ github.com/nats-io/stan.go v0.10.2/go.mod h1:vo2ax8K2IxaR3JtEMLZRFKIdoK/3o1/PKue github.com/nats-io/stan.go v0.10.4 h1:19GS/eD1SeQJaVkeM9EkvEYattnvnWrZ3wkSWSw4uXw= github.com/nats-io/stan.go v0.10.4/go.mod h1:3XJXH8GagrGqajoO/9+HgPyKV5MWsv7S5ccdda+pc6k= github.com/ncw/swift v1.0.52/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM= +github.com/neo4j/neo4j-go-driver/v5 v5.24.0 h1:7MAFoB7L6f9heQUo/tJ5EnrrpVzm9ZBHgH8ew03h6Eo= +github.com/neo4j/neo4j-go-driver/v5 v5.24.0/go.mod h1:Vff8OwT7QpLm7L2yYr85XNWe9Rbqlbeb9asNXJTHO4k= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nsf/jsondiff v0.0.0-20210926074059-1e845ec5d249 h1:NHrXEjTNQY7P0Zfx1aMrNhpgxHmow66XQtm0aQLY0AE= github.com/nsf/jsondiff v0.0.0-20210926074059-1e845ec5d249/go.mod h1:mpRZBD8SJ55OIICQ3iWH0Yz3cjzA61JdqMLoWXeB2+8= diff --git a/internal/impl/cypher/output_cypher.go b/internal/impl/cypher/output_cypher.go new file mode 100644 index 000000000..2a1ca325e --- /dev/null +++ b/internal/impl/cypher/output_cypher.go @@ -0,0 +1,155 @@ +package cypher + +import ( + "context" + + "github.com/neo4j/neo4j-go-driver/v5/neo4j" + "github.com/warpstreamlabs/bento/public/service" +) + +func cypherOutputSpec() *service.ConfigSpec { + spec := service.NewConfigSpec(). + Fields( + service.NewStringField("database"). + Description("The name of the database to connect to."). + Example("neo4j"), + service.NewStringField("uri"). + Description("The URI of the database."). + Example("bolt://localhost:7687"), + service.NewBoolField("noAuth"). + Description("No Authentication currently implemented, defaults to true."). + Default(true), + service.NewStringField("query"). + Description("The cypher query to execute."). + Example("CREATE (p:Person {name: $name}) RETURN p"), + service.NewInterpolatedStringMapField("values"). + Description("A map of strings -> bloblang interpolations that form the values of the references in the query i.e. $name."). + Default(map[string]any{}). + Example(map[string]any{ + "name": "Alice", + }), + service.NewIntField("max_in_flight"). + Description("The maximum number of queries to run in parallel."). + Default(64), + ) + + spec = spec.Field(service.NewBatchPolicyField("batching")). + Version("1.0.0"). + Example("Create Node", + ` +Here we execute a cypher query that takes the value of $name from the interpolated field in the values map:`, + ` +output: + cypher: + database: neo4j + uri: bolt://localhost:7687 + query: | + CREATE (p:Person {name: $name}) RETURN p + values: + name: ${! json("name") } + batching: + count: 0 +`, + ) + return spec +} + +func init() { + err := service.RegisterBatchOutput( + "cypher", cypherOutputSpec(), + func(conf *service.ParsedConfig, mgr *service.Resources) (out service.BatchOutput, batchPolicy service.BatchPolicy, maxInFlight int, err error) { + if batchPolicy, err = conf.FieldBatchPolicy("batching"); err != nil { + return + } + if maxInFlight, err = conf.FieldInt("max_in_flight"); err != nil { + return + } + out, err = NewCypherOutputFromConfig(conf, mgr) + return + }) + if err != nil { + panic(err) + } +} + +//---------------------------------------------------------------------------- + +type CypherOutput struct { + database string + uri string + noAuth bool + query string + values map[string]*service.InterpolatedString + driver neo4j.DriverWithContext + session neo4j.SessionWithContext +} + +func NewCypherOutputFromConfig(conf *service.ParsedConfig, mgr *service.Resources) (*CypherOutput, error) { + database, err := conf.FieldString("database") + if err != nil { + return nil, err + } + uri, err := conf.FieldString("uri") + if err != nil { + return nil, err + } + noAuth, err := conf.FieldBool("noAuth") + if err != nil { + return nil, err + } + query, err := conf.FieldString("query") + if err != nil { + return nil, err + } + values, _ := conf.FieldInterpolatedStringMap("values") + + return &CypherOutput{database: database, uri: uri, noAuth: noAuth, query: query, values: values}, nil + +} + +func (cyp *CypherOutput) Connect(ctx context.Context) error { + + driver, err := neo4j.NewDriverWithContext(cyp.uri, neo4j.NoAuth()) + if err != nil { + return err + } + cyp.driver = driver + + session := driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite}) + cyp.session = session + + return nil +} + +func (cyp *CypherOutput) WriteBatch(ctx context.Context, batch service.MessageBatch) error { + + values := make(map[string]any) + + for _, msgPart := range batch { + + for k, v := range cyp.values { + values[k] = v.String(msgPart) + } + + _, err := neo4j.ExecuteQuery(ctx, cyp.driver, + cyp.query, + values, + neo4j.EagerResultTransformer, + neo4j.ExecuteQueryWithDatabase(cyp.database), + ) + + if err != nil { + panic(err) + } + + values = nil + } + + return nil +} + +func (cyp *CypherOutput) Close(ctx context.Context) error { + cyp.driver.Close(ctx) + cyp.session.Close(ctx) + return nil +} diff --git a/public/components/all/package.go b/public/components/all/package.go index 9abaceeb1..b819e09e5 100644 --- a/public/components/all/package.go +++ b/public/components/all/package.go @@ -17,6 +17,7 @@ import ( _ "github.com/warpstreamlabs/bento/public/components/confluent" _ "github.com/warpstreamlabs/bento/public/components/couchbase" _ "github.com/warpstreamlabs/bento/public/components/crypto" + _ "github.com/warpstreamlabs/bento/public/components/cypher" _ "github.com/warpstreamlabs/bento/public/components/dgraph" _ "github.com/warpstreamlabs/bento/public/components/discord" _ "github.com/warpstreamlabs/bento/public/components/elasticsearch" diff --git a/public/components/cypher/package.go b/public/components/cypher/package.go new file mode 100644 index 000000000..e388adca7 --- /dev/null +++ b/public/components/cypher/package.go @@ -0,0 +1,6 @@ +package crypto + +import ( + // Bring in the internal plugin definitions. + _ "github.com/warpstreamlabs/bento/internal/impl/cypher" +) diff --git a/website/docs/components/outputs/cypher.md b/website/docs/components/outputs/cypher.md new file mode 100644 index 000000000..98134769b --- /dev/null +++ b/website/docs/components/outputs/cypher.md @@ -0,0 +1,270 @@ +--- +title: cypher +slug: cypher +type: output +status: experimental +--- + + + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +:::caution EXPERIMENTAL +This component is experimental and therefore subject to change or removal outside of major version releases. +::: + +Introduced in version 1.0.0. + + + + + + +```yml +# Common config fields, showing default values +output: + label: "" + cypher: + database: neo4j # No default (required) + uri: bolt://localhost:7687 # No default (required) + noAuth: true + query: 'CREATE (p:Person {name: $name}) RETURN p' # No default (required) + values: {} + max_in_flight: 64 + batching: + count: 0 + byte_size: 0 + period: "" + check: "" +``` + + + + +```yml +# All config fields, showing default values +output: + label: "" + cypher: + database: neo4j # No default (required) + uri: bolt://localhost:7687 # No default (required) + noAuth: true + query: 'CREATE (p:Person {name: $name}) RETURN p' # No default (required) + values: {} + max_in_flight: 64 + batching: + count: 0 + byte_size: 0 + period: "" + check: "" + processors: [] # No default (optional) +``` + + + + +## Examples + + + + + + +Here we execute a cypher query that takes the value of $name from the interpolated field in the values map: + +```yaml +output: + cypher: + database: neo4j + uri: bolt://localhost:7687 + query: | + CREATE (p:Person {name: $name}) RETURN p + values: + name: ${! json("name") } + batching: + count: 0 +``` + + + + +## Fields + +### `database` + +The name of the database to connect to. + + +Type: `string` + +```yml +# Examples + +database: neo4j +``` + +### `uri` + +The URI of the database. + + +Type: `string` + +```yml +# Examples + +uri: bolt://localhost:7687 +``` + +### `noAuth` + +No Authentication currently implemented, defaults to true. + + +Type: `bool` +Default: `true` + +### `query` + +The cypher query to execute. + + +Type: `string` + +```yml +# Examples + +query: 'CREATE (p:Person {name: $name}) RETURN p' +``` + +### `values` + +A map of strings -> bloblang interpolations that form the values of the references in the query i.e. $name. +This field supports [interpolation functions](/docs/configuration/interpolation#bloblang-queries). + + +Type: `object` +Default: `{}` + +```yml +# Examples + +values: + name: Alice +``` + +### `max_in_flight` + +The maximum number of queries to run in parallel. + + +Type: `int` +Default: `64` + +### `batching` + +Allows you to configure a [batching policy](/docs/configuration/batching). + + +Type: `object` + +```yml +# Examples + +batching: + byte_size: 5000 + count: 0 + period: 1s + +batching: + count: 10 + period: 1s + +batching: + check: this.contains("END BATCH") + count: 0 + period: 1m +``` + +### `batching.count` + +A number of messages at which the batch should be flushed. If `0` disables count based batching. + + +Type: `int` +Default: `0` + +### `batching.byte_size` + +An amount of bytes at which the batch should be flushed. If `0` disables size based batching. + + +Type: `int` +Default: `0` + +### `batching.period` + +A period in which an incomplete batch should be flushed regardless of its size. + + +Type: `string` +Default: `""` + +```yml +# Examples + +period: 1s + +period: 1m + +period: 500ms +``` + +### `batching.check` + +A [Bloblang query](/docs/guides/bloblang/about/) that should return a boolean value indicating whether a message should end a batch. + + +Type: `string` +Default: `""` + +```yml +# Examples + +check: this.type == "end_of_transaction" +``` + +### `batching.processors` + +A list of [processors](/docs/components/processors/about) to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. + + +Type: `array` + +```yml +# Examples + +processors: + - archive: + format: concatenate + +processors: + - archive: + format: lines + +processors: + - archive: + format: json_array +``` + + From 6d777468b74edeeba4a2cf51cb2eae3cc6890703 Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Thu, 26 Sep 2024 22:59:14 +0100 Subject: [PATCH 02/12] add basic_auth to cypher output Signed-off-by: Jem Davies --- cmd/bento/config.yaml | 4 ++ internal/impl/cypher/output_cypher.go | 58 ++++++++++++++++++++++----- 2 files changed, 52 insertions(+), 10 deletions(-) diff --git a/cmd/bento/config.yaml b/cmd/bento/config.yaml index cd926ff50..d4223fb3d 100644 --- a/cmd/bento/config.yaml +++ b/cmd/bento/config.yaml @@ -5,6 +5,10 @@ output: cypher: database: neo4j uri: bolt://localhost:7687 + noAuth: true + # basic_auth: + # user: neo4j + # password: password query: | CREATE (p:Person {name: $name}) RETURN p values: diff --git a/internal/impl/cypher/output_cypher.go b/internal/impl/cypher/output_cypher.go index 2a1ca325e..e8f46eb82 100644 --- a/internal/impl/cypher/output_cypher.go +++ b/internal/impl/cypher/output_cypher.go @@ -18,7 +18,9 @@ func cypherOutputSpec() *service.ConfigSpec { Example("bolt://localhost:7687"), service.NewBoolField("noAuth"). Description("No Authentication currently implemented, defaults to true."). - Default(true), + Default(false), + service.NewObjectField("basic_auth", basicAuthSpec()...). + Description("basic auth"), service.NewStringField("query"). Description("The cypher query to execute."). Example("CREATE (p:Person {name: $name}) RETURN p"), @@ -54,6 +56,21 @@ output: return spec } +func basicAuthSpec() []*service.ConfigField { + return []*service.ConfigField{ + service.NewStringField("user"). + Default(""). + Description("The username for basic auth."), + service.NewStringField("password"). + Default(""). + Secret(). + Description("The password for basic auth."), + service.NewStringField("realm"). + Default(""). + Description("The realm for basic auth."), + } +} + func init() { err := service.RegisterBatchOutput( "cypher", cypherOutputSpec(), @@ -75,13 +92,20 @@ func init() { //---------------------------------------------------------------------------- type CypherOutput struct { - database string - uri string - noAuth bool - query string - values map[string]*service.InterpolatedString - driver neo4j.DriverWithContext - session neo4j.SessionWithContext + database string + uri string + noAuth bool + basic_auth CypherBasicAuth + query string + values map[string]*service.InterpolatedString + driver neo4j.DriverWithContext + session neo4j.SessionWithContext +} + +type CypherBasicAuth struct { + user string + password string + realm string } func NewCypherOutputFromConfig(conf *service.ParsedConfig, mgr *service.Resources) (*CypherOutput, error) { @@ -103,16 +127,30 @@ func NewCypherOutputFromConfig(conf *service.ParsedConfig, mgr *service.Resource } values, _ := conf.FieldInterpolatedStringMap("values") - return &CypherOutput{database: database, uri: uri, noAuth: noAuth, query: query, values: values}, nil + if !noAuth { + basicAuthMap, _ := conf.FieldStringMap("basic_auth") + basicAuth := CypherBasicAuth{user: basicAuthMap["user"], password: basicAuthMap["password"], realm: basicAuthMap["realm"]} + return &CypherOutput{database: database, uri: uri, noAuth: noAuth, basic_auth: basicAuth, query: query, values: values}, nil + } + return &CypherOutput{database: database, uri: uri, noAuth: noAuth, query: query, values: values}, nil } func (cyp *CypherOutput) Connect(ctx context.Context) error { - driver, err := neo4j.NewDriverWithContext(cyp.uri, neo4j.NoAuth()) + var driver neo4j.DriverWithContext + var err error + + if cyp.noAuth { + driver, err = neo4j.NewDriverWithContext(cyp.uri, neo4j.NoAuth()) + } else { + driver, err = neo4j.NewDriverWithContext(cyp.uri, neo4j.BasicAuth(cyp.basic_auth.user, cyp.basic_auth.password, cyp.basic_auth.realm)) + } + if err != nil { return err } + cyp.driver = driver session := driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite}) From 6641275cb52de5feafff0ac2dca86721cfcb479a Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Thu, 26 Sep 2024 23:15:04 +0100 Subject: [PATCH 03/12] fix for linter, make docs Signed-off-by: Jem Davies --- internal/impl/cypher/output_cypher.go | 20 +++++----- website/docs/components/outputs/cypher.md | 48 +++++++++++++++++++++-- 2 files changed, 55 insertions(+), 13 deletions(-) diff --git a/internal/impl/cypher/output_cypher.go b/internal/impl/cypher/output_cypher.go index e8f46eb82..c141fe644 100644 --- a/internal/impl/cypher/output_cypher.go +++ b/internal/impl/cypher/output_cypher.go @@ -92,14 +92,14 @@ func init() { //---------------------------------------------------------------------------- type CypherOutput struct { - database string - uri string - noAuth bool - basic_auth CypherBasicAuth - query string - values map[string]*service.InterpolatedString - driver neo4j.DriverWithContext - session neo4j.SessionWithContext + database string + uri string + noAuth bool + basicAuth CypherBasicAuth + query string + values map[string]*service.InterpolatedString + driver neo4j.DriverWithContext + session neo4j.SessionWithContext } type CypherBasicAuth struct { @@ -130,7 +130,7 @@ func NewCypherOutputFromConfig(conf *service.ParsedConfig, mgr *service.Resource if !noAuth { basicAuthMap, _ := conf.FieldStringMap("basic_auth") basicAuth := CypherBasicAuth{user: basicAuthMap["user"], password: basicAuthMap["password"], realm: basicAuthMap["realm"]} - return &CypherOutput{database: database, uri: uri, noAuth: noAuth, basic_auth: basicAuth, query: query, values: values}, nil + return &CypherOutput{database: database, uri: uri, noAuth: noAuth, basicAuth: basicAuth, query: query, values: values}, nil } return &CypherOutput{database: database, uri: uri, noAuth: noAuth, query: query, values: values}, nil @@ -144,7 +144,7 @@ func (cyp *CypherOutput) Connect(ctx context.Context) error { if cyp.noAuth { driver, err = neo4j.NewDriverWithContext(cyp.uri, neo4j.NoAuth()) } else { - driver, err = neo4j.NewDriverWithContext(cyp.uri, neo4j.BasicAuth(cyp.basic_auth.user, cyp.basic_auth.password, cyp.basic_auth.realm)) + driver, err = neo4j.NewDriverWithContext(cyp.uri, neo4j.BasicAuth(cyp.basicAuth.user, cyp.basicAuth.password, cyp.basicAuth.realm)) } if err != nil { diff --git a/website/docs/components/outputs/cypher.md b/website/docs/components/outputs/cypher.md index 98134769b..08f2e1783 100644 --- a/website/docs/components/outputs/cypher.md +++ b/website/docs/components/outputs/cypher.md @@ -35,7 +35,11 @@ output: cypher: database: neo4j # No default (required) uri: bolt://localhost:7687 # No default (required) - noAuth: true + noAuth: false + basic_auth: + user: "" + password: "" + realm: "" query: 'CREATE (p:Person {name: $name}) RETURN p' # No default (required) values: {} max_in_flight: 64 @@ -56,7 +60,11 @@ output: cypher: database: neo4j # No default (required) uri: bolt://localhost:7687 # No default (required) - noAuth: true + noAuth: false + basic_auth: + user: "" + password: "" + realm: "" query: 'CREATE (p:Person {name: $name}) RETURN p' # No default (required) values: {} max_in_flight: 64 @@ -132,7 +140,41 @@ No Authentication currently implemented, defaults to true. Type: `bool` -Default: `true` +Default: `false` + +### `basic_auth` + +basic auth + + +Type: `object` + +### `basic_auth.user` + +The username for basic auth. + + +Type: `string` +Default: `""` + +### `basic_auth.password` + +The password for basic auth. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: + + +Type: `string` +Default: `""` + +### `basic_auth.realm` + +The realm for basic auth. + + +Type: `string` +Default: `""` ### `query` From b9b05f5b30ebc0f23aea55a32874f169881faccc Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Sat, 28 Sep 2024 20:32:57 +0100 Subject: [PATCH 04/12] add integration tests, add check for query and values Signed-off-by: Jem Davies --- cmd/bento/config.yaml | 27 --- internal/impl/cypher/output_cypher.go | 66 +++-- internal/impl/cypher/output_cypher_test.go | 270 +++++++++++++++++++++ website/docs/components/outputs/cypher.md | 27 ++- 4 files changed, 335 insertions(+), 55 deletions(-) delete mode 100644 cmd/bento/config.yaml create mode 100644 internal/impl/cypher/output_cypher_test.go diff --git a/cmd/bento/config.yaml b/cmd/bento/config.yaml deleted file mode 100644 index d4223fb3d..000000000 --- a/cmd/bento/config.yaml +++ /dev/null @@ -1,27 +0,0 @@ -input: - stdin: {} - -output: - cypher: - database: neo4j - uri: bolt://localhost:7687 - noAuth: true - # basic_auth: - # user: neo4j - # password: password - query: | - CREATE (p:Person {name: $name}) RETURN p - values: - name: ${! json("name") } - batching: - count: 0 - # byte_size: 0 - # period: "" - # check: "" - -## Return all nodes and relations: -# MATCH (n) OPTIONAL MATCH (n)-[r]->() RETURN n, r - -## Delete all nodes and relations -# MATCH (n) DETACH DELETE n - diff --git a/internal/impl/cypher/output_cypher.go b/internal/impl/cypher/output_cypher.go index c141fe644..40e91451c 100644 --- a/internal/impl/cypher/output_cypher.go +++ b/internal/impl/cypher/output_cypher.go @@ -2,6 +2,9 @@ package cypher import ( "context" + "fmt" + "regexp" + "strings" "github.com/neo4j/neo4j-go-driver/v5/neo4j" "github.com/warpstreamlabs/bento/public/service" @@ -13,14 +16,14 @@ func cypherOutputSpec() *service.ConfigSpec { service.NewStringField("database"). Description("The name of the database to connect to."). Example("neo4j"), - service.NewStringField("uri"). - Description("The URI of the database."). + service.NewStringField("url"). + Description("The URL of the database engine."). Example("bolt://localhost:7687"), - service.NewBoolField("noAuth"). - Description("No Authentication currently implemented, defaults to true."). + service.NewBoolField("no_auth"). + Description("Set to true to connect without authentication."). Default(false), service.NewObjectField("basic_auth", basicAuthSpec()...). - Description("basic auth"), + Description("Basic Authentication fields"), service.NewStringField("query"). Description("The cypher query to execute."). Example("CREATE (p:Person {name: $name}) RETURN p"), @@ -28,7 +31,7 @@ func cypherOutputSpec() *service.ConfigSpec { Description("A map of strings -> bloblang interpolations that form the values of the references in the query i.e. $name."). Default(map[string]any{}). Example(map[string]any{ - "name": "Alice", + "name": `${! json("name") }`, }), service.NewIntField("max_in_flight"). Description("The maximum number of queries to run in parallel."). @@ -44,7 +47,10 @@ Here we execute a cypher query that takes the value of $name from the interpolat output: cypher: database: neo4j - uri: bolt://localhost:7687 + url: bolt://localhost:7687 + basic_auth: + user: neo4j + password: password query: | CREATE (p:Person {name: $name}) RETURN p values: @@ -93,7 +99,7 @@ func init() { type CypherOutput struct { database string - uri string + url string noAuth bool basicAuth CypherBasicAuth query string @@ -113,11 +119,11 @@ func NewCypherOutputFromConfig(conf *service.ParsedConfig, mgr *service.Resource if err != nil { return nil, err } - uri, err := conf.FieldString("uri") + url, err := conf.FieldString("url") if err != nil { return nil, err } - noAuth, err := conf.FieldBool("noAuth") + noAuth, err := conf.FieldBool("no_auth") if err != nil { return nil, err } @@ -125,15 +131,43 @@ func NewCypherOutputFromConfig(conf *service.ParsedConfig, mgr *service.Resource if err != nil { return nil, err } - values, _ := conf.FieldInterpolatedStringMap("values") + values, err := conf.FieldInterpolatedStringMap("values") + if err != nil { + return nil, err + } + + err = validateQueryAndValues(query, values) + if err != nil { + return nil, err + } if !noAuth { basicAuthMap, _ := conf.FieldStringMap("basic_auth") basicAuth := CypherBasicAuth{user: basicAuthMap["user"], password: basicAuthMap["password"], realm: basicAuthMap["realm"]} - return &CypherOutput{database: database, uri: uri, noAuth: noAuth, basicAuth: basicAuth, query: query, values: values}, nil + return &CypherOutput{database: database, url: url, noAuth: noAuth, basicAuth: basicAuth, query: query, values: values}, nil + } + + return &CypherOutput{database: database, url: url, noAuth: noAuth, query: query, values: values}, nil +} + +func validateQueryAndValues(query string, values map[string]*service.InterpolatedString) error { + + for k := range values { + if !strings.Contains(query, "$"+k) { + return fmt.Errorf("value key: $%s, not found in query: %s", k, query) + } } - return &CypherOutput{database: database, uri: uri, noAuth: noAuth, query: query, values: values}, nil + re := regexp.MustCompile(`\$\b[a-zA-Z][a-zA-Z0-9]*\b`) + extractedVariables := re.FindAllString(query, -1) + + for _, param := range extractedVariables { + if _, ok := values[param[1:]]; !ok { + return fmt.Errorf("query parameter: %s, not found in value keys", param) + } + } + + return nil } func (cyp *CypherOutput) Connect(ctx context.Context) error { @@ -142,9 +176,9 @@ func (cyp *CypherOutput) Connect(ctx context.Context) error { var err error if cyp.noAuth { - driver, err = neo4j.NewDriverWithContext(cyp.uri, neo4j.NoAuth()) + driver, err = neo4j.NewDriverWithContext(cyp.url, neo4j.NoAuth()) } else { - driver, err = neo4j.NewDriverWithContext(cyp.uri, neo4j.BasicAuth(cyp.basicAuth.user, cyp.basicAuth.password, cyp.basicAuth.realm)) + driver, err = neo4j.NewDriverWithContext(cyp.url, neo4j.BasicAuth(cyp.basicAuth.user, cyp.basicAuth.password, cyp.basicAuth.realm)) } if err != nil { @@ -177,7 +211,7 @@ func (cyp *CypherOutput) WriteBatch(ctx context.Context, batch service.MessageBa ) if err != nil { - panic(err) + return err } values = nil diff --git a/internal/impl/cypher/output_cypher_test.go b/internal/impl/cypher/output_cypher_test.go new file mode 100644 index 000000000..2112410e6 --- /dev/null +++ b/internal/impl/cypher/output_cypher_test.go @@ -0,0 +1,270 @@ +package cypher + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/neo4j/neo4j-go-driver/v5/neo4j" + "github.com/neo4j/neo4j-go-driver/v5/neo4j/dbtype" + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/warpstreamlabs/bento/internal/component/testutil" + "github.com/warpstreamlabs/bento/internal/manager/mock" + "github.com/warpstreamlabs/bento/internal/message" +) + +func setupNeo4j(env []string) (string, error) { + pool, err := dockertest.NewPool("") + if err != nil { + return "", err + } + pool.MaxWait = time.Second * 60 + + resource, err := pool.RunWithOptions(&dockertest.RunOptions{ + Repository: "neo4j", + Tag: "latest", + Env: env, + }) + if err != nil { + return "", err + } + + neo4jDockerAddress := fmt.Sprintf("bolt://localhost:%s", resource.GetPort("7687/tcp")) + _ = resource.Expire(900) + + return neo4jDockerAddress, nil +} + +func TestCypherOutputNoAuth(t *testing.T) { + env := []string{"NEO4J_AUTH=none"} + localURL, err := setupNeo4j(env) + + require.NoError(t, err) + + confStr := fmt.Sprintf(` +cypher: + database: neo4j + url: %s + no_auth: true + query: | + CREATE (p:Person {name: $name}) RETURN p + values: + name: ${! json("name") } + batching: + count: 0 +`, localURL) + + conf, err := testutil.OutputFromYAML(confStr) + require.NoError(t, err) + + s, err := mock.NewManager().NewOutput(conf) + + require.NoError(t, err) + + sendChan := make(chan message.Transaction) + resChan := make(chan error) + if err = s.Consume(sendChan); err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + s.TriggerCloseNow() + + ctx, done := context.WithTimeout(context.Background(), time.Second*10) + assert.NoError(t, s.WaitForClose(ctx)) + done() + }) + + inputs := []string{ + `{"name":"Alice"}`, `{"name":"Bob"}`, `{"name":"Carol"}`, `{"name":"Dan"}`, + } + + for _, input := range inputs { + testMsg := message.QuickBatch([][]byte{[]byte(input)}) + select { + case sendChan <- message.NewTransaction(testMsg, resChan): + case <-time.After(time.Second * 60): + t.Fatal("Action timed out") + } + + select { + case res := <-resChan: + if res != nil { + t.Fatal(res) + } + case <-time.After(time.Second * 60): + t.Fatal("Action timed out") + } + } + + ctx := context.Background() + driver, _ := neo4j.NewDriverWithContext(localURL, neo4j.NoAuth()) + + results, _ := neo4j.ExecuteQuery(ctx, driver, + "match (n) return n", + map[string]any{}, + neo4j.EagerResultTransformer, + neo4j.ExecuteQueryWithDatabase("neo4j"), + ) + + var listOfNamesFromDB []string + listOfNamesToCheck := []string{"Alice", "Bob", "Carol", "Dan"} + + for _, record := range results.Records { + dictionary := record.AsMap() + x := dictionary["n"].(dbtype.Node) + listOfNamesFromDB = append(listOfNamesFromDB, x.Props["name"].(string)) + } + + assert.Equal(t, listOfNamesFromDB, listOfNamesToCheck) +} + +func TestCypherOutputBasicAuth(t *testing.T) { + env := []string{"NEO4J_AUTH=neo4j/sparkling_brazilian_orange_456"} + localURL, err := setupNeo4j(env) + + require.NoError(t, err) + + confStr := fmt.Sprintf(` +cypher: + database: neo4j + url: %s + basic_auth: + user: neo4j + password: sparkling_brazilian_orange_456 + query: | + CREATE (p:Person {name: $name}) RETURN p + values: + name: ${! json("name") } + batching: + count: 0 +`, localURL) + + conf, err := testutil.OutputFromYAML(confStr) + require.NoError(t, err) + + s, err := mock.NewManager().NewOutput(conf) + + require.NoError(t, err) + + sendChan := make(chan message.Transaction) + resChan := make(chan error) + if err = s.Consume(sendChan); err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + s.TriggerCloseNow() + + ctx, done := context.WithTimeout(context.Background(), time.Second*10) + assert.NoError(t, s.WaitForClose(ctx)) + done() + }) + + inputs := []string{ + `{"name":"Alice"}`, `{"name":"Bob"}`, `{"name":"Carol"}`, `{"name":"Dan"}`, + } + + for _, input := range inputs { + testMsg := message.QuickBatch([][]byte{[]byte(input)}) + select { + case sendChan <- message.NewTransaction(testMsg, resChan): + case <-time.After(time.Second * 60): + t.Fatal("Action timed out") + } + + select { + case res := <-resChan: + if res != nil { + t.Fatal(res) + } + case <-time.After(time.Second * 60): + t.Fatal("Action timed out") + } + } + + ctx := context.Background() + driver, _ := neo4j.NewDriverWithContext(localURL, neo4j.BasicAuth("neo4j", "sparkling_brazilian_orange_456", "")) + + results, _ := neo4j.ExecuteQuery(ctx, driver, + "match (n) return n", + map[string]any{}, + neo4j.EagerResultTransformer, + neo4j.ExecuteQueryWithDatabase("neo4j"), + ) + + var listOfNamesFromDB []string + listOfNamesToCheck := []string{"Alice", "Bob", "Carol", "Dan"} + + for _, record := range results.Records { + dictionary := record.AsMap() + x := dictionary["n"].(dbtype.Node) + listOfNamesFromDB = append(listOfNamesFromDB, x.Props["name"].(string)) + } + + assert.Equal(t, listOfNamesFromDB, listOfNamesToCheck) +} + +func TestCypherOutputMissingValue(t *testing.T) { + env := []string{"NEO4J_AUTH=neo4j/sparkling_brazilian_orange_456"} + localURL, err := setupNeo4j(env) + + require.NoError(t, err) + + confStr := fmt.Sprintf(` +cypher: + database: neo4j + url: %s + basic_auth: + user: neo4j + password: sparkling_brazilian_orange_456 + query: | + CREATE (p:Person {name: $name, age: $age}) RETURN p + values: + name: ${! json("name") } + batching: + count: 0 +`, localURL) + + conf, err := testutil.OutputFromYAML(confStr) + + require.NoError(t, err) + + _, err = mock.NewManager().NewOutput(conf) + + require.ErrorContains(t, err, "failed to init output : query parameter: $age, not found in value keys") +} + +func TestCypherOutputMissingParam(t *testing.T) { + env := []string{"NEO4J_AUTH=neo4j/sparkling_brazilian_orange_456"} + localURL, err := setupNeo4j(env) + + require.NoError(t, err) + + confStr := fmt.Sprintf(` +cypher: + database: neo4j + url: %s + basic_auth: + user: neo4j + password: sparkling_brazilian_orange_456 + query: | + CREATE (p:Person {name: $name}) RETURN p + values: + name: ${! json("name") } + age: ${! json("name") } + batching: + count: 0 +`, localURL) + + conf, err := testutil.OutputFromYAML(confStr) + + require.NoError(t, err) + + _, err = mock.NewManager().NewOutput(conf) + + require.ErrorContains(t, err, "failed to init output : value key: $age, not found in query: CREATE (p:Person {name: $name}) RETURN p") +} diff --git a/website/docs/components/outputs/cypher.md b/website/docs/components/outputs/cypher.md index 08f2e1783..45e6ed88c 100644 --- a/website/docs/components/outputs/cypher.md +++ b/website/docs/components/outputs/cypher.md @@ -34,8 +34,8 @@ output: label: "" cypher: database: neo4j # No default (required) - uri: bolt://localhost:7687 # No default (required) - noAuth: false + url: bolt://localhost:7687 # No default (required) + no_auth: false basic_auth: user: "" password: "" @@ -59,8 +59,8 @@ output: label: "" cypher: database: neo4j # No default (required) - uri: bolt://localhost:7687 # No default (required) - noAuth: false + url: bolt://localhost:7687 # No default (required) + no_auth: false basic_auth: user: "" password: "" @@ -94,7 +94,10 @@ Here we execute a cypher query that takes the value of $name from the interpolat output: cypher: database: neo4j - uri: bolt://localhost:7687 + url: bolt://localhost:7687 + basic_auth: + user: neo4j + password: password query: | CREATE (p:Person {name: $name}) RETURN p values: @@ -121,9 +124,9 @@ Type: `string` database: neo4j ``` -### `uri` +### `url` -The URI of the database. +The URL of the database engine. Type: `string` @@ -131,12 +134,12 @@ Type: `string` ```yml # Examples -uri: bolt://localhost:7687 +url: bolt://localhost:7687 ``` -### `noAuth` +### `no_auth` -No Authentication currently implemented, defaults to true. +Set to true to connect without authentication. Type: `bool` @@ -144,7 +147,7 @@ Default: `false` ### `basic_auth` -basic auth +Basic Authentication fields Type: `object` @@ -202,7 +205,7 @@ Default: `{}` # Examples values: - name: Alice + name: ${! json("name") } ``` ### `max_in_flight` From e2e8bafad1818fe327c1ba38690c3ef6b105e771 Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Wed, 2 Oct 2024 21:09:41 +0100 Subject: [PATCH 05/12] refactor tests Signed-off-by: Jem Davies --- internal/impl/cypher/output_cypher_test.go | 125 ++++++++++++++++----- 1 file changed, 100 insertions(+), 25 deletions(-) diff --git a/internal/impl/cypher/output_cypher_test.go b/internal/impl/cypher/output_cypher_test.go index 2112410e6..9000fa4e7 100644 --- a/internal/impl/cypher/output_cypher_test.go +++ b/internal/impl/cypher/output_cypher_test.go @@ -11,9 +11,11 @@ import ( "github.com/ory/dockertest/v3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/warpstreamlabs/bento/internal/component/output" "github.com/warpstreamlabs/bento/internal/component/testutil" "github.com/warpstreamlabs/bento/internal/manager/mock" "github.com/warpstreamlabs/bento/internal/message" + "github.com/warpstreamlabs/bento/public/service/integration" ) func setupNeo4j(env []string) (string, error) { @@ -38,35 +40,25 @@ func setupNeo4j(env []string) (string, error) { return neo4jDockerAddress, nil } -func TestCypherOutputNoAuth(t *testing.T) { - env := []string{"NEO4J_AUTH=none"} - localURL, err := setupNeo4j(env) - - require.NoError(t, err) - - confStr := fmt.Sprintf(` -cypher: - database: neo4j - url: %s - no_auth: true - query: | - CREATE (p:Person {name: $name}) RETURN p - values: - name: ${! json("name") } - batching: - count: 0 -`, localURL) +func createCypherOutputFromYaml(template string) (s output.Streamed, err error) { - conf, err := testutil.OutputFromYAML(confStr) - require.NoError(t, err) + conf, err := testutil.OutputFromYAML(template) + if err != nil { + return nil, err + } - s, err := mock.NewManager().NewOutput(conf) + s, err = mock.NewManager().NewOutput(conf) + if err != nil { + return nil, err + } - require.NoError(t, err) + return s, nil +} +func sendMessages(t *testing.T, s output.Streamed) { sendChan := make(chan message.Transaction) resChan := make(chan error) - if err = s.Consume(sendChan); err != nil { + if err := s.Consume(sendChan); err != nil { t.Fatal(err) } @@ -99,9 +91,18 @@ cypher: t.Fatal("Action timed out") } } +} + +func checkNeo4j(localURL string, basicAuth bool) (listOfNodeNames []string) { ctx := context.Background() - driver, _ := neo4j.NewDriverWithContext(localURL, neo4j.NoAuth()) + + var driver neo4j.DriverWithContext + if basicAuth { + driver, _ = neo4j.NewDriverWithContext(localURL, neo4j.BasicAuth("neo4j", "sparkling_brazilian_orange_456", "")) + } else { + driver, _ = neo4j.NewDriverWithContext(localURL, neo4j.NoAuth()) + } results, _ := neo4j.ExecuteQuery(ctx, driver, "match (n) return n", @@ -111,7 +112,6 @@ cypher: ) var listOfNamesFromDB []string - listOfNamesToCheck := []string{"Alice", "Bob", "Carol", "Dan"} for _, record := range results.Records { dictionary := record.AsMap() @@ -119,6 +119,81 @@ cypher: listOfNamesFromDB = append(listOfNamesFromDB, x.Props["name"].(string)) } + return listOfNamesFromDB +} + +func TestIntegrationCypherOutput(t *testing.T) { + integration.CheckSkip(t) + t.Parallel() + + t.Run("cypher_output_no_auth", func(t *testing.T) { + localURL, err := setupNeo4j([]string{"NEO4J_AUTH=none"}) + require.NoError(t, err) + testCypherOutputNoAuth(t, localURL) + }) + + t.Run("cypher_output_basic_auth", func(t *testing.T) { + localURL, err := setupNeo4j([]string{"NEO4J_AUTH=neo4j/sparkling_brazilian_orange_456"}) + require.NoError(t, err) + testCypherOutputBasicAuth(t, localURL) + }) +} + +func testCypherOutputNoAuth(t *testing.T, localURL string) { + template := fmt.Sprintf(` +cypher: + database: neo4j + url: %s + no_auth: true + query: | + CREATE (p:Person {name: $name}) RETURN p + values: + name: ${! json("name") } + batching: + count: 0 + `, localURL) + + s, err := createCypherOutputFromYaml(template) + require.NoError(t, err) + + // send the messages + sendMessages(&testing.T{}, s) + + // check output + listOfNamesFromDB := checkNeo4j(localURL, false) + + listOfNamesToCheck := []string{"Alice", "Bob", "Carol", "Dan"} + + assert.Equal(t, listOfNamesFromDB, listOfNamesToCheck) +} + +func testCypherOutputBasicAuth(t *testing.T, localURL string) { + template := fmt.Sprintf(` +cypher: + database: neo4j + url: %s + basic_auth: + user: neo4j + password: sparkling_brazilian_orange_456 + query: | + CREATE (p:Person {name: $name}) RETURN p + values: + name: ${! json("name") } + batching: + count: 0 + `, localURL) + + s, err := createCypherOutputFromYaml(template) + require.NoError(t, err) + + // send the messages + sendMessages(&testing.T{}, s) + + // check output + listOfNamesFromDB := checkNeo4j(localURL, true) + + listOfNamesToCheck := []string{"Alice", "Bob", "Carol", "Dan"} + assert.Equal(t, listOfNamesFromDB, listOfNamesToCheck) } From 5941b6eb5776956f6fa106f58718726c579e5aae Mon Sep 17 00:00:00 2001 From: Jem Davies Date: Wed, 2 Oct 2024 21:54:17 +0100 Subject: [PATCH 06/12] refactor tests, update for feedback on PR Signed-off-by: Jem Davies --- internal/impl/cypher/output_cypher.go | 85 +++++++++++++------ internal/impl/cypher/output_cypher_test.go | 94 +--------------------- website/docs/components/outputs/cypher.md | 30 +++++-- 3 files changed, 89 insertions(+), 120 deletions(-) diff --git a/internal/impl/cypher/output_cypher.go b/internal/impl/cypher/output_cypher.go index 40e91451c..b02ee21ce 100644 --- a/internal/impl/cypher/output_cypher.go +++ b/internal/impl/cypher/output_cypher.go @@ -10,35 +10,72 @@ import ( "github.com/warpstreamlabs/bento/public/service" ) +const ( + cypherDatabase = "database" + cypherURI = "uri" + cypherNoAuth = "no_auth" + cypherBasicAuth = "basic_auth" + cypherQuery = "query" + cypherValues = "values" + cypherMaxInFlight = "max_in_flight" + cypherBatching = "batching" + + // Basic Auth + cypherUser = "user" + cypherPassword = "password" + cypherRealm = "realm" +) + +var cypherOutputDescription string = ` +## Executes a Cypher Query + +The ` + "`" + `query` + "`" + ` field is expected to be a valid cypher query with 0 or more parameters with the $ syntax: +` + "```" + ` + query: CREATE (p:Person {name: $name, age: $age}) RETURN p +` + "```" + ` + +The ` + "`" + `values` + "`" + ` field is expected to be a map where the keys are equal to the parameters in the query, +and the values are strings (bloblang interpolations are allowed): + +` + "```" + ` + values: + name: ${! json("name") } + age: ${! json("age") } +` + "```" + func cypherOutputSpec() *service.ConfigSpec { spec := service.NewConfigSpec(). + Version("1.3.0"). + Categories("Services"). + Summary("Executes a Cypher Query"). + Description(cypherOutputDescription). Fields( - service.NewStringField("database"). + service.NewStringField(cypherDatabase). Description("The name of the database to connect to."). Example("neo4j"), - service.NewStringField("url"). + service.NewStringField(cypherURI). Description("The URL of the database engine."). Example("bolt://localhost:7687"), - service.NewBoolField("no_auth"). + service.NewBoolField(cypherNoAuth). Description("Set to true to connect without authentication."). Default(false), - service.NewObjectField("basic_auth", basicAuthSpec()...). + service.NewObjectField(cypherBasicAuth, basicAuthSpec()...). Description("Basic Authentication fields"), - service.NewStringField("query"). + service.NewStringField(cypherQuery). Description("The cypher query to execute."). Example("CREATE (p:Person {name: $name}) RETURN p"), - service.NewInterpolatedStringMapField("values"). + service.NewInterpolatedStringMapField(cypherValues). Description("A map of strings -> bloblang interpolations that form the values of the references in the query i.e. $name."). Default(map[string]any{}). Example(map[string]any{ "name": `${! json("name") }`, }), - service.NewIntField("max_in_flight"). + service.NewIntField(cypherMaxInFlight). Description("The maximum number of queries to run in parallel."). Default(64), ) - spec = spec.Field(service.NewBatchPolicyField("batching")). + spec = spec.Field(service.NewBatchPolicyField(cypherBatching)). Version("1.0.0"). Example("Create Node", ` @@ -47,7 +84,7 @@ Here we execute a cypher query that takes the value of $name from the interpolat output: cypher: database: neo4j - url: bolt://localhost:7687 + uri: bolt://localhost:7687 basic_auth: user: neo4j password: password @@ -64,14 +101,14 @@ output: func basicAuthSpec() []*service.ConfigField { return []*service.ConfigField{ - service.NewStringField("user"). + service.NewStringField(cypherUser). Default(""). Description("The username for basic auth."), - service.NewStringField("password"). + service.NewStringField(cypherPassword). Default(""). Secret(). Description("The password for basic auth."), - service.NewStringField("realm"). + service.NewStringField(cypherRealm). Default(""). Description("The realm for basic auth."), } @@ -99,7 +136,7 @@ func init() { type CypherOutput struct { database string - url string + uri string noAuth bool basicAuth CypherBasicAuth query string @@ -115,23 +152,23 @@ type CypherBasicAuth struct { } func NewCypherOutputFromConfig(conf *service.ParsedConfig, mgr *service.Resources) (*CypherOutput, error) { - database, err := conf.FieldString("database") + database, err := conf.FieldString(cypherDatabase) if err != nil { return nil, err } - url, err := conf.FieldString("url") + uri, err := conf.FieldString(cypherURI) if err != nil { return nil, err } - noAuth, err := conf.FieldBool("no_auth") + noAuth, err := conf.FieldBool(cypherNoAuth) if err != nil { return nil, err } - query, err := conf.FieldString("query") + query, err := conf.FieldString(cypherQuery) if err != nil { return nil, err } - values, err := conf.FieldInterpolatedStringMap("values") + values, err := conf.FieldInterpolatedStringMap(cypherValues) if err != nil { return nil, err } @@ -142,12 +179,12 @@ func NewCypherOutputFromConfig(conf *service.ParsedConfig, mgr *service.Resource } if !noAuth { - basicAuthMap, _ := conf.FieldStringMap("basic_auth") - basicAuth := CypherBasicAuth{user: basicAuthMap["user"], password: basicAuthMap["password"], realm: basicAuthMap["realm"]} - return &CypherOutput{database: database, url: url, noAuth: noAuth, basicAuth: basicAuth, query: query, values: values}, nil + basicAuthMap, _ := conf.FieldStringMap(cypherBasicAuth) + basicAuth := CypherBasicAuth{user: basicAuthMap[cypherUser], password: basicAuthMap[cypherPassword], realm: basicAuthMap[cypherRealm]} + return &CypherOutput{database: database, uri: uri, noAuth: noAuth, basicAuth: basicAuth, query: query, values: values}, nil } - return &CypherOutput{database: database, url: url, noAuth: noAuth, query: query, values: values}, nil + return &CypherOutput{database: database, uri: uri, noAuth: noAuth, query: query, values: values}, nil } func validateQueryAndValues(query string, values map[string]*service.InterpolatedString) error { @@ -176,9 +213,9 @@ func (cyp *CypherOutput) Connect(ctx context.Context) error { var err error if cyp.noAuth { - driver, err = neo4j.NewDriverWithContext(cyp.url, neo4j.NoAuth()) + driver, err = neo4j.NewDriverWithContext(cyp.uri, neo4j.NoAuth()) } else { - driver, err = neo4j.NewDriverWithContext(cyp.url, neo4j.BasicAuth(cyp.basicAuth.user, cyp.basicAuth.password, cyp.basicAuth.realm)) + driver, err = neo4j.NewDriverWithContext(cyp.uri, neo4j.BasicAuth(cyp.basicAuth.user, cyp.basicAuth.password, cyp.basicAuth.realm)) } if err != nil { diff --git a/internal/impl/cypher/output_cypher_test.go b/internal/impl/cypher/output_cypher_test.go index 9000fa4e7..9944d89b8 100644 --- a/internal/impl/cypher/output_cypher_test.go +++ b/internal/impl/cypher/output_cypher_test.go @@ -143,7 +143,7 @@ func testCypherOutputNoAuth(t *testing.T, localURL string) { template := fmt.Sprintf(` cypher: database: neo4j - url: %s + uri: %s no_auth: true query: | CREATE (p:Person {name: $name}) RETURN p @@ -171,7 +171,7 @@ func testCypherOutputBasicAuth(t *testing.T, localURL string) { template := fmt.Sprintf(` cypher: database: neo4j - url: %s + uri: %s basic_auth: user: neo4j password: sparkling_brazilian_orange_456 @@ -197,92 +197,6 @@ cypher: assert.Equal(t, listOfNamesFromDB, listOfNamesToCheck) } -func TestCypherOutputBasicAuth(t *testing.T) { - env := []string{"NEO4J_AUTH=neo4j/sparkling_brazilian_orange_456"} - localURL, err := setupNeo4j(env) - - require.NoError(t, err) - - confStr := fmt.Sprintf(` -cypher: - database: neo4j - url: %s - basic_auth: - user: neo4j - password: sparkling_brazilian_orange_456 - query: | - CREATE (p:Person {name: $name}) RETURN p - values: - name: ${! json("name") } - batching: - count: 0 -`, localURL) - - conf, err := testutil.OutputFromYAML(confStr) - require.NoError(t, err) - - s, err := mock.NewManager().NewOutput(conf) - - require.NoError(t, err) - - sendChan := make(chan message.Transaction) - resChan := make(chan error) - if err = s.Consume(sendChan); err != nil { - t.Fatal(err) - } - - t.Cleanup(func() { - s.TriggerCloseNow() - - ctx, done := context.WithTimeout(context.Background(), time.Second*10) - assert.NoError(t, s.WaitForClose(ctx)) - done() - }) - - inputs := []string{ - `{"name":"Alice"}`, `{"name":"Bob"}`, `{"name":"Carol"}`, `{"name":"Dan"}`, - } - - for _, input := range inputs { - testMsg := message.QuickBatch([][]byte{[]byte(input)}) - select { - case sendChan <- message.NewTransaction(testMsg, resChan): - case <-time.After(time.Second * 60): - t.Fatal("Action timed out") - } - - select { - case res := <-resChan: - if res != nil { - t.Fatal(res) - } - case <-time.After(time.Second * 60): - t.Fatal("Action timed out") - } - } - - ctx := context.Background() - driver, _ := neo4j.NewDriverWithContext(localURL, neo4j.BasicAuth("neo4j", "sparkling_brazilian_orange_456", "")) - - results, _ := neo4j.ExecuteQuery(ctx, driver, - "match (n) return n", - map[string]any{}, - neo4j.EagerResultTransformer, - neo4j.ExecuteQueryWithDatabase("neo4j"), - ) - - var listOfNamesFromDB []string - listOfNamesToCheck := []string{"Alice", "Bob", "Carol", "Dan"} - - for _, record := range results.Records { - dictionary := record.AsMap() - x := dictionary["n"].(dbtype.Node) - listOfNamesFromDB = append(listOfNamesFromDB, x.Props["name"].(string)) - } - - assert.Equal(t, listOfNamesFromDB, listOfNamesToCheck) -} - func TestCypherOutputMissingValue(t *testing.T) { env := []string{"NEO4J_AUTH=neo4j/sparkling_brazilian_orange_456"} localURL, err := setupNeo4j(env) @@ -292,7 +206,7 @@ func TestCypherOutputMissingValue(t *testing.T) { confStr := fmt.Sprintf(` cypher: database: neo4j - url: %s + uri: %s basic_auth: user: neo4j password: sparkling_brazilian_orange_456 @@ -322,7 +236,7 @@ func TestCypherOutputMissingParam(t *testing.T) { confStr := fmt.Sprintf(` cypher: database: neo4j - url: %s + uri: %s basic_auth: user: neo4j password: sparkling_brazilian_orange_456 diff --git a/website/docs/components/outputs/cypher.md b/website/docs/components/outputs/cypher.md index 45e6ed88c..795290b29 100644 --- a/website/docs/components/outputs/cypher.md +++ b/website/docs/components/outputs/cypher.md @@ -3,6 +3,7 @@ title: cypher slug: cypher type: output status: experimental +categories: ["Services"] ---