Skip to content

Commit

Permalink
Fix the tag in openAPI (#18445)
Browse files Browse the repository at this point in the history
* Fix the tag

* remove unused
  • Loading branch information
benmoriceau authored and nataly committed Nov 3, 2022
1 parent a810170 commit ebc282a
Show file tree
Hide file tree
Showing 8 changed files with 278 additions and 266 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.generated.SourceDefinitionApi;
import io.airbyte.api.client.generated.SourceDefinitionSpecificationApi;
import io.airbyte.api.client.generated.StateApi;
import io.airbyte.api.client.generated.WorkspaceApi;
import io.airbyte.api.client.invoker.generated.ApiClient;

Expand Down Expand Up @@ -46,8 +47,8 @@ public class AirbyteApiClient {
private final WorkspaceApi workspaceApi;
private final HealthApi healthApi;
private final DbMigrationApi dbMigrationApi;

private final AttemptApi attemptApi;
private final StateApi stateApi;

public AirbyteApiClient(final ApiClient apiClient) {
connectionApi = new ConnectionApi(apiClient);
Expand All @@ -64,6 +65,7 @@ public AirbyteApiClient(final ApiClient apiClient) {
healthApi = new HealthApi(apiClient);
dbMigrationApi = new DbMigrationApi(apiClient);
attemptApi = new AttemptApi(apiClient);
stateApi = new StateApi(apiClient);
}

public ConnectionApi getConnectionApi() {
Expand Down Expand Up @@ -122,4 +124,8 @@ public AttemptApi getAttemptApi() {
return attemptApi;
}

public StateApi getStateApi() {
return stateApi;
}

}
8 changes: 5 additions & 3 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ tags:
description: Export/Import Airbyte Configuration and Database resources.
- name: attempt
description: Interactions with attempt related resources.
- name: state
description: Interactions with state related resources.

paths:
/v1/workspaces/create:
Expand Down Expand Up @@ -1389,7 +1391,7 @@ paths:
/v1/state/get:
post:
tags:
- connection
- state
summary: Fetch the current state for a connection.
operationId: getState
requestBody:
Expand All @@ -1412,7 +1414,7 @@ paths:
/v1/state/create_or_update:
post:
tags:
- connection
- state
- internal
summary: Create or update the state for a connection.
operationId: createOrUpdateState
Expand Down Expand Up @@ -1994,7 +1996,7 @@ paths:
/v1/web_backend/state/get_type:
post:
tags:
- connection
- web_backend
summary: Fetch the current state type for a connection.
operationId: getStateType
requestBody:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,11 +797,11 @@ public static JobRead waitWhileJobHasStatus(final JobsApi jobsApi,
@SuppressWarnings("BusyWait")
public static ConnectionState waitForConnectionState(final AirbyteApiClient apiClient, final UUID connectionId)
throws ApiException, InterruptedException {
ConnectionState connectionState = apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId));
ConnectionState connectionState = apiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId));
int count = 0;
while (count < 60 && (connectionState.getState() == null || connectionState.getState().isNull())) {
LOGGER.info("fetching connection state. attempt: {}", count++);
connectionState = apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId));
connectionState = apiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId));
sleep(1000);
}
return connectionState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ void testIncrementalSync() throws Exception {
final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob());
LOGGER.info(STATE_AFTER_SYNC_ONE, apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));
LOGGER.info(STATE_AFTER_SYNC_ONE, apiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);

Expand All @@ -631,7 +631,7 @@ void testIncrementalSync() throws Exception {
final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob());
LOGGER.info(STATE_AFTER_SYNC_TWO, apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));
LOGGER.info(STATE_AFTER_SYNC_TWO, apiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

testHarness.assertRawDestinationContains(expectedRecords, new SchemaTableNamePair(PUBLIC, STREAM_NAME));

Expand All @@ -642,7 +642,7 @@ void testIncrementalSync() throws Exception {
waitWhileJobHasStatus(apiClient.getJobsApi(), jobInfoRead.getJob(),
Sets.newHashSet(JobStatus.PENDING, JobStatus.RUNNING, JobStatus.INCOMPLETE, JobStatus.FAILED));

LOGGER.info("state after reset: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));
LOGGER.info("state after reset: {}", apiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

testHarness.assertRawDestinationContains(Collections.emptyList(), new SchemaTableNamePair(PUBLIC,
STREAM_NAME));
Expand All @@ -652,7 +652,7 @@ void testIncrementalSync() throws Exception {
final JobInfoRead connectionSyncRead3 =
apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead3.getJob());
LOGGER.info("state after sync 3: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));
LOGGER.info("state after sync 3: {}", apiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);

Expand Down Expand Up @@ -906,7 +906,7 @@ void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws Except
final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob());
LOGGER.info(STATE_AFTER_SYNC_ONE, apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));
LOGGER.info(STATE_AFTER_SYNC_ONE, apiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);

Expand All @@ -930,7 +930,7 @@ void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws Except
final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob());
LOGGER.info(STATE_AFTER_SYNC_TWO, apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));
LOGGER.info(STATE_AFTER_SYNC_TWO, apiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

testHarness.assertRawDestinationContains(expectedRecords, new SchemaTableNamePair(PUBLIC, STREAM_NAME));

Expand All @@ -940,7 +940,7 @@ void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws Except
waitWhileJobHasStatus(apiClient.getJobsApi(), jobInfoRead.getJob(),
Sets.newHashSet(JobStatus.PENDING, JobStatus.RUNNING, JobStatus.INCOMPLETE, JobStatus.FAILED));

LOGGER.info("state after reset: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));
LOGGER.info("state after reset: {}", apiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

testHarness.assertRawDestinationContains(Collections.emptyList(), new SchemaTableNamePair(PUBLIC,
STREAM_NAME));
Expand All @@ -952,7 +952,7 @@ void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws Except
final JobInfoRead connectionSyncRead3 =
apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead3.getJob());
final ConnectionState state = apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId));
final ConnectionState state = apiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId));
LOGGER.info("state after sync 3: {}", state);

testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);
Expand Down Expand Up @@ -995,7 +995,7 @@ void testSyncAfterUpgradeToPerStreamStateWithNoNewData(final TestInfo testInfo)
final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob());
LOGGER.info(STATE_AFTER_SYNC_ONE, apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));
LOGGER.info(STATE_AFTER_SYNC_ONE, apiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);

Expand All @@ -1008,7 +1008,7 @@ void testSyncAfterUpgradeToPerStreamStateWithNoNewData(final TestInfo testInfo)
final JobInfoRead connectionSyncRead2 =
apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob());
LOGGER.info(STATE_AFTER_SYNC_TWO, apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));
LOGGER.info(STATE_AFTER_SYNC_TWO, apiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

final JobInfoRead syncJob = apiClient.getJobsApi().getJobInfo(new JobIdRequestBody().id(connectionSyncRead2.getJob().getId()));
final Optional<AttemptInfoRead> result = syncJob.getAttempts().stream()
Expand Down Expand Up @@ -1086,7 +1086,7 @@ void testResetAllWhenSchemaIsModifiedForLegacySource() throws Exception {
return null;
});
final ConnectionState initSyncState =
apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connection.getConnectionId()));
apiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connection.getConnectionId()));
LOGGER.info("ConnectionState after the initial sync: " + initSyncState.toString());

testHarness.assertSourceAndDestinationDbInSync(false);
Expand Down Expand Up @@ -1127,7 +1127,7 @@ void testResetAllWhenSchemaIsModifiedForLegacySource() throws Exception {
return null;
});
final ConnectionState postResetState =
apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connection.getConnectionId()));
apiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connection.getConnectionId()));
LOGGER.info("ConnectionState after the update request: {}", postResetState.toString());

// Wait until the sync from the UpdateConnection is finished
Expand All @@ -1136,7 +1136,7 @@ void testResetAllWhenSchemaIsModifiedForLegacySource() throws Exception {
waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate);

final ConnectionState postUpdateState =
apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connection.getConnectionId()));
apiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connection.getConnectionId()));
LOGGER.info("ConnectionState after the final sync: {}", postUpdateState.toString());

LOGGER.info("Inspecting DBs After the final sync");
Expand Down Expand Up @@ -1202,7 +1202,7 @@ void testIncrementalSyncMultipleStreams() throws Exception {
final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob());
LOGGER.info(STATE_AFTER_SYNC_ONE, apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));
LOGGER.info(STATE_AFTER_SYNC_ONE, apiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);

Expand Down Expand Up @@ -1232,7 +1232,7 @@ void testIncrementalSyncMultipleStreams() throws Exception {
final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob());
LOGGER.info(STATE_AFTER_SYNC_TWO, apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));
LOGGER.info(STATE_AFTER_SYNC_TWO, apiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

testHarness.assertRawDestinationContains(expectedRecordsIdAndName, new SchemaTableNamePair(PUBLIC_SCHEMA_NAME, STREAM_NAME));
testHarness.assertRawDestinationContains(expectedRecordsCoolEmployees, new SchemaTableNamePair(STAGING_SCHEMA_NAME, COOL_EMPLOYEES_TABLE_NAME));
Expand All @@ -1245,7 +1245,7 @@ void testIncrementalSyncMultipleStreams() throws Exception {
waitWhileJobHasStatus(apiClient.getJobsApi(), jobInfoRead.getJob(),
Sets.newHashSet(JobStatus.PENDING, JobStatus.RUNNING, JobStatus.INCOMPLETE, JobStatus.FAILED));

LOGGER.info("state after reset: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));
LOGGER.info("state after reset: {}", apiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

testHarness.assertRawDestinationContains(Collections.emptyList(), new SchemaTableNamePair(PUBLIC,
STREAM_NAME));
Expand All @@ -1255,7 +1255,7 @@ void testIncrementalSyncMultipleStreams() throws Exception {
final JobInfoRead connectionSyncRead3 =
apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead3.getJob());
LOGGER.info("state after sync 3: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));
LOGGER.info("state after sync 3: {}", apiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);

Expand Down Expand Up @@ -1407,7 +1407,7 @@ void testPartialResetResetAllWhenSchemaIsModified(final TestInfo testInfo) throw
}

private void assertStreamStateContainsStream(final UUID connectionId, final List<StreamDescriptor> expectedStreamDescriptors) throws ApiException {
final ConnectionState state = apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId));
final ConnectionState state = apiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId));
final List<StreamDescriptor> streamDescriptors = state.getStreamState().stream().map(StreamState::getStreamDescriptor).toList();

Assertions.assertTrue(streamDescriptors.containsAll(expectedStreamDescriptors) && expectedStreamDescriptors.containsAll(streamDescriptors));
Expand Down Expand Up @@ -1441,14 +1441,14 @@ private JobRead waitUntilTheNextJobIsStarted(final UUID connectionId) throws Exc
* @param maxRetries the number of times to retry
* @throws InterruptedException
*/
private void waitForSuccessfulJobWithRetries(final UUID connectionId, int maxRetries) throws InterruptedException {
private void waitForSuccessfulJobWithRetries(final UUID connectionId, final int maxRetries) throws InterruptedException {
int i;
for (i = 0; i < maxRetries; i++) {
try {
final JobRead jobInfo = testHarness.getMostRecentSyncJobId(connectionId);
waitForSuccessfulJob(apiClient.getJobsApi(), jobInfo);
break;
} catch (Exception e) {
} catch (final Exception e) {
LOGGER.info("Something went wrong querying jobs API, retrying...");
}
sleep(Duration.ofSeconds(30).toMillis());
Expand Down
Loading

0 comments on commit ebc282a

Please sign in to comment.