-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bmoric/update connection list with breaking #18125
Changes from 1 commit
abaea95
f428057
5a3df67
6b8e1f0
5ac2cde
024b3d3
4fcc5a6
b8eeec2
ce5b574
b14b322
9593ebc
558022c
17a9681
e03cdbb
2686be6
62ca0f7
b78fd6e
7941c76
c42af6b
c444399
2359d9d
50770ae
5bc3e3e
6d13a42
20f9e30
21eb1d0
32613c0
5320f73
826cfb0
1270de2
f10e413
1c9a008
5781ce0
5203c9f
f81285c
6c4d620
9e1db24
e16a35d
1e9cc8a
f38b682
8f28aa5
acf9ec1
fc085cf
95a6dc0
cdf1065
d1683d5
6662bb7
f93f67d
a2b00ca
53937da
8c8cd4b
0a0c500
f4959c8
68d6825
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -71,7 +71,6 @@ | |
import java.util.function.Function; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
import lombok.Data; | ||
import org.apache.commons.lang3.ArrayUtils; | ||
import org.jooq.Condition; | ||
import org.jooq.DSLContext; | ||
|
@@ -976,45 +975,20 @@ public Optional<ActorCatalogFetchEvent> getMostRecentActorCatalogFetchEventForSo | |
return records.stream().findFirst().map(DbConverter::buildActorCatalogFetchEvent); | ||
} | ||
|
||
@Data | ||
public static class ActorCatalogFetchEventWithCreationDate { | ||
|
||
private final ActorCatalogFetchEvent actorCatalogFetchEvent; | ||
private final OffsetDateTime createdAt; | ||
|
||
} | ||
|
||
public Map<UUID, ActorCatalogFetchEventWithCreationDate> getMostRecentActorCatalogFetchEventForSources(final List<UUID> sourceIds) | ||
public Map<UUID, ActorCatalogFetchEvent> getMostRecentActorCatalogFetchEventForSources(final List<UUID> sourceIds) | ||
throws IOException { | ||
|
||
return database.query(ctx -> ctx.select(ACTOR_CATALOG_FETCH_EVENT.asterisk()) | ||
.from(ACTOR_CATALOG_FETCH_EVENT) | ||
.where(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID.in(sourceIds)) | ||
.fetch() | ||
.stream() | ||
.map(record -> new ActorCatalogFetchEventWithCreationDate( | ||
DbConverter.buildActorCatalogFetchEvent(record), | ||
record.get(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT))) | ||
.collect( | ||
() -> new HashMap<>(), | ||
this::insertInAccumulatorIfNeeded, | ||
(left, right) -> { | ||
right.forEach((actorId, value) -> { | ||
insertInAccumulatorIfNeeded(left, value); | ||
}); | ||
})); | ||
} | ||
|
||
private void insertInAccumulatorIfNeeded(final Map<UUID, ActorCatalogFetchEventWithCreationDate> acc, | ||
final ActorCatalogFetchEventWithCreationDate value) { | ||
if (acc.containsKey(value.getActorCatalogFetchEvent().getActorId())) { | ||
final ActorCatalogFetchEventWithCreationDate currentNewest = acc.get(value.actorCatalogFetchEvent.getActorId()); | ||
if (currentNewest.getCreatedAt().isBefore(value.getCreatedAt())) { | ||
acc.put(value.getActorCatalogFetchEvent().getActorId(), value); | ||
} | ||
} else { | ||
acc.put(value.actorCatalogFetchEvent.getActorId(), value); | ||
} | ||
return database.query(ctx -> ctx.fetch( | ||
""" | ||
select actor_catalog_id, actor_id from | ||
(select id, actor_catalog_id, actor_id, config_hash, actor_version, created_at, rank() over (partition by actor_id order by created_at desc) as creation_order_rank, modified_at | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. kind of nitpicky, but do we need to select all these fields or just actor_id, actor_catalog_id, & creation_order_rank? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes most of the column are not needed. I have remove them. Thanks for catching that. |
||
from public.actor_catalog_fetch_event | ||
) table_with_rank | ||
where creation_order_rank = 1; | ||
""")) | ||
.stream().map(DbConverter::buildActorCatalogFetchEvent) | ||
.collect(Collectors.toMap(record -> record.getActorId(), | ||
record -> record)); | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,7 +48,6 @@ | |
import io.airbyte.config.StandardSync; | ||
import io.airbyte.config.persistence.ConfigNotFoundException; | ||
import io.airbyte.config.persistence.ConfigRepository; | ||
import io.airbyte.config.persistence.ConfigRepository.ActorCatalogFetchEventWithCreationDate; | ||
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; | ||
import io.airbyte.server.converters.ApiPojoConverters; | ||
import io.airbyte.server.handlers.helpers.CatalogConverter; | ||
|
@@ -115,7 +114,7 @@ public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final | |
final Map<UUID, JobRead> runningJobByConnectionId = | ||
getRunningJobByConnectionId(standardSyncs.stream().map(StandardSync::getConnectionId).toList()); | ||
|
||
final Map<UUID, ActorCatalogFetchEventWithCreationDate> newestFetchEventsByActorId = | ||
final Map<UUID, ActorCatalogFetchEvent> newestFetchEventsByActorId = | ||
configRepository.getMostRecentActorCatalogFetchEventForSources(new ArrayList<>()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @benmoriceau why are we adding more db queries into this handler? it's really specifically not supposed to be making direct database calls. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cgardens It was not very clear that the repository shouldn't have been used here. Especially since it was recently in in the same endpoint and other endpoints to get data from the DB. The ticket related to this PR #17526 is only needing the described functionality in the webBackend endpoint only so it made sense to add that direct call in the WebBackendHandler. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What else do we need to add to make it clear? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment is easy to miss when the same field is being used within the same function here. When you are looking at the function to update, it is not explicit that this is a deprecated field. Something like this would make it very explicit that this is deprecated. It will show the getter are an error and would have prevent any use of it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks good. let's do that. i also put up a PR that adds more explanation in the javadocs #19719 |
||
|
||
final List<WebBackendConnectionListItem> connectionItems = Lists.newArrayList(); | ||
|
@@ -197,7 +196,7 @@ private WebBackendConnectionListItem buildWebBackendConnectionListItem( | |
final Map<UUID, DestinationRead> destinationReadById, | ||
final Map<UUID, JobRead> latestJobByConnectionId, | ||
final Map<UUID, JobRead> runningJobByConnectionId, | ||
final Optional<ActorCatalogFetchEventWithCreationDate> latestFetchEvent) | ||
final Optional<ActorCatalogFetchEvent> latestFetchEvent) | ||
throws JsonValidationException, ConfigNotFoundException, IOException { | ||
|
||
final SourceRead source = sourceReadById.get(standardSync.getSourceId()); | ||
|
@@ -207,11 +206,7 @@ private WebBackendConnectionListItem buildWebBackendConnectionListItem( | |
final ConnectionRead connectionRead = connectionsHandler.getConnection(standardSync.getConnectionId()); | ||
final Optional<UUID> currentCatalogId = connectionRead == null ? Optional.empty() : Optional.ofNullable(connectionRead.getSourceCatalogId()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it possible for connectionread to be null? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nothing tells me that the field is not nullable so I prefered to handle the possibility. |
||
|
||
final Optional<ActorCatalogFetchEvent> mostRecentFetchEvent = | ||
latestFetchEvent | ||
.map(actorCatalogFetchEventWithCreationDate -> actorCatalogFetchEventWithCreationDate.getActorCatalogFetchEvent()); | ||
|
||
final SchemaChange schemaChange = getSchemaChange(connectionRead, currentCatalogId, mostRecentFetchEvent); | ||
final SchemaChange schemaChange = getSchemaChange(connectionRead, currentCatalogId, latestFetchEvent); | ||
|
||
final WebBackendConnectionListItem listItem = new WebBackendConnectionListItem() | ||
.connectionId(standardSync.getConnectionId()) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@benmoriceau what is this method supposed to do? the argument that is passed in is unused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cgardens. It is doing a similar operation than
getMostRecentActorCatalogFetchEventForSource
but for a list of sourceIds instead of a single source Id. The use of the input got lost during PR updatesFixed in #19668