diff --git a/CHANGELOG.md b/CHANGELOG.md index 862c7ef39a..7ac8fa0285 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ All notable changes to this project will be documented in this file. - New experimental `gcp_vertex_ai_embeddings` processor. (@rockwotj) - New experimental `aws_bedrock_embeddings` processor. (@rockwotj) - New experimental `cohere_chat` and `cohere_embeddings` processors. (@rockwotj) +- New experimental `questdb` output. (@sklarsa) ## 4.36.0 - 2024-09-11 diff --git a/docs/modules/components/pages/outputs/questdb.adoc b/docs/modules/components/pages/outputs/questdb.adoc new file mode 100644 index 0000000000..115f1613fd --- /dev/null +++ b/docs/modules/components/pages/outputs/questdb.adoc @@ -0,0 +1,535 @@ += questdb +:type: output +:status: experimental +:categories: ["Services"] + + + +//// + THIS FILE IS AUTOGENERATED! + + To make changes, edit the corresponding source file under: + + https://github.com/redpanda-data/connect/tree/main/internal/impl/. + + And: + + https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl +//// + +// © 2024 Redpanda Data Inc. + + +component_type_dropdown::[] + + +Pushes messages to a QuestDB table + + +[tabs] +====== +Common:: ++ +-- + +```yml +# Common config fields, showing default values +output: + label: "" + questdb: + max_in_flight: 64 + batching: + count: 0 + byte_size: 0 + period: "" + check: "" + address: localhost:9000 # No default (required) + username: "" # No default (optional) + password: "" # No default (optional) + token: "" # No default (optional) + table: trades # No default (required) + designated_timestamp_field: "" # No default (optional) + designated_timestamp_unit: auto + timestamp_string_fields: [] # No default (optional) + timestamp_string_format: Jan _2 15:04:05.000000Z0700 + symbols: [] # No default (optional) + doubles: [] # No default (optional) + error_on_empty_messages: false +``` + +-- +Advanced:: ++ +-- + +```yml +# All config fields, showing default values +output: + label: "" + questdb: + max_in_flight: 64 + batching: + count: 0 + byte_size: 0 + period: "" + check: "" + processors: [] # No default (optional) + tls: + enabled: false + skip_cert_verify: false + enable_renegotiation: false + root_cas: "" + root_cas_file: "" + client_certs: [] + address: localhost:9000 # No default (required) + username: "" # No default (optional) + password: "" # No default (optional) + token: "" # No default (optional) + retry_timeout: "" # No default (optional) + request_timeout: "" # No default (optional) + request_min_throughput: 0 # No default (optional) + table: trades # No default (required) + designated_timestamp_field: "" # No default (optional) + designated_timestamp_unit: auto + timestamp_string_fields: [] # No default (optional) + timestamp_string_format: Jan _2 15:04:05.000000Z0700 + symbols: [] # No default (optional) + doubles: [] # No default (optional) + error_on_empty_messages: false +``` + +-- +====== + +Important: We recommend that the dedupe feature is enabled on the QuestDB server. Please visit https://questdb.io/docs/ for more information about deploying, configuring, and using QuestDB. + +== Performance + +This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the field `max_in_flight`. + +This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more xref:configuration:batching.adoc[in this doc]. + +== Fields + +=== `max_in_flight` + +The maximum number of messages to have in flight at a given time. Increase this to improve throughput. + + +*Type*: `int` + +*Default*: `64` + +=== `batching` + +Allows you to configure a xref:configuration:batching.adoc[batching policy]. + + +*Type*: `object` + + +```yml +# Examples + +batching: + byte_size: 5000 + count: 0 + period: 1s + +batching: + count: 10 + period: 1s + +batching: + check: this.contains("END BATCH") + count: 0 + period: 1m +``` + +=== `batching.count` + +A number of messages at which the batch should be flushed. If `0` disables count based batching. + + +*Type*: `int` + +*Default*: `0` + +=== `batching.byte_size` + +An amount of bytes at which the batch should be flushed. If `0` disables size based batching. + + +*Type*: `int` + +*Default*: `0` + +=== `batching.period` + +A period in which an incomplete batch should be flushed regardless of its size. + + +*Type*: `string` + +*Default*: `""` + +```yml +# Examples + +period: 1s + +period: 1m + +period: 500ms +``` + +=== `batching.check` + +A xref:guides:bloblang/about.adoc[Bloblang query] that should return a boolean value indicating whether a message should end a batch. + + +*Type*: `string` + +*Default*: `""` + +```yml +# Examples + +check: this.type == "end_of_transaction" +``` + +=== `batching.processors` + +A list of xref:components:processors/about.adoc[processors] to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. + + +*Type*: `array` + + +```yml +# Examples + +processors: + - archive: + format: concatenate + +processors: + - archive: + format: lines + +processors: + - archive: + format: json_array +``` + +=== `tls` + +Custom TLS settings can be used to override system defaults. + + +*Type*: `object` + + +=== `tls.enabled` + +Whether custom TLS settings are enabled. + + +*Type*: `bool` + +*Default*: `false` + +=== `tls.skip_cert_verify` + +Whether to skip server side certificate verification. + + +*Type*: `bool` + +*Default*: `false` + +=== `tls.enable_renegotiation` + +Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you're seeing the error message `local error: tls: no renegotiation`. + + +*Type*: `bool` + +*Default*: `false` +Requires version 3.45.0 or newer + +=== `tls.root_cas` + +An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate. +[CAUTION] +==== +This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. +==== + + + +*Type*: `string` + +*Default*: `""` + +```yml +# Examples + +root_cas: |- + -----BEGIN CERTIFICATE----- + ... + -----END CERTIFICATE----- +``` + +=== `tls.root_cas_file` + +An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate. + + +*Type*: `string` + +*Default*: `""` + +```yml +# Examples + +root_cas_file: ./root_cas.pem +``` + +=== `tls.client_certs` + +A list of client certificates to use. For each certificate either the fields `cert` and `key`, or `cert_file` and `key_file` should be specified, but not both. + + +*Type*: `array` + +*Default*: `[]` + +```yml +# Examples + +client_certs: + - cert: foo + key: bar + +client_certs: + - cert_file: ./example.pem + key_file: ./example.key +``` + +=== `tls.client_certs[].cert` + +A plain text certificate to use. + + +*Type*: `string` + +*Default*: `""` + +=== `tls.client_certs[].key` + +A plain text certificate key to use. +[CAUTION] +==== +This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. +==== + + + +*Type*: `string` + +*Default*: `""` + +=== `tls.client_certs[].cert_file` + +The path of a certificate to use. + + +*Type*: `string` + +*Default*: `""` + +=== `tls.client_certs[].key_file` + +The path of a certificate key to use. + + +*Type*: `string` + +*Default*: `""` + +=== `tls.client_certs[].password` + +A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete `pbeWithMD5AndDES-CBC` algorithm is not supported for the PKCS#8 format. + +Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext. +[CAUTION] +==== +This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. +==== + + + +*Type*: `string` + +*Default*: `""` + +```yml +# Examples + +password: foo + +password: ${KEY_PASSWORD} +``` + +=== `address` + +Address of the QuestDB server's HTTP port (excluding protocol) + + +*Type*: `string` + + +```yml +# Examples + +address: localhost:9000 +``` + +=== `username` + +Username for HTTP basic auth +[CAUTION] +==== +This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. +==== + + + +*Type*: `string` + + +=== `password` + +Password for HTTP basic auth +[CAUTION] +==== +This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. +==== + + + +*Type*: `string` + + +=== `token` + +Bearer token for HTTP auth (takes precedence over basic auth username & password) +[CAUTION] +==== +This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. +==== + + + +*Type*: `string` + + +=== `retry_timeout` + +The time to continue retrying after a failed HTTP request. The interval between retries is an exponential backoff starting at 10ms and doubling after each failed attempt up to a maximum of 1 second. + + +*Type*: `string` + + +=== `request_timeout` + +The time to wait for a response from the server. This is in addition to the calculation derived from the request_min_throughput parameter. + + +*Type*: `string` + + +=== `request_min_throughput` + +Minimum expected throughput in bytes per second for HTTP requests. If the throughput is lower than this value, the connection will time out. This is used to calculate an additional timeout on top of request_timeout. This is useful for large requests. You can set this value to 0 to disable this logic. + + +*Type*: `int` + + +=== `table` + +Destination table + + +*Type*: `string` + + +```yml +# Examples + +table: trades +``` + +=== `designated_timestamp_field` + +Name of the designated timestamp field + + +*Type*: `string` + + +=== `designated_timestamp_unit` + +Designated timestamp field units + + +*Type*: `string` + +*Default*: `"auto"` + +=== `timestamp_string_fields` + +String fields with textual timestamps + + +*Type*: `array` + + +=== `timestamp_string_format` + +Timestamp format, used when parsing timestamp string fields. Specified in golang's time.Parse layout + + +*Type*: `string` + +*Default*: `"Jan _2 15:04:05.000000Z0700"` + +=== `symbols` + +Columns that should be the SYMBOL type (string values default to STRING) + + +*Type*: `array` + + +=== `doubles` + +Columns that should be double type, (int is default) + + +*Type*: `array` + + +=== `error_on_empty_messages` + +Mark a message as errored if it is empty after field validation + + +*Type*: `bool` + +*Default*: `false` + + diff --git a/go.mod b/go.mod index f8ee0071e4..f235a1eda1 100644 --- a/go.mod +++ b/go.mod @@ -92,6 +92,7 @@ require ( github.com/prometheus/common v0.55.0 github.com/pusher/pusher-http-go v4.0.1+incompatible github.com/qdrant/go-client v1.11.1 + github.com/questdb/go-questdb-client/v3 v3.2.0 github.com/r3labs/diff/v3 v3.0.1 github.com/rabbitmq/amqp091-go v1.10.0 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 @@ -278,7 +279,7 @@ require ( github.com/itchyny/gojq v0.12.16 // indirect github.com/itchyny/timefmt-go v0.1.6 // indirect github.com/jackc/chunkreader/v2 v2.0.1 // indirect - github.com/jackc/pgconn v1.14.3 // indirect + github.com/jackc/pgconn v1.14.3 github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgproto3/v2 v2.3.3 // indirect diff --git a/go.sum b/go.sum index c7c5055e22..9efc0606b2 100644 --- a/go.sum +++ b/go.sum @@ -1011,6 +1011,8 @@ github.com/pusher/pusher-http-go v4.0.1+incompatible h1:4u6tomPG1WhHaST7Wi9mw83Y github.com/pusher/pusher-http-go v4.0.1+incompatible/go.mod h1:XAv1fxRmVTI++2xsfofDhg7whapsLRG/gH/DXbF3a18= github.com/qdrant/go-client v1.11.1 h1:kla7n21wSEWWZLrvpttTOnCppDm6jluYDZEFe2kJ8zs= github.com/qdrant/go-client v1.11.1/go.mod h1:zFa6t5Y3Oqecoa0aSsGWhMqQWq3x3kTPvm0sMf5qplw= +github.com/questdb/go-questdb-client/v3 v3.2.0 h1:rFlkc3tD+vNucd4dkNv2xN5xqcFJGwqxt3F5p2H8zrg= +github.com/questdb/go-questdb-client/v3 v3.2.0/go.mod h1:kXoftTVQZlksdJ9tsHQRWfdWO5Kyl4bZuKotyyeWa3c= github.com/quipo/dependencysolver v0.0.0-20170801134659-2b009cb4ddcc h1:hK577yxEJ2f5s8w2iy2KimZmgrdAUZUNftE1ESmg2/Q= github.com/quipo/dependencysolver v0.0.0-20170801134659-2b009cb4ddcc/go.mod h1:OQt6Zo5B3Zs+C49xul8kcHo+fZ1mCLPvd0LFxiZ2DHc= github.com/r3labs/diff/v3 v3.0.1 h1:CBKqf3XmNRHXKmdU7mZP1w7TV0pDyVCis1AUHtA4Xtg= diff --git a/internal/impl/aws/enterprise/processor_bedrock_embeddings.go b/internal/impl/aws/enterprise/processor_bedrock_embeddings.go index 949340cbf5..3aef321c9f 100644 --- a/internal/impl/aws/enterprise/processor_bedrock_embeddings.go +++ b/internal/impl/aws/enterprise/processor_bedrock_embeddings.go @@ -18,6 +18,7 @@ import ( "github.com/redpanda-data/benthos/v4/public/service" amzn "github.com/aws/aws-sdk-go-v2/aws" + "github.com/redpanda-data/connect/v4/internal/impl/aws" "github.com/redpanda-data/connect/v4/internal/impl/aws/config" ) diff --git a/internal/impl/cohere/chat_processor.go b/internal/impl/cohere/chat_processor.go index dd0a13cb15..32a10bdf93 100644 --- a/internal/impl/cohere/chat_processor.go +++ b/internal/impl/cohere/chat_processor.go @@ -17,6 +17,7 @@ import ( cohere "github.com/cohere-ai/cohere-go/v2" "github.com/redpanda-data/benthos/v4/public/service" + "github.com/redpanda-data/connect/v4/internal/impl/confluent/sr" ) diff --git a/internal/impl/questdb/integration_test.go b/internal/impl/questdb/integration_test.go new file mode 100644 index 0000000000..e204cf9863 --- /dev/null +++ b/internal/impl/questdb/integration_test.go @@ -0,0 +1,107 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package questdb + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "testing" + "time" + + "github.com/jackc/pgconn" + qdb "github.com/questdb/go-questdb-client/v3" + + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/redpanda-data/benthos/v4/public/service/integration" +) + +func TestIntegrationQuestDB(t *testing.T) { + ctx := context.Background() + + integration.CheckSkip(t) + t.Parallel() + + pool, err := dockertest.NewPool("") + require.NoError(t, err) + + pool.MaxWait = time.Minute * 3 + resource, err := pool.Run("questdb/questdb", "8.0.0", []string{ + "JAVA_OPTS=-Xms512m -Xmx512m", + }) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, pool.Purge(resource)) + }) + + if err = pool.Retry(func() error { + clientConfStr := fmt.Sprintf("http::addr=localhost:%v", resource.GetPort("9000/tcp")) + sender, err := qdb.LineSenderFromConf(ctx, clientConfStr) + if err != nil { + return err + } + defer sender.Close(ctx) + err = sender.Table("ping").Int64Column("test", 42).AtNow(ctx) + if err != nil { + return err + } + return sender.Flush(ctx) + }); err != nil { + t.Fatalf("Could not connect to docker resource: %s", err) + } + + _ = resource.Expire(900) + + template := ` +output: + questdb: + address: "localhost:$PORT" + table: $ID +` + queryGetFn := func(ctx context.Context, testID, messageID string) (string, []string, error) { + pgConn, err := pgconn.Connect(ctx, fmt.Sprintf("postgresql://admin:quest@localhost:%v", resource.GetPort("8812/tcp"))) + require.NoError(t, err) + defer pgConn.Close(ctx) + + result := pgConn.ExecParams(ctx, fmt.Sprintf("SELECT content, id FROM '%v' WHERE id=%v", testID, messageID), nil, nil, nil, nil) + + result.NextRow() + id, err := strconv.Atoi(string(result.Values()[1])) + assert.NoError(t, err) + data := map[string]any{ + "content": string(result.Values()[0]), + "id": id, + } + + assert.False(t, result.NextRow()) + + outputBytes, err := json.Marshal(data) + require.NoError(t, err) + return string(outputBytes), nil, nil + } + + suite := integration.StreamTests( + integration.StreamTestOutputOnlySendSequential(10, queryGetFn), + integration.StreamTestOutputOnlySendBatch(10, queryGetFn), + ) + suite.Run( + t, template, + integration.StreamTestOptPort(resource.GetPort("9000/tcp")), + ) +} diff --git a/internal/impl/questdb/output.go b/internal/impl/questdb/output.go new file mode 100644 index 0000000000..c405ff1772 --- /dev/null +++ b/internal/impl/questdb/output.go @@ -0,0 +1,520 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package questdb + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "time" + + qdb "github.com/questdb/go-questdb-client/v3" + "github.com/redpanda-data/benthos/v4/public/service" +) + +func questdbOutputConfig() *service.ConfigSpec { + return service.NewConfigSpec(). + Summary("Pushes messages to a QuestDB table"). + Description("Important: We recommend that the dedupe feature is enabled on the QuestDB server. "+ + "Please visit https://questdb.io/docs/ for more information about deploying, configuring, and using QuestDB."+ + service.OutputPerformanceDocs(true, true)). + Categories("Services"). + Fields( + service.NewOutputMaxInFlightField(), + service.NewBatchPolicyField("batching"), + service.NewTLSToggledField("tls"), + service.NewStringField("address"). + Description("Address of the QuestDB server's HTTP port (excluding protocol)"). + Example("localhost:9000"), + service.NewStringField("username"). + Description("Username for HTTP basic auth"). + Optional(). + Secret(), + service.NewStringField("password"). + Description("Password for HTTP basic auth"). + Optional(). + Secret(), + service.NewStringField("token"). + Description("Bearer token for HTTP auth (takes precedence over basic auth username & password)"). + Optional(). + Secret(), + service.NewDurationField("retry_timeout"). + Description("The time to continue retrying after a failed HTTP request. The interval between retries is an exponential "+ + "backoff starting at 10ms and doubling after each failed attempt up to a maximum of 1 second."). + Optional(). + Advanced(), + service.NewDurationField("request_timeout"). + Description("The time to wait for a response from the server. This is in addition to the calculation "+ + "derived from the request_min_throughput parameter."). + Optional(). + Advanced(), + service.NewIntField("request_min_throughput"). + Description("Minimum expected throughput in bytes per second for HTTP requests. If the throughput is lower than this value, "+ + "the connection will time out. This is used to calculate an additional timeout on top of request_timeout. This is useful for large requests. "+ + "You can set this value to 0 to disable this logic."). + Optional(). + Advanced(), + service.NewStringField("table"). + Description("Destination table"). + Example("trades"), + service.NewStringField("designated_timestamp_field"). + Description("Name of the designated timestamp field"). + Optional(), + service.NewStringField("designated_timestamp_unit"). + Description("Designated timestamp field units"). + Default("auto"). + LintRule(`root = if ["nanos","micros","millis","seconds","auto"].contains(this) != true { [ "valid options are \"nanos\", \"micros\", \"millis\", \"seconds\", \"auto\"" ] }`). + Optional(), + service.NewStringListField("timestamp_string_fields"). + Description("String fields with textual timestamps"). + Optional(), + service.NewStringField("timestamp_string_format"). + Description("Timestamp format, used when parsing timestamp string fields. Specified in golang's time.Parse layout"). + Default(time.StampMicro+"Z0700"). + Optional(), + service.NewStringListField("symbols"). + Description("Columns that should be the SYMBOL type (string values default to STRING)"). + Optional(), + service.NewStringListField("doubles"). + Description("Columns that should be double type, (int is default)"). + Optional(), + service.NewBoolField("error_on_empty_messages"). + Description("Mark a message as errored if it is empty after field validation"). + Optional(). + Default(false), + ) +} + +type questdbWriter struct { + log *service.Logger + + pool *qdb.LineSenderPool + transport *http.Transport + + address string + symbols map[string]bool + doubles map[string]bool + table string + designatedTimestampField string + designatedTimestampUnit timestampUnit + timestampStringFormat string + timestampStringFields map[string]bool + errorOnEmptyMessages bool +} + +func fromConf(conf *service.ParsedConfig, mgr *service.Resources) (out service.BatchOutput, batchPol service.BatchPolicy, mif int, err error) { + + if batchPol, err = conf.FieldBatchPolicy("batching"); err != nil { + return + } + + if mif, err = conf.FieldMaxInFlight(); err != nil { + return + } + + // We force the use of HTTP connections (instead of TCP) and + // disable the QuestDB LineSender[s] auto flush to force the client + // to send data over the wire only once, when a MessageBatch has been + // completely processed. + opts := []qdb.LineSenderOption{ + qdb.WithHttp(), + qdb.WithAutoFlushDisabled(), + } + + // Now, we process options for and construct the LineSenderPool + // which is used to send data to QuestDB using Influx Line Protocol + + var addr string + if addr, err = conf.FieldString("address"); err != nil { + return + } + opts = append(opts, qdb.WithAddress(addr)) + + if conf.Contains("retry_timeout") { + var retryTimeout time.Duration + if retryTimeout, err = conf.FieldDuration("retry_timeout"); err != nil { + return + } + opts = append(opts, qdb.WithRetryTimeout(retryTimeout)) + } + + if conf.Contains("request_timeout") { + var requestTimeout time.Duration + if requestTimeout, err = conf.FieldDuration("request_timeout"); err != nil { + return + } + opts = append(opts, qdb.WithRequestTimeout(requestTimeout)) + } + + if conf.Contains("request_min_throughput") { + var requestMinThroughput int + if requestMinThroughput, err = conf.FieldInt("request_min_throughput"); err != nil { + return + } + opts = append(opts, qdb.WithMinThroughput(requestMinThroughput)) + } + + if conf.Contains("token") { + var token string + if token, err = conf.FieldString("token"); err != nil { + return + } + opts = append(opts, qdb.WithBearerToken(token)) + } + + if conf.Contains("username") && conf.Contains("password") { + var username, password string + if username, err = conf.FieldString("username"); err != nil { + return + } + if password, err = conf.FieldString("password"); err != nil { + return + } + opts = append(opts, qdb.WithBasicAuth(username, password)) + + } + + // Use a common http transport with user-defined TLS config + transport := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + MaxConnsPerHost: 0, + MaxIdleConns: 64, + MaxIdleConnsPerHost: 64, + IdleConnTimeout: 120 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + } + + tlsConf, tlsEnabled, err := conf.FieldTLSToggled("tls") + if err != nil { + return + } + + if tlsEnabled { + opts = append(opts, qdb.WithTls()) + transport.TLSClientConfig = tlsConf + } + + opts = append(opts, qdb.WithHttpTransport(transport)) + + // Allocate the QuestDBWriter which wraps the LineSenderPool + w := &questdbWriter{ + address: addr, + log: mgr.Logger(), + symbols: map[string]bool{}, + doubles: map[string]bool{}, + timestampStringFields: map[string]bool{}, + transport: transport, + } + out = w + w.pool, err = qdb.PoolFromOptions(opts...) + if err != nil { + return + } + + // Apply pool-level options + // todo: is this the correct interpretation of max-in-flight? + qdb.WithMaxSenders(mif)(w.pool) + + // Configure the questdbWriter with additional options + + if w.table, err = conf.FieldString("table"); err != nil { + return + } + + // Symbols, doubles, and timestampStringFields are stored in maps + // for fast lookup. + var symbols []string + if conf.Contains("symbols") { + if symbols, err = conf.FieldStringList("symbols"); err != nil { + return + } + for _, s := range symbols { + w.symbols[s] = true + } + } + + var doubles []string + if conf.Contains("doubles") { + if doubles, err = conf.FieldStringList("doubles"); err != nil { + return + } + for _, d := range doubles { + w.doubles[d] = true + } + } + + var timestampStringFields []string + if conf.Contains("timestamp_string_fields") { + if timestampStringFields, err = conf.FieldStringList("timestamp_string_fields"); err != nil { + return + } + for _, f := range timestampStringFields { + w.timestampStringFields[f] = true + } + } + + if conf.Contains("designated_timestamp_field") { + if w.designatedTimestampField, err = conf.FieldString("designated_timestamp_field"); err != nil { + return + } + } + + var designatedTimestampUnit string + if conf.Contains("designated_timestamp_unit") { + if designatedTimestampUnit, err = conf.FieldString("designated_timestamp_unit"); err != nil { + return + } + + // perform validation on timestamp units here in case the user doesn't lint the config + w.designatedTimestampUnit = timestampUnit(designatedTimestampUnit) + if !w.designatedTimestampUnit.IsValid() { + err = fmt.Errorf("%v is not a valid timestamp unit", designatedTimestampUnit) + return + } + } + + if conf.Contains("timestamp_string_format") { + if w.timestampStringFormat, err = conf.FieldString("timestamp_string_format"); err != nil { + return + } + } + + if w.errorOnEmptyMessages, err = conf.FieldBool("error_on_empty_messages"); err != nil { + return + } + + return +} + +func (q *questdbWriter) Connect(ctx context.Context) error { + // No connections are required to initialize a LineSenderPool, + // so nothing to do here. Each LineSender has its own http client + // that will use the network only when flushing messages to the server. + return nil +} + +func (q *questdbWriter) parseTimestamp(v any) (time.Time, error) { + switch val := v.(type) { + case string: + t, err := time.Parse(q.timestampStringFormat, val) + if err != nil { + q.log.Errorf("could not parse timestamp field %v", err) + } + return t, err + case json.Number: + intVal, err := val.Int64() + if err != nil { + q.log.Errorf("numerical timestamps must be int64: %v", err) + } + return q.designatedTimestampUnit.From(intVal), err + default: + err := fmt.Errorf("unsupported type %T for designated timestamp: %v", v, v) + q.log.Error(err.Error()) + return time.Time{}, err + } +} + +func (q *questdbWriter) WriteBatch(ctx context.Context, batch service.MessageBatch) (err error) { + sender, err := q.pool.Sender(ctx) + if err != nil { + return err + } + + err = batch.WalkWithBatchedErrors(func(i int, m *service.Message) (err error) { + // QuestDB's LineSender constructs ILP messages using a buffer, so message + // components must be written in the correct order, otherwise the sender will + // return an error. This order is: + // 1. Table Name + // 2. Symbols (key/value pairs) + // 3. Columns (key/value pairs) + // 4. Timestamp [optional] + // + // Before writing any column, we call Table(), which is guaranteed to run once. + // hasTable flag is used for that. + var hasTable bool + + q.log.Tracef("Writing message %v", i) + + jVal, err := m.AsStructured() + if err != nil { + err = fmt.Errorf("unable to parse JSON: %v", err) + m.SetError(err) + return err + } + jObj, ok := jVal.(map[string]any) + if !ok { + err = fmt.Errorf("expected JSON object, found '%T'", jVal) + m.SetError(err) + return err + } + + // Stage 1: Handle all symbols, which must be written to the buffer first + for s := range q.symbols { + v, found := jObj[s] + if found { + if !hasTable { + sender.Table(q.table) + hasTable = true + } + switch val := v.(type) { + case string: + sender.Symbol(s, val) + default: + sender.Symbol(s, fmt.Sprintf("%v", val)) + } + } + } + + // Stage 2: Handle columns + for k, v := range jObj { + // Skip designated timestamp field (will process this in the 3rd stage) + if q.designatedTimestampField == k { + continue + } + + // Skip symbols (already processed in 1st stage) + if _, isSymbol := q.symbols[k]; isSymbol { + continue + } + + // For all non-timestamp fields, process values by JSON types since we are working + // with structured messages + switch val := v.(type) { + case string: + // Check if the field is a timestamp and process accordingly + if _, isTimestampField := q.timestampStringFields[k]; isTimestampField { + timestamp, err := q.parseTimestamp(v) + if err == nil { + if !hasTable { + sender.Table(q.table) + hasTable = true + } + sender.TimestampColumn(k, timestamp) + } else { + q.log.Errorf("%v", err) + } + continue + } + + if !hasTable { + sender.Table(q.table) + hasTable = true + } + sender.StringColumn(k, val) + case bool: + if !hasTable { + sender.Table(q.table) + hasTable = true + } + sender.BoolColumn(k, val) + case json.Number: + // For json numbers, assume int unless column is explicitly marked as a double + if _, isDouble := q.doubles[k]; isDouble { + floatVal, err := val.Float64() + if err != nil { + q.log.Errorf("could not parse %v into a double: %v", val, err) + } + + if !hasTable { + sender.Table(q.table) + hasTable = true + } + sender.Float64Column(k, floatVal) + } else { + intVal, err := val.Int64() + if err != nil { + q.log.Errorf("could not parse %v into an integer: %v", val, err) + } + + if !hasTable { + sender.Table(q.table) + hasTable = true + } + sender.Int64Column(k, intVal) + } + case float64: + // float64 is only needed if BENTHOS_USE_NUMBER=false + if !hasTable { + sender.Table(q.table) + hasTable = true + } + sender.Float64Column(k, float64(val)) + default: + q.log.Errorf("unsupported type %T for field %v", v, k) + } + } + + // Stage 3: Handle designated timestamp and finalize the buffered message + var designatedTimestamp time.Time + if q.designatedTimestampField != "" { + val, found := jObj[q.designatedTimestampField] + if found { + designatedTimestamp, err = q.parseTimestamp(val) + if err != nil { + q.log.Errorf("unable to parse designated timestamp: %v", val) + } + } + } + + if !hasTable { + if q.errorOnEmptyMessages { + err = errors.New("empty message, skipping send to QuestDB") + m.SetError(err) + return err + } + q.log.Warn("empty message, skipping send to QuestDB") + return nil + } + + if !designatedTimestamp.IsZero() { + err = sender.At(ctx, designatedTimestamp) + } else { + err = sender.AtNow(ctx) + } + + if err != nil { + m.SetError(err) + } + return err + }) + + // This will flush the sender, no need to call sender.Flush at the end of the method + releaseErr := sender.Close(ctx) + if releaseErr != nil { + if err != nil { + err = fmt.Errorf("%v %w", err, releaseErr) + } else { + err = releaseErr + } + } + + return err +} + +func (q *questdbWriter) Close(ctx context.Context) error { + return q.pool.Close(ctx) +} + +func init() { + if err := service.RegisterBatchOutput( + "questdb", + questdbOutputConfig(), + fromConf, + ); err != nil { + panic(err) + } +} diff --git a/internal/impl/questdb/output_test.go b/internal/impl/questdb/output_test.go new file mode 100644 index 0000000000..e5b837fe64 --- /dev/null +++ b/internal/impl/questdb/output_test.go @@ -0,0 +1,283 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package questdb + +import ( + "bufio" + "context" + "fmt" + "math" + "net" + "net/http" + "testing" + "time" + + "github.com/redpanda-data/benthos/v4/public/service" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTimestampConversions(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + value int64 + unit timestampUnit + expectedTime time.Time + }{ + { + name: "autoSecondsMin", + value: 0, + unit: auto, + expectedTime: time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC), + }, + { + name: "autoSecondsMax", + value: 9999999999, + unit: auto, + expectedTime: time.Date(2286, 11, 20, 17, 46, 39, 0, time.UTC), + }, + { + name: "autoMillisMin", + value: 10000000000, + unit: auto, + expectedTime: time.Date(1970, 4, 26, 17, 46, 40, 0, time.UTC), + }, + { + name: "autoMillisMax", + value: 9999999999999, + unit: auto, + expectedTime: time.Date(2286, 11, 20, 17, 46, 39, 999000000, time.UTC), + }, + { + name: "autoMicrosMin", + value: 10000000000000, + unit: auto, + expectedTime: time.Date(1970, 4, 26, 17, 46, 40, 0, time.UTC), + }, + { + name: "autoMicrosMax", + value: 9999999999999999, + unit: auto, + expectedTime: time.Date(2286, 11, 20, 17, 46, 39, 999999000, time.UTC), + }, + { + name: "autoNanosMin", + value: 10000000000000000, + unit: auto, + expectedTime: time.Date(1970, 4, 26, 17, 46, 40, 0, time.UTC), + }, + { + name: "autoNanosMax", + value: math.MaxInt64, + unit: auto, + expectedTime: time.Date(2262, 4, 11, 23, 47, 16, 854775807, time.UTC), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.expectedTime, tc.unit.From(tc.value)) + }) + } +} + +func TestFromConf(t *testing.T) { + t.Parallel() + + configSpec := questdbOutputConfig() + conf := ` +table: test +address: "localhost:9000" +designated_timestamp_field: myDesignatedTimestamp +designated_timestamp_unit: nanos +timestamp_string_fields: + - fieldA + - fieldB +timestamp_string_format: 2006-01-02T15:04:05Z07:00 # rfc3339 +symbols: + - mySymbolA + - mySymbolB +` + parsed, err := configSpec.ParseYAML(conf, nil) + require.NoError(t, err) + + out, _, _, err := fromConf(parsed, service.MockResources()) + require.NoError(t, err) + + w, ok := out.(*questdbWriter) + require.True(t, ok) + + assert.Equal(t, "test", w.table) + assert.Equal(t, "myDesignatedTimestamp", w.designatedTimestampField) + assert.Equal(t, nanos, w.designatedTimestampUnit) + assert.Equal(t, map[string]bool{"fieldA": true, "fieldB": true}, w.timestampStringFields) + assert.Equal(t, time.RFC3339, w.timestampStringFormat) + assert.Equal(t, map[string]bool{"mySymbolA": true, "mySymbolB": true}, w.symbols) + +} + +func TestValidationErrorsFromConf(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + conf string + expectedErrContains string + }{ + { + name: "no address", + conf: "table: test", + expectedErrContains: "field 'address' is required", + }, + { + name: "no table", + conf: `address: "localhost:9000"`, + expectedErrContains: "field 'table' is required", + }, + { + name: "invalid timestamp unit", + conf: ` +address: "localhost:9000" +table: test +designated_timestamp_unit: hello`, + expectedErrContains: "is not a valid timestamp unit", + }, + } + + for _, tc := range testCases { + configSpec := questdbOutputConfig() + + t.Run(tc.name, func(t *testing.T) { + cfg, err := configSpec.ParseYAML(tc.conf, nil) + if err != nil { + assert.ErrorContains(t, err, tc.expectedErrContains) + return + } + + _, _, _, err = fromConf(cfg, service.MockResources()) + assert.ErrorContains(t, err, tc.expectedErrContains) + + }) + } +} + +func TestOptionsOnWrite(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + sentMsgs := make(chan string, 4) // Arbitrary buffer size, > max number of test messages + t.Cleanup(func() { close(sentMsgs) }) + + // Set up mock QuestDB http server + listener, err := net.Listen("tcp", ":0") + require.NoError(t, err) + s := http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + scanner := bufio.NewScanner(r.Body) + for scanner.Scan() { + sentMsgs <- scanner.Text() + } + assert.NoError(t, scanner.Err()) + w.WriteHeader(200) + }), + } + t.Cleanup(func() { + _ = s.Shutdown(ctx) + }) + go func() { + _ = s.Serve(listener) + }() + + testCases := []struct { + name string + extraConf string + payload []string + expectedLines []string + }{ + { + name: "withSymbols", + extraConf: "symbols: ['hello']", + payload: []string{`{"hello": "world", "test": 1}`}, + expectedLines: []string{"withSymbols,hello=world test=1i"}, + }, + { + name: "withDesignatedTimestamp", + extraConf: "designated_timestamp_field: timestamp", + payload: []string{`{"hello": "world", "timestamp": 1}`}, + expectedLines: []string{ + `withDesignatedTimestamp hello="world" 1000000000`, + }, + }, + { + name: "withTimestampUnit", + extraConf: "designated_timestamp_field: timestamp\ndesignated_timestamp_unit: nanos", + payload: []string{`{"hello": "world", "timestamp": 1}`}, + expectedLines: []string{ + `withTimestampUnit hello="world" 1`, + }, + }, + { + name: "withTimestampStringFields", + extraConf: "timestamp_string_fields: ['timestamp']\ntimestamp_string_format: 2006-02-01", + payload: []string{`{"timestamp": "1970-01-02"}`}, + expectedLines: []string{ + `withTimestampStringFields timestamp=2678400000000t`, + }, + }, + { + name: "withBoolValue", + extraConf: "timestamp_string_fields: ['timestamp']\ntimestamp_string_format: 2006-02-01", + payload: []string{`{"hello": true}`}, + expectedLines: []string{ + `withBoolValue hello=t`, + }, + }, + { + name: "withDoubles", + extraConf: "doubles: ['hello']", + payload: []string{`{"hello": 1.23}`}, + expectedLines: []string{ + `withDoubles hello=1.23`, + }, + }, + } + + for _, tc := range testCases { + conf := fmt.Sprintf("address: 'localhost:%d'\n", listener.Addr().(*net.TCPAddr).Port) + conf += fmt.Sprintf("table: '%s'\n", tc.name) + conf += tc.extraConf + + configSpec := questdbOutputConfig() + + cfg, err := configSpec.ParseYAML(conf, nil) + require.NoError(t, err) + w, _, _, err := fromConf(cfg, service.MockResources()) + require.NoError(t, err) + + qdbWriter := w.(*questdbWriter) + batch := service.MessageBatch{} + for _, msg := range tc.payload { + batch = append(batch, service.NewMessage([]byte(msg))) + } + assert.NoError(t, qdbWriter.WriteBatch(ctx, batch)) + for _, l := range tc.expectedLines { + assert.Equal(t, l, <-sentMsgs) + } + } +} diff --git a/internal/impl/questdb/timestamp.go b/internal/impl/questdb/timestamp.go new file mode 100644 index 0000000000..917fec85a8 --- /dev/null +++ b/internal/impl/questdb/timestamp.go @@ -0,0 +1,64 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package questdb + +import "time" + +type timestampUnit string + +const ( + nanos timestampUnit = "nanos" + micros timestampUnit = "micros" + millis timestampUnit = "millis" + seconds timestampUnit = "seconds" + auto timestampUnit = "auto" +) + +func guessTimestampUnits(timestamp int64) timestampUnit { + if timestamp < 10000000000 { + return seconds + } else if timestamp < 10000000000000 { // 11/20/2286, 5:46:40 PM in millis and 4/26/1970, 5:46:40 PM in micros + return millis + } else if timestamp < 10000000000000000 { + return micros + } else { + return nanos + } +} + +func (t timestampUnit) IsValid() bool { + return t == nanos || + t == micros || + t == millis || + t == seconds || + t == auto +} + +func (t timestampUnit) From(value int64) time.Time { + switch t { + case nanos: + return time.Unix(0, value).UTC() + case micros: + return time.UnixMicro(value).UTC() + case millis: + return time.UnixMilli(value).UTC() + case seconds: + return time.Unix(value, 0).UTC() + case auto: + return guessTimestampUnits(value).From(value).UTC() + default: + panic("unsupported timestampUnit: " + t) + } +} diff --git a/internal/plugins/info.csv b/internal/plugins/info.csv index 0801aa6bea..b70a67b3ad 100644 --- a/internal/plugins/info.csv +++ b/internal/plugins/info.csv @@ -178,6 +178,7 @@ pulsar ,input ,pulsar ,3.43.0 ,community pulsar ,output ,pulsar ,3.43.0 ,community ,n ,n ,n pusher ,output ,pusher ,4.3.0 ,community ,n ,n ,n qdrant ,output ,qdrant ,4.33.0 ,certified ,n ,y ,y +questdb ,output ,questdb ,4.37.0 ,community ,n ,n ,n rate_limit ,processor ,rate_limit ,0.0.0 ,certified ,n ,y ,y re_match ,scanner ,re_match ,0.0.0 ,certified ,n ,y ,y read_until ,input ,read_until ,0.0.0 ,certified ,n ,y ,y diff --git a/public/components/community/package.go b/public/components/community/package.go index a360e66e1f..cda3f2bae0 100644 --- a/public/components/community/package.go +++ b/public/components/community/package.go @@ -59,6 +59,7 @@ import ( _ "github.com/redpanda-data/connect/v4/public/components/pure/extended" _ "github.com/redpanda-data/connect/v4/public/components/pusher" _ "github.com/redpanda-data/connect/v4/public/components/qdrant" + _ "github.com/redpanda-data/connect/v4/public/components/questdb" _ "github.com/redpanda-data/connect/v4/public/components/redis" _ "github.com/redpanda-data/connect/v4/public/components/redpanda" _ "github.com/redpanda-data/connect/v4/public/components/sentry" diff --git a/public/components/questdb/package.go b/public/components/questdb/package.go new file mode 100644 index 0000000000..1bb1d56b2e --- /dev/null +++ b/public/components/questdb/package.go @@ -0,0 +1,20 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package questdb + +import ( + // Bring in the internal plugin definitions. + _ "github.com/redpanda-data/connect/v4/internal/impl/questdb" +)