-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce the AirbyteStreamNamespaceNamePair to the DefaultReplicationWorker. #19360
Conversation
…tehq/airbyte into davinchia/progress-bar-persistence
airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerMetricReporter.java
Show resolved
Hide resolved
...protocol-models/src/main/java/io/airbyte/protocol/models/AirbyteStreamNameNamespacePair.java
Outdated
Show resolved
Hide resolved
airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerMetricReporter.java
Show resolved
Hide resolved
…ocol/models/AirbyteStreamNameNamespacePair.java Co-authored-by: Jimmy Ma <gosusnp@users.noreply.github.com>
/test connector=source-postgres |
/test connector=destination-snowflake |
@@ -77,7 +77,7 @@ private static <T> Map<AirbyteStreamNameNamespacePair, StreamCopier> createWrite | |||
final String stagingFolder = UUID.randomUUID().toString(); | |||
for (final var configuredStream : catalog.getStreams()) { | |||
final var stream = configuredStream.getStream(); | |||
final var pair = AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream); | |||
final var pair = AirbyteStreamNameNamespacePair.fromAirbyteStream(stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non-blocking: since we're being more explicit with AirbyteStreamNameNamespacePair
, should this var
be AirbyteStreamNameNamespacePair
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/test connector=source-postgres |
/test connector=destination-snowflake
Build PassedTest summary info:
|
/test connector=source-postgres
Build FailedTest summary info:
|
@grishick @ryankfu the source-postgres build has been failing for the last 5 days for reasons unrelated to this PR. From the connector-health dash, looks like this has been failing for a few days now: Since this is a package change, I think this change set should be fine to merge. I want to check in with both of you first. I would prefer to merge in sooner rather than later due to the freeze next week & this blocking other feature work. |
Follow up to #19360. This PR adjusts the MessageTracker interface to use the new Pair object.
…nWorker. (#19360) #19191 made me realise the DefaultReplicationWorker's metric tracking today has a bug where we aren't accounting for namespace when tracking metrics today. i.e. Streams with the same name and duplicate namespace will merge metrics. While reading the code to figure out a fix, I realised we don't have a good conceptual representation of stream namespace <> name pairs within the platform today. We use a concatenated string. Though this works, it will become harder and harder to read/track as we do more operations that involve namespace i.e. progress bars and column selection. This PR introduces the AirbyteStreamNameNamespacePair object into the platform code to make it more convenient to work with Streams in the future. (Especially if we proceed with the project to make streams a first-class citizen!) The AirbyteStreamNameNamespacePair object was written to deal with the same issue of namespace <> name pair manipulation within the Java destination code. It implements the Comparable interface, which makes it convenient to use for Collections operations. For an example of how this is consumed, see #19361.
Follow up to #19360. This PR adjusts the MessageTracker interface to use the new Pair object.
Now that there is the v0 version of the protocol models, will there be an updated version of AirbyteStreamNameNamespacePair that accepts the v0 or v1 models? |
Eventually I think we'll want to get something similar (possibly extracting an interface from the protocol models a la #20905). De facto, v0 and v1 don't have any structural changes (since the only diff is in the JSON blob fields, i.e. record data / stream schemas) so we can get away with using v0 everywhere for now. cc @gosusnp to fact-check me on that though |
@Eusebiotrigo, |
What
#19191 made me realise the DefaultReplicationWorker's metric tracking today has a bug where we aren't accounting for namespace when tracking metrics today. i.e. Streams with the same name and duplicate namespace will merge metrics.
While reading the code to figure out a fix, I realised we don't have a good conceptual representation of stream namespace <> name pairs within the platform today. We use a concatenated string. Though this works, it will become harder and harder to read/track as we do more operations that involve namespace i.e. progress bars and column selection.
This PR introduces the
AirbyteStreamNameNamespacePair
object into the platform code to make it more convenient to work with Streams in the future. (Especially if we proceed with the project to make streams a first-class citizen!)The
AirbyteStreamNameNamespacePair
object was written to deal with the same issue of namespace <> name pair manipulation within the Java destination code. It implements the Comparable interface, which makes it convenient to use for Collections operations.For an example of how this is consumed, see #19361.
How
AirbyteStreamNameNamespacePair
object from thebase-java
module to theprotocol_models
module. This location already contains models that make it easier to interact with pure Airbyte protocol-defined models, so moving it here is appropriate. All consumer modules already have dependencies on this, so there are build.gradle changes.No logic changes here - all modifications are to use the
AirbyteStreamNameNamespacePair
object.Recommended reading order
AirbyteStreamNameNamespacePair.java
- the main file being moved.DefaultReplicatonWorker.java
andRecordSchemaValidator.java
- the main consumers being changed.RecordSchemaValidatorTest.Java
,WorkerUtilsTest.java
andDefaultReplicationWorkerTest.java
- for the test modifications.🚨 User Impact 🚨
Are there any breaking changes? What is the end result perceived by the user? If yes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.
Pre-merge Checklist
Expand the relevant checklist and delete the others.
New Connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampledocs/integrations/README.md
airbyte-integrations/builds.md
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereUpdating a connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampleAirbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing/publish
command described hereConnector Generator
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates
then checking in your changesTests
Unit
Put your unit tests output here.
Integration
Put your integration tests output here.
Acceptance
Put your acceptance tests output here.