diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/NotificationUtils.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/NotificationClient.java similarity index 96% rename from airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/NotificationUtils.java rename to airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/NotificationClient.java index da0a45c2ee6..b5f8437e5ee 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/NotificationUtils.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/NotificationClient.java @@ -26,12 +26,12 @@ */ @Singleton @Slf4j -public class NotificationUtils { +public class NotificationClient { private final FeatureFlagClient featureFlagClient; private final WorkflowClient client; - public NotificationUtils(final FeatureFlagClient featureFlagClient, WorkflowClient client) { + public NotificationClient(final FeatureFlagClient featureFlagClient, WorkflowClient client) { this.featureFlagClient = featureFlagClient; this.client = client; } diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java index 8d6fbecd55d..1c565e5cf94 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java @@ -73,7 +73,7 @@ public class TemporalClient { private final WorkflowServiceStubs service; private final StreamResetPersistence streamResetPersistence; private final ConnectionManagerUtils connectionManagerUtils; - private final NotificationUtils notificationUtils; + private final NotificationClient notificationClient; private final StreamResetRecordsHelper streamResetRecordsHelper; public TemporalClient(@Named("workspaceRootTemporal") final Path workspaceRoot, @@ -81,14 +81,14 @@ public TemporalClient(@Named("workspaceRootTemporal") final Path workspaceRoot, final WorkflowServiceStubs service, final StreamResetPersistence streamResetPersistence, final ConnectionManagerUtils connectionManagerUtils, - final NotificationUtils notificationUtils, + final NotificationClient notificationClient, final StreamResetRecordsHelper streamResetRecordsHelper) { this.workspaceRoot = workspaceRoot; this.client = client; this.service = service; this.streamResetPersistence = streamResetPersistence; this.connectionManagerUtils = connectionManagerUtils; - this.notificationUtils = notificationUtils; + this.notificationClient = notificationClient; this.streamResetRecordsHelper = streamResetRecordsHelper; } @@ -533,7 +533,7 @@ public void forceDeleteWorkflow(final UUID connectionId) { } public void sendSchemaChangeNotification(final UUID connectionId, final String url, final boolean containsBreakingChange) { - notificationUtils.sendSchemaChangeNotification(connectionId, url, containsBreakingChange); + notificationClient.sendSchemaChangeNotification(connectionId, url, containsBreakingChange); } /** diff --git a/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/NotificationUtilsTest.java b/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/NotificationClientTest.java similarity index 79% rename from airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/NotificationUtilsTest.java rename to airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/NotificationClientTest.java index d1e5b5c78cd..1d13955cb9f 100644 --- a/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/NotificationUtilsTest.java +++ b/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/NotificationClientTest.java @@ -28,7 +28,7 @@ /** * Test the notificationUtils. */ -class NotificationUtilsTest { +class NotificationClientTest { private static final String WEBHOOK_URL = "url"; @@ -37,7 +37,7 @@ class NotificationUtilsTest { private final FeatureFlagClient featureFlagClient = mock(TestClient.class); private final WorkflowClient workflowClient = mock(WorkflowClient.class); - private final NotificationUtils notificationUtils = spy(new NotificationUtils(featureFlagClient, workflowClient)); + private final NotificationClient notificationClient = spy(new NotificationClient(featureFlagClient, workflowClient)); @Test void testCallNewNotifyWorkflow() { @@ -47,7 +47,7 @@ void testCallNewNotifyWorkflow() { when(workflowClient.newWorkflowStub(NotificationWorkflow.class, TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.NOTIFY))) .thenReturn(notificationWorkflow); - notificationUtils.sendSchemaChangeNotification(connectionId, "", false); + notificationClient.sendSchemaChangeNotification(connectionId, "", false); verify(notificationWorkflow).sendNotification(eq(connectionId), any(), any(), any()); } @@ -60,12 +60,12 @@ void testCallRightTemplate() throws IOException { when(workflowClient.newWorkflowStub(NotificationWorkflow.class, TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.NOTIFY))) .thenReturn(notificationWorkflow); - notificationUtils.sendSchemaChangeNotification(connectionId, WEBHOOK_URL, false); - verify(notificationUtils).renderTemplate("slack/non_breaking_schema_change_slack_notification_template.txt", connectionId.toString(), + notificationClient.sendSchemaChangeNotification(connectionId, WEBHOOK_URL, false); + verify(notificationClient).renderTemplate("slack/non_breaking_schema_change_slack_notification_template.txt", connectionId.toString(), WEBHOOK_URL); - notificationUtils.sendSchemaChangeNotification(connectionId, WEBHOOK_URL, true); - verify(notificationUtils).renderTemplate("slack/breaking_schema_change_slack_notification_template.txt", connectionId.toString(), WEBHOOK_URL); + notificationClient.sendSchemaChangeNotification(connectionId, WEBHOOK_URL, true); + verify(notificationClient).renderTemplate("slack/breaking_schema_change_slack_notification_template.txt", connectionId.toString(), WEBHOOK_URL); } @Test @@ -76,7 +76,7 @@ void testCallOldNotifyWorkflow() throws JsonValidationException, ConfigNotFoundE when(workflowClient.newWorkflowStub(ConnectionNotificationWorkflow.class, TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.NOTIFY))) .thenReturn(connectionNotificationWorkflow); - notificationUtils.sendSchemaChangeNotification(connectionId, WEBHOOK_URL, false); + notificationClient.sendSchemaChangeNotification(connectionId, WEBHOOK_URL, false); verify(connectionNotificationWorkflow).sendSchemaChangeNotification(connectionId, WEBHOOK_URL); } diff --git a/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java b/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java index b5802d4e4da..6da1b2d2983 100644 --- a/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java +++ b/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java @@ -108,7 +108,7 @@ public class TemporalClientTest { private WorkflowServiceBlockingStub workflowServiceBlockingStub; private StreamResetPersistence streamResetPersistence; private ConnectionManagerUtils connectionManagerUtils; - private NotificationUtils notificationUtils; + private NotificationClient notificationClient; private StreamResetRecordsHelper streamResetRecordsHelper; private Path workspaceRoot; @@ -127,10 +127,11 @@ void setup() throws IOException { streamResetPersistence = mock(StreamResetPersistence.class); mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING); connectionManagerUtils = spy(new ConnectionManagerUtils()); - notificationUtils = spy(new NotificationUtils(featureFlagClient, workflowClient)); + notificationClient = spy(new NotificationClient(featureFlagClient, workflowClient)); streamResetRecordsHelper = mock(StreamResetRecordsHelper.class); temporalClient = - spy(new TemporalClient(workspaceRoot, workflowClient, workflowServiceStubs, streamResetPersistence, connectionManagerUtils, notificationUtils, + spy(new TemporalClient(workspaceRoot, workflowClient, workflowServiceStubs, streamResetPersistence, connectionManagerUtils, + notificationClient, streamResetRecordsHelper)); } @@ -138,15 +139,16 @@ void setup() throws IOException { class RestartPerStatus { private ConnectionManagerUtils mConnectionManagerUtils; - private NotificationUtils mNotificationUtils; + private NotificationClient mNotificationClient; @BeforeEach void init() { mConnectionManagerUtils = mock(ConnectionManagerUtils.class); - mNotificationUtils = mock(NotificationUtils.class); + mNotificationClient = mock(NotificationClient.class); temporalClient = spy( - new TemporalClient(workspaceRoot, workflowClient, workflowServiceStubs, streamResetPersistence, mConnectionManagerUtils, mNotificationUtils, + new TemporalClient(workspaceRoot, workflowClient, workflowServiceStubs, streamResetPersistence, mConnectionManagerUtils, + mNotificationClient, streamResetRecordsHelper)); }