Skip to content

Commit

Permalink
add output cypher component (#118)
Browse files Browse the repository at this point in the history
* add output cypher component

Signed-off-by: Jem Davies <jemsot@gmail.com>
  • Loading branch information
jem-davies committed Nov 7, 2024
1 parent fbf2827 commit d16dcd5
Show file tree
Hide file tree
Showing 7 changed files with 925 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ require (
github.com/nats-io/nats-server/v2 v2.9.23 // indirect
github.com/nats-io/nats-streaming-server v0.24.6 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/neo4j/neo4j-go-driver/v5 v5.24.0
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc5 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1581,6 +1581,8 @@ github.com/nats-io/stan.go v0.10.2/go.mod h1:vo2ax8K2IxaR3JtEMLZRFKIdoK/3o1/PKue
github.com/nats-io/stan.go v0.10.4 h1:19GS/eD1SeQJaVkeM9EkvEYattnvnWrZ3wkSWSw4uXw=
github.com/nats-io/stan.go v0.10.4/go.mod h1:3XJXH8GagrGqajoO/9+HgPyKV5MWsv7S5ccdda+pc6k=
github.com/ncw/swift v1.0.52/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM=
github.com/neo4j/neo4j-go-driver/v5 v5.24.0 h1:7MAFoB7L6f9heQUo/tJ5EnrrpVzm9ZBHgH8ew03h6Eo=
github.com/neo4j/neo4j-go-driver/v5 v5.24.0/go.mod h1:Vff8OwT7QpLm7L2yYr85XNWe9Rbqlbeb9asNXJTHO4k=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nsf/jsondiff v0.0.0-20210926074059-1e845ec5d249 h1:NHrXEjTNQY7P0Zfx1aMrNhpgxHmow66XQtm0aQLY0AE=
github.com/nsf/jsondiff v0.0.0-20210926074059-1e845ec5d249/go.mod h1:mpRZBD8SJ55OIICQ3iWH0Yz3cjzA61JdqMLoWXeB2+8=
Expand Down
263 changes: 263 additions & 0 deletions internal/impl/cypher/output_cypher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
package cypher

import (
"context"
"fmt"
"regexp"
"strings"

"github.com/neo4j/neo4j-go-driver/v5/neo4j"
"github.com/warpstreamlabs/bento/public/service"
)

const (
cypherDatabase = "database"
cypherURI = "uri"
cypherNoAuth = "no_auth"
cypherBasicAuth = "basic_auth"
cypherQuery = "query"
cypherValues = "values"
cypherMaxInFlight = "max_in_flight"
cypherBatching = "batching"

// Basic Auth
cypherUser = "user"
cypherPassword = "password"
cypherRealm = "realm"
)

var cypherOutputDescription string = `
## Executes a Cypher Query
The ` + "`" + `query` + "`" + ` field is expected to be a valid cypher query with 0 or more parameters with the ` + "`" + `$` + "`" + ` syntax:
` + "```" + `
query: CREATE (p:Person {name: $name, age: $age}) RETURN p
` + "```" + `
The ` + "`" + `values` + "`" + ` field is expected to be a map where the keys are equal to the parameters in the query,
and the values are strings (bloblang interpolations are allowed):
` + "```" + `
values:
name: ${! json("name") }
age: ${! json("age") }
` + "```"

func cypherOutputSpec() *service.ConfigSpec {
spec := service.NewConfigSpec().
Version("1.4.0").
Categories("Services").
Summary("Executes a Cypher Query").
Description(cypherOutputDescription).
Fields(
service.NewStringField(cypherDatabase).
Description("The name of the database to connect to.").
Example("neo4j"),
service.NewStringField(cypherURI).
Description("The URL of the database engine.").
Example("bolt://localhost:7687"),
service.NewBoolField(cypherNoAuth).
Description("Set to true to connect without authentication.").
Default(false),
service.NewObjectField(cypherBasicAuth, basicAuthSpec()...).
Description("Basic Authentication fields"),
service.NewStringField(cypherQuery).
Description("The cypher query to execute.").
Example("CREATE (p:Person {name: $name}) RETURN p"),
service.NewInterpolatedStringMapField(cypherValues).
Description("A map of strings -> bloblang interpolations that form the values of the references in the query i.e. $name.").
Default(map[string]any{}).
Example(map[string]any{
"name": `${! json("name") }`,
}),
service.NewIntField(cypherMaxInFlight).
Description("The maximum number of queries to run in parallel.").
Default(64),
)

spec = spec.Field(service.NewBatchPolicyField(cypherBatching)).
Example("Create Node",
`
Here we execute a cypher query that takes the value of $name from the interpolated field in the values map:`,
`
output:
cypher:
database: neo4j
uri: bolt://localhost:7687
basic_auth:
user: neo4j
password: password
query: |
CREATE (p:Person {name: $name}) RETURN p
values:
name: ${! json("name") }
batching:
count: 0
`,
)
return spec
}

func basicAuthSpec() []*service.ConfigField {
return []*service.ConfigField{
service.NewStringField(cypherUser).
Default("").
Description("The username for basic auth."),
service.NewStringField(cypherPassword).
Default("").
Secret().
Description("The password for basic auth."),
service.NewStringField(cypherRealm).
Default("").
Description("The realm for basic auth."),
}
}

func init() {
err := service.RegisterBatchOutput(
"cypher", cypherOutputSpec(),
func(conf *service.ParsedConfig, mgr *service.Resources) (out service.BatchOutput, batchPolicy service.BatchPolicy, maxInFlight int, err error) {
if batchPolicy, err = conf.FieldBatchPolicy("batching"); err != nil {
return
}
if maxInFlight, err = conf.FieldInt("max_in_flight"); err != nil {
return
}
out, err = NewCypherOutputFromConfig(conf, mgr)
return
})
if err != nil {
panic(err)
}
}

//----------------------------------------------------------------------------

type CypherOutput struct {
database string
uri string
noAuth bool
basicAuth CypherBasicAuth
query string
values map[string]*service.InterpolatedString
driver neo4j.DriverWithContext
session neo4j.SessionWithContext
}

type CypherBasicAuth struct {
user string
password string
realm string
}

func NewCypherOutputFromConfig(conf *service.ParsedConfig, mgr *service.Resources) (*CypherOutput, error) {
database, err := conf.FieldString(cypherDatabase)
if err != nil {
return nil, err
}
uri, err := conf.FieldString(cypherURI)
if err != nil {
return nil, err
}
noAuth, err := conf.FieldBool(cypherNoAuth)
if err != nil {
return nil, err
}
query, err := conf.FieldString(cypherQuery)
if err != nil {
return nil, err
}
values, err := conf.FieldInterpolatedStringMap(cypherValues)
if err != nil {
return nil, err
}

err = validateQueryAndValues(query, values)
if err != nil {
return nil, err
}

if !noAuth {
basicAuthMap, _ := conf.FieldStringMap(cypherBasicAuth)
basicAuth := CypherBasicAuth{user: basicAuthMap[cypherUser], password: basicAuthMap[cypherPassword], realm: basicAuthMap[cypherRealm]}
return &CypherOutput{database: database, uri: uri, noAuth: noAuth, basicAuth: basicAuth, query: query, values: values}, nil
}

return &CypherOutput{database: database, uri: uri, noAuth: noAuth, query: query, values: values}, nil
}

func validateQueryAndValues(query string, values map[string]*service.InterpolatedString) error {

for k := range values {
if !strings.Contains(query, "$"+k) {
return fmt.Errorf("value key: $%s, not found in query: %s", k, query)
}
}

re := regexp.MustCompile(`\$\b[a-zA-Z][a-zA-Z0-9]*\b`)
extractedVariables := re.FindAllString(query, -1)

for _, param := range extractedVariables {
if _, ok := values[param[1:]]; !ok {
return fmt.Errorf("query parameter: %s, not found in value keys", param)
}
}

return nil
}

func (cyp *CypherOutput) Connect(ctx context.Context) error {

var driver neo4j.DriverWithContext
var err error

if cyp.noAuth {
driver, err = neo4j.NewDriverWithContext(cyp.uri, neo4j.NoAuth())
} else {
driver, err = neo4j.NewDriverWithContext(cyp.uri, neo4j.BasicAuth(cyp.basicAuth.user, cyp.basicAuth.password, cyp.basicAuth.realm))
}

if err != nil {
return err
}

cyp.driver = driver

session := driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
cyp.session = session

return nil
}

func (cyp *CypherOutput) WriteBatch(ctx context.Context, batch service.MessageBatch) error {

values := make([]map[string]any, len(batch))

for i, msgPart := range batch {

values[i] = make(map[string]any)

for k, v := range cyp.values {
values[i][k] = v.String(msgPart)
}

_, err := neo4j.ExecuteQuery(ctx, cyp.driver,
cyp.query,
values[i],
neo4j.EagerResultTransformer,
neo4j.ExecuteQueryWithDatabase(cyp.database),
)

if err != nil {
return err
}
}

return nil
}

func (cyp *CypherOutput) Close(ctx context.Context) error {
cyp.driver.Close(ctx)
cyp.session.Close(ctx)
return nil
}
Loading

0 comments on commit d16dcd5

Please sign in to comment.