-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refresh before syncs when feature flag is on #19888
Conversation
0a83a93
to
25cc417
Compare
9326cc7
to
527d2fc
Compare
try { | ||
airbyteApiClient.getSourceApi().discoverSchemaForSource(requestBody); | ||
} catch (final Exception e) { | ||
log.info("Attempted schema refresh, but failed."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be an error
log with the exception itself logged as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops yep, meant to update that! I also have a question about logging in general. These logs (and the ones in SyncWorkflowImpl) are not showing up in the UI logs and I'm not sure why.
Two questions on this. I'm seeing this CI error:
and I'm not seeing how that's related. Also - the logs I added in SyncWorkflowImpl are not showing up in the UI. Does anyone have insight into which logs end up in the UI? |
@@ -72,6 +86,23 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, | |||
|
|||
final int version = Workflow.getVersion(VERSION_LABEL, Workflow.DEFAULT_VERSION, CURRENT_VERSION); | |||
final String taskQueue = Workflow.getInfo().getTaskQueue(); | |||
|
|||
if (version > Workflow.DEFAULT_VERSION) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need a dedicated version for that. Re-using the existing version won't work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh I actually had this locally but didn't push up the change - doing that now
|
||
@Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME) | ||
@Override | ||
public StandardSyncOutput run(final JobRunConfig jobRunConfig, | ||
final IntegrationLauncherConfig sourceLauncherConfig, | ||
final IntegrationLauncherConfig destinationLauncherConfig, | ||
final StandardSyncInput syncInput, | ||
final UUID connectionId) { | ||
final UUID connectionId) | ||
throws JsonValidationException, ConfigNotFoundException, IOException, ApiException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that those exception shouldn't be in the method signature, we should catch them and set the failure reason in the return StandardSyncOutput
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just updated this -- I actually think we shouldn't fail the sync just because a schema refresh fails. I logged the error, but I'm not sure if that would be confusing to users. What do you think?
a69ad8c
to
7db8ba9
Compare
da9aeb6
to
825e42a
Compare
|
||
public RefreshSchemaActivityImpl(Optional<ConfigRepository> configRepository, SourceApi sourceApi) { | ||
public RefreshSchemaActivityImpl(Optional<ConfigRepository> configRepository, | ||
AirbyteApiClient airbyteApiClient, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we inject the specific ApiClient that are needed rather than the top level one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried this but was getting a micronaut bean error, so I followed how you did it in the PersistStateActivity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the time PersistStateActivity
was added, I don't think we had the independent clients yet.
The way to solve the micronaut bean error should be to add the client to the API factory:
airbyte/airbyte-workers/src/main/java/io/airbyte/workers/config/ApiClientBeanFactory.java
Line 38 in 8c780bd
public AirbyteApiClient airbyteApiClient( |
@benmoriceau, any specific reasons we do not have those yet other than did not have time to get to it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not that I know of. We should add them
airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java
Outdated
Show resolved
Hide resolved
@@ -423,4 +450,13 @@ private static void verifyDbtTransform(final DbtTransformationActivity dbtTransf | |||
operatorDbtInput); | |||
} | |||
|
|||
private static void verifyShouldRefreshSchema(final RefreshSchemaActivity refreshSchemaActivity) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
airbyte-workers/src/main/java/io/airbyte/workers/config/ApiClientBeanFactory.java
Outdated
Show resolved
Hide resolved
try { | ||
sourceApi.discoverSchemaForSource(requestBody); | ||
} catch (final Exception e) { | ||
log.error("Attempted schema refresh, but failed with error: ", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Could be worth adding a comment explaining that we swallow the exception here to avoid blocking the replication if we fail to refresh.
This reverts commit 2b045a9.
Refresh schemas before each sync using feature flag
The main logic here is in
SyncWorkflowImpl.java