From d16dcd5f521cbddee6fb9219228af4ff0c58e059 Mon Sep 17 00:00:00 2001 From: Jem Davies <131159520+jem-davies@users.noreply.github.com> Date: Wed, 6 Nov 2024 00:06:42 +0000 Subject: [PATCH] add output cypher component (#118) * add output cypher component Signed-off-by: Jem Davies --- go.mod | 1 + go.sum | 2 + internal/impl/cypher/output_cypher.go | 263 ++++++++++++++++ internal/impl/cypher/output_cypher_test.go | 319 ++++++++++++++++++++ public/components/all/package.go | 1 + public/components/cypher/package.go | 6 + website/docs/components/outputs/cypher.md | 333 +++++++++++++++++++++ 7 files changed, 925 insertions(+) create mode 100644 internal/impl/cypher/output_cypher.go create mode 100644 internal/impl/cypher/output_cypher_test.go create mode 100644 public/components/cypher/package.go create mode 100644 website/docs/components/outputs/cypher.md diff --git a/go.mod b/go.mod index bfc69d357..43d461a48 100644 --- a/go.mod +++ b/go.mod @@ -311,6 +311,7 @@ require ( github.com/nats-io/nats-server/v2 v2.9.23 // indirect github.com/nats-io/nats-streaming-server v0.24.6 // indirect github.com/nats-io/nuid v1.0.1 // indirect + github.com/neo4j/neo4j-go-driver/v5 v5.24.0 github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0-rc5 // indirect diff --git a/go.sum b/go.sum index ffb5e77fd..70bd57b55 100644 --- a/go.sum +++ b/go.sum @@ -1581,6 +1581,8 @@ github.com/nats-io/stan.go v0.10.2/go.mod h1:vo2ax8K2IxaR3JtEMLZRFKIdoK/3o1/PKue github.com/nats-io/stan.go v0.10.4 h1:19GS/eD1SeQJaVkeM9EkvEYattnvnWrZ3wkSWSw4uXw= github.com/nats-io/stan.go v0.10.4/go.mod h1:3XJXH8GagrGqajoO/9+HgPyKV5MWsv7S5ccdda+pc6k= github.com/ncw/swift v1.0.52/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM= +github.com/neo4j/neo4j-go-driver/v5 v5.24.0 h1:7MAFoB7L6f9heQUo/tJ5EnrrpVzm9ZBHgH8ew03h6Eo= +github.com/neo4j/neo4j-go-driver/v5 v5.24.0/go.mod h1:Vff8OwT7QpLm7L2yYr85XNWe9Rbqlbeb9asNXJTHO4k= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nsf/jsondiff v0.0.0-20210926074059-1e845ec5d249 h1:NHrXEjTNQY7P0Zfx1aMrNhpgxHmow66XQtm0aQLY0AE= github.com/nsf/jsondiff v0.0.0-20210926074059-1e845ec5d249/go.mod h1:mpRZBD8SJ55OIICQ3iWH0Yz3cjzA61JdqMLoWXeB2+8= diff --git a/internal/impl/cypher/output_cypher.go b/internal/impl/cypher/output_cypher.go new file mode 100644 index 000000000..99fa6881a --- /dev/null +++ b/internal/impl/cypher/output_cypher.go @@ -0,0 +1,263 @@ +package cypher + +import ( + "context" + "fmt" + "regexp" + "strings" + + "github.com/neo4j/neo4j-go-driver/v5/neo4j" + "github.com/warpstreamlabs/bento/public/service" +) + +const ( + cypherDatabase = "database" + cypherURI = "uri" + cypherNoAuth = "no_auth" + cypherBasicAuth = "basic_auth" + cypherQuery = "query" + cypherValues = "values" + cypherMaxInFlight = "max_in_flight" + cypherBatching = "batching" + + // Basic Auth + cypherUser = "user" + cypherPassword = "password" + cypherRealm = "realm" +) + +var cypherOutputDescription string = ` +## Executes a Cypher Query + +The ` + "`" + `query` + "`" + ` field is expected to be a valid cypher query with 0 or more parameters with the ` + "`" + `$` + "`" + ` syntax: +` + "```" + ` + query: CREATE (p:Person {name: $name, age: $age}) RETURN p +` + "```" + ` + +The ` + "`" + `values` + "`" + ` field is expected to be a map where the keys are equal to the parameters in the query, +and the values are strings (bloblang interpolations are allowed): + +` + "```" + ` + values: + name: ${! json("name") } + age: ${! json("age") } +` + "```" + +func cypherOutputSpec() *service.ConfigSpec { + spec := service.NewConfigSpec(). + Version("1.4.0"). + Categories("Services"). + Summary("Executes a Cypher Query"). + Description(cypherOutputDescription). + Fields( + service.NewStringField(cypherDatabase). + Description("The name of the database to connect to."). + Example("neo4j"), + service.NewStringField(cypherURI). + Description("The URL of the database engine."). + Example("bolt://localhost:7687"), + service.NewBoolField(cypherNoAuth). + Description("Set to true to connect without authentication."). + Default(false), + service.NewObjectField(cypherBasicAuth, basicAuthSpec()...). + Description("Basic Authentication fields"), + service.NewStringField(cypherQuery). + Description("The cypher query to execute."). + Example("CREATE (p:Person {name: $name}) RETURN p"), + service.NewInterpolatedStringMapField(cypherValues). + Description("A map of strings -> bloblang interpolations that form the values of the references in the query i.e. $name."). + Default(map[string]any{}). + Example(map[string]any{ + "name": `${! json("name") }`, + }), + service.NewIntField(cypherMaxInFlight). + Description("The maximum number of queries to run in parallel."). + Default(64), + ) + + spec = spec.Field(service.NewBatchPolicyField(cypherBatching)). + Example("Create Node", + ` +Here we execute a cypher query that takes the value of $name from the interpolated field in the values map:`, + ` +output: + cypher: + database: neo4j + uri: bolt://localhost:7687 + basic_auth: + user: neo4j + password: password + query: | + CREATE (p:Person {name: $name}) RETURN p + values: + name: ${! json("name") } + batching: + count: 0 +`, + ) + return spec +} + +func basicAuthSpec() []*service.ConfigField { + return []*service.ConfigField{ + service.NewStringField(cypherUser). + Default(""). + Description("The username for basic auth."), + service.NewStringField(cypherPassword). + Default(""). + Secret(). + Description("The password for basic auth."), + service.NewStringField(cypherRealm). + Default(""). + Description("The realm for basic auth."), + } +} + +func init() { + err := service.RegisterBatchOutput( + "cypher", cypherOutputSpec(), + func(conf *service.ParsedConfig, mgr *service.Resources) (out service.BatchOutput, batchPolicy service.BatchPolicy, maxInFlight int, err error) { + if batchPolicy, err = conf.FieldBatchPolicy("batching"); err != nil { + return + } + if maxInFlight, err = conf.FieldInt("max_in_flight"); err != nil { + return + } + out, err = NewCypherOutputFromConfig(conf, mgr) + return + }) + if err != nil { + panic(err) + } +} + +//---------------------------------------------------------------------------- + +type CypherOutput struct { + database string + uri string + noAuth bool + basicAuth CypherBasicAuth + query string + values map[string]*service.InterpolatedString + driver neo4j.DriverWithContext + session neo4j.SessionWithContext +} + +type CypherBasicAuth struct { + user string + password string + realm string +} + +func NewCypherOutputFromConfig(conf *service.ParsedConfig, mgr *service.Resources) (*CypherOutput, error) { + database, err := conf.FieldString(cypherDatabase) + if err != nil { + return nil, err + } + uri, err := conf.FieldString(cypherURI) + if err != nil { + return nil, err + } + noAuth, err := conf.FieldBool(cypherNoAuth) + if err != nil { + return nil, err + } + query, err := conf.FieldString(cypherQuery) + if err != nil { + return nil, err + } + values, err := conf.FieldInterpolatedStringMap(cypherValues) + if err != nil { + return nil, err + } + + err = validateQueryAndValues(query, values) + if err != nil { + return nil, err + } + + if !noAuth { + basicAuthMap, _ := conf.FieldStringMap(cypherBasicAuth) + basicAuth := CypherBasicAuth{user: basicAuthMap[cypherUser], password: basicAuthMap[cypherPassword], realm: basicAuthMap[cypherRealm]} + return &CypherOutput{database: database, uri: uri, noAuth: noAuth, basicAuth: basicAuth, query: query, values: values}, nil + } + + return &CypherOutput{database: database, uri: uri, noAuth: noAuth, query: query, values: values}, nil +} + +func validateQueryAndValues(query string, values map[string]*service.InterpolatedString) error { + + for k := range values { + if !strings.Contains(query, "$"+k) { + return fmt.Errorf("value key: $%s, not found in query: %s", k, query) + } + } + + re := regexp.MustCompile(`\$\b[a-zA-Z][a-zA-Z0-9]*\b`) + extractedVariables := re.FindAllString(query, -1) + + for _, param := range extractedVariables { + if _, ok := values[param[1:]]; !ok { + return fmt.Errorf("query parameter: %s, not found in value keys", param) + } + } + + return nil +} + +func (cyp *CypherOutput) Connect(ctx context.Context) error { + + var driver neo4j.DriverWithContext + var err error + + if cyp.noAuth { + driver, err = neo4j.NewDriverWithContext(cyp.uri, neo4j.NoAuth()) + } else { + driver, err = neo4j.NewDriverWithContext(cyp.uri, neo4j.BasicAuth(cyp.basicAuth.user, cyp.basicAuth.password, cyp.basicAuth.realm)) + } + + if err != nil { + return err + } + + cyp.driver = driver + + session := driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite}) + cyp.session = session + + return nil +} + +func (cyp *CypherOutput) WriteBatch(ctx context.Context, batch service.MessageBatch) error { + + values := make([]map[string]any, len(batch)) + + for i, msgPart := range batch { + + values[i] = make(map[string]any) + + for k, v := range cyp.values { + values[i][k] = v.String(msgPart) + } + + _, err := neo4j.ExecuteQuery(ctx, cyp.driver, + cyp.query, + values[i], + neo4j.EagerResultTransformer, + neo4j.ExecuteQueryWithDatabase(cyp.database), + ) + + if err != nil { + return err + } + } + + return nil +} + +func (cyp *CypherOutput) Close(ctx context.Context) error { + cyp.driver.Close(ctx) + cyp.session.Close(ctx) + return nil +} diff --git a/internal/impl/cypher/output_cypher_test.go b/internal/impl/cypher/output_cypher_test.go new file mode 100644 index 000000000..df3270505 --- /dev/null +++ b/internal/impl/cypher/output_cypher_test.go @@ -0,0 +1,319 @@ +package cypher + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/neo4j/neo4j-go-driver/v5/neo4j" + "github.com/neo4j/neo4j-go-driver/v5/neo4j/dbtype" + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/warpstreamlabs/bento/internal/component/output" + "github.com/warpstreamlabs/bento/internal/component/testutil" + "github.com/warpstreamlabs/bento/internal/manager/mock" + "github.com/warpstreamlabs/bento/internal/message" + "github.com/warpstreamlabs/bento/public/service/integration" +) + +func setupNeo4j(t *testing.T, env []string) string { + pool, err := dockertest.NewPool("") + require.NoError(t, err) + + pool.MaxWait = time.Second * 60 + + resource, err := pool.RunWithOptions(&dockertest.RunOptions{ + Repository: "neo4j", + Tag: "latest", + Env: env, + }) + require.NoError(t, err) + + neo4jDockerAddress := fmt.Sprintf("bolt://localhost:%s", resource.GetPort("7687/tcp")) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, pool.Purge(resource)) + }) + + return neo4jDockerAddress +} + +func createCypherOutputFromYaml(template string) (s output.Streamed, err error) { + + conf, err := testutil.OutputFromYAML(template) + if err != nil { + return nil, err + } + + s, err = mock.NewManager().NewOutput(conf) + if err != nil { + return nil, err + } + + return s, nil +} + +func sendMessages(t *testing.T, s output.Streamed) { + sendChan := make(chan message.Transaction) + resChan := make(chan error) + if err := s.Consume(sendChan); err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + s.TriggerCloseNow() + + ctx, done := context.WithTimeout(context.Background(), time.Second*10) + assert.NoError(t, s.WaitForClose(ctx)) + done() + }) + + inputs := []string{ + `{"name":"Alice"}`, `{"name":"Bob"}`, `{"name":"Carol"}`, `{"name":"Dan"}`, + } + + for _, input := range inputs { + testMsg := message.QuickBatch([][]byte{[]byte(input)}) + select { + case sendChan <- message.NewTransaction(testMsg, resChan): + case <-time.After(time.Second * 60): + t.Fatal("Action timed out") + } + + select { + case res := <-resChan: + if res != nil { + t.Fatal(res) + } + case <-time.After(time.Second * 60): + t.Fatal("Action timed out") + } + } +} + +func sendBatches(t *testing.T, s output.Streamed) { + sendChan := make(chan message.Transaction) + resChan := make(chan error) + if err := s.Consume(sendChan); err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + s.TriggerCloseNow() + + ctx, done := context.WithTimeout(context.Background(), time.Second*10) + assert.NoError(t, s.WaitForClose(ctx)) + done() + }) + + inputs := [][]byte{ + []byte(`{"name":"Alice"}`), []byte(`{"name":"Bob"}`), + []byte(`{"name":"Carol"}`), []byte(`{"name":"Dan"}`), + } + + for i := 0; i < len(inputs); i += 2 { + testMsg := message.QuickBatch([][]byte{inputs[i], inputs[i+1]}) + select { + case sendChan <- message.NewTransaction(testMsg, resChan): + case <-time.After(time.Second * 60): + t.Fatal("Action timed out") + } + + select { + case res := <-resChan: + if res != nil { + t.Fatal(res) + } + case <-time.After(time.Second * 60): + t.Fatal("Action timed out") + } + } +} + +func checkNeo4j(localURL string, basicAuth bool) (listOfNodeNames []string) { + + ctx := context.Background() + + var driver neo4j.DriverWithContext + if basicAuth { + driver, _ = neo4j.NewDriverWithContext(localURL, neo4j.BasicAuth("neo4j", "sparkling_brazilian_orange_456", "")) + } else { + driver, _ = neo4j.NewDriverWithContext(localURL, neo4j.NoAuth()) + } + + results, _ := neo4j.ExecuteQuery(ctx, driver, + "match (n) return n", + map[string]any{}, + neo4j.EagerResultTransformer, + neo4j.ExecuteQueryWithDatabase("neo4j"), + ) + + var listOfNamesFromDB []string + + for _, record := range results.Records { + recordMap := record.AsMap() + node := recordMap["n"].(dbtype.Node) + listOfNamesFromDB = append(listOfNamesFromDB, node.Props["name"].(string)) + } + + return listOfNamesFromDB +} + +func TestIntegrationCypherOutput(t *testing.T) { + integration.CheckSkip(t) + t.Parallel() + + t.Run("cypher_output_no_auth", func(t *testing.T) { + localURL := setupNeo4j(t, []string{"NEO4J_AUTH=none"}) + testCypherOutputNoAuth(t, localURL) + }) + + t.Run("cypher_output_basic_auth", func(t *testing.T) { + localURL := setupNeo4j(t, []string{"NEO4J_AUTH=neo4j/sparkling_brazilian_orange_456"}) + testCypherOutputBasicAuth(t, localURL) + }) + + t.Run("cypher_output_with_batching", func(t *testing.T) { + localURL := setupNeo4j(t, []string{"NEO4J_AUTH=none"}) + testCypherOutputWithBatching(t, localURL) + }) +} + +func testCypherOutputNoAuth(t *testing.T, localURL string) { + template := fmt.Sprintf(` +cypher: + database: neo4j + uri: %s + no_auth: true + query: | + CREATE (p:Person {name: $name}) RETURN p + values: + name: ${! json("name") } + batching: + count: 0 + `, localURL) + + s, err := createCypherOutputFromYaml(template) + require.NoError(t, err) + + sendMessages(t, s) + + // check output + listOfNamesFromDB := checkNeo4j(localURL, false) + listOfNamesToCheck := []string{"Alice", "Bob", "Carol", "Dan"} + + assert.Equal(t, listOfNamesToCheck, listOfNamesFromDB) +} + +func testCypherOutputBasicAuth(t *testing.T, localURL string) { + template := fmt.Sprintf(` +cypher: + database: neo4j + uri: %s + basic_auth: + user: neo4j + password: sparkling_brazilian_orange_456 + query: | + CREATE (p:Person {name: $name}) RETURN p + values: + name: ${! json("name") } + batching: + count: 0 + `, localURL) + + s, err := createCypherOutputFromYaml(template) + require.NoError(t, err) + + sendMessages(t, s) + + // check output + listOfNamesFromDB := checkNeo4j(localURL, true) + listOfNamesToCheck := []string{"Alice", "Bob", "Carol", "Dan"} + + assert.Equal(t, listOfNamesToCheck, listOfNamesFromDB) +} + +func testCypherOutputWithBatching(t *testing.T, localURL string) { + template := fmt.Sprintf(` +cypher: + database: neo4j + uri: %s + no_auth: true + query: | + CREATE (p:Person {name: $name}) RETURN p + values: + name: ${! json("name") } + batching: + count: 2 + `, localURL) + + s, err := createCypherOutputFromYaml(template) + require.NoError(t, err) + + sendBatches(t, s) + + // check output + listOfNamesFromDB := checkNeo4j(localURL, false) + listOfNamesToCheck := []string{"Alice", "Bob", "Carol", "Dan"} + + assert.Equal(t, listOfNamesToCheck, listOfNamesFromDB) +} + +func TestCypherOutputMissingValue(t *testing.T) { + env := []string{"NEO4J_AUTH=neo4j/sparkling_brazilian_orange_456"} + localURL := setupNeo4j(t, env) + + confStr := fmt.Sprintf(` +cypher: + database: neo4j + uri: %s + basic_auth: + user: neo4j + password: sparkling_brazilian_orange_456 + query: | + CREATE (p:Person {name: $name, age: $age}) RETURN p + values: + name: ${! json("name") } + batching: + count: 0 +`, localURL) + + conf, err := testutil.OutputFromYAML(confStr) + + require.NoError(t, err) + + _, err = mock.NewManager().NewOutput(conf) + + require.ErrorContains(t, err, "failed to init output : query parameter: $age, not found in value keys") +} + +func TestCypherOutputMissingParam(t *testing.T) { + env := []string{"NEO4J_AUTH=neo4j/sparkling_brazilian_orange_456"} + localURL := setupNeo4j(t, env) + + confStr := fmt.Sprintf(` +cypher: + database: neo4j + uri: %s + basic_auth: + user: neo4j + password: sparkling_brazilian_orange_456 + query: | + CREATE (p:Person {name: $name}) RETURN p + values: + name: ${! json("name") } + age: ${! json("name") } + batching: + count: 0 +`, localURL) + + conf, err := testutil.OutputFromYAML(confStr) + + require.NoError(t, err) + + _, err = mock.NewManager().NewOutput(conf) + + require.ErrorContains(t, err, "failed to init output : value key: $age, not found in query: CREATE (p:Person {name: $name}) RETURN p") +} diff --git a/public/components/all/package.go b/public/components/all/package.go index 9abaceeb1..b819e09e5 100644 --- a/public/components/all/package.go +++ b/public/components/all/package.go @@ -17,6 +17,7 @@ import ( _ "github.com/warpstreamlabs/bento/public/components/confluent" _ "github.com/warpstreamlabs/bento/public/components/couchbase" _ "github.com/warpstreamlabs/bento/public/components/crypto" + _ "github.com/warpstreamlabs/bento/public/components/cypher" _ "github.com/warpstreamlabs/bento/public/components/dgraph" _ "github.com/warpstreamlabs/bento/public/components/discord" _ "github.com/warpstreamlabs/bento/public/components/elasticsearch" diff --git a/public/components/cypher/package.go b/public/components/cypher/package.go new file mode 100644 index 000000000..e388adca7 --- /dev/null +++ b/public/components/cypher/package.go @@ -0,0 +1,6 @@ +package crypto + +import ( + // Bring in the internal plugin definitions. + _ "github.com/warpstreamlabs/bento/internal/impl/cypher" +) diff --git a/website/docs/components/outputs/cypher.md b/website/docs/components/outputs/cypher.md new file mode 100644 index 000000000..98de8bc0a --- /dev/null +++ b/website/docs/components/outputs/cypher.md @@ -0,0 +1,333 @@ +--- +title: cypher +slug: cypher +type: output +status: experimental +categories: ["Services"] +--- + + + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +:::caution EXPERIMENTAL +This component is experimental and therefore subject to change or removal outside of major version releases. +::: +Executes a Cypher Query + +Introduced in version 1.3.0. + + + + + + +```yml +# Common config fields, showing default values +output: + label: "" + cypher: + database: neo4j # No default (required) + uri: bolt://localhost:7687 # No default (required) + no_auth: false + basic_auth: + user: "" + password: "" + realm: "" + query: 'CREATE (p:Person {name: $name}) RETURN p' # No default (required) + values: {} + max_in_flight: 64 + batching: + count: 0 + byte_size: 0 + period: "" + check: "" +``` + + + + +```yml +# All config fields, showing default values +output: + label: "" + cypher: + database: neo4j # No default (required) + uri: bolt://localhost:7687 # No default (required) + no_auth: false + basic_auth: + user: "" + password: "" + realm: "" + query: 'CREATE (p:Person {name: $name}) RETURN p' # No default (required) + values: {} + max_in_flight: 64 + batching: + count: 0 + byte_size: 0 + period: "" + check: "" + processors: [] # No default (optional) +``` + + + + +## Executes a Cypher Query + +The `query` field is expected to be a valid cypher query with 0 or more parameters with the `$` syntax: +``` + query: CREATE (p:Person {name: $name, age: $age}) RETURN p +``` + +The `values` field is expected to be a map where the keys are equal to the parameters in the query, +and the values are strings (bloblang interpolations are allowed): + +``` + values: + name: ${! json("name") } + age: ${! json("age") } +``` + +## Examples + + + + + + +Here we execute a cypher query that takes the value of $name from the interpolated field in the values map: + +```yaml +output: + cypher: + database: neo4j + uri: bolt://localhost:7687 + basic_auth: + user: neo4j + password: password + query: | + CREATE (p:Person {name: $name}) RETURN p + values: + name: ${! json("name") } + batching: + count: 0 +``` + + + + +## Fields + +### `database` + +The name of the database to connect to. + + +Type: `string` + +```yml +# Examples + +database: neo4j +``` + +### `uri` + +The URL of the database engine. + + +Type: `string` + +```yml +# Examples + +uri: bolt://localhost:7687 +``` + +### `no_auth` + +Set to true to connect without authentication. + + +Type: `bool` +Default: `false` + +### `basic_auth` + +Basic Authentication fields + + +Type: `object` + +### `basic_auth.user` + +The username for basic auth. + + +Type: `string` +Default: `""` + +### `basic_auth.password` + +The password for basic auth. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: + + +Type: `string` +Default: `""` + +### `basic_auth.realm` + +The realm for basic auth. + + +Type: `string` +Default: `""` + +### `query` + +The cypher query to execute. + + +Type: `string` + +```yml +# Examples + +query: 'CREATE (p:Person {name: $name}) RETURN p' +``` + +### `values` + +A map of strings -> bloblang interpolations that form the values of the references in the query i.e. $name. +This field supports [interpolation functions](/docs/configuration/interpolation#bloblang-queries). + + +Type: `object` +Default: `{}` + +```yml +# Examples + +values: + name: ${! json("name") } +``` + +### `max_in_flight` + +The maximum number of queries to run in parallel. + + +Type: `int` +Default: `64` + +### `batching` + +Allows you to configure a [batching policy](/docs/configuration/batching). + + +Type: `object` + +```yml +# Examples + +batching: + byte_size: 5000 + count: 0 + period: 1s + +batching: + count: 10 + period: 1s + +batching: + check: this.contains("END BATCH") + count: 0 + period: 1m +``` + +### `batching.count` + +A number of messages at which the batch should be flushed. If `0` disables count based batching. + + +Type: `int` +Default: `0` + +### `batching.byte_size` + +An amount of bytes at which the batch should be flushed. If `0` disables size based batching. + + +Type: `int` +Default: `0` + +### `batching.period` + +A period in which an incomplete batch should be flushed regardless of its size. + + +Type: `string` +Default: `""` + +```yml +# Examples + +period: 1s + +period: 1m + +period: 500ms +``` + +### `batching.check` + +A [Bloblang query](/docs/guides/bloblang/about/) that should return a boolean value indicating whether a message should end a batch. + + +Type: `string` +Default: `""` + +```yml +# Examples + +check: this.type == "end_of_transaction" +``` + +### `batching.processors` + +A list of [processors](/docs/components/processors/about) to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. + + +Type: `array` + +```yml +# Examples + +processors: + - archive: + format: concatenate + +processors: + - archive: + format: lines + +processors: + - archive: + format: json_array +``` + +