Skip to content

Commit

Permalink
add output cypher component
Browse files Browse the repository at this point in the history
Signed-off-by: Jem Davies <jemsot@gmail.com>
  • Loading branch information
jem-davies committed Sep 26, 2024
1 parent 431e8b5 commit 8ddfa45
Show file tree
Hide file tree
Showing 7 changed files with 458 additions and 0 deletions.
23 changes: 23 additions & 0 deletions cmd/bento/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
input:
stdin: {}

output:
cypher:
database: neo4j
uri: bolt://localhost:7687
query: |
CREATE (p:Person {name: $name}) RETURN p
values:
name: ${! json("name") }
batching:
count: 0
# byte_size: 0
# period: ""
# check: ""

## Return all nodes and relations:
# MATCH (n) OPTIONAL MATCH (n)-[r]->() RETURN n, r

## Delete all nodes and relations
# MATCH (n) DETACH DELETE n

1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ require (
github.com/nats-io/nats-server/v2 v2.9.23 // indirect
github.com/nats-io/nats-streaming-server v0.24.6 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/neo4j/neo4j-go-driver/v5 v5.24.0
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc5 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,8 @@ github.com/nats-io/stan.go v0.10.2/go.mod h1:vo2ax8K2IxaR3JtEMLZRFKIdoK/3o1/PKue
github.com/nats-io/stan.go v0.10.4 h1:19GS/eD1SeQJaVkeM9EkvEYattnvnWrZ3wkSWSw4uXw=
github.com/nats-io/stan.go v0.10.4/go.mod h1:3XJXH8GagrGqajoO/9+HgPyKV5MWsv7S5ccdda+pc6k=
github.com/ncw/swift v1.0.52/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM=
github.com/neo4j/neo4j-go-driver/v5 v5.24.0 h1:7MAFoB7L6f9heQUo/tJ5EnrrpVzm9ZBHgH8ew03h6Eo=
github.com/neo4j/neo4j-go-driver/v5 v5.24.0/go.mod h1:Vff8OwT7QpLm7L2yYr85XNWe9Rbqlbeb9asNXJTHO4k=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nsf/jsondiff v0.0.0-20210926074059-1e845ec5d249 h1:NHrXEjTNQY7P0Zfx1aMrNhpgxHmow66XQtm0aQLY0AE=
github.com/nsf/jsondiff v0.0.0-20210926074059-1e845ec5d249/go.mod h1:mpRZBD8SJ55OIICQ3iWH0Yz3cjzA61JdqMLoWXeB2+8=
Expand Down
155 changes: 155 additions & 0 deletions internal/impl/cypher/output_cypher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package cypher

import (
"context"

"github.com/neo4j/neo4j-go-driver/v5/neo4j"
"github.com/warpstreamlabs/bento/public/service"
)

func cypherOutputSpec() *service.ConfigSpec {
spec := service.NewConfigSpec().
Fields(
service.NewStringField("database").
Description("The name of the database to connect to.").
Example("neo4j"),
service.NewStringField("uri").
Description("The URI of the database.").
Example("bolt://localhost:7687"),
service.NewBoolField("noAuth").
Description("No Authentication currently implemented, defaults to true.").
Default(true),
service.NewStringField("query").
Description("The cypher query to execute.").
Example("CREATE (p:Person {name: $name}) RETURN p"),
service.NewInterpolatedStringMapField("values").
Description("A map of strings -> bloblang interpolations that form the values of the references in the query i.e. $name.").
Default(map[string]any{}).
Example(map[string]any{
"name": "Alice",
}),
service.NewIntField("max_in_flight").
Description("The maximum number of queries to run in parallel.").
Default(64),
)

spec = spec.Field(service.NewBatchPolicyField("batching")).
Version("1.0.0").
Example("Create Node",
`
Here we execute a cypher query that takes the value of $name from the interpolated field in the values map:`,
`
output:
cypher:
database: neo4j
uri: bolt://localhost:7687
query: |
CREATE (p:Person {name: $name}) RETURN p
values:
name: ${! json("name") }
batching:
count: 0
`,
)
return spec
}

func init() {
err := service.RegisterBatchOutput(
"cypher", cypherOutputSpec(),
func(conf *service.ParsedConfig, mgr *service.Resources) (out service.BatchOutput, batchPolicy service.BatchPolicy, maxInFlight int, err error) {
if batchPolicy, err = conf.FieldBatchPolicy("batching"); err != nil {
return
}
if maxInFlight, err = conf.FieldInt("max_in_flight"); err != nil {
return
}
out, err = NewCypherOutputFromConfig(conf, mgr)
return
})
if err != nil {
panic(err)
}
}

//----------------------------------------------------------------------------

type CypherOutput struct {
database string
uri string
noAuth bool
query string
values map[string]*service.InterpolatedString
driver neo4j.DriverWithContext
session neo4j.SessionWithContext
}

func NewCypherOutputFromConfig(conf *service.ParsedConfig, mgr *service.Resources) (*CypherOutput, error) {
database, err := conf.FieldString("database")
if err != nil {
return nil, err
}
uri, err := conf.FieldString("uri")
if err != nil {
return nil, err
}
noAuth, err := conf.FieldBool("noAuth")
if err != nil {
return nil, err
}
query, err := conf.FieldString("query")
if err != nil {
return nil, err
}
values, _ := conf.FieldInterpolatedStringMap("values")

return &CypherOutput{database: database, uri: uri, noAuth: noAuth, query: query, values: values}, nil

}

func (cyp *CypherOutput) Connect(ctx context.Context) error {

driver, err := neo4j.NewDriverWithContext(cyp.uri, neo4j.NoAuth())
if err != nil {
return err
}
cyp.driver = driver

session := driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
cyp.session = session

return nil
}

func (cyp *CypherOutput) WriteBatch(ctx context.Context, batch service.MessageBatch) error {

values := make(map[string]any)

for _, msgPart := range batch {

for k, v := range cyp.values {
values[k] = v.String(msgPart)
}

_, err := neo4j.ExecuteQuery(ctx, cyp.driver,
cyp.query,
values,
neo4j.EagerResultTransformer,
neo4j.ExecuteQueryWithDatabase(cyp.database),
)

if err != nil {
panic(err)
}

values = nil
}

return nil
}

func (cyp *CypherOutput) Close(ctx context.Context) error {
cyp.driver.Close(ctx)
cyp.session.Close(ctx)
return nil
}
1 change: 1 addition & 0 deletions public/components/all/package.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
_ "github.com/warpstreamlabs/bento/public/components/confluent"
_ "github.com/warpstreamlabs/bento/public/components/couchbase"
_ "github.com/warpstreamlabs/bento/public/components/crypto"
_ "github.com/warpstreamlabs/bento/public/components/cypher"
_ "github.com/warpstreamlabs/bento/public/components/dgraph"
_ "github.com/warpstreamlabs/bento/public/components/discord"
_ "github.com/warpstreamlabs/bento/public/components/elasticsearch"
Expand Down
6 changes: 6 additions & 0 deletions public/components/cypher/package.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package crypto

import (
// Bring in the internal plugin definitions.
_ "github.com/warpstreamlabs/bento/internal/impl/cypher"
)
Loading

0 comments on commit 8ddfa45

Please sign in to comment.