-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add incremental to ~~jdbc~~ jooq source (and postgres) #1172
Conversation
cb0a1f0
to
a57c953
Compare
...ectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcState1.java
Outdated
Show resolved
Hide resolved
|
||
public class MoreStreams { | ||
|
||
public static <T> Stream<T> toStream(Iterator<T> iterator) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will find replace other instances of this in a separate pr.
...nnectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/IncrementalUtils.java
Outdated
Show resolved
Hide resolved
return Optional.ofNullable(map.get(streamName)); | ||
} | ||
|
||
public Optional<String> getOriginalCursorField(String streamName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a little goofy as opposed to just returning the cursor info. i didn't want to have to worry about defensive copy issues though.
@@ -0,0 +1,24 @@ | |||
import org.jsonschema2pojo.SourceType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was running into a weird issue while trying to nest this model inside of airbyte-integrations/connectors/source-jdbc
so I pulled it out to be a top level connector while I was developing. I will fix the issue and nest everything that is currently in airbyte-integrations/connectors/source-jdbc-models
in airbyte-integrations/connectors/source-jdbc/models
before merging.
...ectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java
Outdated
Show resolved
Hide resolved
775e2ce
to
f70c5af
Compare
configuredStream.setSyncMode(io.airbyte.protocol.models.SyncMode.INCREMENTAL); | ||
if (Optional.ofNullable(configuredStream.getStream().getSourceDefinedCursor()).orElse(false)) { | ||
configuredStream.setCursorField(configuredStream.getStream().getDefaultCursorField()); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as the todo describes, this is really bad. i don't think we need to block postgres on fixing this, but as part of our goal to "launch" incremental, we need to figure out how to include it into our standard tests more ergonomically and transparently. will add an issue for next week. #1217
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public abstract class AbstractJooqSource implements Source { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see main description. intentionally alot of copy and paste going on right now. goal next week is to reconsolidate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reconsolidate with JDBC source correct? sounds good if so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.
} else if (jooqDataType.isString() || jooqDataType.isTemporal()) { | ||
// todo (cgardens) - this is bad because only works for mysql, psql, and sqlite. these are also the | ||
// only dialects that jooq supports in the free version, so this should not actually be an issue. it | ||
// is still sketchy. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again something i want to try to resolve next week.
if (messageStream.hasNext()) { | ||
final AirbyteMessage message = messageStream.next(); | ||
final String cursorCandidate = message.getRecord().getData().get(cursorField).asText(); | ||
if (IncrementalUtils.compareCursors(maxCursor, cursorCandidate, cursorType) < 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no guarantee here that our comparators will do the exact same thing as the databases. This is pretty scary, but haven't come up with a bullet proof way of fixing it yet.
I was thinking about what query I could submit to the database to do this for me. Doing it as individual queries would be too expensive, but if I can batch them somehow it might be okay. Haven't decided if this is a right now problem or not. Open to input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Main comments are around:
- might be missing something, but I'm unsure if the current dynamic of current vs. initial cursor is correct
- why do we have two methods of discovery? can we have just one to avoid risk of them going out of sync?
- is it possible/needed to paginate full refresh queries to the db? ideally incremental too.
- is it possible to do cursor comparison DB side?
Rest of the comments are style or nits
...andard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/TestSource.java
Outdated
Show resolved
Hide resolved
...ectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJooqSource.java
Show resolved
Hide resolved
...ectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJooqSource.java
Show resolved
Hide resolved
...nnectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/JdbcStateManager.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
private static Stream<Record> executeFullRefreshQuery(Database database, List<org.jooq.Field<?>> jooqFields, Table<?> table) throws SQLException { | ||
return database.query(ctx -> ctx.select(jooqFields) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do you think about doing paging in this query to avoid huge queries on the db? ideally we'd do the same with incremental
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think that this should be an issue. postgres should do all of this in a streaming fashion so it shouldn't technically cause problems. like i think the preferred way to handle this is have the db do this for you. i think you only mess with pagination if you really have to for doing stuff in web apps and what not.
...ectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJooqSource.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Override | ||
public AirbyteCatalog discover(JsonNode config) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why doesn't this exclude schemas the same way internal does? feels like right now there are two distinct discoverable schemas, one external and one internal. Ideally we have the same I think, but I may be missing a detail here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it does. if you follow discover => getTables => discoverInternal. so they use the same code path. discover just has to wrap it in our data model, while internally we don't wrap it so we can use the more granularly defined types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok so I'm missing a detail then ;)
...ectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJooqSource.java
Show resolved
Hide resolved
final Optional<String> initialCursorOptional = stateManager.getOriginalCursor(streamName); | ||
|
||
final Stream<AirbyteMessage> internalMessageStream; | ||
if (initialCursorOptional.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the logic right now seems to be: if there was ever a cursor, do incremental. else do fullrefresh.
Shouldn't it be the other way around i.e: if there is a current cursor AND that cursor is the same as the initial cursor do incremental otherwise FR? might be missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think you're right. i'll double check this.
// x < 0 mean replace original | ||
// x == 0 means keep original | ||
// x > 0 means keep original | ||
public static int compareCursors(String original, String candidate, JsonSchemaPrimitive type) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this comparison be done DB side?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we can we should. see: https://github.com/airbytehq/airbyte/pull/1172/files#r536510322. i have some performance concerns around it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😏 select MAX(col1) from ( values ('a'), ('b'), ('c')) as x (col1);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😏😏😏😏😏🤡 🥨 😏 ctx.select(DSL.max(DSL.field("col1"))).from(DSL.values(DSL.row("1"), DSL.row("3"), DSL.row("2")).as("t", "col1"))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will decide tomorrow if i'm going to do this in this iteration. but it's nifty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
going to hold off on this. i think it is low risk for right now and i don't want to put the work in here if it looks like i'm going to replace it with a jdbcsource anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not ready to merge right?
} | ||
|
||
@SuppressWarnings("OptionalUsedAsFieldOrParameterType") | ||
private static CursorInfo createCursorInfoForStream(String streamName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this function is swallowing errors with the logging. I don't think we should log if we are unable to find the cursor fields and just do a full refresh. I think we should fail instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not what is happening here. None of these are failure states. Given that it is a valid configuration, these are all possible states that need to be handled. Let me know if you you see a case in here that should be a failure case. I've reviewed each of them and they all seem reasonable to me.
return new AirbyteMessage().withType(Type.STATE).withState(stateMessage); | ||
} else { | ||
return endOfData(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: remove new line
correct still iterating on this based on sherif's feedback. |
This is ready for another pass. I am working on the last todos mentioned above, but I don't think any of them will make or break your review. |
…irbyte/integrations/source/jdbc/AbstractJooqSource.java Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
…irbyte/integrations/source/jdbc/AbstractJooqSource.java Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
68e5a8f
to
f6a49d7
Compare
…ng of timezones across platforms
What
jdbcjooq-compatible sources. The goal of this PR is to set the base for all jdbc source and to be able to ship the postgres source using incremental.How
JdbcStateManager
to track original and new cursor states as we progress. This should be reusable for all database style source I think.StateDecoratingIterator
) that handles tracking the max cursor value and emitting the state message at the end of syncing each stream. Note: 1 state message per stream, even if no records are replicated for the stream.AbstractJdbcSource
for now.Decisions to make
WHERE
clauses with the correct syntax to do the comparison we need for the cursor based on the type of the column. Jooq seems to make that easier (though as you go through the PR, you'll see it is not perfect). What I plan to do next is see what a just jdbc version of this looks like. If it is good enough, I will just get rid of all jooq from the database sources and they can all use one. That's the dream.JdbcSourceAbstraction
because it trusted that our types were good enough.Checklist
Reading Order
AbstractJdbcSource
JdbcStateManager
StateDecoratingIterator