diff --git a/airbyte-analytics/src/main/java/io/airbyte/analytics/SegmentTrackingClient.java b/airbyte-analytics/src/main/java/io/airbyte/analytics/SegmentTrackingClient.java index 3599fb91fb70..575b101a471a 100644 --- a/airbyte-analytics/src/main/java/io/airbyte/analytics/SegmentTrackingClient.java +++ b/airbyte-analytics/src/main/java/io/airbyte/analytics/SegmentTrackingClient.java @@ -11,6 +11,7 @@ import com.segment.analytics.messages.IdentifyMessage; import com.segment.analytics.messages.TrackMessage; import io.airbyte.config.StandardWorkspace; +import java.time.Instant; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -43,6 +44,7 @@ public class SegmentTrackingClient implements TrackingClient { private static final String SEGMENT_WRITE_KEY = "7UDdp5K55CyiGgsauOr2pNNujGvmhaeu"; private static final String AIRBYTE_VERSION_KEY = "airbyte_version"; private static final String AIRBYTE_ROLE = "airbyte_role"; + private static final String AIRBYTE_TRACKED_AT = "tracked_at"; // Analytics is threadsafe. private final Analytics analytics; @@ -116,6 +118,7 @@ public void track(final UUID workspaceId, final String action, final Map mapCopy.put("email", email)); } diff --git a/airbyte-analytics/src/test/java/io/airbyte/analytics/SegmentTrackingClientTest.java b/airbyte-analytics/src/test/java/io/airbyte/analytics/SegmentTrackingClientTest.java index a3164108fc35..d6d0bb9eb0e9 100644 --- a/airbyte-analytics/src/test/java/io/airbyte/analytics/SegmentTrackingClientTest.java +++ b/airbyte-analytics/src/test/java/io/airbyte/analytics/SegmentTrackingClientTest.java @@ -5,11 +5,13 @@ package io.airbyte.analytics; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; import com.segment.analytics.Analytics; import com.segment.analytics.messages.IdentifyMessage; import com.segment.analytics.messages.TrackMessage; @@ -17,6 +19,7 @@ import io.airbyte.config.Configs; import io.airbyte.config.Configs.WorkerEnvironment; import java.util.Map; +import java.util.Objects; import java.util.UUID; import java.util.function.Function; import java.util.function.Supplier; @@ -109,7 +112,7 @@ void testTrack() { final TrackMessage actual = mockBuilder.getValue().build(); assertEquals("jump", actual.event()); assertEquals(IDENTITY.getCustomerId().toString(), actual.userId()); - assertEquals(metadata, actual.properties()); + assertEquals(metadata, filterTrackedAtProperty(Objects.requireNonNull(actual.properties()))); } @Test @@ -127,7 +130,18 @@ void testTrackWithMetadata() { final TrackMessage actual = mockBuilder.getValue().build(); assertEquals("jump", actual.event()); assertEquals(IDENTITY.getCustomerId().toString(), actual.userId()); - assertEquals(metadata, actual.properties()); + assertEquals(metadata, filterTrackedAtProperty(Objects.requireNonNull(actual.properties()))); + } + + private static ImmutableMap filterTrackedAtProperty(final Map properties) { + assertTrue(properties.containsKey("tracked_at")); + final Builder builder = ImmutableMap.builder(); + properties.forEach((key, value) -> { + if (!key.equals("tracked_at")) { + builder.put(key, value); + } + }); + return builder.build(); } } diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConfig.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConfig.java index 8d575214b678..1cebcd4b600c 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConfig.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/AzureBlobStorageDestinationConfig.java @@ -64,11 +64,11 @@ public static AzureBlobStorageDestinationConfig getAzureBlobStorageConfig(final final JsonNode endpointFromConfig = config .get("azure_blob_storage_endpoint_domain_name"); final JsonNode containerName = config.get("azure_blob_storage_container_name"); - final int outputStreamBufferSizeFromConfig = + final int outputStreamBufferSizeFromConfig = config.get("azure_blob_storage_output_buffer_size") != null ? config.get("azure_blob_storage_output_buffer_size").asInt(DEFAULT_STORAGE_OUTPUT_BUFFER_SIZE) : DEFAULT_STORAGE_OUTPUT_BUFFER_SIZE; - + final JsonNode blobName = config.get("azure_blob_storage_blob_name"); // streamId final String endpointComputed = String.format(Locale.ROOT, DEFAULT_STORAGE_ENDPOINT_FORMAT, diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvWriter.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvWriter.java index 23e31bbf4d9c..f31aaab64fa0 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvWriter.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/csv/AzureBlobStorageCsvWriter.java @@ -5,7 +5,6 @@ package io.airbyte.integrations.destination.azure_blob_storage.csv; import com.azure.storage.blob.specialized.AppendBlobClient; -import com.azure.storage.blob.specialized.BlobOutputStream; import io.airbyte.integrations.destination.azure_blob_storage.AzureBlobStorageDestinationConfig; import io.airbyte.integrations.destination.azure_blob_storage.writer.AzureBlobStorageWriter; import io.airbyte.integrations.destination.azure_blob_storage.writer.BaseAzureBlobStorageWriter; diff --git a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/jsonl/AzureBlobStorageJsonlWriter.java b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/jsonl/AzureBlobStorageJsonlWriter.java index 6a0406be7a7e..77ed63b7dee6 100644 --- a/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/jsonl/AzureBlobStorageJsonlWriter.java +++ b/airbyte-integrations/connectors/destination-azure-blob-storage/src/main/java/io/airbyte/integrations/destination/azure_blob_storage/jsonl/AzureBlobStorageJsonlWriter.java @@ -5,7 +5,6 @@ package io.airbyte.integrations.destination.azure_blob_storage.jsonl; import com.azure.storage.blob.specialized.AppendBlobClient; -import com.azure.storage.blob.specialized.BlobOutputStream; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.databind.node.ObjectNode; diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java index 73c951c08ae1..4de7b58b3c54 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java @@ -66,6 +66,7 @@ public static ImmutableMap generateDestinationDefinitionMetadata final Builder metadata = ImmutableMap.builder(); metadata.put("connector_destination", destinationDefinition.getName()); metadata.put("connector_destination_definition_id", destinationDefinition.getDestinationDefinitionId()); + metadata.put("connector_destination_docker_repository", destinationDefinition.getDockerRepository()); final String imageTag = destinationDefinition.getDockerImageTag(); if (!Strings.isEmpty(imageTag)) { metadata.put("connector_destination_version", imageTag); @@ -77,6 +78,7 @@ public static ImmutableMap generateSourceDefinitionMetadata(fina final Builder metadata = ImmutableMap.builder(); metadata.put("connector_source", sourceDefinition.getName()); metadata.put("connector_source_definition_id", sourceDefinition.getSourceDefinitionId()); + metadata.put("connector_source_docker_repository", sourceDefinition.getDockerRepository()); final String imageTag = sourceDefinition.getDockerImageTag(); if (!Strings.isEmpty(imageTag)) { metadata.put("connector_source_version", imageTag); @@ -94,6 +96,7 @@ public static ImmutableMap generateJobAttemptMetadata(final Job final JobOutput jobOutput = lastAttempt.getOutput().get(); if (jobOutput.getSync() != null) { final StandardSyncSummary syncSummary = jobOutput.getSync().getStandardSyncSummary(); + metadata.put("sync_start_time", syncSummary.getStartTime()); metadata.put("duration", Math.round((syncSummary.getEndTime() - syncSummary.getStartTime()) / 1000.0)); metadata.put("volume_mb", syncSummary.getBytesSynced()); metadata.put("volume_rows", syncSummary.getRecordsSynced()); diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/JobNotifierTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/JobNotifierTest.java index f55a8d6a5876..83385d404ffb 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/JobNotifierTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/JobNotifierTest.java @@ -99,9 +99,11 @@ void testFailJob() throws IOException, InterruptedException, JsonValidationExcep metadata.put("connector_source_definition_id", sourceDefinition.getSourceDefinitionId()); metadata.put("connector_source", "source-test"); metadata.put("connector_source_version", TEST_DOCKER_TAG); + metadata.put("connector_source_docker_repository", sourceDefinition.getDockerRepository()); metadata.put("connector_destination_definition_id", destinationDefinition.getDestinationDefinitionId()); metadata.put("connector_destination", "destination-test"); metadata.put("connector_destination_version", TEST_DOCKER_TAG); + metadata.put("connector_destination_docker_repository", destinationDefinition.getDockerRepository()); metadata.put("notification_type", NotificationType.SLACK); verify(trackingClient).track(WORKSPACE_ID, JobNotifier.FAILURE_NOTIFICATION, metadata.build()); } diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_factory/OAuthConfigSupplierTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_factory/OAuthConfigSupplierTest.java index 3c375a217cfc..9a08190f0c56 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_factory/OAuthConfigSupplierTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_factory/OAuthConfigSupplierTest.java @@ -141,6 +141,7 @@ public void testOAuthFullInjectionBecauseNoOAuthSpec() throws JsonValidationExce .thenReturn(new StandardSourceDefinition() .withSourceDefinitionId(sourceDefinitionId) .withName("test") + .withDockerRepository("test/test") .withDockerImageTag("dev") .withSpec(null)); setupOAuthParamMocks(oauthParameters); @@ -222,6 +223,7 @@ private void setupStandardDefinitionMock(final AdvancedAuth advancedAuth) throws when(configRepository.getStandardSourceDefinition(any())).thenReturn(new StandardSourceDefinition() .withSourceDefinitionId(sourceDefinitionId) .withName("test") + .withDockerRepository("test/test") .withDockerImageTag("dev") .withSpec(new ConnectorSpecification().withAdvancedAuth(advancedAuth))); } @@ -277,6 +279,7 @@ private void assertTracking(final UUID workspaceId) { verify(trackingClient, times(1)).track(workspaceId, "OAuth Injection - Backend", Map.of( "connector_source", "test", "connector_source_definition_id", sourceDefinitionId, + "connector_source_docker_repository", "test/test", "connector_source_version", "dev")); } diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java index a6096f950c8c..3e5bf9a0b0cd 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java @@ -65,7 +65,8 @@ class JobTrackerTest { private static final UUID CONNECTION_ID = UUID.randomUUID(); private static final String SOURCE_DEF_NAME = "postgres"; private static final String DESTINATION_DEF_NAME = "bigquery"; - public static final String CONNECTOR_VERSION = "test"; + private static final String CONNECTOR_REPOSITORY = "test/test"; + private static final String CONNECTOR_VERSION = "test"; private static final long SYNC_START_TIME = 1000L; private static final long SYNC_END_TIME = 10000L; private static final long SYNC_DURATION = 9L; // in sync between end and start time @@ -84,6 +85,7 @@ class JobTrackerTest { .put("attempt_completion_status", JobState.FAILED) .build(); private static final ImmutableMap ATTEMPT_METADATA = ImmutableMap.builder() + .put("sync_start_time", SYNC_START_TIME) .put("duration", SYNC_DURATION) .put("volume_rows", SYNC_RECORDS_SYNC) .put("volume_mb", SYNC_BYTES_SYNC) @@ -122,6 +124,7 @@ void testTrackCheckConnectionSource() throws ConfigNotFoundException, IOExceptio .put("attempt_id", 0) .put("connector_source", SOURCE_DEF_NAME) .put("connector_source_definition_id", UUID1) + .put("connector_source_docker_repository", CONNECTOR_REPOSITORY) .put("connector_source_version", CONNECTOR_VERSION) .build(); @@ -129,6 +132,7 @@ void testTrackCheckConnectionSource() throws ConfigNotFoundException, IOExceptio .thenReturn(new StandardSourceDefinition() .withSourceDefinitionId(UUID1) .withName(SOURCE_DEF_NAME) + .withDockerRepository(CONNECTOR_REPOSITORY) .withDockerImageTag(CONNECTOR_VERSION)); when(configRepository.getStandardWorkspace(WORKSPACE_ID, true)) .thenReturn(new StandardWorkspace().withWorkspaceId(WORKSPACE_ID).withName(WORKSPACE_NAME)); @@ -150,6 +154,7 @@ void testTrackCheckConnectionDestination() throws ConfigNotFoundException, IOExc .put("attempt_id", 0) .put("connector_destination", DESTINATION_DEF_NAME) .put("connector_destination_definition_id", UUID2) + .put("connector_destination_docker_repository", CONNECTOR_REPOSITORY) .put("connector_destination_version", CONNECTOR_VERSION) .build(); @@ -157,6 +162,7 @@ void testTrackCheckConnectionDestination() throws ConfigNotFoundException, IOExc .thenReturn(new StandardDestinationDefinition() .withDestinationDefinitionId(UUID2) .withName(DESTINATION_DEF_NAME) + .withDockerRepository(CONNECTOR_REPOSITORY) .withDockerImageTag(CONNECTOR_VERSION)); when(configRepository.getStandardWorkspace(WORKSPACE_ID, true)) .thenReturn(new StandardWorkspace().withWorkspaceId(WORKSPACE_ID).withName(WORKSPACE_NAME)); @@ -178,6 +184,7 @@ void testTrackDiscover() throws ConfigNotFoundException, IOException, JsonValida .put("attempt_id", 0) .put("connector_source", SOURCE_DEF_NAME) .put("connector_source_definition_id", UUID1) + .put("connector_source_docker_repository", CONNECTOR_REPOSITORY) .put("connector_source_version", CONNECTOR_VERSION) .build(); @@ -185,6 +192,7 @@ void testTrackDiscover() throws ConfigNotFoundException, IOException, JsonValida .thenReturn(new StandardSourceDefinition() .withSourceDefinitionId(UUID1) .withName(SOURCE_DEF_NAME) + .withDockerRepository(CONNECTOR_REPOSITORY) .withDockerImageTag(CONNECTOR_VERSION)); when(configRepository.getStandardWorkspace(WORKSPACE_ID, true)) .thenReturn(new StandardWorkspace().withWorkspaceId(WORKSPACE_ID).withName(WORKSPACE_NAME)); @@ -296,22 +304,26 @@ private Job getJobMock(final ConfigType configType, final long jobId) throws Con .thenReturn(new StandardSourceDefinition() .withSourceDefinitionId(UUID1) .withName(SOURCE_DEF_NAME) + .withDockerRepository(CONNECTOR_REPOSITORY) .withDockerImageTag(CONNECTOR_VERSION)); when(configRepository.getDestinationDefinitionFromConnection(CONNECTION_ID)) .thenReturn(new StandardDestinationDefinition() .withDestinationDefinitionId(UUID2) .withName(DESTINATION_DEF_NAME) + .withDockerRepository(CONNECTOR_REPOSITORY) .withDockerImageTag(CONNECTOR_VERSION)); when(configRepository.getStandardSourceDefinition(UUID1)) .thenReturn(new StandardSourceDefinition() .withSourceDefinitionId(UUID1) .withName(SOURCE_DEF_NAME) + .withDockerRepository(CONNECTOR_REPOSITORY) .withDockerImageTag(CONNECTOR_VERSION)); when(configRepository.getStandardDestinationDefinition(UUID2)) .thenReturn(new StandardDestinationDefinition() .withDestinationDefinitionId(UUID2) .withName(DESTINATION_DEF_NAME) + .withDockerRepository(CONNECTOR_REPOSITORY) .withDockerImageTag(CONNECTOR_VERSION)); final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( @@ -368,9 +380,11 @@ private ImmutableMap getJobMetadata(final ConfigType configType, .put("connection_id", CONNECTION_ID) .put("connector_source", SOURCE_DEF_NAME) .put("connector_source_definition_id", UUID1) + .put("connector_source_docker_repository", CONNECTOR_REPOSITORY) .put("connector_source_version", CONNECTOR_VERSION) .put("connector_destination", DESTINATION_DEF_NAME) .put("connector_destination_definition_id", UUID2) + .put("connector_destination_docker_repository", CONNECTOR_REPOSITORY) .put("connector_destination_version", CONNECTOR_VERSION) .put("namespace_definition", NamespaceDefinitionType.SOURCE) .put("table_prefix", false)