-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix record count and add acceptance test to the new scheduler #9487
Conversation
23dca6c
to
57238c9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the quick turn around!
Couple of questions to make sure I understand what is going on.
Did you double check in a dev env to see if the counts are working as expected?
configs.getWebappUrl(), | ||
configRepository, | ||
new WorkspaceHelper(configRepository, jobPersistence), | ||
TrackingClientSingleton.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not seeing this be initialised? Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is initialized in the get method
public static TrackingClient get() {
synchronized (lock) {
if (trackingClient == null) {
initialize();
}
return trackingClient;
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah I was referring to properly initialising it with this call https://github.com/airbytehq/airbyte/blob/master/airbyte-analytics/src/main/java/io/airbyte/analytics/TrackingClientSingleton.java#L39. e.g. https://github.com/airbytehq/airbyte/blob/master/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java#L259
I think this is needed otherwise we won't be sending events to Segment. If you look at the Scheduler code, the linked code is called before we call get
. I'm assuming we want to do the same here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching it, Done
@@ -116,6 +116,7 @@ services: | |||
- SYNC_JOB_MAX_TIMEOUT_DAYS=${SYNC_JOB_MAX_TIMEOUT_DAYS} | |||
- TEMPORAL_HOST=${TEMPORAL_HOST} | |||
- TRACKING_STRATEGY=${TRACKING_STRATEGY} | |||
- WEBAPP_URL=${WEBAPP_URL} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is this coming from?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed to initialize the jobNotifier call by the temporal activity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will also need to be updated for Kustomize/Helm in the future.
This is probably beyond the scope for this PR but for internal uses like this we should probably aim at using INTERNAL_API_HOST
+ API_URL
and not WEBAPP_URL
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about the kustomize files? should we also update those in this PR so we at least have one good working Kube deploy?
...o/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java
Show resolved
Hide resolved
airbyte-workers/src/main/java/io/airbyte/workers/worker_run/TemporalWorkerRunFactory.java
Show resolved
Hide resolved
...o/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java
Show resolved
Hide resolved
@@ -3408,7 +3408,7 @@ | |||
supported_destination_sync_modes: | |||
- "append" | |||
- "overwrite" | |||
- dockerImage: "airbyte/destination-s3:0.2.4" | |||
- dockerImage: "airbyte/destination-s3:0.2.3" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: Update to 4
configs.getWebappUrl(), | ||
configRepository, | ||
new WorkspaceHelper(configRepository, jobPersistence), | ||
TrackingClientSingleton.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is initialized in the get method
public static TrackingClient get() {
synchronized (lock) {
if (trackingClient == null) {
initialize();
}
return trackingClient;
}
}
...o/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java
Show resolved
Hide resolved
airbyte-workers/src/main/java/io/airbyte/workers/worker_run/TemporalWorkerRunFactory.java
Show resolved
Hide resolved
...o/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java
Show resolved
Hide resolved
@@ -116,6 +116,7 @@ services: | |||
- SYNC_JOB_MAX_TIMEOUT_DAYS=${SYNC_JOB_MAX_TIMEOUT_DAYS} | |||
- TEMPORAL_HOST=${TEMPORAL_HOST} | |||
- TRACKING_STRATEGY=${TRACKING_STRATEGY} | |||
- WEBAPP_URL=${WEBAPP_URL} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed to initialize the jobNotifier call by the temporal activity
@davinchia I tested locally and not on dev yet (I need to merge for that). The issue has been reproduce and visually fixed with this change. |
57238c9
to
e1ae796
Compare
fa42598
to
67bde1b
Compare
67bde1b
to
bd28eee
Compare
bd28eee
to
2319305
Compare
2319305
to
3ce9ce7
Compare
256e30b
to
b8b506a
Compare
@@ -467,7 +467,8 @@ public void testManualSync() throws Exception { | |||
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); | |||
final UUID connectionId = | |||
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); | |||
|
|||
// Avoid Race condition with the new scheduler | |||
Thread.sleep(10 * 1000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
weird. what exactly is the issue? the connection is created but not in temporal yet? should createConnect
just not return until everything is set up properly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The connection is created, which means that the a workflow is plan. The temporal workflow runs asynchronously, that's why I put this wait to make sure that the job gots created when we call final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
because if it was done without a wait the async temporal process may have done it or not. CreateConnection might return the UUID of a connection for which the job is not created yet
// waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob()); | ||
final JobRead job = | ||
waitForJob(apiClient.getJobsApi(), connectionSyncRead2.getJob(), | ||
Sets.newHashSet(JobStatus.PENDING, JobStatus.RUNNING, JobStatus.INCOMPLETE, JobStatus.FAILED)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this changing? don't we want to wait for it to complete instead of to just be in running.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have changed the behavior of the reset, it is now cancelling a running job instead of waiting for it to terminate. That's why we are also expecting to have failed status.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add another look and this was a mistake, I restored the previous way to do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
b8b506a
to
cf41304
Compare
cf41304
to
f9fc450
Compare
f9fc450
to
ff647de
Compare
8b7e3dc
to
5745704
Compare
The new scheduler was missing a notification step after the job is done. This is needed in order to report the number of record of a sync.
Add a new github action task to run the acceptances test with the new scheduler
5745704
to
a0f20f6
Compare
a0f20f6
to
69b2ab0
Compare
It is following the process describe as the first proposal of https://docs.google.com/document/d/11_wFPSVDNgm0_EyeBdP8huYXrJRzo4YaX9AcaR9XYl8/edit#heading=h.2v6egvd3w0f6
The description of the PR are in the commit message. Let me know if having the description here would be better.
This change is