-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Geography support to RouterService #17902
Conversation
@pmossman is this ready for review? |
@davinchia yeah I think so! |
9bfe03b
to
8cc89fb
Compare
f021777
to
9fed5f0
Compare
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.commons.temporal.scheduling; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This interface is in the airbyte-commons-temporal
module, so that on the Cloud side, the new cloud-workers
module can depend on airbyte-commons-temporal
without needing a dependency on airbyte-workers
.
I debated putting this interface in airbyte-commons-worker
but based on the contents of the two modules, airbyte-commons-temporal
seemed like a slightly better fit. Definitely open to other opinions here!
@@ -0,0 +1,37 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This diff shows up as a brand new file because I moved the RouterService from /sync/ to
/scheduling/` because it's used by the ConnectionManagerWorkflow, not the SyncWorkflow.
This diff also contains the new functionality of reading the connection's Geography from the database instead of from the environment variable
|
||
return new RouteToSyncTaskQueueOutput(taskQueueForConnectionId); | ||
return new RouteToSyncTaskQueueOutput(taskQueueForConnectionId); | ||
} catch (final IOException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that the routerService queries the database, an IOException is possible. Throw a RetryableException if encountered
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remind me again, where do we catch the RetryableException? I can't look since as my IntelliJ is pooping out on me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Asked here: https://airbytehq-team.slack.com/archives/C03AS1GAQV6/p1666217128557719
I think it extends RuntimeException
which means Temporal knows to retry activities that fail with it as a cause, but it isn't super clear
@@ -0,0 +1,37 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's the OSS implementation of the GeographyMapper. In Cloud, this class will be overridden with a @Replaces
annotation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davinchia They are two different options for the same result. Any bean marked @Primary
will always win. @Replace
is used to take the place of an existing bean of the same type. I haven't played with it, but I suspect that @Replace
wouldn't work if no other bean exists of that type, but @Primary
would. Generally, one is a bit more loose than the other, so it really depends on if we have more than 2 versions, as we won't want to mark two of the three of the same type with @Primary
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davinchia That is my assessment as well.
@jdpgrailsdev @davinchia I think this should be ready for review in tandem with the Cloud PR |
*/ | ||
@Singleton | ||
@Slf4j | ||
@AllArgsConstructor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Our recommendation is to not use Lombok and instead create an explicit constructor with the beans to be injected. This allows us to use annotations on the fields should that become necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jdpgrailsdev for my knowledge, what does 'allows us to use annotations on the fields should that become necessary' mean?
(I understand and agree on not mixing constructor annotations from separate AOP frameworks for clarity/best practices.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davinchia With the Lombok generated constructor, there is no code for us to modify if we end up needing to add a @Qualifier
annotation to specify which version of a singleton to inject, nor is there the ability for us to use @Value
or @Property
to inject configuration values via the constructor. Put another way, the Lombok generated constructor only works for constructor-based injection if we do not need to annotate any of the parameters in order for Micronaut to make the correct injection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Thanks for explaining.
I think we should also start phasing out Lombok in our system for clarity. Does Micronaut have an @Slf4j
annotation equivalent? I believe these two are the main Lombok annotations we use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davinchia No, not that I am aware of. It provides the configuration of the logging system, but the use of a logger is up to the developer. I'm not sure that the @Slf4j
annotation really saves us a whole lot, as declaring a logger is a 1 line statement, much like the annotation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I think the benefit of the annotation is mostly not need to remember to type the long string and the hideous class import - all cosmetic.
MIcronaut docs do indicate some clashing of Lombok and Micronaut - let's get rid of Lombok!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a5430be
to
c6b30ea
Compare
c6b30ea
to
9c79c5a
Compare
@@ -579,16 +579,6 @@ public interface Configs { | |||
*/ | |||
boolean shouldRunConnectionManagerWorkflows(); | |||
|
|||
// Worker - Control Plane configs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
private final ConfigRepository configRepository; | ||
private final GeographyMapper geographyMapper; | ||
|
||
public RouterService(final ConfigRepository configRepository, final GeographyMapper geographyMapper) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is where Micronaut will inject the Cloud specific bean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It will inject which ever implementation of the GeographyMapper
wins. In OSS, this will be the default implementation (the only one present). In cloud, both are present on the classpath, but only the one provided by the cloud-worker
library will win because of the Primary
annotation.
return new RouteToSyncTaskQueueOutput(taskQueueForConnectionId); | ||
return new RouteToSyncTaskQueueOutput(taskQueueForConnectionId); | ||
} catch (final IOException e) { | ||
throw new RetryableException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: log.warn here?
public class DefaultGeographyMapper implements GeographyMapper { | ||
|
||
@VisibleForTesting | ||
static final String DEFAULT_SYNC_TASK_QUEUE = TemporalJobType.SYNC.name(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
* Maps a {@link Geography} to a Temporal Task Queue that should be used to run syncs for the given | ||
* Geography. | ||
*/ | ||
public interface GeographyMapper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking out loud:
Would a better name not include Geography? Thinking about the future where we want the input to getTaskQueue
to not be tied to geography. Maybe QueueMapper?
Landing what we have now and evolving this later is also okay to me.
|
||
/** | ||
* If this test fails, it likely means that a new value was added to the {@link Geography} enum. A | ||
* new entry must be added to {@link DefaultGeographyMapper#GEOGRAPHY_TASK_QUEUE_MAP} to get this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @pmossman
Both the code comments and the PR comments made it very easy to review.
I have one medium comment about naming of the geography mapper. I have some smaller comments for my own understanding.
None of these are blocking. We can iterate on a follow up PR.
39cf2ab
to
0301e31
Compare
0301e31
to
9a5ab5a
Compare
move geography map to a helper that can be overridden with a separate implementation in cloud format pmd fix import move geography mapper interface to airbyte-commons-temporal
9a5ab5a
to
53045c1
Compare
* router service uses geography in database instead of env var move geography map to a helper that can be overridden with a separate implementation in cloud format pmd fix import move geography mapper interface to airbyte-commons-temporal * add DefaultGeographyMapper back in * remove all args constructor and extranneous import * rename GeographyMapper to TaskQueueMapper
What
RouterService selects a task queue based on a connection's Geography stored in the config database, rather than using environment variables.
How
Reading Order