From 18d8d888eefe83acc1bb3c300e1244d4d5a76e3f Mon Sep 17 00:00:00 2001 From: jdpgrailsdev Date: Thu, 27 Oct 2022 10:27:16 -0400 Subject: [PATCH 1/3] Ensure that restart delay is set before any other activity is executed --- .../temporal/scheduling/ConnectionManagerWorkflowImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 837d0f483935..9e3b9fc6cb97 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -161,8 +161,8 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws RetryableException { try { ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, connectionUpdaterInput.getConnectionId())); - recordMetric(new RecordMetricInput(connectionUpdaterInput, Optional.empty(), OssMetricsRegistry.TEMPORAL_WORKFLOW_ATTEMPT, null)); workflowDelay = getWorkflowRestartDelaySeconds(); + recordMetric(new RecordMetricInput(connectionUpdaterInput, Optional.empty(), OssMetricsRegistry.TEMPORAL_WORKFLOW_ATTEMPT, null)); try { cancellableSyncWorkflow = generateSyncWorkflowRunnable(connectionUpdaterInput); From 48e204bbed2842e40f5e87636a3bcaeb31d34ddc Mon Sep 17 00:00:00 2001 From: jdpgrailsdev Date: Thu, 27 Oct 2022 11:42:29 -0400 Subject: [PATCH 2/3] Add version check --- .../scheduling/ConnectionManagerWorkflowImpl.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 9e3b9fc6cb97..0daf13e52d3d 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -121,6 +121,8 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow private static final int RECORD_METRIC_CURRENT_VERSION = 1; private static final String WORKFLOW_CONFIG_TAG = "workflow_config"; private static final int WORKFLOW_CONFIG_CURRENT_VERSION = 1; + private static final String WORKFLOW_DELAY_TAG = "get_workflow_delay"; + private static final int WORKFLOW_DELAY_TAG_CURRENT_VERSION = 1; private static final String ROUTE_ACTIVITY_TAG = "route_activity"; private static final int ROUTE_ACTIVITY_CURRENT_VERSION = 1; @@ -161,8 +163,13 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws RetryableException { try { ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, connectionUpdaterInput.getConnectionId())); - workflowDelay = getWorkflowRestartDelaySeconds(); - recordMetric(new RecordMetricInput(connectionUpdaterInput, Optional.empty(), OssMetricsRegistry.TEMPORAL_WORKFLOW_ATTEMPT, null)); + if(Workflow.getVersion(WORKFLOW_DELAY_TAG, Workflow.DEFAULT_VERSION, WORKFLOW_DELAY_TAG_CURRENT_VERSION) < WORKFLOW_DELAY_TAG_CURRENT_VERSION) { + recordMetric(new RecordMetricInput(connectionUpdaterInput, Optional.empty(), OssMetricsRegistry.TEMPORAL_WORKFLOW_ATTEMPT, null)); + workflowDelay = getWorkflowRestartDelaySeconds(); + } else { + workflowDelay = getWorkflowRestartDelaySeconds(); + recordMetric(new RecordMetricInput(connectionUpdaterInput, Optional.empty(), OssMetricsRegistry.TEMPORAL_WORKFLOW_ATTEMPT, null)); + } try { cancellableSyncWorkflow = generateSyncWorkflowRunnable(connectionUpdaterInput); From 3d632af9a8123d4cb68bb60c0ac19f50ee905ffe Mon Sep 17 00:00:00 2001 From: jdpgrailsdev Date: Thu, 27 Oct 2022 11:50:30 -0400 Subject: [PATCH 3/3] Formatting --- .../temporal/scheduling/ConnectionManagerWorkflowImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 0daf13e52d3d..4a24ee92151f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -163,7 +163,8 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws RetryableException { try { ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, connectionUpdaterInput.getConnectionId())); - if(Workflow.getVersion(WORKFLOW_DELAY_TAG, Workflow.DEFAULT_VERSION, WORKFLOW_DELAY_TAG_CURRENT_VERSION) < WORKFLOW_DELAY_TAG_CURRENT_VERSION) { + if (Workflow.getVersion(WORKFLOW_DELAY_TAG, Workflow.DEFAULT_VERSION, + WORKFLOW_DELAY_TAG_CURRENT_VERSION) < WORKFLOW_DELAY_TAG_CURRENT_VERSION) { recordMetric(new RecordMetricInput(connectionUpdaterInput, Optional.empty(), OssMetricsRegistry.TEMPORAL_WORKFLOW_ATTEMPT, null)); workflowDelay = getWorkflowRestartDelaySeconds(); } else {