Skip to content

Commit

Permalink
Merge pull request #44 from gregfurman/sync-upstream-2024-06-12
Browse files Browse the repository at this point in the history
Cherry-pick relevant commits from upstream
  • Loading branch information
gregfurman authored Jun 13, 2024
2 parents 4eb815f + 7b8b1aa commit 9e1621d
Show file tree
Hide file tree
Showing 19 changed files with 595 additions and 115 deletions.
104 changes: 59 additions & 45 deletions CHANGELOG.old.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@ Changelog

All notable changes to this project will be documented in this file.

## 4.29.0 - 2024-06-10

### Added

- Go API: New APIs added for extracting struct based documentation for plugins, which can be used for executing custom
documentation templates for each registered component and/or bloblang plugin.
- Field `omit_empty` added to the `lines` scanner. (@mihaitodor)
- New scheme `gcm` added to the `encrypt_aes` and `decrypy_aes` Bloblang methods. (@abergmeier)
- New Bloblang method `pow`. (@mfamador)

## 4.28.0 - 2024-05-29

### Added
Expand Down Expand Up @@ -272,7 +282,8 @@ All notable changes to this project will be documented in this file.

### Fixed

- Restore message ordering support to `gcp_pubsub` output. This issue was introduced in 4.16.0 as a result of [#1836](https://github.com/benthosdev/benthos/pull/1836).
- Restore message ordering support to `gcp_pubsub` output. This issue was introduced in 4.16.0 as a result
of [#1836](https://github.com/redpanda-data/benthos/pull/1836).
- Specifying structured metadata values (non-strings) in unit test definitions should no longer cause linting errors.

### Changed
Expand Down Expand Up @@ -709,7 +720,10 @@ This is a major version release, for more information and guidance on how to mig

- The `sftp` output no longer opens files in both read and write mode.
- The `aws_sqs` input with `reset_visibility` set to `false` will no longer reset timeouts on pending messages during gracefully shutdown.
- The `schema_registry_decode` processor now handles AVRO logical types correctly. Details in [#1198](https://github.com/benthosdev/benthos/pull/1198) and [#1161](https://github.com/benthosdev/benthos/issues/1161) and also in https://github.com/linkedin/goavro/issues/242.
- The `schema_registry_decode` processor now handles AVRO logical types correctly. Details
in [#1198](https://github.com/redpanda-data/benthos/pull/1198)
and [#1161](https://github.com/redpanda-data/benthos/issues/1161) and also
in https://github.com/linkedin/goavro/issues/242.

### Changed

Expand Down Expand Up @@ -1150,10 +1164,10 @@ This is a major version release, for more information and guidance on how to mig
### Changed

- The following beta components have been promoted to stable:
+ `ristretto` cache
+ `csv` and `generate` inputs
+ `reject` output
+ `branch`, `jq` and `workflow` processors
+ `ristretto` cache
+ `csv` and `generate` inputs
+ `reject` output
+ `branch`, `jq` and `workflow` processors

## 3.44.1 - 2021-04-15

Expand Down Expand Up @@ -1835,36 +1849,36 @@ This is a major version release, for more information and guidance on how to mig
### Added

- New field `max_in_flight` added to the following outputs:
+ `amqp_0_9`
+ `cache`
+ `dynamodb`
+ `elasticsearch`
+ `gcp_pubsub`
+ `hdfs`
+ `http_client`
+ `kafka`
+ `kinesis`
+ `kinesis_firehose`
+ `mqtt`
+ `nanomsg`
+ `nats`
+ `nats_stream`
+ `nsq`
+ `redis_hash`
+ `redis_list`
+ `redis_pubsub`
+ `redis_streams`
+ `s3`
+ `sns`
+ `sqs`
+ `amqp_0_9`
+ `cache`
+ `dynamodb`
+ `elasticsearch`
+ `gcp_pubsub`
+ `hdfs`
+ `http_client`
+ `kafka`
+ `kinesis`
+ `kinesis_firehose`
+ `mqtt`
+ `nanomsg`
+ `nats`
+ `nats_stream`
+ `nsq`
+ `redis_hash`
+ `redis_list`
+ `redis_pubsub`
+ `redis_streams`
+ `s3`
+ `sns`
+ `sqs`
- Batching fields added to the following outputs:
+ `dynamodb`
+ `elasticsearch`
+ `http_client`
+ `kafka`
+ `kinesis`
+ `kinesis_firehose`
+ `sqs`
+ `dynamodb`
+ `elasticsearch`
+ `http_client`
+ `kafka`
+ `kinesis`
+ `kinesis_firehose`
+ `sqs`
- More TRACE level logs added throughout the pipeline.
- Operator `delete` added to `cache` processor.
- Operator `explode` added to `json` processor.
Expand Down Expand Up @@ -2017,14 +2031,14 @@ This is a major version release, for more information and guidance on how to mig
- Names of `process_dag` stages must now match the regexp `[a-zA-Z0-9_-]+`.
- Go API: buffer constructors now take a `types.Manager` argument in parity with other components.
- JSON dot paths within the following components have been updated to allow array-based operations:
+ `awk` processor
+ `json` processor
+ `process_field` processor
+ `process_map` processor
+ `check_field` condition
+ `json_field` function interpolation
+ `s3` input
+ `dynamodb` output
+ `awk` processor
+ `json` processor
+ `process_field` processor
+ `process_map` processor
+ `check_field` condition
+ `json_field` function interpolation
+ `s3` input
+ `dynamodb` output

### Fixed

Expand Down Expand Up @@ -3518,8 +3532,8 @@ This is a major version released due to a series of minor breaking changes, you
- New `batch` processor for combining payloads up to a number of bytes.
- New `conditional` processor, allows you to configure a chain of processors to only be run if the payload passes a `condition`.
- New `--stream` mode features:
+ POST verb for `/streams` path now supported.
+ New `--streams-dir` flag for parsing a directory of stream configs.
+ POST verb for `/streams` path now supported.
+ New `--streams-dir` flag for parsing a directory of stream configs.

### Changed

Expand Down
54 changes: 46 additions & 8 deletions internal/bloblang/query/methods_strings.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ var _ = registerSimpleMethod(
"encrypt_aes", "",
).InCategory(
MethodCategoryEncoding,
"Encrypts a string or byte array target according to a chosen AES encryption method and returns a string result. The algorithms require a key and an initialization vector / nonce. Available schemes are: `ctr`, `ofb`, `cbc`.",
"Encrypts a string or byte array target according to a chosen AES encryption method and returns a string result. The algorithms require a key and an initialization vector / nonce. Available schemes are: `ctr`, `gcm`, `ofb`, `cbc`.",
NewExampleSpec("",
`let key = "2b7e151628aed2a6abf7158809cf4f3c".decode("hex")
let vector = "f0f1f2f3f4f5f6f7f8f9fafbfcfdfeff".decode("hex")
Expand All @@ -290,7 +290,7 @@ root.encrypted = this.value.encrypt_aes("ctr", $key, $vector).encode("hex")`,
`{"encrypted":"84e9b31ff7400bdf80be7254"}`,
),
).
Param(ParamString("scheme", "The scheme to use for encryption, one of `ctr`, `ofb`, `cbc`.")).
Param(ParamString("scheme", "The scheme to use for encryption, one of `ctr`, `gcm`, `ofb`, `cbc`.")).
Param(ParamString("key", "A key to encrypt with.")).
Param(ParamString("iv", "An initialization vector / nonce.")),
func(args *ParsedParams) (simpleMethod, error) {
Expand All @@ -312,8 +312,16 @@ root.encrypted = this.value.encrypt_aes("ctr", $key, $vector).encode("hex")`,
return nil, err
}
iv := []byte(ivStr)
if len(iv) != block.BlockSize() {
return nil, errors.New("the key must match the initialisation vector size")

switch schemeStr {
case "ctr":
fallthrough
case "ofb":
fallthrough
case "cbc":
if len(iv) != block.BlockSize() {
return nil, errors.New("the key must match the initialisation vector size")
}
}

var schemeFn func([]byte) (string, error)
Expand All @@ -325,6 +333,16 @@ root.encrypted = this.value.encrypt_aes("ctr", $key, $vector).encode("hex")`,
stream.XORKeyStream(ciphertext, b)
return string(ciphertext), nil
}
case "gcm":
schemeFn = func(b []byte) (string, error) {
ciphertext := make([]byte, 0, len(b))
stream, err := cipher.NewGCM(block)
if err != nil {
return "", fmt.Errorf("creating gcm failed: %w", err)
}
ciphertext = stream.Seal(ciphertext, iv, b, nil)
return string(ciphertext), nil
}
case "ofb":
schemeFn = func(b []byte) (string, error) {
ciphertext := make([]byte, len(b))
Expand Down Expand Up @@ -369,7 +387,7 @@ var _ = registerSimpleMethod(
"decrypt_aes", "",
).InCategory(
MethodCategoryEncoding,
"Decrypts an encrypted string or byte array target according to a chosen AES encryption method and returns the result as a byte array. The algorithms require a key and an initialization vector / nonce. Available schemes are: `ctr`, `ofb`, `cbc`.",
"Decrypts an encrypted string or byte array target according to a chosen AES encryption method and returns the result as a byte array. The algorithms require a key and an initialization vector / nonce. Available schemes are: `ctr`, `gcm`, `ofb`, `cbc`.",
NewExampleSpec("",
`let key = "2b7e151628aed2a6abf7158809cf4f3c".decode("hex")
let vector = "f0f1f2f3f4f5f6f7f8f9fafbfcfdfeff".decode("hex")
Expand All @@ -378,7 +396,7 @@ root.decrypted = this.value.decode("hex").decrypt_aes("ctr", $key, $vector).stri
`{"decrypted":"hello world!"}`,
),
).
Param(ParamString("scheme", "The scheme to use for decryption, one of `ctr`, `ofb`, `cbc`.")).
Param(ParamString("scheme", "The scheme to use for decryption, one of `ctr`, `gcm`, `ofb`, `cbc`.")).
Param(ParamString("key", "A key to decrypt with.")).
Param(ParamString("iv", "An initialization vector / nonce.")),
func(args *ParsedParams) (simpleMethod, error) {
Expand All @@ -401,8 +419,15 @@ root.decrypted = this.value.decode("hex").decrypt_aes("ctr", $key, $vector).stri
return nil, err
}
iv := []byte(ivStr)
if len(iv) != block.BlockSize() {
return nil, errors.New("the key must match the initialisation vector size")
switch schemeStr {
case "ctr":
fallthrough
case "ofb":
fallthrough
case "cbc":
if len(iv) != block.BlockSize() {
return nil, errors.New("the key must match the initialisation vector size")
}
}

var schemeFn func([]byte) ([]byte, error)
Expand All @@ -414,6 +439,19 @@ root.decrypted = this.value.decode("hex").decrypt_aes("ctr", $key, $vector).stri
stream.XORKeyStream(plaintext, b)
return plaintext, nil
}
case "gcm":
schemeFn = func(b []byte) ([]byte, error) {
plaintext := make([]byte, 0, len(b))
stream, err := cipher.NewGCM(block)
if err != nil {
return nil, fmt.Errorf("creating gcm failed: %w", err)
}
plaintext, err = stream.Open(plaintext, iv, b, nil)
if err != nil {
return nil, fmt.Errorf("gcm decrypting failed: %w", err)
}
return plaintext, nil
}
case "ofb":
schemeFn = func(b []byte) ([]byte, error) {
plaintext := make([]byte, len(b))
Expand Down
40 changes: 40 additions & 0 deletions internal/bloblang/query/methods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1690,6 +1690,46 @@ func TestMethods(t *testing.T) {
),
output: `hello world!`,
},
"check aes-gcm encryption": {
input: methods(
methods(
literalFn("007c5e5b3e59df24a7c355584fc1518d"),
method("decode", "hex"),
),
method(
"encrypt_aes", "gcm",
methods(
literalFn("feffe9928665731c6d6a8f9467308308feffe9928665731c6d6a8f9467308308"),
method("decode", "hex"),
),
methods(
literalFn("54cc7dc2c37ec006bcc6d1da"),
method("decode", "hex"),
),
),
method("encode", "hex"),
),
output: `d50b9e252b70945d4240d351677eb10f937cdaef6f2822b6a3191654ba41b197`,
},
"check aes-gcm decryption": {
input: methods(
literalFn("d50b9e252b70945d4240d351677eb10f937cdaef6f2822b6a3191654ba41b197"),
method("decode", "hex"),
method(
"decrypt_aes", "gcm",
methods(
literalFn("feffe9928665731c6d6a8f9467308308feffe9928665731c6d6a8f9467308308"),
method("decode", "hex"),
),
methods(
literalFn("54cc7dc2c37ec006bcc6d1da"),
method("decode", "hex"),
),
),
method("encode", "hex"),
),
output: `007c5e5b3e59df24a7c355584fc1518d`,
},
"check aes-ofb encryption": {
input: methods(
literalFn("hello world!"),
Expand Down
32 changes: 22 additions & 10 deletions internal/impl/io/input_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func websocketInputSpec() *service.ConfigSpec {
service.NewURLField("url").
Description("The URL to connect to.").
Example("ws://localhost:4195/get/ws"),
service.NewURLField("proxy_url").
Description("An optional HTTP proxy URL.").
Advanced().Optional(),
service.NewStringField("open_message").
Description("An optional message to send to the server upon connection.").
Advanced().Optional(),
Expand Down Expand Up @@ -92,12 +95,13 @@ type websocketReader struct {

lock *sync.Mutex

client *websocket.Conn
urlParsed *url.URL
urlStr string
tlsEnabled bool
tlsConf *tls.Config
reqSigner func(f fs.FS, req *http.Request) error
client *websocket.Conn
urlParsed *url.URL
urlStr string
proxyURLParsed *url.URL
tlsEnabled bool
tlsConf *tls.Config
reqSigner func(f fs.FS, req *http.Request) error

openMsgType wsOpenMsgType
openMsg []byte
Expand All @@ -116,6 +120,11 @@ func newWebsocketReaderFromParsed(conf *service.ParsedConfig, mgr bundle.NewMana
if ws.urlStr, err = conf.FieldString("url"); err != nil {
return nil, err
}
if conf.Contains("proxy_url") {
if ws.proxyURLParsed, err = conf.FieldURL("proxy_url"); err != nil {
return nil, err
}
}
if ws.tlsConf, ws.tlsEnabled, err = conf.FieldTLSToggled("tls"); err != nil {
return nil, err
}
Expand Down Expand Up @@ -169,14 +178,17 @@ func (w *websocketReader) Connect(ctx context.Context) error {
}
}()

dialer := *websocket.DefaultDialer
if w.proxyURLParsed != nil {
dialer.Proxy = http.ProxyURL(w.proxyURLParsed)
}

if w.tlsEnabled {
dialer := websocket.Dialer{
TLSClientConfig: w.tlsConf,
}
dialer.TLSClientConfig = w.tlsConf
if client, res, err = dialer.Dial(w.urlStr, headers); err != nil {
return err
}
} else if client, res, err = websocket.DefaultDialer.Dial(w.urlStr, headers); err != nil {
} else if client, res, err = dialer.Dial(w.urlStr, headers); err != nil {
return err
}

Expand Down
Loading

0 comments on commit 9e1621d

Please sign in to comment.