Skip to content

Commit

Permalink
Annotate webbackend traces (#6591)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosusnp committed May 15, 2023
1 parent fa04e8d commit f73f34d
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public static final class Tags {
*/
public static final String DESTINATION_DOCKER_IMAGE_KEY = "destination.docker_image";

/**
* Name of the APM trace tag that holds the destination ID value associated with the trace.
*/
public static final String DESTINATION_ID_KEY = "destination.id";

/**
* Name of the APM trace tag that holds the Docker image value associated with the trace.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.metrics.lib;

import io.airbyte.metrics.lib.ApmTraceConstants.Tags;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
* Helps Annotating Traces.
*/
public class TracingHelper {

/**
* Add ConnectionId ta the current Trace.
*/
public static void addConnection(final UUID connectionId) {
if (connectionId != null) {
ApmTraceUtils.addTagsToTrace(Map.of(Tags.CONNECTION_ID_KEY, connectionId));
}
}

/**
* Add WorkspaceId ta the current Trace.
*/
public static void addWorkspace(final UUID workspaceId) {
if (workspaceId != null) {
ApmTraceUtils.addTagsToTrace(Map.of(Tags.WORKSPACE_ID_KEY, workspaceId));
}
}

/**
* Add SourceId and DestinationId ta the current Trace.
*/
public static void addSourceDestination(final UUID sourceId, final UUID destinationId) {
final Map<String, Object> tags = new HashMap<>();
if (sourceId != null) {
tags.put(Tags.SOURCE_ID_KEY, sourceId);
}
if (destinationId != null) {
tags.put(Tags.DESTINATION_ID_KEY, destinationId);
}
ApmTraceUtils.addTagsToTrace(tags);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.metrics.lib;

import static io.airbyte.metrics.lib.ApmTraceUtils.formatTag;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.airbyte.metrics.lib.ApmTraceConstants.Tags;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracerTestUtil;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class TracingHelperTest {

Span span;
Tracer tracer;

@BeforeEach
void beforeEach() {
span = mock(Span.class);
tracer = mock(Tracer.class);
when(tracer.activeSpan()).thenReturn(span);
GlobalTracerTestUtil.setGlobalTracerUnconditionally(tracer);
}

@Test
void testAddConnection() {
TracingHelper.addConnection(null);
verifySpanNotSet(Tags.CONNECTION_ID_KEY);

final UUID connectionId = UUID.randomUUID();
TracingHelper.addConnection(connectionId);
verifySpanSetTag(Tags.CONNECTION_ID_KEY, connectionId);
}

@Test
void testAddSourceDestination() {
final UUID sourceId = UUID.randomUUID();
final UUID destinationId = UUID.randomUUID();

TracingHelper.addSourceDestination(null, null);
verifySpanNotSet(Tags.SOURCE_ID_KEY);
verifySpanNotSet(Tags.DESTINATION_ID_KEY);

TracingHelper.addSourceDestination(sourceId, null);
verifySpanSetTag(Tags.SOURCE_ID_KEY, sourceId);
verifySpanNotSet(Tags.DESTINATION_ID_KEY);
reset(span);

TracingHelper.addSourceDestination(null, destinationId);
verifySpanNotSet(Tags.SOURCE_ID_KEY);
verifySpanSetTag(Tags.DESTINATION_ID_KEY, destinationId);
reset(span);

TracingHelper.addSourceDestination(sourceId, destinationId);
verifySpanSetTag(Tags.SOURCE_ID_KEY, sourceId);
verifySpanSetTag(Tags.DESTINATION_ID_KEY, destinationId);
}

@Test
void testAddWorkspace() {
TracingHelper.addWorkspace(null);
verifySpanNotSet(Tags.WORKSPACE_ID_KEY);

final UUID workspaceId = UUID.randomUUID();
TracingHelper.addWorkspace(workspaceId);
verifySpanSetTag(Tags.WORKSPACE_ID_KEY, workspaceId);
}

private void verifySpanNotSet(final String tag) {
verify(span, never()).setTag(formatTag(tag), eq(anyString()));
}

private void verifySpanSetTag(final String tag, final Object value) {
verify(span).setTag(formatTag(tag), value.toString());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.airbyte.commons.server.handlers.WebBackendConnectionsHandler;
import io.airbyte.commons.server.handlers.WebBackendGeographiesHandler;
import io.airbyte.commons.server.scheduling.AirbyteTaskExecutors;
import io.airbyte.metrics.lib.TracingHelper;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import io.micronaut.scheduling.annotation.ExecuteOn;
Expand Down Expand Up @@ -55,7 +56,10 @@ public WebBackendApiController(final WebBackendConnectionsHandler webBackendConn
@ExecuteOn(AirbyteTaskExecutors.IO)
@Override
public ConnectionStateType getStateType(final ConnectionIdRequestBody connectionIdRequestBody) {
return ApiHelper.execute(() -> webBackendConnectionsHandler.getStateType(connectionIdRequestBody));
return ApiHelper.execute(() -> {
TracingHelper.addConnection(connectionIdRequestBody.getConnectionId());
return webBackendConnectionsHandler.getStateType(connectionIdRequestBody);
});
}

@Post("/check_updates")
Expand All @@ -72,7 +76,10 @@ public WebBackendCheckUpdatesRead webBackendCheckUpdates() {
@ExecuteOn(AirbyteTaskExecutors.SCHEDULER)
@Override
public WebBackendConnectionRead webBackendCreateConnection(final WebBackendConnectionCreate webBackendConnectionCreate) {
return ApiHelper.execute(() -> webBackendConnectionsHandler.webBackendCreateConnection(webBackendConnectionCreate));
return ApiHelper.execute(() -> {
TracingHelper.addSourceDestination(webBackendConnectionCreate.getSourceId(), webBackendConnectionCreate.getDestinationId());
return webBackendConnectionsHandler.webBackendCreateConnection(webBackendConnectionCreate);
});
}

@Post("/connections/get")
Expand All @@ -81,7 +88,10 @@ public WebBackendConnectionRead webBackendCreateConnection(final WebBackendConne
@ExecuteOn(AirbyteTaskExecutors.IO)
@Override
public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnectionRequestBody webBackendConnectionRequestBody) {
return ApiHelper.execute(() -> webBackendConnectionsHandler.webBackendGetConnection(webBackendConnectionRequestBody));
return ApiHelper.execute(() -> {
TracingHelper.addConnection(webBackendConnectionRequestBody.getConnectionId());
return webBackendConnectionsHandler.webBackendGetConnection(webBackendConnectionRequestBody);
});
}

@Post("/workspace/state")
Expand All @@ -90,7 +100,10 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
@ExecuteOn(AirbyteTaskExecutors.IO)
@Override
public WebBackendWorkspaceStateResult webBackendGetWorkspaceState(final WebBackendWorkspaceState webBackendWorkspaceState) {
return ApiHelper.execute(() -> webBackendConnectionsHandler.getWorkspaceState(webBackendWorkspaceState));
return ApiHelper.execute(() -> {
TracingHelper.addWorkspace(webBackendWorkspaceState.getWorkspaceId());
return webBackendConnectionsHandler.getWorkspaceState(webBackendWorkspaceState);
});
}

@SuppressWarnings("LineLength")
Expand All @@ -100,7 +113,10 @@ public WebBackendWorkspaceStateResult webBackendGetWorkspaceState(final WebBacke
@ExecuteOn(AirbyteTaskExecutors.IO)
@Override
public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final WebBackendConnectionListRequestBody webBackendConnectionListRequestBody) {
return ApiHelper.execute(() -> webBackendConnectionsHandler.webBackendListConnectionsForWorkspace(webBackendConnectionListRequestBody));
return ApiHelper.execute(() -> {
TracingHelper.addWorkspace(webBackendConnectionListRequestBody.getWorkspaceId());
return webBackendConnectionsHandler.webBackendListConnectionsForWorkspace(webBackendConnectionListRequestBody);
});
}

@Post("/geographies/list")
Expand All @@ -117,7 +133,10 @@ public WebBackendGeographiesListResult webBackendListGeographies() {
@ExecuteOn(AirbyteTaskExecutors.IO)
@Override
public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConnectionUpdate webBackendConnectionUpdate) {
return ApiHelper.execute(() -> webBackendConnectionsHandler.webBackendUpdateConnection(webBackendConnectionUpdate));
return ApiHelper.execute(() -> {
TracingHelper.addConnection(webBackendConnectionUpdate.getConnectionId());
return webBackendConnectionsHandler.webBackendUpdateConnection(webBackendConnectionUpdate);
});
}

}

0 comments on commit f73f34d

Please sign in to comment.