diff --git a/cmd/bento/config.yaml b/cmd/bento/config.yaml new file mode 100644 index 000000000..cd926ff50 --- /dev/null +++ b/cmd/bento/config.yaml @@ -0,0 +1,23 @@ +input: + stdin: {} + +output: + cypher: + database: neo4j + uri: bolt://localhost:7687 + query: | + CREATE (p:Person {name: $name}) RETURN p + values: + name: ${! json("name") } + batching: + count: 0 + # byte_size: 0 + # period: "" + # check: "" + +## Return all nodes and relations: +# MATCH (n) OPTIONAL MATCH (n)-[r]->() RETURN n, r + +## Delete all nodes and relations +# MATCH (n) DETACH DELETE n + diff --git a/go.mod b/go.mod index 9838201d4..951c549dc 100644 --- a/go.mod +++ b/go.mod @@ -296,6 +296,7 @@ require ( github.com/nats-io/nats-server/v2 v2.9.23 // indirect github.com/nats-io/nats-streaming-server v0.24.6 // indirect github.com/nats-io/nuid v1.0.1 // indirect + github.com/neo4j/neo4j-go-driver/v5 v5.24.0 github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0-rc5 // indirect diff --git a/go.sum b/go.sum index 6e66c1c06..a0a050b6c 100644 --- a/go.sum +++ b/go.sum @@ -932,6 +932,8 @@ github.com/nats-io/stan.go v0.10.2/go.mod h1:vo2ax8K2IxaR3JtEMLZRFKIdoK/3o1/PKue github.com/nats-io/stan.go v0.10.4 h1:19GS/eD1SeQJaVkeM9EkvEYattnvnWrZ3wkSWSw4uXw= github.com/nats-io/stan.go v0.10.4/go.mod h1:3XJXH8GagrGqajoO/9+HgPyKV5MWsv7S5ccdda+pc6k= github.com/ncw/swift v1.0.52/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM= +github.com/neo4j/neo4j-go-driver/v5 v5.24.0 h1:7MAFoB7L6f9heQUo/tJ5EnrrpVzm9ZBHgH8ew03h6Eo= +github.com/neo4j/neo4j-go-driver/v5 v5.24.0/go.mod h1:Vff8OwT7QpLm7L2yYr85XNWe9Rbqlbeb9asNXJTHO4k= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nsf/jsondiff v0.0.0-20210926074059-1e845ec5d249 h1:NHrXEjTNQY7P0Zfx1aMrNhpgxHmow66XQtm0aQLY0AE= github.com/nsf/jsondiff v0.0.0-20210926074059-1e845ec5d249/go.mod h1:mpRZBD8SJ55OIICQ3iWH0Yz3cjzA61JdqMLoWXeB2+8= diff --git a/internal/impl/cypher/output_cypher.go b/internal/impl/cypher/output_cypher.go new file mode 100644 index 000000000..2a1ca325e --- /dev/null +++ b/internal/impl/cypher/output_cypher.go @@ -0,0 +1,155 @@ +package cypher + +import ( + "context" + + "github.com/neo4j/neo4j-go-driver/v5/neo4j" + "github.com/warpstreamlabs/bento/public/service" +) + +func cypherOutputSpec() *service.ConfigSpec { + spec := service.NewConfigSpec(). + Fields( + service.NewStringField("database"). + Description("The name of the database to connect to."). + Example("neo4j"), + service.NewStringField("uri"). + Description("The URI of the database."). + Example("bolt://localhost:7687"), + service.NewBoolField("noAuth"). + Description("No Authentication currently implemented, defaults to true."). + Default(true), + service.NewStringField("query"). + Description("The cypher query to execute."). + Example("CREATE (p:Person {name: $name}) RETURN p"), + service.NewInterpolatedStringMapField("values"). + Description("A map of strings -> bloblang interpolations that form the values of the references in the query i.e. $name."). + Default(map[string]any{}). + Example(map[string]any{ + "name": "Alice", + }), + service.NewIntField("max_in_flight"). + Description("The maximum number of queries to run in parallel."). + Default(64), + ) + + spec = spec.Field(service.NewBatchPolicyField("batching")). + Version("1.0.0"). + Example("Create Node", + ` +Here we execute a cypher query that takes the value of $name from the interpolated field in the values map:`, + ` +output: + cypher: + database: neo4j + uri: bolt://localhost:7687 + query: | + CREATE (p:Person {name: $name}) RETURN p + values: + name: ${! json("name") } + batching: + count: 0 +`, + ) + return spec +} + +func init() { + err := service.RegisterBatchOutput( + "cypher", cypherOutputSpec(), + func(conf *service.ParsedConfig, mgr *service.Resources) (out service.BatchOutput, batchPolicy service.BatchPolicy, maxInFlight int, err error) { + if batchPolicy, err = conf.FieldBatchPolicy("batching"); err != nil { + return + } + if maxInFlight, err = conf.FieldInt("max_in_flight"); err != nil { + return + } + out, err = NewCypherOutputFromConfig(conf, mgr) + return + }) + if err != nil { + panic(err) + } +} + +//---------------------------------------------------------------------------- + +type CypherOutput struct { + database string + uri string + noAuth bool + query string + values map[string]*service.InterpolatedString + driver neo4j.DriverWithContext + session neo4j.SessionWithContext +} + +func NewCypherOutputFromConfig(conf *service.ParsedConfig, mgr *service.Resources) (*CypherOutput, error) { + database, err := conf.FieldString("database") + if err != nil { + return nil, err + } + uri, err := conf.FieldString("uri") + if err != nil { + return nil, err + } + noAuth, err := conf.FieldBool("noAuth") + if err != nil { + return nil, err + } + query, err := conf.FieldString("query") + if err != nil { + return nil, err + } + values, _ := conf.FieldInterpolatedStringMap("values") + + return &CypherOutput{database: database, uri: uri, noAuth: noAuth, query: query, values: values}, nil + +} + +func (cyp *CypherOutput) Connect(ctx context.Context) error { + + driver, err := neo4j.NewDriverWithContext(cyp.uri, neo4j.NoAuth()) + if err != nil { + return err + } + cyp.driver = driver + + session := driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite}) + cyp.session = session + + return nil +} + +func (cyp *CypherOutput) WriteBatch(ctx context.Context, batch service.MessageBatch) error { + + values := make(map[string]any) + + for _, msgPart := range batch { + + for k, v := range cyp.values { + values[k] = v.String(msgPart) + } + + _, err := neo4j.ExecuteQuery(ctx, cyp.driver, + cyp.query, + values, + neo4j.EagerResultTransformer, + neo4j.ExecuteQueryWithDatabase(cyp.database), + ) + + if err != nil { + panic(err) + } + + values = nil + } + + return nil +} + +func (cyp *CypherOutput) Close(ctx context.Context) error { + cyp.driver.Close(ctx) + cyp.session.Close(ctx) + return nil +} diff --git a/public/components/all/package.go b/public/components/all/package.go index 9abaceeb1..b819e09e5 100644 --- a/public/components/all/package.go +++ b/public/components/all/package.go @@ -17,6 +17,7 @@ import ( _ "github.com/warpstreamlabs/bento/public/components/confluent" _ "github.com/warpstreamlabs/bento/public/components/couchbase" _ "github.com/warpstreamlabs/bento/public/components/crypto" + _ "github.com/warpstreamlabs/bento/public/components/cypher" _ "github.com/warpstreamlabs/bento/public/components/dgraph" _ "github.com/warpstreamlabs/bento/public/components/discord" _ "github.com/warpstreamlabs/bento/public/components/elasticsearch" diff --git a/public/components/cypher/package.go b/public/components/cypher/package.go new file mode 100644 index 000000000..e388adca7 --- /dev/null +++ b/public/components/cypher/package.go @@ -0,0 +1,6 @@ +package crypto + +import ( + // Bring in the internal plugin definitions. + _ "github.com/warpstreamlabs/bento/internal/impl/cypher" +) diff --git a/website/docs/components/outputs/cypher.md b/website/docs/components/outputs/cypher.md new file mode 100644 index 000000000..98134769b --- /dev/null +++ b/website/docs/components/outputs/cypher.md @@ -0,0 +1,270 @@ +--- +title: cypher +slug: cypher +type: output +status: experimental +--- + + + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +:::caution EXPERIMENTAL +This component is experimental and therefore subject to change or removal outside of major version releases. +::: + +Introduced in version 1.0.0. + + + + + + +```yml +# Common config fields, showing default values +output: + label: "" + cypher: + database: neo4j # No default (required) + uri: bolt://localhost:7687 # No default (required) + noAuth: true + query: 'CREATE (p:Person {name: $name}) RETURN p' # No default (required) + values: {} + max_in_flight: 64 + batching: + count: 0 + byte_size: 0 + period: "" + check: "" +``` + + + + +```yml +# All config fields, showing default values +output: + label: "" + cypher: + database: neo4j # No default (required) + uri: bolt://localhost:7687 # No default (required) + noAuth: true + query: 'CREATE (p:Person {name: $name}) RETURN p' # No default (required) + values: {} + max_in_flight: 64 + batching: + count: 0 + byte_size: 0 + period: "" + check: "" + processors: [] # No default (optional) +``` + + + + +## Examples + + + + + + +Here we execute a cypher query that takes the value of $name from the interpolated field in the values map: + +```yaml +output: + cypher: + database: neo4j + uri: bolt://localhost:7687 + query: | + CREATE (p:Person {name: $name}) RETURN p + values: + name: ${! json("name") } + batching: + count: 0 +``` + + + + +## Fields + +### `database` + +The name of the database to connect to. + + +Type: `string` + +```yml +# Examples + +database: neo4j +``` + +### `uri` + +The URI of the database. + + +Type: `string` + +```yml +# Examples + +uri: bolt://localhost:7687 +``` + +### `noAuth` + +No Authentication currently implemented, defaults to true. + + +Type: `bool` +Default: `true` + +### `query` + +The cypher query to execute. + + +Type: `string` + +```yml +# Examples + +query: 'CREATE (p:Person {name: $name}) RETURN p' +``` + +### `values` + +A map of strings -> bloblang interpolations that form the values of the references in the query i.e. $name. +This field supports [interpolation functions](/docs/configuration/interpolation#bloblang-queries). + + +Type: `object` +Default: `{}` + +```yml +# Examples + +values: + name: Alice +``` + +### `max_in_flight` + +The maximum number of queries to run in parallel. + + +Type: `int` +Default: `64` + +### `batching` + +Allows you to configure a [batching policy](/docs/configuration/batching). + + +Type: `object` + +```yml +# Examples + +batching: + byte_size: 5000 + count: 0 + period: 1s + +batching: + count: 10 + period: 1s + +batching: + check: this.contains("END BATCH") + count: 0 + period: 1m +``` + +### `batching.count` + +A number of messages at which the batch should be flushed. If `0` disables count based batching. + + +Type: `int` +Default: `0` + +### `batching.byte_size` + +An amount of bytes at which the batch should be flushed. If `0` disables size based batching. + + +Type: `int` +Default: `0` + +### `batching.period` + +A period in which an incomplete batch should be flushed regardless of its size. + + +Type: `string` +Default: `""` + +```yml +# Examples + +period: 1s + +period: 1m + +period: 500ms +``` + +### `batching.check` + +A [Bloblang query](/docs/guides/bloblang/about/) that should return a boolean value indicating whether a message should end a batch. + + +Type: `string` +Default: `""` + +```yml +# Examples + +check: this.type == "end_of_transaction" +``` + +### `batching.processors` + +A list of [processors](/docs/components/processors/about) to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. + + +Type: `array` + +```yml +# Examples + +processors: + - archive: + format: concatenate + +processors: + - archive: + format: lines + +processors: + - archive: + format: json_array +``` + +