Skip to content

Commit

Permalink
Add API source to tracking data (#22320)
Browse files Browse the repository at this point in the history
* Add API source to tracking data

* PR feedback

* Fix formatting

* Add analytic source header to CORS filter
  • Loading branch information
jdpgrailsdev authored Feb 6, 2023
1 parent 57b0088 commit ac370c9
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 8 deletions.
8 changes: 6 additions & 2 deletions airbyte-analytics/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
dependencies {
implementation 'com.segment.analytics.java:analytics:2.1.1'
plugins {
id 'java-library'
}

dependencies {
api libs.segment.java.analytics
api libs.micronaut.http

implementation project(':airbyte-config:config-models')
implementation project(':airbyte-config:config-persistence')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
import com.segment.analytics.messages.IdentifyMessage;
import com.segment.analytics.messages.TrackMessage;
import io.airbyte.config.StandardWorkspace;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.context.ServerRequestContext;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -45,10 +48,12 @@ public class SegmentTrackingClient implements TrackingClient {

private static final Logger LOGGER = LoggerFactory.getLogger(SegmentTrackingClient.class);

public static final String AIRBYTE_ANALYTIC_SOURCE_HEADER = "X-Airbyte-Analytic-Source";
public static final String CUSTOMER_ID_KEY = "user_id";
private static final String SEGMENT_WRITE_KEY = "7UDdp5K55CyiGgsauOr2pNNujGvmhaeu";
private static final String AIRBYTE_VERSION_KEY = "airbyte_version";
protected static final String AIRBYTE_VERSION_KEY = "airbyte_version";
private static final String AIRBYTE_ROLE = "airbyte_role";
protected static final String AIRBYTE_SOURCE = "airbyte_source";
private static final String AIRBYTE_TRACKED_AT = "tracked_at";

// Analytics is threadsafe.
Expand Down Expand Up @@ -123,6 +128,9 @@ public void track(@Nullable final UUID workspaceId, final String action, final M
}
final Map<String, Object> mapCopy = new HashMap<>(metadata);
final TrackingIdentity trackingIdentity = identityFetcher.apply(workspaceId);
final Optional<String> airbyteSource = getAirbyteSource();

airbyteSource.ifPresent(a -> mapCopy.put(AIRBYTE_SOURCE, a));

// Always add these traits.
mapCopy.put(AIRBYTE_VERSION_KEY, trackingIdentity.getAirbyteVersion().serialize());
Expand All @@ -138,4 +146,13 @@ public void track(@Nullable final UUID workspaceId, final String action, final M
.properties(mapCopy));
}

private Optional<String> getAirbyteSource() {
final Optional<HttpRequest<Object>> currentRequest = ServerRequestContext.currentRequest();
if (currentRequest.isPresent()) {
return Optional.ofNullable(currentRequest.get().getHeaders().get(AIRBYTE_ANALYTIC_SOURCE_HEADER));
}

return Optional.empty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.analytics;

import static io.airbyte.analytics.SegmentTrackingClient.AIRBYTE_ANALYTIC_SOURCE_HEADER;
import static io.airbyte.analytics.SegmentTrackingClient.AIRBYTE_SOURCE;
import static io.airbyte.analytics.SegmentTrackingClient.AIRBYTE_VERSION_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -20,6 +23,9 @@
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.micronaut.http.HttpHeaders;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.context.ServerRequestContext;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
Expand All @@ -37,8 +43,8 @@ class SegmentTrackingClientTest {
private static final TrackingIdentity IDENTITY = new TrackingIdentity(AIRBYTE_VERSION, UUID.randomUUID(), EMAIL, false, false, true);
private static final UUID WORKSPACE_ID = UUID.randomUUID();
private static final Function<UUID, TrackingIdentity> MOCK_TRACKING_IDENTITY = (workspaceId) -> IDENTITY;
private static final String AIRBYTE_VERSION_KEY = "airbyte_version";
private static final String JUMP = "jump";
private static final String EMAIL_KEY = "email";

private Analytics analytics;
private SegmentTrackingClient segmentTrackingClient;
Expand Down Expand Up @@ -69,7 +75,7 @@ void testIdentify() {
.put("deployment_env", DEPLOYMENT.getDeploymentEnv())
.put("deployment_mode", DEPLOYMENT.getDeploymentMode())
.put("deployment_id", DEPLOYMENT.getDeploymentId())
.put("email", IDENTITY.getEmail().get())
.put(EMAIL_KEY, IDENTITY.getEmail().get())
.put("subscribed_newsletter", IDENTITY.isNews())
.put("subscribed_security", IDENTITY.isSecurityUpdates())
.build();
Expand All @@ -96,7 +102,7 @@ void testIdentifyWithRole() {
.put("deployment_env", DEPLOYMENT.getDeploymentEnv())
.put("deployment_mode", DEPLOYMENT.getDeploymentMode())
.put("deployment_id", DEPLOYMENT.getDeploymentId())
.put("email", IDENTITY.getEmail().get())
.put(EMAIL_KEY, IDENTITY.getEmail().get())
.put("subscribed_newsletter", IDENTITY.isNews())
.put("subscribed_security", IDENTITY.isSecurityUpdates())
.build();
Expand Down Expand Up @@ -124,7 +130,7 @@ void testTrackWithMetadata() {
final ArgumentCaptor<TrackMessage.Builder> mockBuilder = ArgumentCaptor.forClass(TrackMessage.Builder.class);
final ImmutableMap<String, Object> metadata = ImmutableMap.of(
AIRBYTE_VERSION_KEY, AIRBYTE_VERSION.serialize(),
"email", EMAIL,
EMAIL_KEY, EMAIL,
"height", "80 meters",
"user_id", IDENTITY.getCustomerId());

Expand All @@ -144,6 +150,30 @@ void testTrackNullWorkspace() {
verify(analytics, never()).enqueue(any());
}

@Test
void testTrackAirbyteAnalyticSource() {
final String analyticSource = "test";
final HttpHeaders httpHeaders = mock(HttpHeaders.class);
final HttpRequest<?> httpRequest = mock(HttpRequest.class);

when(httpHeaders.get(AIRBYTE_ANALYTIC_SOURCE_HEADER)).thenReturn(analyticSource);
when(httpRequest.getHeaders()).thenReturn(httpHeaders);
ServerRequestContext.set(httpRequest);

final ArgumentCaptor<TrackMessage.Builder> mockBuilder = ArgumentCaptor.forClass(TrackMessage.Builder.class);
final ImmutableMap<String, Object> metadata = ImmutableMap.of(
AIRBYTE_VERSION_KEY, AIRBYTE_VERSION.serialize(),
EMAIL_KEY, EMAIL,
"height", "80 meters",
"user_id", IDENTITY.getCustomerId());

segmentTrackingClient.track(WORKSPACE_ID, JUMP, metadata);

verify(analytics).enqueue(mockBuilder.capture());
final TrackMessage actual = mockBuilder.getValue().build();
assertEquals(analyticSource, actual.properties().get(AIRBYTE_SOURCE));
}

private static ImmutableMap<String, Object> filterTrackedAtProperty(final Map<String, ?> properties) {
final String trackedAtKey = "tracked_at";
assertTrue(properties.containsKey(trackedAtKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class CorsFilter implements ContainerResponseFilter {

public static final ImmutableMap<String, String> MAP = ImmutableMap.of(
HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN, "*",
HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS, "Origin, Content-Type, Accept, Content-Encoding",
HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS, "Origin, Content-Type, Accept, Content-Encoding, X-Airbyte-Analytic-Source",
HttpHeaders.ACCESS_CONTROL_ALLOW_METHODS, "GET, POST, PUT, DELETE, OPTIONS, HEAD");

@Override
Expand Down
2 changes: 2 additions & 0 deletions deps.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ micronaut-test = "3.8.0"
platform-testcontainers = "1.17.3"
postgresql = "42.3.5"
reactor = "3.5.2"
segment = "2.1.1"
slf4j = "1.7.36"
temporal = "1.17.0"

Expand Down Expand Up @@ -108,6 +109,7 @@ quartz-scheduler = { module = "org.quartz-scheduler:quartz", version = "2.3.2" }
reactor-core = { module = "io.projectreactor:reactor-core", version.ref = "reactor" }
reactor-test = { module = "io.projectreactor:reactor-test", version.ref = "reactor" }
s3 = { module = "software.amazon.awssdk:s3", version = "2.16.84" }
segment-java-analytics = { module = "com.segment.analytics.java:analytics", version.ref = "segment" }
slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
spotbugs-annotations = { module = "com.github.spotbugs:spotbugs-annotations", version = "4.7.3" }
temporal-sdk = { module = "io.temporal:temporal-sdk", version.ref = "temporal" }
Expand Down

0 comments on commit ac370c9

Please sign in to comment.