Skip to content

Commit

Permalink
Merge branch 'master' into ddavydov/#829-source-iterable-better-proce…
Browse files Browse the repository at this point in the history
…ssing-for-429-401
  • Loading branch information
davydov-d committed Oct 25, 2022
2 parents 45acb8e + e933de0 commit 903ebd4
Show file tree
Hide file tree
Showing 56 changed files with 1,149 additions and 394 deletions.
3 changes: 3 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4649,6 +4649,7 @@ components:
- destination
- status
- isSyncing
- schemaChange
properties:
connectionId:
$ref: "#/components/schemas/ConnectionId"
Expand All @@ -4674,6 +4675,8 @@ components:
$ref: "#/components/schemas/JobStatus"
isSyncing:
type: boolean
schemaChange:
$ref: "#/components/schemas/SchemaChange"
WebBackendConnectionRead:
type: object
required:
Expand Down
Empty file.
1 change: 1 addition & 0 deletions airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"Tracker": "https://github.com/airbytehq/airbyte/issues",
},
packages=find_packages(exclude=("unit_tests",)),
package_data={"airbyte_cdk": ["py.typed"]},
install_requires=[
"backoff",
# pinned to the last working version for us temporarily while we fix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,23 @@ public List<SourceConnection> listSourceConnection() throws JsonValidationExcept
return persistence.listConfigs(ConfigSchema.SOURCE_CONNECTION, SourceConnection.class);
}

/**
* Returns all sources for a workspace. Does not contain secrets.
*
* @param workspaceId - id of the workspace
* @return sources
* @throws JsonValidationException - throws if returned sources are invalid
* @throws IOException - you never know when you IO
*/
public List<SourceConnection> listWorkspaceSourceConnection(final UUID workspaceId) throws IOException {
final Result<Record> result = database.query(ctx -> ctx.select(asterisk())
.from(ACTOR)
.where(ACTOR.ACTOR_TYPE.eq(ActorType.source))
.and(ACTOR.WORKSPACE_ID.eq(workspaceId))
.andNot(ACTOR.TOMBSTONE).fetch());
return result.stream().map(DbConverter::buildSourceConnection).collect(Collectors.toList());
}

/**
* Returns destination with a given id. Does not contain secrets. To hydrate with secrets see
* { @link SecretsRepositoryReader#getDestinationConnectionWithSecrets(final UUID destinationId) }.
Expand Down Expand Up @@ -975,6 +992,22 @@ public Optional<ActorCatalogFetchEvent> getMostRecentActorCatalogFetchEventForSo
return records.stream().findFirst().map(DbConverter::buildActorCatalogFetchEvent);
}

public Map<UUID, ActorCatalogFetchEvent> getMostRecentActorCatalogFetchEventForSources(final List<UUID> sourceIds)
throws IOException {

return database.query(ctx -> ctx.fetch(
"""
select actor_catalog_id, actor_id from
(select actor_catalog_id, actor_id, rank() over (partition by actor_id order by created_at desc) as creation_order_rank
from public.actor_catalog_fetch_event
) table_with_rank
where creation_order_rank = 1;
"""))
.stream().map(DbConverter::buildActorCatalogFetchEvent)
.collect(Collectors.toMap(record -> record.getActorId(),
record -> record));
}

/**
* Stores source catalog information.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ public static ActorCatalog buildActorCatalog(final Record record) {

public static ActorCatalogFetchEvent buildActorCatalogFetchEvent(final Record record) {
return new ActorCatalogFetchEvent()
.withActorId(record.get(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID))
.withActorCatalogId(record.get(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,15 @@ void testListPublicSourceDefinitions() throws IOException {
assertEquals(List.of(MockData.publicSourceDefinition()), actualDefinitions);
}

@Test
void testListWorkspaceSources() throws IOException {
UUID workspaceId = MockData.standardWorkspaces().get(1).getWorkspaceId();
final List<SourceConnection> expectedSources = MockData.sourceConnections().stream()
.filter(source -> source.getWorkspaceId().equals(workspaceId)).collect(Collectors.toList());
final List<SourceConnection> sources = configRepository.listWorkspaceSourceConnection(workspaceId);
assertThat(sources).hasSameElementsAs(expectedSources);
}

@Test
void testSourceDefinitionGrants() throws IOException {
final UUID workspaceId = MockData.standardWorkspaces().get(0).getWorkspaceId();
Expand Down Expand Up @@ -506,17 +515,17 @@ void testGetGeographyForConnection() throws IOException {
}

@Test
void testGetMostRecentActorCatalogFetchEventForSources() throws SQLException, IOException, JsonValidationException {
void testGetMostRecentActorCatalogFetchEventForSource() throws SQLException, IOException, JsonValidationException {
for (final ActorCatalog actorCatalog : MockData.actorCatalogs()) {
configPersistence.writeConfig(ConfigSchema.ACTOR_CATALOG, actorCatalog.getId().toString(), actorCatalog);
}

OffsetDateTime now = OffsetDateTime.now();
OffsetDateTime yesterday = now.minusDays(1l);
final OffsetDateTime now = OffsetDateTime.now();
final OffsetDateTime yesterday = now.minusDays(1l);

List<ActorCatalogFetchEvent> fetchEvents = MockData.actorCatalogFetchEventsSameSource();
ActorCatalogFetchEvent fetchEvent1 = fetchEvents.get(0);
ActorCatalogFetchEvent fetchEvent2 = fetchEvents.get(1);
final List<ActorCatalogFetchEvent> fetchEvents = MockData.actorCatalogFetchEventsSameSource();
final ActorCatalogFetchEvent fetchEvent1 = fetchEvents.get(0);
final ActorCatalogFetchEvent fetchEvent2 = fetchEvents.get(1);

database.transaction(ctx -> {
insertCatalogFetchEvent(
Expand All @@ -533,13 +542,37 @@ void testGetMostRecentActorCatalogFetchEventForSources() throws SQLException, IO
return null;
});

Optional<ActorCatalogFetchEvent> result =
final Optional<ActorCatalogFetchEvent> result =
configRepository.getMostRecentActorCatalogFetchEventForSource(fetchEvent1.getActorId());

assertEquals(fetchEvent2.getActorCatalogId(), result.get().getActorCatalogId());
}

private void insertCatalogFetchEvent(DSLContext ctx, UUID sourceId, UUID catalogId, OffsetDateTime creationDate) {
@Test
void testGetMostRecentActorCatalogFetchEventForSources() throws SQLException, IOException, JsonValidationException {
for (final ActorCatalog actorCatalog : MockData.actorCatalogs()) {
configPersistence.writeConfig(ConfigSchema.ACTOR_CATALOG, actorCatalog.getId().toString(), actorCatalog);
}

database.transaction(ctx -> {
MockData.actorCatalogFetchEventsForAggregationTest().forEach(actorCatalogFetchEvent -> insertCatalogFetchEvent(
ctx,
actorCatalogFetchEvent.getActorCatalogFetchEvent().getActorId(),
actorCatalogFetchEvent.getActorCatalogFetchEvent().getActorCatalogId(),
actorCatalogFetchEvent.getCreatedAt()));

return null;
});

final Map<UUID, ActorCatalogFetchEvent> result =
configRepository.getMostRecentActorCatalogFetchEventForSources(List.of(MockData.SOURCE_ID_1,
MockData.SOURCE_ID_2));

assertEquals(MockData.ACTOR_CATALOG_ID_1, result.get(MockData.SOURCE_ID_1).getActorCatalogId());
assertEquals(MockData.ACTOR_CATALOG_ID_3, result.get(MockData.SOURCE_ID_2).getActorCatalogId());
}

private void insertCatalogFetchEvent(final DSLContext ctx, final UUID sourceId, final UUID catalogId, final OffsetDateTime creationDate) {
ctx.insertInto(ACTOR_CATALOG_FETCH_EVENT)
.columns(
ACTOR_CATALOG_FETCH_EVENT.ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,19 @@
import io.airbyte.protocol.models.SyncMode;
import java.net.URI;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.Data;

public class MockData {

private static final UUID WORKSPACE_ID_1 = UUID.randomUUID();
public static final UUID WORKSPACE_ID_1 = UUID.randomUUID();
private static final UUID WORKSPACE_ID_2 = UUID.randomUUID();
private static final UUID WORKSPACE_ID_3 = UUID.randomUUID();
private static final UUID WORKSPACE_CUSTOMER_ID = UUID.randomUUID();
Expand All @@ -72,8 +74,8 @@ public class MockData {
private static final UUID DESTINATION_DEFINITION_ID_2 = UUID.randomUUID();
private static final UUID DESTINATION_DEFINITION_ID_3 = UUID.randomUUID();
private static final UUID DESTINATION_DEFINITION_ID_4 = UUID.randomUUID();
private static final UUID SOURCE_ID_1 = UUID.randomUUID();
private static final UUID SOURCE_ID_2 = UUID.randomUUID();
public static final UUID SOURCE_ID_1 = UUID.randomUUID();
public static final UUID SOURCE_ID_2 = UUID.randomUUID();
private static final UUID SOURCE_ID_3 = UUID.randomUUID();
private static final UUID DESTINATION_ID_1 = UUID.randomUUID();
private static final UUID DESTINATION_ID_2 = UUID.randomUUID();
Expand All @@ -91,11 +93,12 @@ public class MockData {
private static final UUID SOURCE_OAUTH_PARAMETER_ID_2 = UUID.randomUUID();
private static final UUID DESTINATION_OAUTH_PARAMETER_ID_1 = UUID.randomUUID();
private static final UUID DESTINATION_OAUTH_PARAMETER_ID_2 = UUID.randomUUID();
private static final UUID ACTOR_CATALOG_ID_1 = UUID.randomUUID();
public static final UUID ACTOR_CATALOG_ID_1 = UUID.randomUUID();
private static final UUID ACTOR_CATALOG_ID_2 = UUID.randomUUID();
private static final UUID ACTOR_CATALOG_ID_3 = UUID.randomUUID();
public static final UUID ACTOR_CATALOG_ID_3 = UUID.randomUUID();
private static final UUID ACTOR_CATALOG_FETCH_EVENT_ID_1 = UUID.randomUUID();
private static final UUID ACTOR_CATALOG_FETCH_EVENT_ID_2 = UUID.randomUUID();
private static final UUID ACTOR_CATALOG_FETCH_EVENT_ID_3 = UUID.randomUUID();

public static final String MOCK_SERVICE_ACCOUNT_1 = "{\n"
+ " \"type\" : \"service_account\",\n"
Expand Down Expand Up @@ -622,8 +625,8 @@ public static List<ActorCatalogFetchEvent> actorCatalogFetchEvents() {
.withId(ACTOR_CATALOG_FETCH_EVENT_ID_2)
.withActorCatalogId(ACTOR_CATALOG_ID_2)
.withActorId(SOURCE_ID_2)
.withConfigHash("1394")
.withConnectorVersion("1.2.0");
.withConfigHash("1395")
.withConnectorVersion("1.42.0");
return Arrays.asList(actorCatalogFetchEvent1, actorCatalogFetchEvent2);
}

Expand All @@ -643,6 +646,42 @@ public static List<ActorCatalogFetchEvent> actorCatalogFetchEventsSameSource() {
return Arrays.asList(actorCatalogFetchEvent1, actorCatalogFetchEvent2);
}

@Data
public static class ActorCatalogFetchEventWithCreationDate {

private final ActorCatalogFetchEvent actorCatalogFetchEvent;
private final OffsetDateTime createdAt;

}

public static List<ActorCatalogFetchEventWithCreationDate> actorCatalogFetchEventsForAggregationTest() {
final OffsetDateTime now = OffsetDateTime.now();
final OffsetDateTime yesterday = OffsetDateTime.now().minusDays(1l);

final ActorCatalogFetchEvent actorCatalogFetchEvent1 = new ActorCatalogFetchEvent()
.withId(ACTOR_CATALOG_FETCH_EVENT_ID_1)
.withActorCatalogId(ACTOR_CATALOG_ID_1)
.withActorId(SOURCE_ID_1)
.withConfigHash("CONFIG_HASH")
.withConnectorVersion("1.0.0");
final ActorCatalogFetchEvent actorCatalogFetchEvent2 = new ActorCatalogFetchEvent()
.withId(ACTOR_CATALOG_FETCH_EVENT_ID_2)
.withActorCatalogId(ACTOR_CATALOG_ID_2)
.withActorId(SOURCE_ID_2)
.withConfigHash("1394")
.withConnectorVersion("1.2.0");
final ActorCatalogFetchEvent actorCatalogFetchEvent3 = new ActorCatalogFetchEvent()
.withId(ACTOR_CATALOG_FETCH_EVENT_ID_3)
.withActorCatalogId(ACTOR_CATALOG_ID_3)
.withActorId(SOURCE_ID_2)
.withConfigHash("1394")
.withConnectorVersion("1.2.0");
return Arrays.asList(
new ActorCatalogFetchEventWithCreationDate(actorCatalogFetchEvent1, now),
new ActorCatalogFetchEventWithCreationDate(actorCatalogFetchEvent2, yesterday),
new ActorCatalogFetchEventWithCreationDate(actorCatalogFetchEvent3, now));
}

public static List<WorkspaceServiceAccount> workspaceServiceAccounts() {
final WorkspaceServiceAccount workspaceServiceAccount = new WorkspaceServiceAccount()
.withWorkspaceId(WORKSPACE_ID_1)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.2.12
Declare `bypass_reason` field in test configuration. [#18364](https://github.com/airbytehq/airbyte/pull/18364).

## 0.2.11
Declare `test_strictness_level` field in test configuration. [#18218](https://github.com/airbytehq/airbyte/pull/18218).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./
COPY source_acceptance_test ./source_acceptance_test
RUN pip install .

LABEL io.airbyte.version=0.2.11
LABEL io.airbyte.version=0.2.12
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"]
13 changes: 12 additions & 1 deletion airbyte-integrations/bases/source-acceptance-test/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,15 @@ These iterations are more conveniently achieved by remaining in the current dire
12. Open a PR on our GitHub repository
13. Run the unit test on the CI by running `/test connector=bases/source-acceptance-test` in a GitHub comment
14. Publish the new SAT version if your PR is approved by running `/publish connector=bases/source-acceptance-test auto-bump-version=false` in a GitHub comment
15. Merge your PR
15. Merge your PR

## Migrating `acceptance-test-config.yml` to latest configuration format
We introduced changes in the structure of `acceptance-test-config.yml` files in version 0.2.12.
The *legacy* configuration format is still supported but should be deprecated soon.
To migrate a legacy configuration to the latest configuration format please run:

```bash
python -m venv .venv # If you don't have a virtualenv already
source ./.venv/bin/activate # If you're not in your virtualenv already
python source_acceptance_test/utils/config_migration.py ../../connectors/source-to-migrate/acceptance-test-config.yml
```
Loading

0 comments on commit 903ebd4

Please sign in to comment.