-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Checkpoint snapshot skeleton #33838
base: 12-29-Model_update
Are you sure you want to change the base?
Checkpoint snapshot skeleton #33838
Conversation
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. |
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Ignored Deployment
|
Warning 🚨 Connector code freeze is in effect until 2024-01-02. This PR is changing connector code. Please contact the current OC engineers if you want to merge this change to master. |
Before Merging a Connector Pull RequestWow! What a great pull request you have here! 🎉 To merge this PR, ensure the following has been done/considered for each connector added or updated:
If the checklist is complete, but the CI check is failing,
|
fefa086
to
2011184
Compare
2011184
to
36e9ac2
Compare
36e9ac2
to
c964c15
Compare
41e7aea
to
bcc2380
Compare
c964c15
to
c1ed503
Compare
bcc2380
to
3ed7e3f
Compare
c1ed503
to
849216e
Compare
3ed7e3f
to
ef63a6b
Compare
849216e
to
c42143b
Compare
ef63a6b
to
5be6c10
Compare
c42143b
to
c524fb9
Compare
5be6c10
to
ccf8233
Compare
c524fb9
to
e64062e
Compare
ccf8233
to
a30583d
Compare
e64062e
to
3553861
Compare
a30583d
to
0542ac5
Compare
final ConfiguredAirbyteCatalog catalog, | ||
final Map<String, TableInfo<CommonField<JDBCType>>> tableNameToTable, | ||
final Instant emittedAt) { | ||
final List<AutoCloseableIterator<AirbyteMessage>> iteratorList = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this is beyond the scope of this change but does spamming the final
keyword really improve code quality? I wonder especially in cases like this one when we're assigning a value which is in reality obviously mutable, even if the enclosing container reference doesn't change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The narrowest response to why add final
is that something will complain if you don't mark it as final and try to build.
It improves performance I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried removing final
in a bunch of places and nothing complained. Perhaps that's because I ripped out PMD a few months ago (nobody noticed, I think).
I don't buy the performance argument. Very little of our code is in the hotpath. Not to mention that if PMD can figure out which variables are final
then I trust that javac can tool.
IMHO lack of readability is a fantastic source of bugs and our code isn't great in this regard. Let's all improve it a little bit at a time.
.map(CommonField::getName) | ||
.filter(CatalogHelpers.getTopLevelFieldNames(airbyteStream)::contains) | ||
.toList(); | ||
primaryKeys.forEach(pk -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we either have a vanilla for-loop, or use the Streams constructs fully, instead of this mix of styles?
new MssqlInitialLoadRecordIterator(database, sourceOperations, quoteString, initialLoadStateManager, selectedDatabaseFields, pair, calculateChunkSize(), isCompositePrimaryKey(airbyteStream)); | ||
final AutoCloseableIterator<AirbyteMessage> recordIterator = | ||
getRecordIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli()); | ||
final AutoCloseableIterator<AirbyteMessage> recordAndMessageIterator = augmentWithState(recordIterator, pair); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we use var
more, please? the LHS types in this line and those above is not ambiguous
.withNamespace(namespace) | ||
.withEmittedAt(emittedAt) | ||
.withData(r))); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: perhaps it's just me but if this were inlined I'd find the code more readable. If we're worried about cyclomatic complexity then let's lift L80:99 into a separate function or something.
|
||
// Augments the given iterator with record count logs. | ||
private AutoCloseableIterator<AirbyteMessage> augmentWithLogs(final AutoCloseableIterator<AirbyteMessage> iterator, | ||
final io.airbyte.protocol.models.AirbyteStreamNameNamespacePair pair, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this type name fully qualified?
final OrderedColumnLoadStatus currentOcLoadStatus = initialLoadStateManager.getOrderedColumnLoadStatus(pair); | ||
final JsonNode incrementalState = | ||
(currentOcLoadStatus == null || currentOcLoadStatus.getIncrementalState() == null) ? streamStateForIncrementalRunSupplier.apply(pair) | ||
:currentOcLoadStatus.getIncrementalState(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: formatting
|
||
private boolean shouldBuildNextSubquery() { | ||
// The next sub-query should be built if (i) it is the first subquery in the sequence. (ii) the | ||
// previous subquery has finished. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: comment formatting
final String schemaName = pair.getNamespace(); | ||
LOGGER.info("Preparing query for table: {}", tableName); | ||
final String fullTableName = getFullyQualifiedTableNameWithQuoting(schemaName, tableName, | ||
quoteString); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar comment regarding use of fullTableName
in log msg
final String wrappedColumnNames = RelationalDbQueryUtils.enquoteIdentifierList(columnNames, quoteString); | ||
final OrderedColumnLoadStatus ocLoadStatus = initialLoadStateManager.getOrderedColumnLoadStatus(pair); | ||
if (ocLoadStatus == null) { | ||
LOGGER.info("pkLoadStatus is null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider improving this log msg, which is not terribly informative
AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamespacePair pair, final JsonNode streamStateForIncrementalRun); | ||
|
||
// Returns the previous state emitted, represented as a {@link OrderedColumnLoadStatus} associated with | ||
// the stream. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: javadoc instead of // comments?
final AirbyteStreamNameNamespacePair updatedPair = new AirbyteStreamNameNamespacePair(pair.getName(), pair.getNamespace()); | ||
map.put(updatedPair, ocStatus); | ||
}); | ||
return map; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use Streams constructs
} | ||
|
||
private AirbyteStreamState getAirbyteStreamState(final io.airbyte.protocol.models.AirbyteStreamNameNamespacePair pair, final JsonNode stateData) { | ||
LOGGER.info("STATE DATA FOR {}: {}", pair.getNamespace().concat("_").concat(pair.getName()), stateData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: caps
private AirbyteStreamState getAirbyteStreamState(final io.airbyte.protocol.models.AirbyteStreamNameNamespacePair pair, final JsonNode stateData) { | ||
LOGGER.info("STATE DATA FOR {}: {}", pair.getNamespace().concat("_").concat(pair.getName()), stateData); | ||
assert Objects.nonNull(pair.getName()); | ||
assert Objects.nonNull(pair.getNamespace()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar comment as earlier regarding assertions
|
||
} | ||
|
||
public record OrderedColumnInfo(String ocFieldName, JDBCType fieldType, String ocMaxValue) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious why are these defined as inner classes? files are cheap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a bunch of nits on my end.
0542ac5
to
ec9436d
Compare
a9e4d07
to
f853b53
Compare
ec9436d
to
36eba82
Compare
f853b53
to
7d9fc69
Compare
a6e557e
to
48a2e60
Compare
b355b73
to
8c1e809
Compare
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
@SuppressWarnings("try") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the reason we have to use the supresswarnings here. Usually those are pretty useful in preventing resource leaks
48a2e60
to
57a4dfe
Compare
8c1e809
to
0554fad
Compare
57a4dfe
to
ba38c44
Compare
0554fad
to
eb7b3b4
Compare
What
Describe what the change is solving
It helps to add screenshots if it affects the frontend.
How
Describe the solution
Recommended reading order
x.java
y.python
🚨 User Impact 🚨
Are there any breaking changes? What is the end result perceived by the user?
For connector PRs, use this section to explain which type of semantic versioning bump occurs as a result of the changes. Refer to our Semantic Versioning for Connectors guidelines for more information. Breaking changes to connectors must be documented by an Airbyte engineer (PR author, or reviewer for community PRs) by using the Breaking Change Release Playbook.
If there are breaking changes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.
Pre-merge Actions
Expand the relevant checklist and delete the others.
New Connector
Community member or Airbyter
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.0.0.1
Dockerfile
has version0.0.1
README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog with an entry for the initial version. See changelog exampledocs/integrations/README.md
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
Updating a connector
Community member or Airbyter
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
Connector Generator
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:generateScaffolds
then checking in your changesUpdating the Python CDK
Airbyter
Before merging:
--use-local-cdk --name=source-<connector>
as optionsairbyte-ci connectors --use-local-cdk --name=source-<connector> test
After merging: