From 18407d0c05481f7089cf1b7356cdb9af03794d07 Mon Sep 17 00:00:00 2001 From: Andrzej Stencel Date: Fri, 12 May 2023 15:12:54 +0200 Subject: [PATCH 1/4] feat: persist tracking value in storage --- receiver/sqlqueryreceiver/README.md | 14 +- receiver/sqlqueryreceiver/config.go | 7 +- receiver/sqlqueryreceiver/go.mod | 11 +- receiver/sqlqueryreceiver/go.sum | 19 +- receiver/sqlqueryreceiver/integration_test.go | 346 ++++++++++++++++-- receiver/sqlqueryreceiver/logs_receiver.go | 119 ++++-- receiver/sqlqueryreceiver/row_scanner.go | 4 + .../testdata/integration/init.sql | 2 +- 8 files changed, 439 insertions(+), 83 deletions(-) diff --git a/receiver/sqlqueryreceiver/README.md b/receiver/sqlqueryreceiver/README.md index ad915046152f..242931bf533d 100644 --- a/receiver/sqlqueryreceiver/README.md +++ b/receiver/sqlqueryreceiver/README.md @@ -24,6 +24,7 @@ The configuration supports the following top-level fields: e.g. _host=localhost port=5432 user=me password=s3cr3t sslmode=disable_ - `queries`(required): A list of queries, where a query is a sql statement and one or more `logs` and/or `metrics` sections (details below). - `collection_interval`(optional): The time interval between query executions. Defaults to _10s_. +- `storage` (optional, default `""`): The ID of a storage extension to be used to [track processed results](#tracking-processed-results). ### Queries @@ -35,10 +36,10 @@ are quite different. Additionally, each `query` section supports the following properties: -- `tracking_column` (optional, default "") Applies only to logs. In case of a parameterized query, +- `tracking_column` (optional, default `""`) Applies only to logs. In case of a parameterized query, defines the column to retrieve the value of the parameter on subsequent query runs. See the below section [Tracking processed results](#tracking-processed-results). -- `tracking_start_value` (optional, default 0) Applies only to logs. In case of a parameterized query, defines the initial value for the parameter. +- `tracking_start_value` (optional, default `""`) Applies only to logs. In case of a parameterized query, defines the initial value for the parameter. See the below section [Tracking processed results](#tracking-processed-results). Example: @@ -50,7 +51,7 @@ receivers: datasource: "host=localhost port=5432 user=postgres password=s3cr3t sslmode=disable" queries: - sql: "select * from my_logs where log_id > $1" - tracking_start_value: 10000 + tracking_start_value: "10000" tracking_column: log_id logs: - body_column: log_body @@ -73,9 +74,9 @@ the receiver will run the same query every collection interval, which can cause over and over again, unless there's an external actor removing the old rows from the `my_logs` table. To prevent reading the same rows on every collection interval, use a parameterized query like `select * from my_logs where id_column > ?`, -together with the `tracking_start_value` configuration property that specifies the initial value for the parameter. +together with the `tracking_start_value` and `tracking_column` configuration properties. The receiver will use the configured `tracking_start_value` as the value for the query parameter when running the query for the first time. -On each query run, the receiver will retrieve the last value from the `tracking_column` from the result set and use it as the value for the query parameter on next collection interval. To prevent duplicate log downloads, make sure to sort the query results in ascending order by the tracking_column value. +After each query run, the receiver will store the value of the `tracking_column` from the last row of the result set and use it as the value for the query parameter on next collection interval. To prevent duplicate log downloads, make sure to sort the query results in ascending order by the tracking_column value. Note that the notation for the parameter depends on the database backend. For example in MySQL this is `?`, in PostgreSQL this is `$1`, in Oracle this is any string identifier starting with a colon `:`, for example `:my_parameter`. @@ -107,9 +108,10 @@ receivers: sqlquery: driver: postgres datasource: "host=localhost port=5432 user=postgres password=s3cr3t sslmode=disable" + storage: file_storage queries: - sql: "select * from my_logs where log_id > $1" - tracking_start_value: 10000 + tracking_start_value: "10000" tracking_column: log_id logs: - body_column: log_body diff --git a/receiver/sqlqueryreceiver/config.go b/receiver/sqlqueryreceiver/config.go index c604ef705007..c8a157721718 100644 --- a/receiver/sqlqueryreceiver/config.go +++ b/receiver/sqlqueryreceiver/config.go @@ -26,9 +26,10 @@ import ( type Config struct { scraperhelper.ScraperControllerSettings `mapstructure:",squash"` - Driver string `mapstructure:"driver"` - DataSource string `mapstructure:"datasource"` - Queries []Query `mapstructure:"queries"` + Driver string `mapstructure:"driver"` + DataSource string `mapstructure:"datasource"` + Queries []Query `mapstructure:"queries"` + StorageID *component.ID `mapstructure:"storage"` } func (c Config) Validate() error { diff --git a/receiver/sqlqueryreceiver/go.mod b/receiver/sqlqueryreceiver/go.mod index f16b8bab3864..d43172cc553c 100644 --- a/receiver/sqlqueryreceiver/go.mod +++ b/receiver/sqlqueryreceiver/go.mod @@ -8,10 +8,13 @@ require ( github.com/docker/go-connections v0.4.0 github.com/go-sql-driver/mysql v1.7.1 github.com/lib/pq v1.10.9 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.77.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.77.0 github.com/sijms/go-ora/v2 v2.7.4 github.com/snowflakedb/gosnowflake v1.6.18 github.com/stretchr/testify v1.8.2 github.com/testcontainers/testcontainers-go v0.20.1 + go.opentelemetry.io/collector v0.77.0 go.opentelemetry.io/collector/component v0.77.0 go.opentelemetry.io/collector/confmap v0.77.0 go.opentelemetry.io/collector/consumer v0.77.0 @@ -28,6 +31,7 @@ require ( github.com/Azure/azure-storage-blob-go v0.15.0 // indirect github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/Microsoft/go-winio v0.5.2 // indirect + github.com/antonmedv/expr v1.12.5 // indirect github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect github.com/aws/aws-sdk-go-v2 v1.18.0 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.8 // indirect @@ -43,6 +47,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/s3 v1.27.11 // indirect github.com/aws/smithy-go v1.13.5 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/containerd/containerd v1.6.19 // indirect github.com/cpuguy83/dockercfg v0.3.1 // indirect github.com/danieljoos/wincred v1.1.2 // indirect @@ -78,6 +83,8 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/mtibben/percent v0.2.1 // indirect + github.com/observiq/ctimefmt v1.0.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.77.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0-rc2 // indirect github.com/opencontainers/runc v1.1.5 // indirect @@ -87,7 +94,6 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/sirupsen/logrus v1.9.0 // indirect go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/collector v0.77.0 // indirect go.opentelemetry.io/collector/exporter v0.77.0 // indirect go.opentelemetry.io/collector/featuregate v0.77.0 // indirect go.opentelemetry.io/otel v1.15.1 // indirect @@ -102,7 +108,8 @@ require ( golang.org/x/term v0.8.0 // indirect golang.org/x/text v0.9.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect - google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect + gonum.org/v1/gonum v0.13.0 // indirect + google.golang.org/genproto v0.0.0-20230320184635-7606e756e683 // indirect google.golang.org/grpc v1.55.0 // indirect google.golang.org/protobuf v1.30.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/receiver/sqlqueryreceiver/go.sum b/receiver/sqlqueryreceiver/go.sum index a30d8cff4129..31c6773c20a5 100644 --- a/receiver/sqlqueryreceiver/go.sum +++ b/receiver/sqlqueryreceiver/go.sum @@ -32,6 +32,7 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA= github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY= github.com/Microsoft/hcsshim v0.9.7 h1:mKNHW/Xvv1aFH87Jb6ERDzXTJTLPlmzfZ28VBFD/bfg= +github.com/Mottl/ctimefmt v0.0.0-20190803144728-fd2ac23a585a/go.mod h1:eyj2WSIdoPMPs2eNTLpSmM6Nzqo4V80/d6jHpnJ1SAI= github.com/SAP/go-hdb v1.2.6 h1:FLrDgSySez0Ye+e4g8pm0ciSj2xOdPFM0reY/fcNaS0= github.com/SAP/go-hdb v1.2.6/go.mod h1:LCziJVuENlInAw8+9sb1ZgQZKThaFRE6vUcLeTLUr/M= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= @@ -41,6 +42,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/antonmedv/expr v1.12.5 h1:Fq4okale9swwL3OeLLs9WD9H6GbgBLJyN/NUHRv+n0E= +github.com/antonmedv/expr v1.12.5/go.mod h1:FPC8iWArxls7axbVLsW+kpg1mz29A1b2M6jt+hZfDkU= github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 h1:q4dksr6ICHXqG5hm0ZW5IHyeEJXoIJSOZeBLmWPNeIQ= github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40/go.mod h1:Q7yQnSMnLvcXlZ8RV+jwz/6y1rQTqbX6C82SndT52Zs= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= @@ -117,6 +120,7 @@ github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/checkpoint-restore/go-criu/v5 v5.3.0/go.mod h1:E/eQpaFtUKGOOSEBZgmKAcn+zUUwWxqcaKZlF54wK8E= github.com/cilium/ebpf v0.7.0/go.mod h1:/oI2+1shJiTGAMgl6/RgJr36Eo1jzrRcAWbcXO2usCA= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= @@ -384,7 +388,15 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnuG+zWp9L0Uk= +github.com/observiq/ctimefmt v1.0.0 h1:r7vTJ+Slkrt9fZ67mkf+mA6zAdR5nGIJRMTzkUyvilk= +github.com/observiq/ctimefmt v1.0.0/go.mod h1:mxi62//WbSpG/roCO1c6MqZ7zQTvjVtYheqHN3eOjvc= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.77.0 h1:UU5IzATj3qbO6E3eY27pe5JrP/GiKOal58vWRNzqt94= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.77.0/go.mod h1:oVDNuwHckAtSasbtLsWUOC3ALPuIhnZt3J76U6m+2Ls= +github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.77.0 h1:oD1N4IGyW2HRQj9VHDjc3+dwzlSdnzyQWYteRDBBCSE= +github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.77.0/go.mod h1:eNiZXR4qNR6g61BNlntCVm7VudTOBrl6vWeChQDR6QQ= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.77.0 h1:UGgb2bYSrRTbCcLNEOc4ZCbKx2H1uwkKR+FpffQ7d0o= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.77.0/go.mod h1:nENVAU79yHGHoCe+pUHvYUy5LR+CHeFvhBkfRcNjRZ8= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0-rc2 h1:2zx/Stx4Wc5pIPDvIxHXvXtQFW/7XWJGmnM7r3wg034= @@ -685,7 +697,8 @@ golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNq gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0= gonum.org/v1/gonum v0.9.3/go.mod h1:TZumC3NeyVQskjXqmyWt4S3bINhy7B4eYwW69EbyX+0= -gonum.org/v1/gonum v0.12.0 h1:xKuo6hzt+gMav00meVPUlXwSdoEJP46BR+wdxQEFK2o= +gonum.org/v1/gonum v0.13.0 h1:a0T3bh+7fhRyqeNbiC3qVHYmkiQgit3wnNan/2c0HMM= +gonum.org/v1/gonum v0.13.0/go.mod h1:/WPYRckkfWrhWefxyYTfrTtQR0KH4iyHNuzxqXAKyAU= gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc= gonum.org/v1/plot v0.9.0/go.mod h1:3Pcqqmp6RHvJI72kgb8fThyUnav364FOsdDo2aGW5lY= @@ -698,8 +711,8 @@ google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfG google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= google.golang.org/genproto v0.0.0-20210630183607-d20f26d13c79/go.mod h1:yiaVoXHpRzHGyxV3o4DktVWY4mSUErTKaeEOq6C3t3U= -google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 h1:DdoeryqhaXp1LtT/emMP1BRJPHHKFi5akj/nbx/zNTA= -google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4/go.mod h1:NWraEVixdDnqcqQ30jipen1STv2r/n24Wb7twVTGR4s= +google.golang.org/genproto v0.0.0-20230320184635-7606e756e683 h1:khxVcsk/FhnzxMKOyD+TDGwjbEOpcPuIpmafPGFmhMA= +google.golang.org/genproto v0.0.0-20230320184635-7606e756e683/go.mod h1:NWraEVixdDnqcqQ30jipen1STv2r/n24Wb7twVTGR4s= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.22.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= diff --git a/receiver/sqlqueryreceiver/integration_test.go b/receiver/sqlqueryreceiver/integration_test.go index edc70c18dd25..ebd2feacceb9 100644 --- a/receiver/sqlqueryreceiver/integration_test.go +++ b/receiver/sqlqueryreceiver/integration_test.go @@ -19,11 +19,14 @@ package sqlqueryreceiver import ( "context" "fmt" + "io" "path/filepath" + "strings" "testing" "time" "github.com/docker/go-connections/nat" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" @@ -33,10 +36,208 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" + "go.uber.org/zap" ) -func TestPostgresIntegration(t *testing.T) { - externalPort := "15432" +func TestLogsTrackingWithoutStorageInPostgres(t *testing.T) { + // Start Postgres container. + externalPort := "15430" + startPostgresDbContainer(t, externalPort) + + // Start the SQL Query receiver. + receiver, config, consumer := createTestLogsReceiverForPostgres(t, externalPort) + config.CollectionInterval = time.Second + config.Queries = []Query{ + { + SQL: "select * from simple_logs where id > $1", + Logs: []LogsCfg{ + { + BodyColumn: "body", + }, + }, + TrackingColumn: "id", + TrackingStartValue: "0", + }, + } + host := componenttest.NewNopHost() + err := receiver.Start(context.Background(), host) + require.NoError(t, err) + + // Verify there's 5 logs received. + require.Eventuallyf( + t, + func() bool { + return consumer.LogRecordCount() > 0 + }, + 3*time.Second, + 1*time.Second, + "failed to receive more than 0 logs", + ) + require.Equal(t, 5, consumer.LogRecordCount()) + testAllSimpleLogs(t, consumer.AllLogs()) + + // Stop the SQL Query receiver. + err = receiver.Shutdown(context.Background()) + require.NoError(t, err) + + // Start new SQL Query receiver with the same configuration. + receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort) + config.CollectionInterval = time.Second + config.Queries = []Query{ + { + SQL: "select * from simple_logs where id > $1", + Logs: []LogsCfg{ + { + BodyColumn: "body", + }, + }, + TrackingColumn: "id", + TrackingStartValue: "0", + }, + } + err = receiver.Start(context.Background(), host) + require.NoError(t, err) + + // Wait for some logs to come in. + require.Eventuallyf( + t, + func() bool { + return consumer.LogRecordCount() > 0 + }, + 3*time.Second, + 1*time.Second, + "failed to receive more than 0 logs", + ) + + // stop the SQL Query receiver + err = receiver.Shutdown(context.Background()) + require.NoError(t, err) + + // Verify that the same logs are collected again. + require.Equal(t, 5, consumer.LogRecordCount()) + testAllSimpleLogs(t, consumer.AllLogs()) +} + +func TestLogsTrackingWithStorageInPostgres(t *testing.T) { + // start Postgres container + externalPort := "15431" + container := startPostgresDbContainer(t, externalPort) + + // create a File Storage extension writing to a temporary directory in local filesystem + storageDir := t.TempDir() + storageExtension := storagetest.NewFileBackedStorageExtension("test", storageDir) + + // create SQL Query receiver configured with the File Storage extension + receiver, config, consumer := createTestLogsReceiverForPostgres(t, externalPort) + config.CollectionInterval = time.Second + config.StorageID = &storageExtension.ID + config.Queries = []Query{ + { + SQL: "select * from simple_logs where id > $1", + Logs: []LogsCfg{ + { + BodyColumn: "body", + }, + }, + TrackingColumn: "id", + TrackingStartValue: "0", + }, + } + + // start the SQL Query receiver + host := storagetest.NewStorageHost().WithExtension(storageExtension.ID, storageExtension) + err := receiver.Start(context.Background(), host) + require.NoError(t, err) + + // Wait for logs to come in. + require.Eventuallyf( + t, + func() bool { + return consumer.LogRecordCount() > 0 + }, + 3*time.Second, + 1*time.Second, + "failed to receive more than 0 logs", + ) + + // stop the SQL Query receiver + err = receiver.Shutdown(context.Background()) + require.NoError(t, err) + + // verify there's 5 logs received + initialLogCount := 5 + require.Equal(t, initialLogCount, consumer.LogRecordCount()) + testAllSimpleLogs(t, consumer.AllLogs()) + + // start the SQL Query receiver again + receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort) + config.CollectionInterval = time.Second + config.StorageID = &storageExtension.ID + config.Queries = []Query{ + { + SQL: "select * from simple_logs where id > $1", + Logs: []LogsCfg{ + { + BodyColumn: "body", + }, + }, + TrackingColumn: "id", + TrackingStartValue: "0", + }, + } + err = receiver.Start(context.Background(), host) + require.NoError(t, err) + + // Wait for some logs to come in. + time.Sleep(3 * time.Second) + + // stop the SQL Query receiver + err = receiver.Shutdown(context.Background()) + require.NoError(t, err) + + // Verify that no new logs came in + require.Equal(t, 0, consumer.LogRecordCount()) + + // write a number of new logs to the database + newLogCount := 3 + insertPostgresSimpleLogs(t, container, initialLogCount, newLogCount) + + // start the SQL Query receiver again + receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort) + config.CollectionInterval = time.Second + config.StorageID = &storageExtension.ID + config.Queries = []Query{ + { + SQL: "select * from simple_logs where id > $1", + Logs: []LogsCfg{ + { + BodyColumn: "body", + }, + }, + TrackingColumn: "id", + TrackingStartValue: "0", + }, + } + err = receiver.Start(context.Background(), host) + require.NoError(t, err) + + // Wait for new logs to come in. + require.Eventuallyf( + t, + func() bool { + return consumer.LogRecordCount() > 0 + }, + 3*time.Second, + 1*time.Second, + "failed to receive more than 0 logs", + ) + + // Verify that the newly added logs were received. + require.Equal(t, newLogCount, consumer.LogRecordCount()) + printLogs(consumer.AllLogs()) +} + +func startPostgresDbContainer(t *testing.T, externalPort string) testcontainers.Container { internalPort := "5432" waitStrategy := wait.ForListeningPort(nat.Port(internalPort)).WithStartupTimeout(2 * time.Minute) req := testcontainers.ContainerRequest{ @@ -47,16 +248,74 @@ func TestPostgresIntegration(t *testing.T) { ExposedPorts: []string{externalPort + ":" + internalPort}, WaitingFor: waitStrategy, } - ctx := context.Background() - _, err := testcontainers.GenericContainer( - ctx, + container, err := testcontainers.GenericContainer( + context.Background(), testcontainers.GenericContainerRequest{ ContainerRequest: req, Started: true, }, ) require.NoError(t, err) + return container +} + +func createTestLogsReceiverForPostgres(t *testing.T, externalPort string) (*logsReceiver, *Config, *consumertest.LogsSink) { + factory := NewFactory() + config := factory.CreateDefaultConfig().(*Config) + config.CollectionInterval = time.Second + config.Driver = "postgres" + config.DataSource = fmt.Sprintf("host=localhost port=%s user=otel password=otel sslmode=disable", externalPort) + + consumer := &consumertest.LogsSink{} + receiverCreateSettings := receivertest.NewNopCreateSettings() + receiverCreateSettings.Logger = zap.NewExample() + receiver, err := factory.CreateLogsReceiver( + context.Background(), + receiverCreateSettings, + config, + consumer, + ) + require.NoError(t, err) + return receiver.(*logsReceiver), config, consumer +} + +func printLogs(allLogs []plog.Logs) { + for logIndex := 0; logIndex < len(allLogs); logIndex++ { + logs := allLogs[logIndex] + for resourceIndex := 0; resourceIndex < logs.ResourceLogs().Len(); resourceIndex++ { + resource := logs.ResourceLogs().At(resourceIndex) + for scopeIndex := 0; scopeIndex < resource.ScopeLogs().Len(); scopeIndex++ { + scope := resource.ScopeLogs().At(scopeIndex) + for recordIndex := 0; recordIndex < scope.LogRecords().Len(); recordIndex++ { + logRecord := scope.LogRecords().At(recordIndex) + fmt.Printf("log %v resource %v scope %v log %v body: %v\n", logIndex, resourceIndex, scopeIndex, recordIndex, logRecord.Body().Str()) + } + } + } + } +} + +func insertPostgresSimpleLogs(t *testing.T, container testcontainers.Container, existingLogId, newLogCount int) { + for newLogId := existingLogId + 1; newLogId <= existingLogId+newLogCount; newLogId++ { + query := fmt.Sprintf("insert into simple_logs (id, insert_time, body) values (%d, now(), 'another log %d');", newLogId, newLogId) + returnValue, returnMessageReader, err := container.Exec(context.Background(), []string{ + "psql", "-U", "otel", "-c", query, + }) + require.NoError(t, err) + returnMessageBuffer := new(strings.Builder) + _, err = io.Copy(returnMessageBuffer, returnMessageReader) + require.NoError(t, err) + returnMessage := returnMessageBuffer.String() + + assert.Equal(t, 0, returnValue) + assert.Contains(t, returnMessage, "INSERT 0 1") + } +} + +func TestPostgresIntegration(t *testing.T) { + externalPort := "15432" + startPostgresDbContainer(t, externalPort) factory := NewFactory() config := factory.CreateDefaultConfig().(*Config) @@ -137,28 +396,9 @@ func TestPostgresIntegration(t *testing.T) { }, }, }, - { - SQL: "select * from simple_logs where id > $1", - TrackingColumn: "id", - TrackingStartValue: "2", - Logs: []LogsCfg{ - { - BodyColumn: "body", - }, - }, - }, - { - SQL: "select * from simple_logs where insert_time > $1", - TrackingColumn: "insert_time", - TrackingStartValue: "2022-06-03 21:59:28+00", - Logs: []LogsCfg{ - { - BodyColumn: "body", - }, - }, - }, } consumer := &consumertest.MetricsSink{} + ctx := context.Background() receiver, err := factory.CreateMetricsReceiver( ctx, receivertest.NewNopCreateSettings(), @@ -182,14 +422,30 @@ func TestPostgresIntegration(t *testing.T) { testMovieMetrics(t, rms.At(0), genreKey) testPGTypeMetrics(t, rms.At(1)) - logsConsumer := &consumertest.LogsSink{} - logsReceiver, err := factory.CreateLogsReceiver( - ctx, - receivertest.NewNopCreateSettings(), - config, - logsConsumer, - ) - require.NoError(t, err) + logsReceiver, logsConfig, logsConsumer := createTestLogsReceiverForPostgres(t, externalPort) + logsConfig.Queries = []Query{ + { + SQL: "select * from simple_logs where id > $1", + Logs: []LogsCfg{ + { + BodyColumn: "body", + }, + }, + TrackingColumn: "id", + TrackingStartValue: "2", + }, + { + SQL: "select * from simple_logs where insert_time > $1", + Logs: []LogsCfg{ + { + BodyColumn: "body", + }, + }, + TrackingColumn: "insert_time", + TrackingStartValue: "2022-06-03 21:59:28+00", + }, + } + err = logsReceiver.Start(ctx, componenttest.NewNopHost()) require.NoError(t, err) require.Eventuallyf( @@ -197,10 +453,15 @@ func TestPostgresIntegration(t *testing.T) { func() bool { return logsConsumer.LogRecordCount() > 2 }, - 2*time.Minute, + 20*time.Second, 1*time.Second, "failed to receive more than 2 logs", ) + + // stop the SQL Query receiver + err = logsReceiver.Shutdown(context.Background()) + require.NoError(t, err) + testSimpleLogs(t, logsConsumer.AllLogs()) } @@ -588,6 +849,23 @@ func assertDoubleGaugeEquals(t *testing.T, expected float64, metric pmetric.Metr assert.InDelta(t, expected, metric.Gauge().DataPoints().At(0).DoubleValue(), 0.1) } +func testAllSimpleLogs(t *testing.T, logs []plog.Logs) { + assert.Equal(t, 1, len(logs)) + assert.Equal(t, 1, logs[0].ResourceLogs().Len()) + assert.Equal(t, 1, logs[0].ResourceLogs().At(0).ScopeLogs().Len()) + expectedEntries := []string{ + "- - - [03/Jun/2022:21:59:26 +0000] \"GET /api/health HTTP/1.1\" 200 6197 4 \"-\" \"-\" 445af8e6c428303f -", + "- - - [03/Jun/2022:21:59:26 +0000] \"GET /api/health HTTP/1.1\" 200 6205 5 \"-\" \"-\" 3285f43cd4baa202 -", + "- - - [03/Jun/2022:21:59:29 +0000] \"GET /api/health HTTP/1.1\" 200 6233 4 \"-\" \"-\" 579e8362d3185b61 -", + "- - - [03/Jun/2022:21:59:31 +0000] \"GET /api/health HTTP/1.1\" 200 6207 5 \"-\" \"-\" 8c6ac61ae66e509f -", + "- - - [03/Jun/2022:21:59:31 +0000] \"GET /api/health HTTP/1.1\" 200 6200 4 \"-\" \"-\" c163495861e873d8 -", + } + assert.Equal(t, len(expectedEntries), logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len()) + for i := range expectedEntries { + assert.Equal(t, expectedEntries[i], logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i).Body().Str()) + } +} + func testSimpleLogs(t *testing.T, logs []plog.Logs) { assert.Equal(t, 1, len(logs)) assert.Equal(t, 2, logs[0].ResourceLogs().Len()) diff --git a/receiver/sqlqueryreceiver/logs_receiver.go b/receiver/sqlqueryreceiver/logs_receiver.go index a628a23d77bb..1c7a8f50a3eb 100644 --- a/receiver/sqlqueryreceiver/logs_receiver.go +++ b/receiver/sqlqueryreceiver/logs_receiver.go @@ -20,8 +20,10 @@ import ( "fmt" "time" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/extension/experimental/storage" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/receiver" "go.uber.org/multierr" @@ -42,7 +44,8 @@ type logsReceiver struct { collectionIntervalTicker *time.Ticker shutdownRequested chan struct{} - id component.ID + id component.ID + storageClient storage.Client } func newLogsReceiver( @@ -64,28 +67,9 @@ func newLogsReceiver( id: settings.ID, } - receiver.createQueryReceivers() - return receiver, nil } -func (receiver *logsReceiver) createQueryReceivers() { - for i, query := range receiver.config.Queries { - if len(query.Logs) == 0 { - continue - } - id := fmt.Sprintf("query-%d: %s", i, query.SQL) - queryReceiver := newLogsQueryReceiver( - id, - query, - receiver.createConnection, - receiver.createClient, - receiver.settings.Logger, - ) - receiver.queryReceivers = append(receiver.queryReceivers, queryReceiver) - } -} - func (receiver *logsReceiver) Start(ctx context.Context, host component.Host) error { if receiver.isStarted { receiver.settings.Logger.Debug("requested start, but already started, ignoring.") @@ -94,8 +78,19 @@ func (receiver *logsReceiver) Start(ctx context.Context, host component.Host) er receiver.settings.Logger.Debug("starting...") receiver.isStarted = true + var err error + receiver.storageClient, err = adapter.GetStorageClient(ctx, host, receiver.config.StorageID, receiver.settings.ID) + if err != nil { + return fmt.Errorf("error connecting to storage: %w", err) + } + + err = receiver.createQueryReceivers() + if err != nil { + return err + } + for _, queryReceiver := range receiver.queryReceivers { - err := queryReceiver.start() + err := queryReceiver.start(ctx) if err != nil { return err } @@ -105,6 +100,26 @@ func (receiver *logsReceiver) Start(ctx context.Context, host component.Host) er return nil } +func (receiver *logsReceiver) createQueryReceivers() error { + receiver.queryReceivers = nil + for i, query := range receiver.config.Queries { + if len(query.Logs) == 0 { + continue + } + id := fmt.Sprintf("query-%d: %s", i, query.SQL) + queryReceiver := newLogsQueryReceiver( + id, + query, + receiver.createConnection, + receiver.createClient, + receiver.settings.Logger, + receiver.storageClient, + ) + receiver.queryReceivers = append(receiver.queryReceivers, queryReceiver) + } + return nil +} + func (receiver *logsReceiver) startCollecting() { receiver.collectionIntervalTicker = time.NewTicker(receiver.config.CollectionInterval) @@ -126,7 +141,7 @@ func (receiver *logsReceiver) collect() { go func(queryReceiver *logsQueryReceiver) { logs, err := queryReceiver.collect(context.Background()) if err != nil { - receiver.settings.Logger.Error("Error collecting logs", zap.Error(err), zap.String("query", queryReceiver.ID())) + receiver.settings.Logger.Error("error collecting logs", zap.Error(err), zap.String("query", queryReceiver.ID())) } if err := observability.RecordAcceptedLogs(int64(logs.LogRecordCount()), receiver.id.String(), queryReceiver.id); err != nil { @@ -159,10 +174,15 @@ func (receiver *logsReceiver) Shutdown(ctx context.Context) error { queryReceiver.shutdown(ctx) } + var errors error + if receiver.storageClient != nil { + errors = multierr.Append(errors, receiver.storageClient.Close(ctx)) + } + receiver.isStarted = false receiver.settings.Logger.Debug("stopped.") - return nil + return errors } func (receiver *logsReceiver) stopCollecting() { @@ -180,6 +200,9 @@ type logsQueryReceiver struct { db *sql.DB client dbClient trackingValue string + // TODO: Extract persistence into its own component + storageClient storage.Client + trackingValueStorageKey string } func newLogsQueryReceiver( @@ -188,15 +211,18 @@ func newLogsQueryReceiver( dbProviderFunc dbProviderFunc, clientProviderFunc clientProviderFunc, logger *zap.Logger, + storageClient storage.Client, ) *logsQueryReceiver { queryReceiver := &logsQueryReceiver{ - id: id, - query: query, - createDb: dbProviderFunc, - createClient: clientProviderFunc, - logger: logger, + id: id, + query: query, + createDb: dbProviderFunc, + createClient: clientProviderFunc, + logger: logger, + storageClient: storageClient, } queryReceiver.trackingValue = queryReceiver.query.TrackingStartValue + queryReceiver.trackingValueStorageKey = fmt.Sprintf("%s.%s", queryReceiver.id, "trackingValue") return queryReceiver } @@ -204,7 +230,7 @@ func (queryReceiver *logsQueryReceiver) ID() string { return queryReceiver.id } -func (queryReceiver *logsQueryReceiver) start() error { +func (queryReceiver *logsQueryReceiver) start(ctx context.Context) error { var err error queryReceiver.db, err = queryReceiver.createDb() if err != nil { @@ -212,9 +238,28 @@ func (queryReceiver *logsQueryReceiver) start() error { } queryReceiver.client = queryReceiver.createClient(dbWrapper{queryReceiver.db}, queryReceiver.query.SQL, queryReceiver.logger) + queryReceiver.trackingValue = queryReceiver.retrieveTrackingValue(ctx) + return nil } +// retrieveTrackingValue retrieves the tracking value from storage, if storage is configured. +// Otherwise, it returns the tracking value configured in `tracking_start_value`. +func (queryReceiver *logsQueryReceiver) retrieveTrackingValue(ctx context.Context) string { + trackingValueFromConfig := queryReceiver.query.TrackingStartValue + if queryReceiver.storageClient == nil { + return trackingValueFromConfig + } + + storedTrackingValueBytes, err := queryReceiver.storageClient.Get(ctx, queryReceiver.trackingValueStorageKey) + if err != nil || storedTrackingValueBytes == nil { + return trackingValueFromConfig + } + + return string(storedTrackingValueBytes) + +} + func (queryReceiver *logsQueryReceiver) collect(ctx context.Context) (plog.Logs, error) { logs := plog.NewLogs() @@ -238,19 +283,21 @@ func (queryReceiver *logsQueryReceiver) collect(ctx context.Context) (plog.Logs, errs = multierr.Append(errs, err) } if logsConfigIndex == 0 { - queryReceiver.storeTrackingValue(row) + queryReceiver.storeTrackingValue(ctx, row) } } } return logs, nil } -func (queryReceiver *logsQueryReceiver) storeTrackingValue(row stringMap) { +func (queryReceiver *logsQueryReceiver) storeTrackingValue(ctx context.Context, row stringMap) { if queryReceiver.query.TrackingColumn == "" { return } - currentTrackingColumnValueString := row[queryReceiver.query.TrackingColumn] - queryReceiver.trackingValue = currentTrackingColumnValueString + queryReceiver.trackingValue = row[queryReceiver.query.TrackingColumn] + if queryReceiver.storageClient != nil { + queryReceiver.storageClient.Set(ctx, queryReceiver.trackingValueStorageKey, []byte(queryReceiver.trackingValue)) + } } func rowToLog(row stringMap, config LogsCfg, logRecord plog.LogRecord) error { @@ -259,5 +306,9 @@ func rowToLog(row stringMap, config LogsCfg, logRecord plog.LogRecord) error { } func (queryReceiver *logsQueryReceiver) shutdown(ctx context.Context) error { - return nil + var errors error + // if queryReceiver.db != nil { + // errors = multierr.Append(errors, queryReceiver.db.Close()) + // } + return errors } diff --git a/receiver/sqlqueryreceiver/row_scanner.go b/receiver/sqlqueryreceiver/row_scanner.go index 042508655c8f..63b0c58fb68e 100644 --- a/receiver/sqlqueryreceiver/row_scanner.go +++ b/receiver/sqlqueryreceiver/row_scanner.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "reflect" + "time" "go.uber.org/multierr" ) @@ -41,6 +42,9 @@ func newRowScanner(colTypes []colType) *rowScanner { return "", errNullValueWarning } format := "%v" + if t, isTime := v.(time.Time); isTime { + return t.Format(time.RFC3339), nil + } if reflect.TypeOf(v).Kind() == reflect.Slice { // The Postgres driver returns a []uint8 (ascii string) for decimal and numeric types, // which we want to render as strings. e.g. "4.1" instead of "[52, 46, 49]". diff --git a/receiver/sqlqueryreceiver/testdata/integration/init.sql b/receiver/sqlqueryreceiver/testdata/integration/init.sql index f8967373d128..5f0ebfd2bed0 100644 --- a/receiver/sqlqueryreceiver/testdata/integration/init.sql +++ b/receiver/sqlqueryreceiver/testdata/integration/init.sql @@ -27,7 +27,7 @@ create table simple_logs insert_time timestamp, body text ); -grant select on simple_logs to otel; +grant select, insert on simple_logs to otel; insert into simple_logs (id, insert_time, body) values (1, '2022-06-03 21:59:26+00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -'), From 95e6c31820713b9848a0c87eb55c1a18ff645f69 Mon Sep 17 00:00:00 2001 From: Andrzej Stencel Date: Tue, 16 May 2023 09:53:23 +0200 Subject: [PATCH 2/4] docs: add link to a storage extension --- receiver/sqlqueryreceiver/README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/receiver/sqlqueryreceiver/README.md b/receiver/sqlqueryreceiver/README.md index 242931bf533d..ef800e784b88 100644 --- a/receiver/sqlqueryreceiver/README.md +++ b/receiver/sqlqueryreceiver/README.md @@ -24,7 +24,9 @@ The configuration supports the following top-level fields: e.g. _host=localhost port=5432 user=me password=s3cr3t sslmode=disable_ - `queries`(required): A list of queries, where a query is a sql statement and one or more `logs` and/or `metrics` sections (details below). - `collection_interval`(optional): The time interval between query executions. Defaults to _10s_. -- `storage` (optional, default `""`): The ID of a storage extension to be used to [track processed results](#tracking-processed-results). +- `storage` (optional, default `""`): The ID of a [storage][storage_extension] extension to be used to [track processed results](#tracking-processed-results). + +[storage_extension]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage ### Queries From 353b38a9342478e68e3b45ac92d55af238ca7225 Mon Sep 17 00:00:00 2001 From: Andrzej Stencel Date: Tue, 16 May 2023 11:15:50 +0200 Subject: [PATCH 3/4] style: remove unused code --- receiver/sqlqueryreceiver/db_client.go | 3 --- receiver/sqlqueryreceiver/logs_receiver.go | 6 +----- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/receiver/sqlqueryreceiver/db_client.go b/receiver/sqlqueryreceiver/db_client.go index e2bd60efbf01..e0f817d13661 100644 --- a/receiver/sqlqueryreceiver/db_client.go +++ b/receiver/sqlqueryreceiver/db_client.go @@ -73,6 +73,3 @@ func (cl dbSQLClient) queryRows(ctx context.Context, args ...any) ([]stringMap, } return out, warnings } - -// getRowsSinceId(ctx context.Context, id int) -// getRowsSinceTimestamp(ctx context.Context, timestamp time.Time) diff --git a/receiver/sqlqueryreceiver/logs_receiver.go b/receiver/sqlqueryreceiver/logs_receiver.go index 1c7a8f50a3eb..545a2e5a99bf 100644 --- a/receiver/sqlqueryreceiver/logs_receiver.go +++ b/receiver/sqlqueryreceiver/logs_receiver.go @@ -306,9 +306,5 @@ func rowToLog(row stringMap, config LogsCfg, logRecord plog.LogRecord) error { } func (queryReceiver *logsQueryReceiver) shutdown(ctx context.Context) error { - var errors error - // if queryReceiver.db != nil { - // errors = multierr.Append(errors, queryReceiver.db.Close()) - // } - return errors + return nil } From 0a5ddd34182924e228e41fb15176c11965fa817c Mon Sep 17 00:00:00 2001 From: Andrzej Stencel Date: Tue, 16 May 2023 11:23:56 +0200 Subject: [PATCH 4/4] test: terminate DB containers after test This prevents a "busy port" error when running one of the tests repeatedly. --- receiver/sqlqueryreceiver/integration_test.go | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/receiver/sqlqueryreceiver/integration_test.go b/receiver/sqlqueryreceiver/integration_test.go index ebd2feacceb9..5ffa16289a2c 100644 --- a/receiver/sqlqueryreceiver/integration_test.go +++ b/receiver/sqlqueryreceiver/integration_test.go @@ -42,7 +42,8 @@ import ( func TestLogsTrackingWithoutStorageInPostgres(t *testing.T) { // Start Postgres container. externalPort := "15430" - startPostgresDbContainer(t, externalPort) + dbContainer := startPostgresDbContainer(t, externalPort) + defer dbContainer.Terminate(context.Background()) // Start the SQL Query receiver. receiver, config, consumer := createTestLogsReceiverForPostgres(t, externalPort) @@ -121,7 +122,8 @@ func TestLogsTrackingWithoutStorageInPostgres(t *testing.T) { func TestLogsTrackingWithStorageInPostgres(t *testing.T) { // start Postgres container externalPort := "15431" - container := startPostgresDbContainer(t, externalPort) + dbContainer := startPostgresDbContainer(t, externalPort) + defer dbContainer.Terminate(context.Background()) // create a File Storage extension writing to a temporary directory in local filesystem storageDir := t.TempDir() @@ -200,7 +202,7 @@ func TestLogsTrackingWithStorageInPostgres(t *testing.T) { // write a number of new logs to the database newLogCount := 3 - insertPostgresSimpleLogs(t, container, initialLogCount, newLogCount) + insertPostgresSimpleLogs(t, dbContainer, initialLogCount, newLogCount) // start the SQL Query receiver again receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort) @@ -232,6 +234,10 @@ func TestLogsTrackingWithStorageInPostgres(t *testing.T) { "failed to receive more than 0 logs", ) + // stop the SQL Query receiver + err = receiver.Shutdown(context.Background()) + require.NoError(t, err) + // Verify that the newly added logs were received. require.Equal(t, newLogCount, consumer.LogRecordCount()) printLogs(consumer.AllLogs()) @@ -315,7 +321,8 @@ func insertPostgresSimpleLogs(t *testing.T, container testcontainers.Container, func TestPostgresIntegration(t *testing.T) { externalPort := "15432" - startPostgresDbContainer(t, externalPort) + dbContainer := startPostgresDbContainer(t, externalPort) + defer dbContainer.Terminate(context.Background()) factory := NewFactory() config := factory.CreateDefaultConfig().(*Config) @@ -484,15 +491,16 @@ func TestOracleDBIntegration(t *testing.T) { } ctx := context.Background() - container, err := testcontainers.GenericContainer( + dbContainer, err := testcontainers.GenericContainer( ctx, testcontainers.GenericContainerRequest{ ContainerRequest: req, Started: true, }, ) - require.NotNil(t, container) + require.NotNil(t, dbContainer) require.NoError(t, err) + defer dbContainer.Terminate(ctx) genreKey := "GENRE" factory := NewFactory() @@ -599,14 +607,16 @@ func TestMysqlIntegration(t *testing.T) { } ctx := context.Background() - _, err := testcontainers.GenericContainer( + dbContainer, err := testcontainers.GenericContainer( ctx, testcontainers.GenericContainerRequest{ ContainerRequest: req, Started: true, }, ) + require.NotNil(t, dbContainer) require.NoError(t, err) + defer dbContainer.Terminate(ctx) factory := NewFactory() config := factory.CreateDefaultConfig().(*Config)