From 2c462ccb8f2d60deb4e2357206b5eda3f795be39 Mon Sep 17 00:00:00 2001 From: Michal Grandys Date: Sun, 29 Sep 2024 00:32:39 +0200 Subject: [PATCH] [receiver/sqlquery] sqlserver integration tests, improved log functionality coverage --- receiver/sqlqueryreceiver/integration_test.go | 780 +++++++++--------- .../testdata/integration/oracle/init.sql | 20 +- .../testdata/integration/sqlserver/Dockerfile | 12 + .../integration/sqlserver/configure-db.sh | 4 + .../integration/sqlserver/entrypoint.sh | 13 + .../integration/sqlserver/expected.yaml | 42 + .../testdata/integration/sqlserver/init.sql | 51 ++ 7 files changed, 543 insertions(+), 379 deletions(-) create mode 100644 receiver/sqlqueryreceiver/testdata/integration/sqlserver/Dockerfile create mode 100644 receiver/sqlqueryreceiver/testdata/integration/sqlserver/configure-db.sh create mode 100644 receiver/sqlqueryreceiver/testdata/integration/sqlserver/entrypoint.sh create mode 100644 receiver/sqlqueryreceiver/testdata/integration/sqlserver/expected.yaml create mode 100644 receiver/sqlqueryreceiver/testdata/integration/sqlserver/init.sql diff --git a/receiver/sqlqueryreceiver/integration_test.go b/receiver/sqlqueryreceiver/integration_test.go index d1326ca7b1b3..05ed7ac2ee38 100644 --- a/receiver/sqlqueryreceiver/integration_test.go +++ b/receiver/sqlqueryreceiver/integration_test.go @@ -5,8 +5,8 @@ package sqlqueryreceiver import ( "context" + "database/sql" "fmt" - "io" "path/filepath" "runtime" "strings" @@ -36,305 +36,381 @@ const ( postgresqlPort = "5432" oraclePort = "1521" mysqlPort = "3306" + sqlServerPort = "1433" ) -func TestPostgresIntegrationLogsTrackingWithoutStorage(t *testing.T) { - // Start Postgres container. - externalPort := "15430" - dbContainer := startPostgresDbContainer(t, externalPort) - defer func() { - require.NoError(t, dbContainer.Terminate(context.Background())) - }() - - // Start the SQL Query receiver. - receiverCreateSettings := receivertest.NewNopSettings() - receiver, config, consumer := createTestLogsReceiverForPostgres(t, externalPort, receiverCreateSettings) - config.CollectionInterval = time.Second - config.Queries = []sqlquery.Query{ - { - SQL: "select * from simple_logs where id > $1", - Logs: []sqlquery.LogsCfg{ - { - BodyColumn: "body", - AttributeColumns: []string{"attribute"}, - }, +type DbEngine struct { + Port string + SqlParameter string + CheckCompatibility func(t *testing.T) + ConnectionString func(host string, externalPort nat.Port) string + Driver string + CurrentTimestampFunction string + ConvertColumnName func(string) string + ContainerRequest testcontainers.ContainerRequest +} + +var ( + Postgres = DbEngine{ + Port: postgresqlPort, + SqlParameter: "$1", + CheckCompatibility: func(t *testing.T) { + // No compatibility checks needed for Postgres + }, + ConnectionString: func(host string, externalPort nat.Port) string { + return fmt.Sprintf("host=%s port=%s user=otel password=otel sslmode=disable", host, externalPort.Port()) + }, + Driver: "postgres", + CurrentTimestampFunction: "now()", + ConvertColumnName: func(name string) string { return name }, + ContainerRequest: testcontainers.ContainerRequest{ + Image: "postgres:9.6.24", + Env: map[string]string{ + "POSTGRES_USER": "root", + "POSTGRES_PASSWORD": "otel", + "POSTGRES_DB": "otel", }, - TrackingColumn: "id", - TrackingStartValue: "0", + Files: []testcontainers.ContainerFile{{ + HostFilePath: filepath.Join("testdata", "integration", "postgresql", "init.sql"), + ContainerFilePath: "/docker-entrypoint-initdb.d/init.sql", + FileMode: 700, + }}, + ExposedPorts: []string{postgresqlPort}, + WaitingFor: wait.ForListeningPort(postgresqlPort). + WithStartupTimeout(2 * time.Minute), }, } - host := componenttest.NewNopHost() - err := receiver.Start(context.Background(), host) - require.NoError(t, err) - - // Verify there's 5 logs received. - require.Eventuallyf( - t, - func() bool { - return consumer.LogRecordCount() > 0 + MySql = DbEngine{ + Port: mysqlPort, + SqlParameter: "?", + CheckCompatibility: func(t *testing.T) { + // No compatibility checks needed for MySql }, - 1*time.Minute, - 1*time.Second, - "failed to receive more than 0 logs", - ) - require.Equal(t, 5, consumer.LogRecordCount()) - testAllSimpleLogs(t, consumer.AllLogs()) - - // Stop the SQL Query receiver. - err = receiver.Shutdown(context.Background()) - require.NoError(t, err) - - // Start new SQL Query receiver with the same configuration. - receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort, receiverCreateSettings) - config.CollectionInterval = time.Second - config.Queries = []sqlquery.Query{ - { - SQL: "select * from simple_logs where id > $1", - Logs: []sqlquery.LogsCfg{ - { - BodyColumn: "body", - AttributeColumns: []string{"attribute"}, - }, + ConnectionString: func(host string, externalPort nat.Port) string { + return fmt.Sprintf("otel:otel@tcp(%s:%s)/otel", host, externalPort.Port()) + }, + Driver: "mysql", + CurrentTimestampFunction: "now()", + ConvertColumnName: func(name string) string { return name }, + ContainerRequest: testcontainers.ContainerRequest{ + Image: "mysql:8.0.33", + Env: map[string]string{ + "MYSQL_USER": "otel", + "MYSQL_PASSWORD": "otel", + "MYSQL_ROOT_PASSWORD": "otel", + "MYSQL_DATABASE": "otel", }, - TrackingColumn: "id", - TrackingStartValue: "0", + Files: []testcontainers.ContainerFile{{ + HostFilePath: filepath.Join("testdata", "integration", "mysql", "init.sql"), + ContainerFilePath: "/docker-entrypoint-initdb.d/init.sql", + FileMode: 700, + }}, + ExposedPorts: []string{mysqlPort}, + WaitingFor: wait.ForListeningPort(mysqlPort).WithStartupTimeout(2 * time.Minute), }, } - err = receiver.Start(context.Background(), host) - require.NoError(t, err) - - // Wait for some logs to come in. - require.Eventuallyf( - t, - func() bool { - return consumer.LogRecordCount() > 0 + Oracle = DbEngine{ + Port: oraclePort, + SqlParameter: ":1", + CheckCompatibility: func(t *testing.T) { + if runtime.GOARCH == "arm64" { + t.Skip("Incompatible with arm64") + } + t.Skip("Skipping the test until https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/27577 is fixed") }, - 1*time.Minute, - 1*time.Second, - "failed to receive more than 0 logs", - ) - - // stop the SQL Query receiver - err = receiver.Shutdown(context.Background()) - require.NoError(t, err) - - // Verify that the same logs are collected again. - require.Equal(t, 5, consumer.LogRecordCount()) - testAllSimpleLogs(t, consumer.AllLogs()) -} - -func TestPostgresIntegrationLogsTrackingByTimestampColumnWithoutStorage(t *testing.T) { - // Start Postgres container. - externalPort := "15432" - dbContainer := startPostgresDbContainer(t, externalPort) - defer func() { - require.NoError(t, dbContainer.Terminate(context.Background())) - }() - - // Start the SQL Query receiver. - receiverCreateSettings := receivertest.NewNopSettings() - receiver, config, consumer := createTestLogsReceiverForPostgres(t, externalPort, receiverCreateSettings) - config.CollectionInterval = 100 * time.Millisecond - config.Queries = []sqlquery.Query{ - { - SQL: "select * from simple_logs where insert_time > $1 order by insert_time asc", - Logs: []sqlquery.LogsCfg{ - { - BodyColumn: "body", - AttributeColumns: []string{"attribute"}, - }, + ConnectionString: func(host string, externalPort nat.Port) string { + return fmt.Sprintf("oracle://otel:p@ssw%%25rd@%s:%s/XE", host, externalPort.Port()) + }, + Driver: "oracle", + CurrentTimestampFunction: "SYSTIMESTAMP", + ConvertColumnName: func(name string) string { return strings.ToUpper(name) }, + ContainerRequest: testcontainers.ContainerRequest{ + FromDockerfile: testcontainers.FromDockerfile{ + Context: filepath.Join("testdata", "integration", "oracle"), + Dockerfile: "Dockerfile.oracledb", }, - TrackingColumn: "insert_time", - TrackingStartValue: "2022-06-03 21:00:00+00", + ExposedPorts: []string{oraclePort}, + WaitingFor: wait.NewHealthStrategy().WithStartupTimeout(30 * time.Minute), }, } - host := componenttest.NewNopHost() - err := receiver.Start(context.Background(), host) - require.NoError(t, err) - - // Verify there's 5 logs received. - require.Eventuallyf( - t, - func() bool { - return consumer.LogRecordCount() > 0 + SqlServer = DbEngine{ + Port: sqlServerPort, + SqlParameter: "@p1", + CheckCompatibility: func(t *testing.T) { + if runtime.GOARCH == "arm64" { + t.Skip("Incompatible with arm64") + } + t.Skip("Skipping the test until https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/27577 is fixed") }, - 1*time.Minute, - 500*time.Millisecond, - "failed to receive more than 0 logs", - ) - require.Equal(t, 5, consumer.LogRecordCount()) - testAllSimpleLogs(t, consumer.AllLogs()) - - // Stop the SQL Query receiver. - err = receiver.Shutdown(context.Background()) - require.NoError(t, err) -} - -func TestPostgresIntegrationLogsTrackingWithStorage(t *testing.T) { - // start Postgres container - externalPort := "15431" - dbContainer := startPostgresDbContainer(t, externalPort) - defer func() { - require.NoError(t, dbContainer.Terminate(context.Background())) - }() - - // create a File Storage extension writing to a temporary directory in local filesystem - storageDir := t.TempDir() - storageExtension := storagetest.NewFileBackedStorageExtension("test", storageDir) - - // create SQL Query receiver configured with the File Storage extension - receiverCreateSettings := receivertest.NewNopSettings() - receiver, config, consumer := createTestLogsReceiverForPostgres(t, externalPort, receiverCreateSettings) - config.CollectionInterval = time.Second - config.StorageID = &storageExtension.ID - config.Queries = []sqlquery.Query{ - { - SQL: "select * from simple_logs where id > $1", - Logs: []sqlquery.LogsCfg{ - { - BodyColumn: "body", - AttributeColumns: []string{"attribute"}, - }, + ConnectionString: func(host string, externalPort nat.Port) string { + return fmt.Sprintf("sqlserver://otel:YourStrong%%21Passw0rd@%s:%s?database=otel", host, externalPort.Port()) + }, + Driver: "sqlserver", + CurrentTimestampFunction: "GETDATE()", + ConvertColumnName: func(name string) string { return name }, + ContainerRequest: testcontainers.ContainerRequest{ + FromDockerfile: testcontainers.FromDockerfile{ + Context: filepath.Join("testdata", "integration", "sqlserver"), + Dockerfile: "Dockerfile", }, - TrackingColumn: "id", - TrackingStartValue: "0", + Env: map[string]string{ + "ACCEPT_EULA": "Y", + "SA_PASSWORD": "YourStrong!Passw0rd", + }, + ExposedPorts: []string{sqlServerPort}, + WaitingFor: wait.ForAll( + wait.ForListeningPort(sqlServerPort), + wait.ForLog("Initiation of otel database is complete"), + ).WithDeadline(5 * time.Minute), }, } +) - // start the SQL Query receiver - host := storagetest.NewStorageHost().WithExtension(storageExtension.ID, storageExtension) - err := receiver.Start(context.Background(), host) - require.NoError(t, err) +func TestIntegrationLogsTrackingWithStorage(t *testing.T) { + tests := []struct { + name string + engine DbEngine + }{ + {name: "Postgres", engine: Postgres}, + {name: "MySQL", engine: MySql}, + {name: "SqlServer", engine: SqlServer}, + {name: "Oracle", engine: Oracle}, + } - // Wait for logs to come in. - require.Eventuallyf( - t, - func() bool { - return consumer.LogRecordCount() > 0 - }, - 1*time.Minute, - 1*time.Second, - "failed to receive more than 0 logs", - ) + for _, tt := range tests { + trackingColumn := tt.engine.ConvertColumnName("id") + trackingStartValue := "0" + t.Run(tt.name, func(t *testing.T) { + tt.engine.CheckCompatibility(t) + dbContainer, dbHost, externalPort := startDbContainerWithConfig(t, tt.engine) + defer func() { + require.NoError(t, dbContainer.Terminate(context.Background())) + }() + + storageDir := t.TempDir() + storageExtension := storagetest.NewFileBackedStorageExtension("test", storageDir) + + receiverCreateSettings := receivertest.NewNopSettings() + receiver, config, consumer := createTestLogsReceiver(t, tt.engine.Driver, tt.engine.ConnectionString(dbHost, externalPort), receiverCreateSettings) + config.CollectionInterval = time.Second + config.Telemetry.Logs.Query = true + config.StorageID = &storageExtension.ID + config.Queries = []sqlquery.Query{ + { + SQL: fmt.Sprintf("select * from simple_logs where %s > %s", trackingColumn, tt.engine.SqlParameter), + Logs: []sqlquery.LogsCfg{ + { + BodyColumn: tt.engine.ConvertColumnName("body"), + AttributeColumns: []string{tt.engine.ConvertColumnName("attribute")}, + }, + }, + TrackingColumn: trackingColumn, + TrackingStartValue: trackingStartValue, + }, + } - // stop the SQL Query receiver - err = receiver.Shutdown(context.Background()) - require.NoError(t, err) + host := storagetest.NewStorageHost().WithExtension(storageExtension.ID, storageExtension) + err := receiver.Start(context.Background(), host) + require.NoError(t, err) - // verify there's 5 logs received - initialLogCount := 5 - require.Equal(t, initialLogCount, consumer.LogRecordCount()) - testAllSimpleLogs(t, consumer.AllLogs()) - - // start the SQL Query receiver again - receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort, receiverCreateSettings) - config.CollectionInterval = time.Second - config.StorageID = &storageExtension.ID - config.Queries = []sqlquery.Query{ - { - SQL: "select * from simple_logs where id > $1", - Logs: []sqlquery.LogsCfg{ + require.Eventuallyf( + t, + func() bool { + return consumer.LogRecordCount() > 0 + }, + 1*time.Minute, + 1*time.Second, + "failed to receive more than 0 logs", + ) + + err = receiver.Shutdown(context.Background()) + require.NoError(t, err) + + initialLogCount := 5 + require.Equal(t, initialLogCount, consumer.LogRecordCount()) + testAllSimpleLogs(t, consumer.AllLogs(), tt.engine.ConvertColumnName("attribute")) + + receiver, config, consumer = createTestLogsReceiver(t, tt.engine.Driver, tt.engine.ConnectionString(dbHost, externalPort), receiverCreateSettings) + config.CollectionInterval = time.Second + config.Telemetry.Logs.Query = true + config.StorageID = &storageExtension.ID + config.Queries = []sqlquery.Query{ { - BodyColumn: "body", - AttributeColumns: []string{"attribute"}, + SQL: fmt.Sprintf("select * from simple_logs where %s > %s", trackingColumn, tt.engine.SqlParameter), + Logs: []sqlquery.LogsCfg{ + { + BodyColumn: tt.engine.ConvertColumnName("body"), + AttributeColumns: []string{tt.engine.ConvertColumnName("attribute")}, + }, + }, + TrackingColumn: trackingColumn, + TrackingStartValue: trackingStartValue, }, - }, - TrackingColumn: "id", - TrackingStartValue: "0", - }, - } - err = receiver.Start(context.Background(), host) - require.NoError(t, err) + } + err = receiver.Start(context.Background(), host) + require.NoError(t, err) - // Wait for some logs to come in. - time.Sleep(5 * time.Second) + time.Sleep(5 * time.Second) - // stop the SQL Query receiver - err = receiver.Shutdown(context.Background()) - require.NoError(t, err) + err = receiver.Shutdown(context.Background()) + require.NoError(t, err) + + require.Equal(t, 0, consumer.LogRecordCount()) - // Verify that no new logs came in - require.Equal(t, 0, consumer.LogRecordCount()) - - // write a number of new logs to the database - newLogCount := 3 - insertPostgresSimpleLogs(t, dbContainer, initialLogCount, newLogCount) - - // start the SQL Query receiver again - receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort, receiverCreateSettings) - config.CollectionInterval = time.Second - config.StorageID = &storageExtension.ID - config.Queries = []sqlquery.Query{ - { - SQL: "select * from simple_logs where id > $1", - Logs: []sqlquery.LogsCfg{ + newLogCount := 3 + insertSimpleLogs(t, tt.engine, dbContainer, initialLogCount, newLogCount) + + receiver, config, consumer = createTestLogsReceiver(t, tt.engine.Driver, tt.engine.ConnectionString(dbHost, externalPort), receiverCreateSettings) + config.CollectionInterval = time.Second + config.Telemetry.Logs.Query = true + config.StorageID = &storageExtension.ID + config.Queries = []sqlquery.Query{ { - BodyColumn: "body", - AttributeColumns: []string{"attribute"}, + SQL: fmt.Sprintf("select * from simple_logs where %s > %s", trackingColumn, tt.engine.SqlParameter), + Logs: []sqlquery.LogsCfg{ + { + BodyColumn: tt.engine.ConvertColumnName("body"), + AttributeColumns: []string{tt.engine.ConvertColumnName("attribute")}, + }, + }, + TrackingColumn: trackingColumn, + TrackingStartValue: trackingStartValue, }, - }, - TrackingColumn: "id", - TrackingStartValue: "0", - }, - } - err = receiver.Start(context.Background(), host) - require.NoError(t, err) + } + err = receiver.Start(context.Background(), host) + require.NoError(t, err) - // Wait for new logs to come in. - require.Eventuallyf( - t, - func() bool { - return consumer.LogRecordCount() > 0 - }, - 1*time.Minute, - 1*time.Second, - "failed to receive more than 0 logs", - ) + require.Eventuallyf( + t, + func() bool { + return consumer.LogRecordCount() > 0 + }, + 1*time.Minute, + 1*time.Second, + "failed to receive more than 0 logs", + ) - // stop the SQL Query receiver - err = receiver.Shutdown(context.Background()) - require.NoError(t, err) + err = receiver.Shutdown(context.Background()) + require.NoError(t, err) - // Verify that the newly added logs were received. - require.Equal(t, newLogCount, consumer.LogRecordCount()) - printLogs(consumer.AllLogs()) + require.Equal(t, newLogCount, consumer.LogRecordCount()) + }) + } } -func startPostgresDbContainer(t *testing.T, externalPort string) testcontainers.Container { - req := testcontainers.ContainerRequest{ - Image: "postgres:9.6.24", - Env: map[string]string{ - "POSTGRES_USER": "root", - "POSTGRES_PASSWORD": "otel", - "POSTGRES_DB": "otel", - }, - Files: []testcontainers.ContainerFile{{ - HostFilePath: filepath.Join("testdata", "integration", "postgresql", "init.sql"), - ContainerFilePath: "/docker-entrypoint-initdb.d/init.sql", - FileMode: 700, - }}, - ExposedPorts: []string{externalPort + ":" + postgresqlPort}, - WaitingFor: wait.ForListeningPort(nat.Port(postgresqlPort)). - WithStartupTimeout(2 * time.Minute), +func TestIntegrationLogsTrackingWithoutStorage(t *testing.T) { + tests := []struct { + name string + engine DbEngine + trackingColumn string + trackingStartValue string + trackingStartValueFormat string + }{ + {name: "PostgresById", engine: Postgres, trackingColumn: "id", trackingStartValue: "0", trackingStartValueFormat: ""}, + {name: "MySQLById", engine: MySql, trackingColumn: "id", trackingStartValue: "0", trackingStartValueFormat: ""}, + {name: "SqlServerById", engine: SqlServer, trackingColumn: "id", trackingStartValue: "0", trackingStartValueFormat: ""}, + {name: "OracleById", engine: Oracle, trackingColumn: "ID", trackingStartValue: "0", trackingStartValueFormat: ""}, + {name: "PostgresByTimestamp", engine: Postgres, trackingColumn: "insert_time", trackingStartValue: "2022-06-03 21:00:00+00", trackingStartValueFormat: ""}, + {name: "MySQLByTimestamp", engine: MySql, trackingColumn: "insert_time", trackingStartValue: "2022-06-03 21:00:00", trackingStartValueFormat: ""}, + {name: "SqlServerByTimestamp", engine: SqlServer, trackingColumn: "insert_time", trackingStartValue: "2022-06-03 21:00:00", trackingStartValueFormat: ""}, + {name: "OracleByTimestamp", engine: Oracle, trackingColumn: "INSERT_TIME", trackingStartValue: "2022-06-03T21:00:00.000Z", trackingStartValueFormat: "TO_TIMESTAMP_TZ(:1, 'YYYY-MM-DD\"T\"HH24:MI:SS.FF6TZH:TZM')"}, } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.engine.CheckCompatibility(t) + dbContainer, dbHost, externalPort := startDbContainerWithConfig(t, tt.engine) + defer func() { + require.NoError(t, dbContainer.Terminate(context.Background())) + }() + + receiverCreateSettings := receivertest.NewNopSettings() + receiver, config, consumer := createTestLogsReceiver(t, tt.engine.Driver, tt.engine.ConnectionString(dbHost, externalPort), receiverCreateSettings) + config.CollectionInterval = 100 * time.Millisecond + config.Telemetry.Logs.Query = true + + trackingColumn := tt.engine.ConvertColumnName(tt.trackingColumn) + trackingColumnParameter := tt.engine.SqlParameter + if tt.trackingStartValueFormat != "" { + trackingColumnParameter = tt.trackingStartValueFormat + } + + config.Queries = []sqlquery.Query{ + { + SQL: fmt.Sprintf("select * from simple_logs where %s > %s order by %s asc", trackingColumn, trackingColumnParameter, trackingColumn), + Logs: []sqlquery.LogsCfg{ + { + BodyColumn: tt.engine.ConvertColumnName("body"), + AttributeColumns: []string{tt.engine.ConvertColumnName("attribute")}, + }, + }, + TrackingColumn: trackingColumn, + TrackingStartValue: tt.trackingStartValue, + }, + } + host := componenttest.NewNopHost() + err := receiver.Start(context.Background(), host) + require.NoError(t, err) + + require.Eventuallyf( + t, + func() bool { + return consumer.LogRecordCount() > 0 + }, + 1*time.Minute, + 500*time.Millisecond, + "failed to receive more than 0 logs", + ) + require.Equal(t, 5, consumer.LogRecordCount()) + testAllSimpleLogs(t, consumer.AllLogs(), tt.engine.ConvertColumnName("attribute")) + + err = receiver.Shutdown(context.Background()) + require.NoError(t, err) + }) + } +} + +func startDbContainerWithConfig(t *testing.T, engine DbEngine) (testcontainers.Container, string, nat.Port) { + ctx := context.Background() container, err := testcontainers.GenericContainer( - context.Background(), + ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: req, + ContainerRequest: engine.ContainerRequest, Started: true, }, ) require.NoError(t, err) - return container + dbPort, err := container.MappedPort(ctx, nat.Port(engine.Port)) + require.NoError(t, err) + host, err := container.Host(ctx) + require.NoError(t, err) + return container, host, dbPort +} + +func insertSimpleLogs(t *testing.T, engine DbEngine, container testcontainers.Container, existingLogID, newLogCount int) { + externalPort, err := container.MappedPort(context.Background(), nat.Port(engine.Port)) + require.NoError(t, err) + + host, err := container.Host(context.Background()) + require.NoError(t, err) + + db, err := sql.Open(engine.Driver, engine.ConnectionString(host, externalPort)) + require.NoError(t, err) + defer db.Close() + + for newLogID := existingLogID + 1; newLogID <= existingLogID+newLogCount; newLogID++ { + query := fmt.Sprintf("insert into simple_logs (id, insert_time, body, attribute) values (%d, %s, 'another log %d', 'TLSv1.2')", newLogID, engine.CurrentTimestampFunction, newLogID) + _, err := db.Exec(query) + require.NoError(t, err) + } + } -func createTestLogsReceiverForPostgres(t *testing.T, externalPort string, receiverCreateSettings receiver.Settings) (*logsReceiver, *Config, *consumertest.LogsSink) { +func createTestLogsReceiver(t *testing.T, driver, dataSource string, receiverCreateSettings receiver.Settings) (*logsReceiver, *Config, *consumertest.LogsSink) { factory := NewFactory() config := factory.CreateDefaultConfig().(*Config) - config.CollectionInterval = time.Second - config.Driver = "postgres" - config.DataSource = fmt.Sprintf("host=localhost port=%s user=otel password=otel sslmode=disable", externalPort) + config.Driver = driver + config.DataSource = dataSource consumer := &consumertest.LogsSink{} receiverCreateSettings.Logger = zap.NewExample() @@ -348,65 +424,44 @@ func createTestLogsReceiverForPostgres(t *testing.T, externalPort string, receiv return receiver.(*logsReceiver), config, consumer } -func printLogs(allLogs []plog.Logs) { - for logIndex := 0; logIndex < len(allLogs); logIndex++ { - logs := allLogs[logIndex] - for resourceIndex := 0; resourceIndex < logs.ResourceLogs().Len(); resourceIndex++ { - resource := logs.ResourceLogs().At(resourceIndex) - for scopeIndex := 0; scopeIndex < resource.ScopeLogs().Len(); scopeIndex++ { - scope := resource.ScopeLogs().At(scopeIndex) - for recordIndex := 0; recordIndex < scope.LogRecords().Len(); recordIndex++ { - logRecord := scope.LogRecords().At(recordIndex) - fmt.Printf("log %v resource %v scope %v log %v body: %v\n", logIndex, resourceIndex, scopeIndex, recordIndex, logRecord.Body().Str()) - } - } - } +func testAllSimpleLogs(t *testing.T, logs []plog.Logs, attributeColumnName string) { + assert.Len(t, logs, 1) + assert.Equal(t, 1, logs[0].ResourceLogs().Len()) + assert.Equal(t, 1, logs[0].ResourceLogs().At(0).ScopeLogs().Len()) + expectedLogBodies := []string{ + "- - - [03/Jun/2022:21:59:26 +0000] \"GET /api/health HTTP/1.1\" 200 6197 4 \"-\" \"-\" 445af8e6c428303f -", + "- - - [03/Jun/2022:21:59:26 +0000] \"GET /api/health HTTP/1.1\" 200 6205 5 \"-\" \"-\" 3285f43cd4baa202 -", + "- - - [03/Jun/2022:21:59:29 +0000] \"GET /api/health HTTP/1.1\" 200 6233 4 \"-\" \"-\" 579e8362d3185b61 -", + "- - - [03/Jun/2022:21:59:31 +0000] \"GET /api/health HTTP/1.1\" 200 6207 5 \"-\" \"-\" 8c6ac61ae66e509f -", + "- - - [03/Jun/2022:21:59:31 +0000] \"GET /api/health HTTP/1.1\" 200 6200 4 \"-\" \"-\" c163495861e873d8 -", } -} - -func insertPostgresSimpleLogs(t *testing.T, container testcontainers.Container, existingLogID, newLogCount int) { - for newLogID := existingLogID + 1; newLogID <= existingLogID+newLogCount; newLogID++ { - query := fmt.Sprintf("insert into simple_logs (id, insert_time, body, attribute) values (%d, now(), 'another log %d', 'TLSv1.2');", newLogID, newLogID) - returnValue, returnMessageReader, err := container.Exec(context.Background(), []string{ - "psql", "-U", "otel", "-c", query, - }) - require.NoError(t, err) - returnMessageBuffer := new(strings.Builder) - _, err = io.Copy(returnMessageBuffer, returnMessageReader) - require.NoError(t, err) - returnMessage := returnMessageBuffer.String() - - assert.Equal(t, 0, returnValue) - assert.Contains(t, returnMessage, "INSERT 0 1") + expectedLogAttributes := []string{ + "TLSv1.2", + "TLSv1", + "TLSv1.2", + "TLSv1", + "TLSv1.2", + } + assert.Equal(t, len(expectedLogBodies), logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len()) + for i := range expectedLogBodies { + logRecord := logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i) + assert.Equal(t, expectedLogBodies[i], logRecord.Body().Str()) + logAttribute, _ := logRecord.Attributes().Get(attributeColumnName) + assert.Equal(t, expectedLogAttributes[i], logAttribute.Str()) } } func TestPostgresqlIntegrationMetrics(t *testing.T) { + Postgres.CheckCompatibility(t) scraperinttest.NewIntegrationTest( NewFactory(), scraperinttest.WithContainerRequest( - testcontainers.ContainerRequest{ - Image: "postgres:9.6.24", - Env: map[string]string{ - "POSTGRES_USER": "root", - "POSTGRES_PASSWORD": "otel", - "POSTGRES_DB": "otel", - }, - Files: []testcontainers.ContainerFile{{ - HostFilePath: filepath.Join("testdata", "integration", "postgresql", "init.sql"), - ContainerFilePath: "/docker-entrypoint-initdb.d/init.sql", - FileMode: 700, - }}, - ExposedPorts: []string{postgresqlPort}, - WaitingFor: wait.ForListeningPort(nat.Port(postgresqlPort)). - WithStartupTimeout(2 * time.Minute), - }), + Postgres.ContainerRequest), scraperinttest.WithCustomConfig( func(t *testing.T, cfg component.Config, ci *scraperinttest.ContainerInfo) { rCfg := cfg.(*Config) - rCfg.Driver = "postgres" - rCfg.DataSource = fmt.Sprintf("host=%s port=%s user=otel password=otel sslmode=disable", - ci.Host(t), ci.MappedPort(t, postgresqlPort)) + rCfg.Driver = Postgres.Driver + rCfg.DataSource = Postgres.ConnectionString(ci.Host(t), nat.Port(ci.MappedPort(t, Postgres.Port))) rCfg.Queries = []sqlquery.Query{ { SQL: "select genre, count(*), avg(imdb_rating) from movie group by genre", @@ -495,34 +550,20 @@ func TestPostgresqlIntegrationMetrics(t *testing.T) { // This test ensures the collector can connect to an Oracle DB, and properly get metrics. It's not intended to // test the receiver itself. func TestOracleDBIntegrationMetrics(t *testing.T) { - t.Skip("Skipping the test until https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/27577 is fixed") - if runtime.GOARCH == "arm64" { - t.Skip("Incompatible with arm64") - } + Oracle.CheckCompatibility(t) scraperinttest.NewIntegrationTest( NewFactory(), scraperinttest.WithContainerRequest( - testcontainers.ContainerRequest{ - FromDockerfile: testcontainers.FromDockerfile{ - Context: filepath.Join("testdata", "integration", "oracle"), - Dockerfile: "Dockerfile.oracledb", - }, - ExposedPorts: []string{oraclePort}, - // The Oracle DB container takes close to 10 minutes on a local machine - // to do the default setup, so the best way to account for startup time - // is to wait for the container to be healthy before continuing test. - WaitingFor: wait.NewHealthStrategy().WithStartupTimeout(30 * time.Minute), - }), + Oracle.ContainerRequest), scraperinttest.WithCreateContainerTimeout(30*time.Minute), scraperinttest.WithCustomConfig( func(t *testing.T, cfg component.Config, ci *scraperinttest.ContainerInfo) { rCfg := cfg.(*Config) - rCfg.Driver = "oracle" - rCfg.DataSource = fmt.Sprintf("oracle://otel:p@ssw%%25rd@%s:%s/XE", - ci.Host(t), ci.MappedPort(t, oraclePort)) + rCfg.Driver = Oracle.Driver + rCfg.DataSource = Oracle.ConnectionString(ci.Host(t), nat.Port(ci.MappedPort(t, Oracle.Port))) rCfg.Queries = []sqlquery.Query{ { - SQL: "select genre, count(*) as count, avg(imdb_rating) as avg from sys.movie group by genre", + SQL: "select genre, count(*) as count, avg(imdb_rating) as avg from movie group by genre", Metrics: []sqlquery.MetricCfg{ { MetricName: "genre.count", @@ -552,34 +593,18 @@ func TestOracleDBIntegrationMetrics(t *testing.T) { } func TestMysqlIntegrationMetrics(t *testing.T) { + MySql.CheckCompatibility(t) scraperinttest.NewIntegrationTest( NewFactory(), - scraperinttest.WithContainerRequest( - testcontainers.ContainerRequest{ - Image: "mysql:8.0.33", - Env: map[string]string{ - "MYSQL_USER": "otel", - "MYSQL_PASSWORD": "otel", - "MYSQL_ROOT_PASSWORD": "otel", - "MYSQL_DATABASE": "otel", - }, - Files: []testcontainers.ContainerFile{{ - HostFilePath: filepath.Join("testdata", "integration", "mysql", "init.sql"), - ContainerFilePath: "/docker-entrypoint-initdb.d/init.sql", - FileMode: 700, - }}, - ExposedPorts: []string{mysqlPort}, - WaitingFor: wait.ForListeningPort(nat.Port(mysqlPort)).WithStartupTimeout(2 * time.Minute), - }), + scraperinttest.WithContainerRequest(MySql.ContainerRequest), scraperinttest.WithCustomConfig( func(t *testing.T, cfg component.Config, ci *scraperinttest.ContainerInfo) { rCfg := cfg.(*Config) - rCfg.Driver = "mysql" - rCfg.DataSource = fmt.Sprintf("otel:otel@tcp(%s:%s)/otel", - ci.Host(t), ci.MappedPort(t, mysqlPort)) + rCfg.Driver = MySql.Driver + rCfg.DataSource = MySql.ConnectionString(ci.Host(t), nat.Port(ci.MappedPort(t, MySql.Port))) rCfg.Queries = []sqlquery.Query{ { - SQL: "select genre, count(*), avg(imdb_rating) from movie group by genre", + SQL: "select genre, count(*), avg(imdb_rating) from movie group by genre order by genre desc", Metrics: []sqlquery.MetricCfg{ { MetricName: "genre.count", @@ -647,38 +672,51 @@ func TestMysqlIntegrationMetrics(t *testing.T) { }, } }), - scraperinttest.WithExpectedFile( - filepath.Join("testdata", "integration", "mysql", "expected.yaml"), - ), + scraperinttest.WithExpectedFile(filepath.Join("testdata", "integration", "mysql", "expected.yaml")), scraperinttest.WithCompareOptions( pmetrictest.IgnoreTimestamp(), ), ).Run(t) } -func testAllSimpleLogs(t *testing.T, logs []plog.Logs) { - assert.Len(t, logs, 1) - assert.Equal(t, 1, logs[0].ResourceLogs().Len()) - assert.Equal(t, 1, logs[0].ResourceLogs().At(0).ScopeLogs().Len()) - expectedLogBodies := []string{ - "- - - [03/Jun/2022:21:59:26 +0000] \"GET /api/health HTTP/1.1\" 200 6197 4 \"-\" \"-\" 445af8e6c428303f -", - "- - - [03/Jun/2022:21:59:26 +0000] \"GET /api/health HTTP/1.1\" 200 6205 5 \"-\" \"-\" 3285f43cd4baa202 -", - "- - - [03/Jun/2022:21:59:29 +0000] \"GET /api/health HTTP/1.1\" 200 6233 4 \"-\" \"-\" 579e8362d3185b61 -", - "- - - [03/Jun/2022:21:59:31 +0000] \"GET /api/health HTTP/1.1\" 200 6207 5 \"-\" \"-\" 8c6ac61ae66e509f -", - "- - - [03/Jun/2022:21:59:31 +0000] \"GET /api/health HTTP/1.1\" 200 6200 4 \"-\" \"-\" c163495861e873d8 -", - } - expectedLogAttributes := []string{ - "TLSv1.2", - "TLSv1", - "TLSv1.2", - "TLSv1", - "TLSv1.2", - } - assert.Equal(t, len(expectedLogBodies), logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len()) - for i := range expectedLogBodies { - logRecord := logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i) - assert.Equal(t, expectedLogBodies[i], logRecord.Body().Str()) - logAttribute, _ := logRecord.Attributes().Get("attribute") - assert.Equal(t, expectedLogAttributes[i], logAttribute.Str()) - } +func TestSqlServerIntegrationMetrics(t *testing.T) { + SqlServer.CheckCompatibility(t) + scraperinttest.NewIntegrationTest( + NewFactory(), + scraperinttest.WithContainerRequest(SqlServer.ContainerRequest), + scraperinttest.WithCustomConfig( + func(t *testing.T, cfg component.Config, ci *scraperinttest.ContainerInfo) { + rCfg := cfg.(*Config) + rCfg.Driver = SqlServer.Driver + rCfg.DataSource = SqlServer.ConnectionString(ci.Host(t), nat.Port(ci.MappedPort(t, SqlServer.Port))) + rCfg.Queries = []sqlquery.Query{ + { + SQL: "select genre, count(*) as count, avg(imdb_rating) as avg from movie group by genre order by genre", + Metrics: []sqlquery.MetricCfg{ + { + MetricName: "genre.count", + ValueColumn: "count", + AttributeColumns: []string{"genre"}, + ValueType: sqlquery.MetricValueTypeInt, + DataType: sqlquery.MetricTypeGauge, + }, + { + MetricName: "genre.imdb", + ValueColumn: "avg", + AttributeColumns: []string{"genre"}, + ValueType: sqlquery.MetricValueTypeDouble, + DataType: sqlquery.MetricTypeGauge, + }, + }, + }, + } + }), + scraperinttest.WithExpectedFile( + filepath.Join("testdata", "integration", "sqlserver", "expected.yaml"), + ), + scraperinttest.WithCompareOptions( + pmetrictest.IgnoreTimestamp(), + pmetrictest.IgnoreMetricsOrder(), + ), + ).Run(t) } diff --git a/receiver/sqlqueryreceiver/testdata/integration/oracle/init.sql b/receiver/sqlqueryreceiver/testdata/integration/oracle/init.sql index 8597cfa846b6..fe0c5e4c09a7 100644 --- a/receiver/sqlqueryreceiver/testdata/integration/oracle/init.sql +++ b/receiver/sqlqueryreceiver/testdata/integration/oracle/init.sql @@ -1,9 +1,20 @@ +/* The alter session command is required to enable user creation in an Oracle docker container + This command shouldn't be used outside of test environments. */ +alter session set "_ORACLE_SCRIPT"=true; +CREATE USER OTEL IDENTIFIED BY "p@ssw%rd"; +GRANT CREATE SESSION TO OTEL; +GRANT ALL PRIVILEGES TO OTEL; +ALTER USER OTEL QUOTA UNLIMITED ON USERS; +-- Switch to the OTEL schema +ALTER SESSION SET CURRENT_SCHEMA = OTEL; + create table movie ( title VARCHAR2(256), genre VARCHAR(256), imdb_rating NUMBER ); +GRANT ALL ON movie TO OTEL; insert into movie (title, genre, imdb_rating) values ('E.T.', 'SciFi', 7.9); @@ -16,13 +27,6 @@ values ('Die Hard', 'Action', 8.2); insert into movie (title, genre, imdb_rating) values ('Mission Impossible', 'Action', 7.1); -/* The alter session command is required to enable user creation in an Oracle docker container - This command shouldn't be used outside of test environments. */ -alter session set "_ORACLE_SCRIPT"=true; -CREATE USER OTEL IDENTIFIED BY "p@ssw%rd"; -GRANT CREATE SESSION TO OTEL; -GRANT ALL ON movie TO OTEL; - create table simple_logs ( id number primary key, @@ -30,7 +34,7 @@ create table simple_logs body varchar2(4000), attribute varchar2(100) ); -grant select on simple_logs to otel; +GRANT ALL ON simple_logs TO OTEL; insert into simple_logs (id, insert_time, body, attribute) values (1, TIMESTAMP '2022-06-03 21:59:26 +00:00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -', 'TLSv1.2'); diff --git a/receiver/sqlqueryreceiver/testdata/integration/sqlserver/Dockerfile b/receiver/sqlqueryreceiver/testdata/integration/sqlserver/Dockerfile new file mode 100644 index 000000000000..ff730c070e52 --- /dev/null +++ b/receiver/sqlqueryreceiver/testdata/integration/sqlserver/Dockerfile @@ -0,0 +1,12 @@ +FROM mcr.microsoft.com/mssql/server:2017-latest + +ENV ACCEPT_EULA=Y +ENV SA_PASSWORD=YourStrong!Passw0rd + +COPY entrypoint.sh /usr/src/app/entrypoint.sh +COPY configure-db.sh /usr/src/app/configure-db.sh +COPY init.sql /usr/src/app/init.sql + +RUN chmod +x /usr/src/app/entrypoint.sh /usr/src/app/configure-db.sh + +ENTRYPOINT ["/bin/bash", "/usr/src/app/entrypoint.sh"] \ No newline at end of file diff --git a/receiver/sqlqueryreceiver/testdata/integration/sqlserver/configure-db.sh b/receiver/sqlqueryreceiver/testdata/integration/sqlserver/configure-db.sh new file mode 100644 index 000000000000..d8f058783c7c --- /dev/null +++ b/receiver/sqlqueryreceiver/testdata/integration/sqlserver/configure-db.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +# Run the SQL script to initialize the database +/opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P YourStrong!Passw0rd -d master -i /usr/src/app/init.sql \ No newline at end of file diff --git a/receiver/sqlqueryreceiver/testdata/integration/sqlserver/entrypoint.sh b/receiver/sqlqueryreceiver/testdata/integration/sqlserver/entrypoint.sh new file mode 100644 index 000000000000..cad7dab1451f --- /dev/null +++ b/receiver/sqlqueryreceiver/testdata/integration/sqlserver/entrypoint.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +# Start SQL Server in the background +/opt/mssql/bin/sqlservr & + +# Wait for SQL Server to start +sleep 30s + +# Run the setup script to create the DB and the schema in the DB +/usr/src/app/configure-db.sh + +# Call the original entrypoint script +wait \ No newline at end of file diff --git a/receiver/sqlqueryreceiver/testdata/integration/sqlserver/expected.yaml b/receiver/sqlqueryreceiver/testdata/integration/sqlserver/expected.yaml new file mode 100644 index 000000000000..5c08dbe7ee1a --- /dev/null +++ b/receiver/sqlqueryreceiver/testdata/integration/sqlserver/expected.yaml @@ -0,0 +1,42 @@ +resourceMetrics: + - resource: {} + scopeMetrics: + - metrics: + - gauge: + dataPoints: + - asInt: "2" + attributes: + - key: genre + value: + stringValue: Action + timeUnixNano: "1734876925424574000" + name: genre.count + - gauge: + dataPoints: + - asInt: "3" + attributes: + - key: genre + value: + stringValue: SciFi + timeUnixNano: "1734876925424574000" + name: genre.count + - gauge: + dataPoints: + - asDouble: 7.6499999999999995 + attributes: + - key: genre + value: + stringValue: Action + timeUnixNano: "1734876925424574000" + name: genre.imdb + - gauge: + dataPoints: + - asDouble: 8.200000000000001 + attributes: + - key: genre + value: + stringValue: SciFi + timeUnixNano: "1734876925424574000" + name: genre.imdb + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/sqlqueryreceiver \ No newline at end of file diff --git a/receiver/sqlqueryreceiver/testdata/integration/sqlserver/init.sql b/receiver/sqlqueryreceiver/testdata/integration/sqlserver/init.sql new file mode 100644 index 000000000000..1d3422c8cfc3 --- /dev/null +++ b/receiver/sqlqueryreceiver/testdata/integration/sqlserver/init.sql @@ -0,0 +1,51 @@ +USE master; +GO + +CREATE DATABASE otel; +GO + +CREATE LOGIN otel WITH PASSWORD = 'YourStrong!Passw0rd'; +GO + +USE otel; +GO + +CREATE USER otel FOR LOGIN otel; +GO + +ALTER ROLE db_owner ADD MEMBER otel; +GO + +CREATE TABLE movie +( + title NVARCHAR(256), + genre NVARCHAR(256), + imdb_rating FLOAT +); + +PRINT 'Inserting data into movie table...'; +INSERT INTO movie (title, genre, imdb_rating) +VALUES ('E.T.', 'SciFi', 7.9), + ('Blade Runner', 'SciFi', 8.1), + ('Star Wars', 'SciFi', 8.6), + ('Die Hard', 'Action', 8.2), + ('Mission Impossible', 'Action', 7.1); +PRINT 'Data inserted into movie table.'; + +CREATE TABLE simple_logs +( + id INT PRIMARY KEY, + insert_time DATETIME2, + body NVARCHAR(MAX), + attribute NVARCHAR(100) +); + +PRINT 'Inserting data into simple_logs table...'; +INSERT INTO simple_logs (id, insert_time, body, attribute) VALUES + (1, '2022-06-03 21:59:26', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -', 'TLSv1.2'), + (2, '2022-06-03 21:59:26.692991', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -', 'TLSv1'), + (3, '2022-06-03 21:59:29.212212', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -', 'TLSv1.2'), + (4, '2022-06-03 21:59:31', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -', 'TLSv1'), + (5, '2022-06-03 21:59:31.332121', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -', 'TLSv1.2'); +PRINT 'Data inserted into simple_logs table.'; +PRINT 'Initiation of otel database is complete'; \ No newline at end of file