-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Discover worker starts to use API to write schema result #21875
Changes from all commits
8fb6b7a
56606b5
bfdaef3
a7cf331
e11c17b
21ec7f1
37080ff
8f5ec25
3b8e233
8700b1e
26a7b10
a85aef7
ed2780c
d4556f1
d54c842
fbf6501
35c83ff
f0aebd8
a2b6932
6c46b65
a2fab11
3c3b935
37ebd3a
05988b8
66ffbfd
3456a19
73bc4cc
6d577c8
cbeefb9
544ebd3
9e5b359
d2e47a7
971982f
e877991
56e383b
db559a2
c858453
94296aa
0036de8
a4296c8
d998cc2
c6aec13
2851158
036db6c
947baeb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
/* | ||
xiaohansong marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.workers.helper; | ||
|
||
import io.airbyte.commons.enums.Enums; | ||
import io.airbyte.commons.text.Names; | ||
import io.airbyte.protocol.models.AirbyteStream; | ||
import java.util.stream.Collectors; | ||
|
||
/** | ||
* Utilities to convert Catalog protocol to Catalog API client. This class was similar to existing | ||
* logic in CatalogConverter.java; But code can't be shared because the protocol model is | ||
* essentially converted to two different api models. Thus, if we need to change logic on either | ||
* place we have to take care of the other one too. | ||
*/ | ||
public class CatalogClientConverters { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mfsiega-airbyte curious if you have thoughts here on adding more 'catalog conversion' logic to a new module. These conversion functions are new in that they're converting to the client (ie, 'api.client.model.generated') types as opposed to the api types that the existing CatalogConverter.java handles. I think the 'airbyte-commons-worker' module probably makes sense for client-specific conversions (since the worker here is after all a client of the API) but I feel like I want to raise a discussion here to make sure we aren't fragmenting messy catalog conversion logic all over the place if we can avoid it. Thoughts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Want to add that I can't add this logic into CatalogConverters because it is defined in airbyte-common-server, which has already use airbyte-common-workers as its dependency; introducing this will cause the dependency loop error. I was also thinking about moving both code into a common library so airbyte-common-server and airbyte-common-worker can both depend on it, but not sure where to put it; plus seems server side code will never be used by worker and client side code will never be used by server. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think something like Last but not least I'd really like to refactor the way catalogs are stored and handled: (1) we don't have a distinct "persistence" model, but rather re-use the model from the protocol, which is limiting; (2) we have a loose relationship between discovered source schemas and configured connection catalogs which causes no end of confusion. All in all I do think having this logic fragmented all over is a bit of a problem, but I think I'd say it already is a problem, and I don't think this PR makes it much worse. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i feel this might be out of scope for this PR - I'll create a separate ticket to track this and we probably should discuss refactoring goal/plan in a refining meeting https://github.com/airbytehq/airbyte/issues/21999 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All of the above makes sense to me, thanks for creating that followup ticket @xiaohansong !
xiaohansong marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
/** | ||
* Converts a protocol AirbyteCatalog to an OpenAPI client versioned AirbyteCatalog. | ||
*/ | ||
public static io.airbyte.api.client.model.generated.AirbyteCatalog toAirbyteCatalogClientApi( | ||
final io.airbyte.protocol.models.AirbyteCatalog catalog) { | ||
return new io.airbyte.api.client.model.generated.AirbyteCatalog() | ||
.streams(catalog.getStreams() | ||
.stream() | ||
.map(stream -> toAirbyteStreamClientApi(stream)) | ||
.map(s -> new io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration() | ||
.stream(s) | ||
.config(generateDefaultConfiguration(s))) | ||
.collect(Collectors.toList())); | ||
} | ||
|
||
private static io.airbyte.api.client.model.generated.AirbyteStreamConfiguration generateDefaultConfiguration( | ||
final io.airbyte.api.client.model.generated.AirbyteStream stream) { | ||
final io.airbyte.api.client.model.generated.AirbyteStreamConfiguration result = | ||
new io.airbyte.api.client.model.generated.AirbyteStreamConfiguration() | ||
.aliasName(Names.toAlphanumericAndUnderscore(stream.getName())) | ||
.cursorField(stream.getDefaultCursorField()) | ||
.destinationSyncMode(io.airbyte.api.client.model.generated.DestinationSyncMode.APPEND) | ||
.primaryKey(stream.getSourceDefinedPrimaryKey()) | ||
.selected(true); | ||
if (stream.getSupportedSyncModes().size() > 0) { | ||
result.setSyncMode(Enums.convertTo(stream.getSupportedSyncModes().get(0), | ||
io.airbyte.api.client.model.generated.SyncMode.class)); | ||
} else { | ||
result.setSyncMode(io.airbyte.api.client.model.generated.SyncMode.INCREMENTAL); | ||
} | ||
return result; | ||
} | ||
|
||
private static io.airbyte.api.client.model.generated.AirbyteStream toAirbyteStreamClientApi( | ||
final AirbyteStream stream) { | ||
return new io.airbyte.api.client.model.generated.AirbyteStream() | ||
.name(stream.getName()) | ||
.jsonSchema(stream.getJsonSchema()) | ||
.supportedSyncModes(Enums.convertListTo(stream.getSupportedSyncModes(), | ||
io.airbyte.api.client.model.generated.SyncMode.class)) | ||
.sourceDefinedCursor(stream.getSourceDefinedCursor()) | ||
.defaultCursorField(stream.getDefaultCursorField()) | ||
.sourceDefinedPrimaryKey(stream.getSourceDefinedPrimaryKey()) | ||
.namespace(stream.getNamespace()); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.workers.helper; | ||
|
||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
|
||
import com.google.common.collect.Lists; | ||
import io.airbyte.commons.text.Names; | ||
import io.airbyte.protocol.models.AirbyteCatalog; | ||
import io.airbyte.protocol.models.AirbyteStream; | ||
import io.airbyte.protocol.models.CatalogHelpers; | ||
import io.airbyte.protocol.models.Field; | ||
import io.airbyte.protocol.models.JsonSchemaType; | ||
import io.airbyte.protocol.models.SyncMode; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import org.junit.jupiter.api.Test; | ||
|
||
class CatalogClientConvertersTest { | ||
|
||
public static final String ID_FIELD_NAME = "id"; | ||
private static final String STREAM_NAME = "users-data"; | ||
private static final AirbyteStream STREAM = new AirbyteStream() | ||
.withName(STREAM_NAME) | ||
.withJsonSchema( | ||
CatalogHelpers.fieldsToJsonSchema(Field.of(ID_FIELD_NAME, JsonSchemaType.STRING))) | ||
.withDefaultCursorField(Lists.newArrayList(ID_FIELD_NAME)) | ||
.withSourceDefinedCursor(false) | ||
.withSourceDefinedPrimaryKey(Collections.emptyList()) | ||
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)); | ||
|
||
private static final io.airbyte.api.client.model.generated.AirbyteStream CLIENT_STREAM = | ||
new io.airbyte.api.client.model.generated.AirbyteStream() | ||
.name(STREAM_NAME) | ||
.jsonSchema(CatalogHelpers.fieldsToJsonSchema(Field.of(ID_FIELD_NAME, JsonSchemaType.STRING))) | ||
.defaultCursorField(Lists.newArrayList(ID_FIELD_NAME)) | ||
.sourceDefinedCursor(false) | ||
.sourceDefinedPrimaryKey(Collections.emptyList()) | ||
.supportedSyncModes(List.of(io.airbyte.api.client.model.generated.SyncMode.FULL_REFRESH, | ||
io.airbyte.api.client.model.generated.SyncMode.INCREMENTAL)); | ||
private static final io.airbyte.api.client.model.generated.AirbyteStreamConfiguration CLIENT_DEFAULT_STREAM_CONFIGURATION = | ||
new io.airbyte.api.client.model.generated.AirbyteStreamConfiguration() | ||
.syncMode(io.airbyte.api.client.model.generated.SyncMode.FULL_REFRESH) | ||
.cursorField(Lists.newArrayList(ID_FIELD_NAME)) | ||
.destinationSyncMode(io.airbyte.api.client.model.generated.DestinationSyncMode.APPEND) | ||
.primaryKey(Collections.emptyList()) | ||
.aliasName(Names.toAlphanumericAndUnderscore(STREAM_NAME)) | ||
.selected(true); | ||
|
||
private static final AirbyteCatalog BASIC_MODEL_CATALOG = new AirbyteCatalog().withStreams( | ||
Lists.newArrayList(STREAM)); | ||
|
||
private static final io.airbyte.api.client.model.generated.AirbyteCatalog EXPECTED_CLIENT_CATALOG = | ||
new io.airbyte.api.client.model.generated.AirbyteCatalog() | ||
.streams(Lists.newArrayList( | ||
new io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration() | ||
.stream(CLIENT_STREAM) | ||
.config(CLIENT_DEFAULT_STREAM_CONFIGURATION))); | ||
|
||
@Test | ||
void testConvertToClientAPI() { | ||
assertEquals(EXPECTED_CLIENT_CATALOG, | ||
CatalogClientConverters.toAirbyteCatalogClientApi(BASIC_MODEL_CATALOG)); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ import org.jsoup.Jsoup; | |
|
||
dependencies { | ||
implementation project(':airbyte-db:db-lib') | ||
implementation project(':airbyte-api') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could this be a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes exactly... removed |
||
implementation project(':airbyte-commons-worker') | ||
implementation project(':airbyte-config:config-models') | ||
implementation project(':airbyte-config:config-persistence') | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we define these as 'CONNECT_TIMEOUT_DURATION
and
READ_TIMEOUT_DURATION` constants?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't see this comment - I made the fix in the next PR