From d27c64834df80b1901f19029e23ab57674d1eb27 Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Wed, 5 Jun 2024 20:09:02 +0100 Subject: [PATCH 01/15] Fix typo in the public API docs Signed-off-by: Mihai Todor --- public/service/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/public/service/service.go b/public/service/service.go index 39e50adb4..f1bda4ebe 100644 --- a/public/service/service.go +++ b/public/service/service.go @@ -133,7 +133,7 @@ func CLIOptSetMainSchemaFrom(fn func() *ConfigSchema) CLIOptFunc { } } -// CLIOptOnConfigParsed sets a closure function to be called when a main +// CLIOptOnConfigParse sets a closure function to be called when a main // configuration file load has occurred. // // If an error is returned this will be treated by the CLI the same as any other From 58b2011d182d44bf1d6291885ee760bedb4b3bfc Mon Sep 17 00:00:00 2001 From: Andreas Bergmeier Date: Wed, 5 Jun 2024 22:36:54 +0200 Subject: [PATCH 02/15] Enable gcm mode in encrypt_aes Add a test with values from Go standard library test suite. --- internal/bloblang/query/methods_strings.go | 26 ++++++++++++++++++---- internal/bloblang/query/methods_test.go | 21 +++++++++++++++++ 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/internal/bloblang/query/methods_strings.go b/internal/bloblang/query/methods_strings.go index d407b6dd6..d2b219441 100644 --- a/internal/bloblang/query/methods_strings.go +++ b/internal/bloblang/query/methods_strings.go @@ -281,7 +281,7 @@ var _ = registerSimpleMethod( "encrypt_aes", "", ).InCategory( MethodCategoryEncoding, - "Encrypts a string or byte array target according to a chosen AES encryption method and returns a string result. The algorithms require a key and an initialization vector / nonce. Available schemes are: `ctr`, `ofb`, `cbc`.", + "Encrypts a string or byte array target according to a chosen AES encryption method and returns a string result. The algorithms require a key and an initialization vector / nonce. Available schemes are: `ctr`, `gcm`, `ofb`, `cbc`.", NewExampleSpec("", `let key = "2b7e151628aed2a6abf7158809cf4f3c".decode("hex") let vector = "f0f1f2f3f4f5f6f7f8f9fafbfcfdfeff".decode("hex") @@ -290,7 +290,7 @@ root.encrypted = this.value.encrypt_aes("ctr", $key, $vector).encode("hex")`, `{"encrypted":"84e9b31ff7400bdf80be7254"}`, ), ). - Param(ParamString("scheme", "The scheme to use for encryption, one of `ctr`, `ofb`, `cbc`.")). + Param(ParamString("scheme", "The scheme to use for encryption, one of `ctr`, `gcm`, `ofb`, `cbc`.")). Param(ParamString("key", "A key to encrypt with.")). Param(ParamString("iv", "An initialization vector / nonce.")), func(args *ParsedParams) (simpleMethod, error) { @@ -312,8 +312,16 @@ root.encrypted = this.value.encrypt_aes("ctr", $key, $vector).encode("hex")`, return nil, err } iv := []byte(ivStr) - if len(iv) != block.BlockSize() { - return nil, errors.New("the key must match the initialisation vector size") + + switch schemeStr { + case "ctr": + fallthrough + case "ofb": + fallthrough + case "cbc": + if len(iv) != block.BlockSize() { + return nil, errors.New("the key must match the initialisation vector size") + } } var schemeFn func([]byte) (string, error) @@ -325,6 +333,16 @@ root.encrypted = this.value.encrypt_aes("ctr", $key, $vector).encode("hex")`, stream.XORKeyStream(ciphertext, b) return string(ciphertext), nil } + case "gcm": + schemeFn = func(b []byte) (string, error) { + ciphertext := make([]byte, 0, len(b)) + stream, err := cipher.NewGCM(block) + if err != nil { + return "", fmt.Errorf("creating gcm failed: %w", err) + } + ciphertext = stream.Seal(ciphertext, iv, b, nil) + return string(ciphertext), nil + } case "ofb": schemeFn = func(b []byte) (string, error) { ciphertext := make([]byte, len(b)) diff --git a/internal/bloblang/query/methods_test.go b/internal/bloblang/query/methods_test.go index 7090e75c4..23faa8410 100644 --- a/internal/bloblang/query/methods_test.go +++ b/internal/bloblang/query/methods_test.go @@ -1690,6 +1690,27 @@ func TestMethods(t *testing.T) { ), output: `hello world!`, }, + "check aes-gcm encryption": { + input: methods( + methods( + literalFn("007c5e5b3e59df24a7c355584fc1518d"), + method("decode", "hex"), + ), + method( + "encrypt_aes", "gcm", + methods( + literalFn("feffe9928665731c6d6a8f9467308308feffe9928665731c6d6a8f9467308308"), + method("decode", "hex"), + ), + methods( + literalFn("54cc7dc2c37ec006bcc6d1da"), + method("decode", "hex"), + ), + ), + method("encode", "hex"), + ), + output: `d50b9e252b70945d4240d351677eb10f937cdaef6f2822b6a3191654ba41b197`, + }, "check aes-ofb encryption": { input: methods( literalFn("hello world!"), From 0398dccb8cf715fe566e20ad756ac54a28716fa8 Mon Sep 17 00:00:00 2001 From: Andreas Bergmeier Date: Wed, 5 Jun 2024 22:51:33 +0200 Subject: [PATCH 03/15] Enable gcm mode in decrypt_aes Add test for reversing encryption --- internal/bloblang/query/methods_strings.go | 28 ++++++++++++++++++---- internal/bloblang/query/methods_test.go | 19 +++++++++++++++ 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/internal/bloblang/query/methods_strings.go b/internal/bloblang/query/methods_strings.go index d2b219441..8759efca8 100644 --- a/internal/bloblang/query/methods_strings.go +++ b/internal/bloblang/query/methods_strings.go @@ -387,7 +387,7 @@ var _ = registerSimpleMethod( "decrypt_aes", "", ).InCategory( MethodCategoryEncoding, - "Decrypts an encrypted string or byte array target according to a chosen AES encryption method and returns the result as a byte array. The algorithms require a key and an initialization vector / nonce. Available schemes are: `ctr`, `ofb`, `cbc`.", + "Decrypts an encrypted string or byte array target according to a chosen AES encryption method and returns the result as a byte array. The algorithms require a key and an initialization vector / nonce. Available schemes are: `ctr`, `gcm`, `ofb`, `cbc`.", NewExampleSpec("", `let key = "2b7e151628aed2a6abf7158809cf4f3c".decode("hex") let vector = "f0f1f2f3f4f5f6f7f8f9fafbfcfdfeff".decode("hex") @@ -396,7 +396,7 @@ root.decrypted = this.value.decode("hex").decrypt_aes("ctr", $key, $vector).stri `{"decrypted":"hello world!"}`, ), ). - Param(ParamString("scheme", "The scheme to use for decryption, one of `ctr`, `ofb`, `cbc`.")). + Param(ParamString("scheme", "The scheme to use for decryption, one of `ctr`, `gcm`, `ofb`, `cbc`.")). Param(ParamString("key", "A key to decrypt with.")). Param(ParamString("iv", "An initialization vector / nonce.")), func(args *ParsedParams) (simpleMethod, error) { @@ -419,8 +419,15 @@ root.decrypted = this.value.decode("hex").decrypt_aes("ctr", $key, $vector).stri return nil, err } iv := []byte(ivStr) - if len(iv) != block.BlockSize() { - return nil, errors.New("the key must match the initialisation vector size") + switch schemeStr { + case "ctr": + fallthrough + case "ofb": + fallthrough + case "cbc": + if len(iv) != block.BlockSize() { + return nil, errors.New("the key must match the initialisation vector size") + } } var schemeFn func([]byte) ([]byte, error) @@ -432,6 +439,19 @@ root.decrypted = this.value.decode("hex").decrypt_aes("ctr", $key, $vector).stri stream.XORKeyStream(plaintext, b) return plaintext, nil } + case "gcm": + schemeFn = func(b []byte) ([]byte, error) { + plaintext := make([]byte, 0, len(b)) + stream, err := cipher.NewGCM(block) + if err != nil { + return nil, fmt.Errorf("creating gcm failed: %w", err) + } + plaintext, err = stream.Open(plaintext, iv, b, nil) + if err != nil { + return nil, fmt.Errorf("gcm decrypting failed: %w", err) + } + return plaintext, nil + } case "ofb": schemeFn = func(b []byte) ([]byte, error) { plaintext := make([]byte, len(b)) diff --git a/internal/bloblang/query/methods_test.go b/internal/bloblang/query/methods_test.go index 23faa8410..224e6d4c7 100644 --- a/internal/bloblang/query/methods_test.go +++ b/internal/bloblang/query/methods_test.go @@ -1711,6 +1711,25 @@ func TestMethods(t *testing.T) { ), output: `d50b9e252b70945d4240d351677eb10f937cdaef6f2822b6a3191654ba41b197`, }, + "check aes-gcm decryption": { + input: methods( + literalFn("d50b9e252b70945d4240d351677eb10f937cdaef6f2822b6a3191654ba41b197"), + method("decode", "hex"), + method( + "decrypt_aes", "gcm", + methods( + literalFn("feffe9928665731c6d6a8f9467308308feffe9928665731c6d6a8f9467308308"), + method("decode", "hex"), + ), + methods( + literalFn("54cc7dc2c37ec006bcc6d1da"), + method("decode", "hex"), + ), + ), + method("encode", "hex"), + ), + output: `007c5e5b3e59df24a7c355584fc1518d`, + }, "check aes-ofb encryption": { input: methods( literalFn("hello world!"), From 9089949692b9d8dbabe7aee1ee248c55bd77ad5b Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Sun, 9 Jun 2024 15:00:37 +0200 Subject: [PATCH 04/15] Add `omit_empty` field to the `lines` scanner --- CHANGELOG.old.md | 92 +++++++++++++----------- internal/impl/pure/scanner_lines.go | 29 ++++++-- internal/impl/pure/scanner_lines_test.go | 47 +++++++++--- 3 files changed, 111 insertions(+), 57 deletions(-) diff --git a/CHANGELOG.old.md b/CHANGELOG.old.md index 5c03c50b2..29cb2e7aa 100644 --- a/CHANGELOG.old.md +++ b/CHANGELOG.old.md @@ -3,6 +3,12 @@ Changelog All notable changes to this project will be documented in this file. +## v4.28.2 - TBD + +### Added + +- Field `omit_empty` added to the `lines` scanner. + ## 4.28.0 - 2024-05-29 ### Added @@ -1150,10 +1156,10 @@ This is a major version release, for more information and guidance on how to mig ### Changed - The following beta components have been promoted to stable: - + `ristretto` cache - + `csv` and `generate` inputs - + `reject` output - + `branch`, `jq` and `workflow` processors + + `ristretto` cache + + `csv` and `generate` inputs + + `reject` output + + `branch`, `jq` and `workflow` processors ## 3.44.1 - 2021-04-15 @@ -1835,36 +1841,36 @@ This is a major version release, for more information and guidance on how to mig ### Added - New field `max_in_flight` added to the following outputs: - + `amqp_0_9` - + `cache` - + `dynamodb` - + `elasticsearch` - + `gcp_pubsub` - + `hdfs` - + `http_client` - + `kafka` - + `kinesis` - + `kinesis_firehose` - + `mqtt` - + `nanomsg` - + `nats` - + `nats_stream` - + `nsq` - + `redis_hash` - + `redis_list` - + `redis_pubsub` - + `redis_streams` - + `s3` - + `sns` - + `sqs` + + `amqp_0_9` + + `cache` + + `dynamodb` + + `elasticsearch` + + `gcp_pubsub` + + `hdfs` + + `http_client` + + `kafka` + + `kinesis` + + `kinesis_firehose` + + `mqtt` + + `nanomsg` + + `nats` + + `nats_stream` + + `nsq` + + `redis_hash` + + `redis_list` + + `redis_pubsub` + + `redis_streams` + + `s3` + + `sns` + + `sqs` - Batching fields added to the following outputs: - + `dynamodb` - + `elasticsearch` - + `http_client` - + `kafka` - + `kinesis` - + `kinesis_firehose` - + `sqs` + + `dynamodb` + + `elasticsearch` + + `http_client` + + `kafka` + + `kinesis` + + `kinesis_firehose` + + `sqs` - More TRACE level logs added throughout the pipeline. - Operator `delete` added to `cache` processor. - Operator `explode` added to `json` processor. @@ -2017,14 +2023,14 @@ This is a major version release, for more information and guidance on how to mig - Names of `process_dag` stages must now match the regexp `[a-zA-Z0-9_-]+`. - Go API: buffer constructors now take a `types.Manager` argument in parity with other components. - JSON dot paths within the following components have been updated to allow array-based operations: - + `awk` processor - + `json` processor - + `process_field` processor - + `process_map` processor - + `check_field` condition - + `json_field` function interpolation - + `s3` input - + `dynamodb` output + + `awk` processor + + `json` processor + + `process_field` processor + + `process_map` processor + + `check_field` condition + + `json_field` function interpolation + + `s3` input + + `dynamodb` output ### Fixed @@ -3518,8 +3524,8 @@ This is a major version released due to a series of minor breaking changes, you - New `batch` processor for combining payloads up to a number of bytes. - New `conditional` processor, allows you to configure a chain of processors to only be run if the payload passes a `condition`. - New `--stream` mode features: - + POST verb for `/streams` path now supported. - + New `--streams-dir` flag for parsing a directory of stream configs. + + POST verb for `/streams` path now supported. + + New `--streams-dir` flag for parsing a directory of stream configs. ### Changed diff --git a/internal/impl/pure/scanner_lines.go b/internal/impl/pure/scanner_lines.go index ae4ad8670..fe7bff141 100644 --- a/internal/impl/pure/scanner_lines.go +++ b/internal/impl/pure/scanner_lines.go @@ -12,6 +12,7 @@ import ( const ( slFieldCustomDelimiter = "custom_delimiter" slFieldMaxBufferSize = "max_buffer_size" + slFieldOmitEmpty = "omit_empty" ) func linesScannerSpec() *service.ConfigSpec { @@ -25,6 +26,9 @@ func linesScannerSpec() *service.ConfigSpec { service.NewIntField(slFieldMaxBufferSize). Description("Set the maximum buffer size for storing line data, this limits the maximum size that a line can be without causing an error."). Default(bufio.MaxScanTokenSize), + service.NewBoolField(slFieldOmitEmpty). + Description("Omit empty lines."). + Default(false), ) } @@ -48,12 +52,16 @@ func linesScannerFromParsed(conf *service.ParsedConfig) (l *linesScanner, err er if l.maxScanTokenSize, err = conf.FieldInt(slFieldMaxBufferSize); err != nil { return } + if l.omitEmpty, err = conf.FieldBool(slFieldOmitEmpty); err != nil { + return + } return } type linesScanner struct { maxScanTokenSize int customDelim string + omitEmpty bool } func (l *linesScanner) Create(rdr io.ReadCloser, aFn service.AckFunc, details *service.ScannerSourceDetails) (service.BatchScanner, error) { @@ -85,8 +93,9 @@ func (l *linesScanner) Create(rdr io.ReadCloser, aFn service.AckFunc, details *s } return service.AutoAggregateBatchScannerAcks(&linesReaderStream{ - buf: scanner, - r: rdr, + buf: scanner, + r: rdr, + omitEmpty: l.omitEmpty, }, aFn), nil } @@ -95,13 +104,21 @@ func (l *linesScanner) Close(context.Context) error { } type linesReaderStream struct { - buf *bufio.Scanner - r io.ReadCloser + buf *bufio.Scanner + r io.ReadCloser + omitEmpty bool } func (l *linesReaderStream) NextBatch(ctx context.Context) (service.MessageBatch, error) { - scanned := l.buf.Scan() - if scanned { + for { + if !l.buf.Scan() { + break + } + + if l.omitEmpty && len(l.buf.Bytes()) == 0 { + continue + } + bytesCopy := make([]byte, len(l.buf.Bytes())) copy(bytesCopy, l.buf.Bytes()) return service.MessageBatch{service.NewMessage(bytesCopy)}, nil diff --git a/internal/impl/pure/scanner_lines_test.go b/internal/impl/pure/scanner_lines_test.go index e0a9c095b..4407802a8 100644 --- a/internal/impl/pure/scanner_lines_test.go +++ b/internal/impl/pure/scanner_lines_test.go @@ -3,6 +3,7 @@ package pure_test import ( "bytes" "context" + "fmt" "io" "testing" @@ -56,18 +57,48 @@ test: func TestLinesScannerSuite(t *testing.T) { confSpec := service.NewConfigSpec().Field(service.NewScannerField("test")) - pConf, err := confSpec.ParseYAML(` + + tests := []struct { + name string + input string + want []string + omitEmpty bool + }{ + { + name: "parses input", + input: "firstXsecondXthird", + want: []string{"first", "second", "third"}, + omitEmpty: false, + }, + { + name: "parses input and emits empty line", + input: "firstXsecondXXthird", + want: []string{"first", "second", "", "third"}, + omitEmpty: false, + }, + { + name: "parses input and omits empty line", + input: "firstXsecondXXthird", + want: []string{"first", "second", "third"}, + omitEmpty: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pConf, err := confSpec.ParseYAML(fmt.Sprintf(` test: lines: custom_delimiter: 'X' max_buffer_size: 200 -`, nil) - require.NoError(t, err) + omit_empty: %t +`, tt.omitEmpty), nil) + require.NoError(t, err) - rdr, err := pConf.FieldScanner("test") - require.NoError(t, err) + rdr, err := pConf.FieldScanner("test") + require.NoError(t, err) - testutil.ScannerTestSuite(t, rdr, nil, []byte(`firstXsecondXthird`), "first", "second", "third") - - testutil.ScannerTestSuite(t, rdr, nil, []byte(`firstXsecondXXthird`), "first", "second", "", "third") + testutil.ScannerTestSuite(t, rdr, nil, []byte(tt.input), tt.want...) + }) + } } From 83ea34c28f2c2632ced02d9e6bd6e5cbcc1b9e28 Mon Sep 17 00:00:00 2001 From: Marco Amador Date: Tue, 4 Jun 2024 13:46:58 +0100 Subject: [PATCH 05/15] Add `pow` method --- internal/impl/pure/bloblang_numbers.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/internal/impl/pure/bloblang_numbers.go b/internal/impl/pure/bloblang_numbers.go index 7784399cb..fd725cf5b 100644 --- a/internal/impl/pure/bloblang_numbers.go +++ b/internal/impl/pure/bloblang_numbers.go @@ -173,4 +173,26 @@ root.outs = this.ins.map_each(ele -> ele.abs()) }); err != nil { panic(err) } + + if err := bloblang.RegisterMethodV2("pow", + bloblang.NewPluginSpec(). + Category(query.MethodCategoryNumbers). + Description(`Returns the number raised to the specified exponent.`). + Example("", `root.new_value = this.value * 10.pow(-2)`, + [2]string{`{"value":2}`, `{"new_value":0.02}`}). + Example("", `root.new_value = this.value.pow(-2)`, + [2]string{`{"value":2}`, `{"new_value":0.25}`}). + Param(bloblang.NewFloat64Param("exponent"). + Description("The exponent you want to raise to the power of.")), + func(args *bloblang.ParsedParams) (bloblang.Method, error) { + return bloblang.Float64Method(func(input float64) (any, error) { + exp, err := args.GetFloat64("exponent") + if err != nil { + return nil, err + } + return math.Pow(input, exp), nil + }), nil + }); err != nil { + panic(err) + } } From e5d5b5c817768233c4357b1023b0a9d57042e7a3 Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Mon, 10 Jun 2024 15:26:14 +0200 Subject: [PATCH 06/15] Update CHANGELOG --- CHANGELOG.old.md | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.old.md b/CHANGELOG.old.md index 29cb2e7aa..5c00dd14b 100644 --- a/CHANGELOG.old.md +++ b/CHANGELOG.old.md @@ -3,11 +3,15 @@ Changelog All notable changes to this project will be documented in this file. -## v4.28.2 - TBD +## 4.29.0 - 2024-06-10 ### Added -- Field `omit_empty` added to the `lines` scanner. +- Go API: New APIs added for extracting struct based documentation for plugins, which can be used for executing custom + documentation templates for each registered component and/or bloblang plugin. +- Field `omit_empty` added to the `lines` scanner. (@mihaitodor) +- New scheme `gcm` added to the `encrypt_aes` and `decrypy_aes` Bloblang methods. (@abergmeier) +- New Bloblang method `pow`. (@mfamador) ## 4.28.0 - 2024-05-29 @@ -278,7 +282,8 @@ All notable changes to this project will be documented in this file. ### Fixed -- Restore message ordering support to `gcp_pubsub` output. This issue was introduced in 4.16.0 as a result of [#1836](https://github.com/benthosdev/benthos/pull/1836). +- Restore message ordering support to `gcp_pubsub` output. This issue was introduced in 4.16.0 as a result + of [#1836](https://github.com/redpanda-data/benthos/pull/1836). - Specifying structured metadata values (non-strings) in unit test definitions should no longer cause linting errors. ### Changed @@ -715,7 +720,10 @@ This is a major version release, for more information and guidance on how to mig - The `sftp` output no longer opens files in both read and write mode. - The `aws_sqs` input with `reset_visibility` set to `false` will no longer reset timeouts on pending messages during gracefully shutdown. -- The `schema_registry_decode` processor now handles AVRO logical types correctly. Details in [#1198](https://github.com/benthosdev/benthos/pull/1198) and [#1161](https://github.com/benthosdev/benthos/issues/1161) and also in https://github.com/linkedin/goavro/issues/242. +- The `schema_registry_decode` processor now handles AVRO logical types correctly. Details + in [#1198](https://github.com/redpanda-data/benthos/pull/1198) + and [#1161](https://github.com/redpanda-data/benthos/issues/1161) and also + in https://github.com/linkedin/goavro/issues/242. ### Changed From 7543128cde5231509e7428fa65b1b7ba3fd5f1ea Mon Sep 17 00:00:00 2001 From: Marco Amador Date: Mon, 10 Jun 2024 14:30:26 +0100 Subject: [PATCH 07/15] Add `pi` function --- internal/impl/pure/bloblang_numbers.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/internal/impl/pure/bloblang_numbers.go b/internal/impl/pure/bloblang_numbers.go index fd725cf5b..766a1b21a 100644 --- a/internal/impl/pure/bloblang_numbers.go +++ b/internal/impl/pure/bloblang_numbers.go @@ -195,4 +195,22 @@ root.outs = this.ins.map_each(ele -> ele.abs()) }); err != nil { panic(err) } + + //------------------------------------------------------------------------------ + + if err := bloblang.RegisterFunctionV2("pi", + bloblang.NewPluginSpec(). + Category(query.FunctionCategoryGeneral). + Description(`Returns the value of the mathematical constant Pi.`). + Example("", `root.radians = this.degrees * (pi() / 180)`, + [2]string{`{"degrees":45}`, `{"radians":0.78540}`}). + Example("", `root.degrees = this.radians * (180 / pi())`, + [2]string{`{"radians":0.78540}`, `{"degrees":45}`}), + func(args *bloblang.ParsedParams) (bloblang.Function, error) { + return func() (any, error) { + return math.Pi, nil + }, nil + }); err != nil { + panic(err) + } } From 77836d177c64a0b08e3c977c43407d9b23361d16 Mon Sep 17 00:00:00 2001 From: Marco Amador Date: Mon, 10 Jun 2024 14:30:52 +0100 Subject: [PATCH 08/15] Add `sin` method --- internal/impl/pure/bloblang_numbers.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/internal/impl/pure/bloblang_numbers.go b/internal/impl/pure/bloblang_numbers.go index 766a1b21a..d96229358 100644 --- a/internal/impl/pure/bloblang_numbers.go +++ b/internal/impl/pure/bloblang_numbers.go @@ -196,6 +196,24 @@ root.outs = this.ins.map_each(ele -> ele.abs()) panic(err) } + if err := bloblang.RegisterMethodV2("sin", + bloblang.NewPluginSpec(). + Category(query.MethodCategoryNumbers). + Description(`Calculates the sine of a given angle specified in radians.`). + Example("", `root.new_value = (this.value * (pi() / 180)).sin()`, + [2]string{`{"value":45}`, `{"new_value":0.707}`}). + Example("", `root.new_value = (this.value * (pi() / 180)).sin()`, + [2]string{`{"value":0}`, `{"new_value":0}`}). + Example("", `root.new_value = (this.value * (pi() / 180)).sin()`, + [2]string{`{"value":90}`, `{"new_value":1}`}), + func(args *bloblang.ParsedParams) (bloblang.Method, error) { + return bloblang.Float64Method(func(input float64) (any, error) { + return math.Sin(input), nil + }), nil + }); err != nil { + panic(err) + } + //------------------------------------------------------------------------------ if err := bloblang.RegisterFunctionV2("pi", From b3d51015fe73ba7e3e3375f25a8c272d70e18306 Mon Sep 17 00:00:00 2001 From: Marco Amador Date: Mon, 10 Jun 2024 14:31:10 +0100 Subject: [PATCH 09/15] Add `cos` method --- internal/impl/pure/bloblang_numbers.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/internal/impl/pure/bloblang_numbers.go b/internal/impl/pure/bloblang_numbers.go index d96229358..da62a3daf 100644 --- a/internal/impl/pure/bloblang_numbers.go +++ b/internal/impl/pure/bloblang_numbers.go @@ -214,6 +214,24 @@ root.outs = this.ins.map_each(ele -> ele.abs()) panic(err) } + if err := bloblang.RegisterMethodV2("cos", + bloblang.NewPluginSpec(). + Category(query.MethodCategoryNumbers). + Description(`Calculates the cosine of a given angle specified in radians.`). + Example("", `root.new_value = (this.value * (pi() / 180)).cos()`, + [2]string{`{"value":45}`, `{"new_value":0.707}`}). + Example("", `root.new_value = (this.value * (pi() / 180)).cos()`, + [2]string{`{"value":0}`, `{"new_value":1}`}). + Example("", `root.new_value = (this.value * (pi() / 180)).cos()`, + [2]string{`{"value":180}`, `{"new_value":-1}`}), + func(args *bloblang.ParsedParams) (bloblang.Method, error) { + return bloblang.Float64Method(func(input float64) (any, error) { + return math.Cos(input), nil + }), nil + }); err != nil { + panic(err) + } + //------------------------------------------------------------------------------ if err := bloblang.RegisterFunctionV2("pi", From d926cf0d46b21f50667688918080b3cb3f9efcb2 Mon Sep 17 00:00:00 2001 From: Marco Amador Date: Mon, 10 Jun 2024 14:31:28 +0100 Subject: [PATCH 10/15] Add `tan` method --- internal/impl/pure/bloblang_numbers.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/internal/impl/pure/bloblang_numbers.go b/internal/impl/pure/bloblang_numbers.go index da62a3daf..ac7225f28 100644 --- a/internal/impl/pure/bloblang_numbers.go +++ b/internal/impl/pure/bloblang_numbers.go @@ -232,6 +232,24 @@ root.outs = this.ins.map_each(ele -> ele.abs()) panic(err) } + if err := bloblang.RegisterMethodV2("tan", + bloblang.NewPluginSpec(). + Category(query.MethodCategoryNumbers). + Description(`Calculates the tangent of a given angle specified in radians.`). + Example("", `root.new_value = (this.value * (pi() / 180)).tan()`, + [2]string{`{"value":0}`, `{"new_value":0}`}). + Example("", `root.new_value = (this.value * (pi() / 180)).tan()`, + [2]string{`{"value":45}`, `{"new_value":1}`}). + Example("", `root.new_value = (this.value * (pi() / 180)).tan()`, + [2]string{`{"value":180}`, `{"new_value":0.32491}`}), + func(args *bloblang.ParsedParams) (bloblang.Method, error) { + return bloblang.Float64Method(func(input float64) (any, error) { + return math.Tan(input), nil + }), nil + }); err != nil { + panic(err) + } + //------------------------------------------------------------------------------ if err := bloblang.RegisterFunctionV2("pi", From f6144bf4b88bf5197298628cc09efb244b933b8b Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Tue, 11 Jun 2024 16:47:14 +0100 Subject: [PATCH 11/15] Add public APIs for custom stream builder schemas --- internal/transaction/result_store.go | 8 ++++ public/service/message.go | 45 ++++++++++++++++++++++ public/service/message_test.go | 27 ++++++++++++++ public/service/stream_builder.go | 56 ++++++++++++++++------------ public/service/stream_schema.go | 9 +++++ 5 files changed, 122 insertions(+), 23 deletions(-) diff --git a/internal/transaction/result_store.go b/internal/transaction/result_store.go index c4adc9b71..291bf83da 100644 --- a/internal/transaction/result_store.go +++ b/internal/transaction/result_store.go @@ -78,6 +78,14 @@ func NewResultStore() ResultStore { //------------------------------------------------------------------------------ +// AddResultStoreMsg sets a result store within the context of the provided +// message that allows a roundtrip.Writer or any other component to propagate a +// resulting message back to the origin. +func AddResultStoreMsg(p *message.Part, store ResultStore) *message.Part { + ctx := message.GetContext(p) + return message.WithContext(context.WithValue(ctx, ResultStoreKey, store), p) +} + // AddResultStore sets a result store within the context of the provided message // that allows a roundtrip.Writer or any other component to propagate a // resulting message back to the origin. diff --git a/public/service/message.go b/public/service/message.go index be4f1c9f8..927e059de 100644 --- a/public/service/message.go +++ b/public/service/message.go @@ -618,6 +618,51 @@ func (b MessageBatch) InterpolatedBytes(index int, i *InterpolatedString) []byte return bRes } +// SyncResponseStore represents a store of data that holds a relationship to an +// input message. Any processor or output has the potential to add data to a +// store. +type SyncResponseStore struct { + s transaction.ResultStore +} + +// Read the contents of the response store. Any output or processor that +// registers a synchronous response will result in a single batch of messages +// being added to the store, and therefore more than one resulting batch is +// possible. +func (s *SyncResponseStore) Read() []MessageBatch { + ibb := s.s.Get() + bb := make([]MessageBatch, len(ibb)) + for i, ib := range ibb { + bb[i] = make(MessageBatch, len(ib)) + for j, m := range ib { + bb[i][j] = NewInternalMessage(m) + } + } + return bb +} + +// WithSyncResponseStore returns a modified message and a response store +// associated with it. If the message is sent through a processing pipeline or +// output there is the potential for sync response components to add messages to +// the store, which can be consumed once an acknowledgement is received. +func (m *Message) WithSyncResponseStore() (*Message, *SyncResponseStore) { + resStore := transaction.NewResultStore() + + newM := m.Copy() + newM.part = transaction.AddResultStoreMsg(m.part, resStore) + + return newM, &SyncResponseStore{s: resStore} +} + +// AddSyncResponse attempts to add this individual message, in its exact current +// condition, to the synchronous response destined for the original source input +// of this data. Synchronous responses aren't supported by all inputs, and so +// it's possible that attempting to mark a message as ready for a synchronous +// response will return an error. +func (m *Message) AddSyncResponse() error { + return transaction.SetAsResponse(message.Batch{m.part}) +} + // AddSyncResponse attempts to add this batch of messages, in its exact current // condition, to the synchronous response destined for the original source input // of this data. Synchronous responses aren't supported by all inputs, and so diff --git a/public/service/message_test.go b/public/service/message_test.go index 56c51a079..3739a86da 100644 --- a/public/service/message_test.go +++ b/public/service/message_test.go @@ -490,3 +490,30 @@ func BenchmarkMessageMappingOld(b *testing.B) { }, resI) } } + +func TestSyncResponse(t *testing.T) { + msgA := NewMessage([]byte("hello world a")) + + msgB, storeB := msgA.WithSyncResponseStore() + msgB.SetBytes([]byte("hello world b")) + + require.Error(t, msgA.AddSyncResponse()) + require.NoError(t, msgB.AddSyncResponse()) + + msgC := msgB.Copy() + msgC.SetBytes([]byte("hello world c")) + require.NoError(t, msgC.AddSyncResponse()) + + resBatches := storeB.Read() + require.Len(t, resBatches, 2) + require.Len(t, resBatches[0], 1) + require.Len(t, resBatches[1], 1) + + data, err := resBatches[0][0].AsBytes() + require.NoError(t, err) + assert.Equal(t, "hello world b", string(data)) + + data, err = resBatches[1][0].AsBytes() + require.NoError(t, err) + assert.Equal(t, "hello world c", string(data)) +} diff --git a/public/service/stream_builder.go b/public/service/stream_builder.go index 5695f6e82..db67684ad 100644 --- a/public/service/stream_builder.go +++ b/public/service/stream_builder.go @@ -7,7 +7,6 @@ import ( "log/slog" "net/http" "os" - "sync/atomic" "github.com/Jeffail/gabs/v2" "github.com/gofrs/uuid" @@ -66,6 +65,7 @@ type StreamBuilder struct { apiMut manager.APIReg customLogger log.Modular + configSpec docs.FieldSpecs env *Environment lintingDisabled bool envVarLookupFn func(string) (string, bool) @@ -75,14 +75,18 @@ type StreamBuilder struct { func NewStreamBuilder() *StreamBuilder { httpConf := api.NewConfig() httpConf.Enabled = false + + tmpSpec := config.Spec() + tmpSpec.SetDefault(false, "http", "enabled") + return &StreamBuilder{ - engineVersion: cli.Version, http: httpConf, buffer: buffer.NewConfig(), resources: manager.NewResourceConfig(), metrics: metrics.NewConfig(), tracer: tracer.NewConfig(), logger: log.NewConfig(), + configSpec: tmpSpec, env: globalEnvironment, envVarLookupFn: os.LookupEnv, } @@ -104,6 +108,20 @@ func (s *StreamBuilder) SetEngineVersion(ev string) { s.engineVersion = ev } +// SetSchema overrides the default config schema used when linting and parsing +// full configs with the SetYAML method. Other XYAML methods will not use this +// schema as they parse individual component configs rather than a larger +// configuration. +// +// This method is useful as a mechanism for modifying the default top-level +// settings, such as metrics types and so on. +func (s *StreamBuilder) SetSchema(schema *ConfigSchema) { + if s.engineVersion == "" { + s.engineVersion = schema.version + } + s.configSpec = schema.fields +} + // DisableLinting configures the stream builder to no longer lint YAML configs, // allowing you to add snippets of config to the builder without failing on // linting rules. @@ -509,7 +527,7 @@ func (s *StreamBuilder) SetYAML(conf string) error { return err } - spec := configSpec() + spec := s.configSpec if err := s.lintYAMLSpec(spec, node); err != nil { return err } @@ -528,19 +546,6 @@ func (s *StreamBuilder) SetYAML(conf string) error { return nil } -var builderConfigSpec atomic.Pointer[docs.FieldSpecs] - -func configSpec() docs.FieldSpecs { - spec := builderConfigSpec.Load() - if spec == nil { - tmpSpec := config.Spec() - tmpSpec.SetDefault(false, "http", "enabled") - spec = &tmpSpec - builderConfigSpec.Store(spec) - } - return *spec -} - // SetFields modifies the config by setting one or more fields identified by a // dot path to a value. The argument must be a variadic list of pairs, where the // first element is a string containing the target field dot path, and the @@ -566,7 +571,7 @@ func (s *StreamBuilder) SetFields(pathValues ...any) error { sanitConf.RemoveDeprecated = false sanitConf.DocsProvider = s.env.internal - if err := configSpec().SanitiseYAML(&rootNode, sanitConf); err != nil { + if err := s.configSpec.SanitiseYAML(&rootNode, sanitConf); err != nil { return err } @@ -579,12 +584,12 @@ func (s *StreamBuilder) SetFields(pathValues ...any) error { if !ok { return fmt.Errorf("variadic pair element %v should be a string, got a %T", i, pathValues[i]) } - if err := configSpec().SetYAMLPath(s.env.internal, &rootNode, &valueNode, gabs.DotPathToSlice(pathString)...); err != nil { + if err := s.configSpec.SetYAMLPath(s.env.internal, &rootNode, &valueNode, gabs.DotPathToSlice(pathString)...); err != nil { return err } } - spec := configSpec() + spec := s.configSpec if err := s.lintYAMLSpec(spec, &rootNode); err != nil { return err } @@ -724,7 +729,7 @@ func (s *StreamBuilder) AsYAML() (string, error) { sanitConf.RemoveDeprecated = false sanitConf.DocsProvider = s.env.internal - if err := configSpec().SanitiseYAML(&node, sanitConf); err != nil { + if err := s.configSpec.SanitiseYAML(&node, sanitConf); err != nil { return "", err } @@ -767,7 +772,7 @@ func (s *StreamBuilder) WalkComponents(fn func(w *WalkedComponent) error) error sanitConf.RemoveDeprecated = false sanitConf.DocsProvider = s.env.internal - spec := configSpec() + spec := s.configSpec if err := spec.SanitiseYAML(&node, sanitConf); err != nil { return err } @@ -847,6 +852,11 @@ func (s *StreamBuilder) buildWithEnv(env *bundle.Environment) (*Stream, error) { } } + engVer := s.engineVersion + if engVer == "" { + engVer = cli.Version + } + // This temporary manager is a very lazy way of instantiating a manager that // restricts the bloblang and component environments to custom plugins. // Ideally we would break out the constructor for our general purpose @@ -854,7 +864,7 @@ func (s *StreamBuilder) buildWithEnv(env *bundle.Environment) (*Stream, error) { // resource constructors until after this metrics exporter is initialised. tmpMgr, err := manager.New( manager.NewResourceConfig(), - manager.OptSetEngineVersion(s.engineVersion), + manager.OptSetEngineVersion(engVer), manager.OptSetLogger(logger), manager.OptSetEnvironment(env), manager.OptSetBloblangEnvironment(s.env.getBloblangParserEnv()), @@ -883,7 +893,7 @@ func (s *StreamBuilder) buildWithEnv(env *bundle.Environment) (*Stream, error) { sanitConf.RemoveTypeField = true sanitConf.ScrubSecrets = true sanitConf.DocsProvider = env - _ = configSpec().SanitiseYAML(&sanitNode, sanitConf) + _ = s.configSpec.SanitiseYAML(&sanitNode, sanitConf) } if apiType, err = api.New("", "", s.http, sanitNode, logger, stats); err != nil { return nil, fmt.Errorf("unable to create stream HTTP server due to: %w. Tip: you can disable the server with `http.enabled` set to `false`, or override the configured server with SetHTTPMux", err) diff --git a/public/service/stream_schema.go b/public/service/stream_schema.go index b51e0edb6..9a9936c6c 100644 --- a/public/service/stream_schema.go +++ b/public/service/stream_schema.go @@ -185,6 +185,15 @@ func (s *ConfigSchema) SetVersion(version, dateBuilt string) *ConfigSchema { return s } +// SetFieldDefault attempts to change the default value of a field in the config +// spec, which is the value used when the field is omitted from the config. +// +// This method does NOT support walking into arrays, nor component configs +// themselves. +func (s *ConfigSchema) SetFieldDefault(value any, path ...string) { + s.fields.SetDefault(value, path...) +} + // Field adds a field to the main config of a schema. func (s *ConfigSchema) Field(f *ConfigField) *ConfigSchema { s.fields = append(s.fields, f.field) From d661ed37b2d28b417776c38e898d68a13900f529 Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Wed, 12 Jun 2024 20:19:21 +0100 Subject: [PATCH 12/15] Compact blobl docs examples --- internal/impl/pure/bloblang_numbers.go | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/internal/impl/pure/bloblang_numbers.go b/internal/impl/pure/bloblang_numbers.go index ac7225f28..8e569b634 100644 --- a/internal/impl/pure/bloblang_numbers.go +++ b/internal/impl/pure/bloblang_numbers.go @@ -201,10 +201,8 @@ root.outs = this.ins.map_each(ele -> ele.abs()) Category(query.MethodCategoryNumbers). Description(`Calculates the sine of a given angle specified in radians.`). Example("", `root.new_value = (this.value * (pi() / 180)).sin()`, - [2]string{`{"value":45}`, `{"new_value":0.707}`}). - Example("", `root.new_value = (this.value * (pi() / 180)).sin()`, - [2]string{`{"value":0}`, `{"new_value":0}`}). - Example("", `root.new_value = (this.value * (pi() / 180)).sin()`, + [2]string{`{"value":45}`, `{"new_value":0.707}`}, + [2]string{`{"value":0}`, `{"new_value":0}`}, [2]string{`{"value":90}`, `{"new_value":1}`}), func(args *bloblang.ParsedParams) (bloblang.Method, error) { return bloblang.Float64Method(func(input float64) (any, error) { @@ -219,10 +217,8 @@ root.outs = this.ins.map_each(ele -> ele.abs()) Category(query.MethodCategoryNumbers). Description(`Calculates the cosine of a given angle specified in radians.`). Example("", `root.new_value = (this.value * (pi() / 180)).cos()`, - [2]string{`{"value":45}`, `{"new_value":0.707}`}). - Example("", `root.new_value = (this.value * (pi() / 180)).cos()`, - [2]string{`{"value":0}`, `{"new_value":1}`}). - Example("", `root.new_value = (this.value * (pi() / 180)).cos()`, + [2]string{`{"value":45}`, `{"new_value":0.707}`}, + [2]string{`{"value":0}`, `{"new_value":1}`}, [2]string{`{"value":180}`, `{"new_value":-1}`}), func(args *bloblang.ParsedParams) (bloblang.Method, error) { return bloblang.Float64Method(func(input float64) (any, error) { @@ -237,10 +233,8 @@ root.outs = this.ins.map_each(ele -> ele.abs()) Category(query.MethodCategoryNumbers). Description(`Calculates the tangent of a given angle specified in radians.`). Example("", `root.new_value = (this.value * (pi() / 180)).tan()`, - [2]string{`{"value":0}`, `{"new_value":0}`}). - Example("", `root.new_value = (this.value * (pi() / 180)).tan()`, - [2]string{`{"value":45}`, `{"new_value":1}`}). - Example("", `root.new_value = (this.value * (pi() / 180)).tan()`, + [2]string{`{"value":0}`, `{"new_value":0}`}, + [2]string{`{"value":45}`, `{"new_value":1}`}, [2]string{`{"value":180}`, `{"new_value":0.32491}`}), func(args *bloblang.ParsedParams) (bloblang.Method, error) { return bloblang.Float64Method(func(input float64) (any, error) { From 922981d0860ba342cbada8f129883947386bb7b4 Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Sat, 9 Dec 2023 20:45:51 +0000 Subject: [PATCH 13/15] Add proxy_url field to the websocket components Note: The `HTTP_PROXY`, `HTTPS_PROXY` and `NO_PROXY` environment variables can be used as fallback when this field isn't set. Details here: . Signed-off-by: Mihai Todor --- internal/impl/io/input_websocket.go | 32 +++++++++++++++++++--------- internal/impl/io/output_websocket.go | 30 +++++++++++++++++--------- 2 files changed, 42 insertions(+), 20 deletions(-) diff --git a/internal/impl/io/input_websocket.go b/internal/impl/io/input_websocket.go index aa86addf3..388a79bc8 100644 --- a/internal/impl/io/input_websocket.go +++ b/internal/impl/io/input_websocket.go @@ -40,6 +40,9 @@ func websocketInputSpec() *service.ConfigSpec { service.NewURLField("url"). Description("The URL to connect to."). Example("ws://localhost:4195/get/ws"), + service.NewURLField("proxy_url"). + Description("An optional HTTP proxy URL."). + Advanced().Optional(), service.NewStringField("open_message"). Description("An optional message to send to the server upon connection."). Advanced().Optional(), @@ -92,12 +95,13 @@ type websocketReader struct { lock *sync.Mutex - client *websocket.Conn - urlParsed *url.URL - urlStr string - tlsEnabled bool - tlsConf *tls.Config - reqSigner func(f fs.FS, req *http.Request) error + client *websocket.Conn + urlParsed *url.URL + urlStr string + proxyURLParsed *url.URL + tlsEnabled bool + tlsConf *tls.Config + reqSigner func(f fs.FS, req *http.Request) error openMsgType wsOpenMsgType openMsg []byte @@ -116,6 +120,11 @@ func newWebsocketReaderFromParsed(conf *service.ParsedConfig, mgr bundle.NewMana if ws.urlStr, err = conf.FieldString("url"); err != nil { return nil, err } + if conf.Contains("proxy_url") { + if ws.proxyURLParsed, err = conf.FieldURL("proxy_url"); err != nil { + return nil, err + } + } if ws.tlsConf, ws.tlsEnabled, err = conf.FieldTLSToggled("tls"); err != nil { return nil, err } @@ -169,14 +178,17 @@ func (w *websocketReader) Connect(ctx context.Context) error { } }() + dialer := *websocket.DefaultDialer + if w.proxyURLParsed != nil { + dialer.Proxy = http.ProxyURL(w.proxyURLParsed) + } + if w.tlsEnabled { - dialer := websocket.Dialer{ - TLSClientConfig: w.tlsConf, - } + dialer.TLSClientConfig = w.tlsConf if client, res, err = dialer.Dial(w.urlStr, headers); err != nil { return err } - } else if client, res, err = websocket.DefaultDialer.Dial(w.urlStr, headers); err != nil { + } else if client, res, err = dialer.Dial(w.urlStr, headers); err != nil { return err } diff --git a/internal/impl/io/output_websocket.go b/internal/impl/io/output_websocket.go index 6af96388c..d248fba96 100644 --- a/internal/impl/io/output_websocket.go +++ b/internal/impl/io/output_websocket.go @@ -26,6 +26,7 @@ func websocketOutputSpec() *service.ConfigSpec { Categories("Network"). Summary("Sends messages to an HTTP server via a websocket connection."). Field(service.NewURLField("url").Description("The URL to connect to.")). + Field(service.NewURLField("proxy_url").Description("An optional HTTP proxy URL.").Advanced().Optional()). Field(service.NewTLSToggledField("tls")) for _, f := range service.NewHTTPRequestAuthSignerFields() { @@ -63,12 +64,13 @@ type websocketWriter struct { lock *sync.Mutex - client *websocket.Conn - urlParsed *url.URL - urlStr string - tlsEnabled bool - tlsConf *tls.Config - reqSigner func(f fs.FS, req *http.Request) error + client *websocket.Conn + urlParsed *url.URL + urlStr string + proxyURLParsed *url.URL + tlsEnabled bool + tlsConf *tls.Config + reqSigner func(f fs.FS, req *http.Request) error } func newWebsocketWriterFromParsed(conf *service.ParsedConfig, mgr bundle.NewManagement) (*websocketWriter, error) { @@ -85,6 +87,11 @@ func newWebsocketWriterFromParsed(conf *service.ParsedConfig, mgr bundle.NewMana if ws.urlStr, err = conf.FieldString("url"); err != nil { return nil, err } + if conf.Contains("proxy_url") { + if ws.proxyURLParsed, err = conf.FieldURL("proxy_url"); err != nil { + return nil, err + } + } if ws.tlsConf, ws.tlsEnabled, err = conf.FieldTLSToggled("tls"); err != nil { return nil, err } @@ -130,14 +137,17 @@ func (w *websocketWriter) Connect(ctx context.Context) error { } }() + dialer := *websocket.DefaultDialer + if w.proxyURLParsed != nil { + dialer.Proxy = http.ProxyURL(w.proxyURLParsed) + } + if w.tlsEnabled { - dialer := websocket.Dialer{ - TLSClientConfig: w.tlsConf, - } + dialer.TLSClientConfig = w.tlsConf if client, res, err = dialer.Dial(w.urlStr, headers); err != nil { return err } - } else if client, res, err = websocket.DefaultDialer.Dial(w.urlStr, headers); err != nil { + } else if client, res, err = dialer.Dial(w.urlStr, headers); err != nil { return err } From fe90bdb980b54566f391682ab418ac3627056dca Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Thu, 13 Jun 2024 10:28:08 +0100 Subject: [PATCH 14/15] Update blobl numbers examples --- internal/impl/pure/bloblang_numbers.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/internal/impl/pure/bloblang_numbers.go b/internal/impl/pure/bloblang_numbers.go index 8e569b634..a9e55a7d6 100644 --- a/internal/impl/pure/bloblang_numbers.go +++ b/internal/impl/pure/bloblang_numbers.go @@ -201,7 +201,7 @@ root.outs = this.ins.map_each(ele -> ele.abs()) Category(query.MethodCategoryNumbers). Description(`Calculates the sine of a given angle specified in radians.`). Example("", `root.new_value = (this.value * (pi() / 180)).sin()`, - [2]string{`{"value":45}`, `{"new_value":0.707}`}, + [2]string{`{"value":45}`, `{"new_value":0.7071067811865475}`}, [2]string{`{"value":0}`, `{"new_value":0}`}, [2]string{`{"value":90}`, `{"new_value":1}`}), func(args *bloblang.ParsedParams) (bloblang.Method, error) { @@ -217,7 +217,7 @@ root.outs = this.ins.map_each(ele -> ele.abs()) Category(query.MethodCategoryNumbers). Description(`Calculates the cosine of a given angle specified in radians.`). Example("", `root.new_value = (this.value * (pi() / 180)).cos()`, - [2]string{`{"value":45}`, `{"new_value":0.707}`}, + [2]string{`{"value":45}`, `{"new_value":0.7071067811865476}`}, [2]string{`{"value":0}`, `{"new_value":1}`}, [2]string{`{"value":180}`, `{"new_value":-1}`}), func(args *bloblang.ParsedParams) (bloblang.Method, error) { @@ -232,10 +232,10 @@ root.outs = this.ins.map_each(ele -> ele.abs()) bloblang.NewPluginSpec(). Category(query.MethodCategoryNumbers). Description(`Calculates the tangent of a given angle specified in radians.`). - Example("", `root.new_value = (this.value * (pi() / 180)).tan()`, - [2]string{`{"value":0}`, `{"new_value":0}`}, - [2]string{`{"value":45}`, `{"new_value":1}`}, - [2]string{`{"value":180}`, `{"new_value":0.32491}`}), + Example("", `root.new_value = "%f".format((this.value * (pi() / 180)).tan())`, + [2]string{`{"value":0}`, `{"new_value":"0.000000"}`}, + [2]string{`{"value":45}`, `{"new_value":"1.000000"}`}, + [2]string{`{"value":180}`, `{"new_value":"-0.000000"}`}), func(args *bloblang.ParsedParams) (bloblang.Method, error) { return bloblang.Float64Method(func(input float64) (any, error) { return math.Tan(input), nil @@ -251,9 +251,9 @@ root.outs = this.ins.map_each(ele -> ele.abs()) Category(query.FunctionCategoryGeneral). Description(`Returns the value of the mathematical constant Pi.`). Example("", `root.radians = this.degrees * (pi() / 180)`, - [2]string{`{"degrees":45}`, `{"radians":0.78540}`}). + [2]string{`{"degrees":45}`, `{"radians":0.7853981633974483}`}). Example("", `root.degrees = this.radians * (180 / pi())`, - [2]string{`{"radians":0.78540}`, `{"degrees":45}`}), + [2]string{`{"radians":0.78540}`, `{"degrees":45.00010522957486}`}), func(args *bloblang.ParsedParams) (bloblang.Function, error) { return func() (any, error) { return math.Pi, nil From 7b8b1aa00243a375466bd59eda45d46346527d4e Mon Sep 17 00:00:00 2001 From: Greg Furman Date: Wed, 12 Jun 2024 21:51:14 +0200 Subject: [PATCH 15/15] Update docs --- website/docs/components/inputs/websocket.md | 8 ++ website/docs/components/outputs/websocket.md | 8 ++ website/docs/components/scanners/lines.md | 9 ++ website/docs/guides/bloblang/functions.md | 21 +++++ website/docs/guides/bloblang/methods.md | 93 +++++++++++++++++++- 5 files changed, 135 insertions(+), 4 deletions(-) diff --git a/website/docs/components/inputs/websocket.md b/website/docs/components/inputs/websocket.md index bc5c8bb9c..e2458fc6b 100644 --- a/website/docs/components/inputs/websocket.md +++ b/website/docs/components/inputs/websocket.md @@ -43,6 +43,7 @@ input: label: "" websocket: url: ws://localhost:4195/get/ws # No default (required) + proxy_url: "" # No default (optional) open_message: "" # No default (optional) open_message_type: binary auto_replay_nacks: true @@ -93,6 +94,13 @@ Type: `string` url: ws://localhost:4195/get/ws ``` +### `proxy_url` + +An optional HTTP proxy URL. + + +Type: `string` + ### `open_message` An optional message to send to the server upon connection. diff --git a/website/docs/components/outputs/websocket.md b/website/docs/components/outputs/websocket.md index e1f942c97..681cddbea 100644 --- a/website/docs/components/outputs/websocket.md +++ b/website/docs/components/outputs/websocket.md @@ -42,6 +42,7 @@ output: label: "" websocket: url: "" # No default (required) + proxy_url: "" # No default (optional) tls: enabled: false skip_cert_verify: false @@ -77,6 +78,13 @@ output: The URL to connect to. +Type: `string` + +### `proxy_url` + +An optional HTTP proxy URL. + + Type: `string` ### `tls` diff --git a/website/docs/components/scanners/lines.md b/website/docs/components/scanners/lines.md index d49ae1ebf..da7a0f25a 100644 --- a/website/docs/components/scanners/lines.md +++ b/website/docs/components/scanners/lines.md @@ -21,6 +21,7 @@ Split an input stream into a message per line of data. lines: custom_delimiter: "" # No default (optional) max_buffer_size: 65536 + omit_empty: false ``` ## Fields @@ -40,4 +41,12 @@ Set the maximum buffer size for storing line data, this limits the maximum size Type: `int` Default: `65536` +### `omit_empty` + +Omit empty lines. + + +Type: `bool` +Default: `false` + diff --git a/website/docs/guides/bloblang/functions.md b/website/docs/guides/bloblang/functions.md index 0e3c8aae2..cbb52e1b1 100644 --- a/website/docs/guides/bloblang/functions.md +++ b/website/docs/guides/bloblang/functions.md @@ -176,6 +176,27 @@ It is also possible to specify an optional custom alphabet after the length para root.id = nanoid(54, "abcde") ``` +### `pi` + +Returns the value of the mathematical constant Pi. + +#### Examples + + +```coffee +root.radians = this.degrees * (pi() / 180) + +# In: {"degrees":45} +# Out: {"radians":0.7853981633974483} +``` + +```coffee +root.degrees = this.radians * (180 / pi()) + +# In: {"radians":0.78540} +# Out: {"degrees":45.00010522957486} +``` + ### `random_int` diff --git a/website/docs/guides/bloblang/methods.md b/website/docs/guides/bloblang/methods.md index f02a325ef..d9a3aeb8d 100644 --- a/website/docs/guides/bloblang/methods.md +++ b/website/docs/guides/bloblang/methods.md @@ -897,6 +897,26 @@ root.new_value = this.value.ceil() # Out: {"new_value":-5} ``` +### `cos` + +Calculates the cosine of a given angle specified in radians. + +#### Examples + + +```coffee +root.new_value = (this.value * (pi() / 180)).cos() + +# In: {"value":45} +# Out: {"new_value":0.7071067811865476} + +# In: {"value":0} +# Out: {"new_value":1} + +# In: {"value":180} +# Out: {"new_value":-1} +``` + ### `float32` @@ -1155,6 +1175,31 @@ root.new_value = [10,this.value].min() # Out: {"new_value":10} ``` +### `pow` + +Returns the number raised to the specified exponent. + +#### Parameters + +**`exponent`** <float> The exponent you want to raise to the power of. + +#### Examples + + +```coffee +root.new_value = this.value * 10.pow(-2) + +# In: {"value":2} +# Out: {"new_value":0.02} +``` + +```coffee +root.new_value = this.value.pow(-2) + +# In: {"value":2} +# Out: {"new_value":0.25} +``` + ### `round` Rounds numbers to the nearest integer, rounding half away from zero. If the resulting value fits within a 64-bit integer then that is returned, otherwise a new floating point number is returned. @@ -1172,6 +1217,46 @@ root.new_value = this.value.round() # Out: {"new_value":6} ``` +### `sin` + +Calculates the sine of a given angle specified in radians. + +#### Examples + + +```coffee +root.new_value = (this.value * (pi() / 180)).sin() + +# In: {"value":45} +# Out: {"new_value":0.7071067811865475} + +# In: {"value":0} +# Out: {"new_value":0} + +# In: {"value":90} +# Out: {"new_value":1} +``` + +### `tan` + +Calculates the tangent of a given angle specified in radians. + +#### Examples + + +```coffee +root.new_value = "%f".format((this.value * (pi() / 180)).tan()) + +# In: {"value":0} +# Out: {"new_value":"0.000000"} + +# In: {"value":45} +# Out: {"new_value":"1.000000"} + +# In: {"value":180} +# Out: {"new_value":"-0.000000"} +``` + ### `uint16` @@ -3153,11 +3238,11 @@ root.result = this.compressed.decode("base64").decompress("lz4").string() ### `decrypt_aes` -Decrypts an encrypted string or byte array target according to a chosen AES encryption method and returns the result as a byte array. The algorithms require a key and an initialization vector / nonce. Available schemes are: `ctr`, `ofb`, `cbc`. +Decrypts an encrypted string or byte array target according to a chosen AES encryption method and returns the result as a byte array. The algorithms require a key and an initialization vector / nonce. Available schemes are: `ctr`, `gcm`, `ofb`, `cbc`. #### Parameters -**`scheme`** <string> The scheme to use for decryption, one of `ctr`, `ofb`, `cbc`. +**`scheme`** <string> The scheme to use for decryption, one of `ctr`, `gcm`, `ofb`, `cbc`. **`key`** <string> A key to decrypt with. **`iv`** <string> An initialization vector / nonce. @@ -3200,11 +3285,11 @@ root.encoded = content().encode("ascii85") ### `encrypt_aes` -Encrypts a string or byte array target according to a chosen AES encryption method and returns a string result. The algorithms require a key and an initialization vector / nonce. Available schemes are: `ctr`, `ofb`, `cbc`. +Encrypts a string or byte array target according to a chosen AES encryption method and returns a string result. The algorithms require a key and an initialization vector / nonce. Available schemes are: `ctr`, `gcm`, `ofb`, `cbc`. #### Parameters -**`scheme`** <string> The scheme to use for encryption, one of `ctr`, `ofb`, `cbc`. +**`scheme`** <string> The scheme to use for encryption, one of `ctr`, `gcm`, `ofb`, `cbc`. **`key`** <string> A key to encrypt with. **`iv`** <string> An initialization vector / nonce.