Skip to content

Commit

Permalink
fix column selection handling for syncs without cursors (#21647)
Browse files Browse the repository at this point in the history
  • Loading branch information
mfsiega-airbyte authored Jan 20, 2023
1 parent 8531530 commit 5257f69
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,21 @@ private static io.airbyte.protocol.models.AirbyteStream toProtocol(final Airbyte
}
}
// Only include the selected fields.
// NOTE: we verified above that each selected field has at least one element in the field path.
final Set<String> selectedFieldNames =
config.getSelectedFields().stream().map((field) -> field.getFieldPath().get(0)).collect(Collectors.toSet());
// TODO(mfsiega-airbyte): we only check the top level of the cursor/primary key fields because we
// don't support filtering nested fields yet.
if (!selectedFieldNames.contains(config.getCursorField().get(0)) && config.getSyncMode().equals(SyncMode.INCREMENTAL)) {
if (config.getSyncMode().equals(SyncMode.INCREMENTAL) // INCREMENTAL sync mode, AND
&& !config.getCursorField().isEmpty() // There is a cursor configured, AND
&& !selectedFieldNames.contains(config.getCursorField().get(0))) { // The cursor isn't in the selected fields.
throw new JsonValidationException("Cursor field cannot be de-selected in INCREMENTAL syncs");
}
for (final List<String> primaryKeyComponent : config.getPrimaryKey()) {
if (!selectedFieldNames.contains(primaryKeyComponent.get(0)) && config.getDestinationSyncMode().equals(DestinationSyncMode.APPEND_DEDUP)) {
throw new JsonValidationException("Primary key field cannot be de-selected in DEDUP mode");
if (config.getDestinationSyncMode().equals(DestinationSyncMode.APPEND_DEDUP)) {
for (final List<String> primaryKeyComponent : config.getPrimaryKey()) {
if (!selectedFieldNames.contains(primaryKeyComponent.get(0))) {
throw new JsonValidationException("Primary key field cannot be de-selected in DEDUP mode");
}
}
}
for (final String selectedFieldName : selectedFieldNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,20 +221,8 @@ void setUp() throws JsonValidationException, ConfigNotFoundException, IOExceptio
@Nested
class CreateConnection {

@Test
void testCreateConnection() throws JsonValidationException, ConfigNotFoundException, IOException {

final AirbyteCatalog catalog = ConnectionHelpers.generateBasicApiCatalog();

// set a defaultGeography on the workspace as EU, but expect connection to be
// created AUTO because the ConnectionCreate geography takes precedence over the workspace
// defaultGeography.
final StandardWorkspace workspace = new StandardWorkspace()
.withWorkspaceId(workspaceId)
.withDefaultGeography(Geography.EU);
when(configRepository.getStandardWorkspaceNoSecrets(workspaceId, true)).thenReturn(workspace);

final ConnectionCreate connectionCreate = new ConnectionCreate()
private ConnectionCreate buildConnectionCreateRequest(final StandardSync standardSync, final AirbyteCatalog catalog) {
return new ConnectionCreate()
.sourceId(standardSync.getSourceId())
.destinationId(standardSync.getDestinationId())
.operationIds(standardSync.getOperationIds())
Expand All @@ -252,6 +240,22 @@ void testCreateConnection() throws JsonValidationException, ConfigNotFoundExcept
.memoryLimit(standardSync.getResourceRequirements().getMemoryLimit()))
.sourceCatalogId(standardSync.getSourceCatalogId())
.geography(ApiPojoConverters.toApiGeography(standardSync.getGeography()));
}

@Test
void testCreateConnection() throws JsonValidationException, ConfigNotFoundException, IOException {

final AirbyteCatalog catalog = ConnectionHelpers.generateBasicApiCatalog();

// set a defaultGeography on the workspace as EU, but expect connection to be
// created AUTO because the ConnectionCreate geography takes precedence over the workspace
// defaultGeography.
final StandardWorkspace workspace = new StandardWorkspace()
.withWorkspaceId(workspaceId)
.withDefaultGeography(Geography.EU);
when(configRepository.getStandardWorkspaceNoSecrets(workspaceId, true)).thenReturn(workspace);

final ConnectionCreate connectionCreate = buildConnectionCreateRequest(standardSync, catalog);

final ConnectionRead actualConnectionRead = connectionsHandler.createConnection(connectionCreate);

Expand All @@ -277,23 +281,7 @@ void testCreateConnectionUsesDefaultGeographyFromWorkspace() throws JsonValidati
final AirbyteCatalog catalog = ConnectionHelpers.generateBasicApiCatalog();

// don't set a geography on the ConnectionCreate to force inheritance from workspace default
final ConnectionCreate connectionCreate = new ConnectionCreate()
.sourceId(standardSync.getSourceId())
.destinationId(standardSync.getDestinationId())
.operationIds(standardSync.getOperationIds())
.name(PRESTO_TO_HUDI)
.namespaceDefinition(NamespaceDefinitionType.SOURCE)
.namespaceFormat(null)
.prefix(PRESTO_TO_HUDI_PREFIX)
.status(ConnectionStatus.ACTIVE)
.schedule(ConnectionHelpers.generateBasicConnectionSchedule())
.syncCatalog(catalog)
.resourceRequirements(new io.airbyte.api.model.generated.ResourceRequirements()
.cpuRequest(standardSync.getResourceRequirements().getCpuRequest())
.cpuLimit(standardSync.getResourceRequirements().getCpuLimit())
.memoryRequest(standardSync.getResourceRequirements().getMemoryRequest())
.memoryLimit(standardSync.getResourceRequirements().getMemoryLimit()))
.sourceCatalogId(standardSync.getSourceCatalogId());
final ConnectionCreate connectionCreate = buildConnectionCreateRequest(standardSync, catalog).geography(null);

// set the workspace default to EU
final StandardWorkspace workspace = new StandardWorkspace()
Expand All @@ -319,29 +307,12 @@ void testCreateConnectionWithSelectedFields() throws IOException, JsonValidation
.withDefaultGeography(Geography.AUTO);
when(configRepository.getStandardWorkspaceNoSecrets(workspaceId, true)).thenReturn(workspace);

final AirbyteCatalog catalog = ConnectionHelpers.generateApiCatalogWithTwoFields();
final AirbyteCatalog catalogWithSelectedFields = ConnectionHelpers.generateApiCatalogWithTwoFields();
// Only select one of the two fields.
catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true)
catalogWithSelectedFields.getStreams().get(0).getConfig().fieldSelectionEnabled(true)
.selectedFields(List.of(new SelectedFieldInfo().addFieldPathItem(FIELD_NAME)));

final ConnectionCreate connectionCreate = new ConnectionCreate()
.sourceId(standardSync.getSourceId())
.destinationId(standardSync.getDestinationId())
.operationIds(standardSync.getOperationIds())
.name(PRESTO_TO_HUDI)
.namespaceDefinition(NamespaceDefinitionType.SOURCE)
.namespaceFormat(null)
.prefix(PRESTO_TO_HUDI_PREFIX)
.status(ConnectionStatus.ACTIVE)
.schedule(ConnectionHelpers.generateBasicConnectionSchedule())
.syncCatalog(catalog)
.resourceRequirements(new io.airbyte.api.model.generated.ResourceRequirements()
.cpuRequest(standardSync.getResourceRequirements().getCpuRequest())
.cpuLimit(standardSync.getResourceRequirements().getCpuLimit())
.memoryRequest(standardSync.getResourceRequirements().getMemoryRequest())
.memoryLimit(standardSync.getResourceRequirements().getMemoryLimit()))
.sourceCatalogId(standardSync.getSourceCatalogId())
.geography(ApiPojoConverters.toApiGeography(standardSync.getGeography()));
final ConnectionCreate connectionCreate = buildConnectionCreateRequest(standardSync, catalogWithSelectedFields);

final ConnectionRead actualConnectionRead = connectionsHandler.createConnection(connectionCreate);

Expand All @@ -354,6 +325,35 @@ void testCreateConnectionWithSelectedFields() throws IOException, JsonValidation
verify(configRepository).writeStandardSync(standardSync);
}

@Test
void testCreateFullRefreshConnectionWithSelectedFields() throws IOException, JsonValidationException, ConfigNotFoundException {
final StandardWorkspace workspace = new StandardWorkspace()
.withWorkspaceId(workspaceId)
.withDefaultGeography(Geography.AUTO);
when(configRepository.getStandardWorkspaceNoSecrets(workspaceId, true)).thenReturn(workspace);

final AirbyteCatalog fullRefreshCatalogWithSelectedFields = ConnectionHelpers.generateApiCatalogWithTwoFields();
fullRefreshCatalogWithSelectedFields.getStreams().get(0).getConfig()
.fieldSelectionEnabled(true)
.selectedFields(List.of(new SelectedFieldInfo().addFieldPathItem(FIELD_NAME)))
.cursorField(null)
.syncMode(SyncMode.FULL_REFRESH);

final ConnectionCreate connectionCreate = buildConnectionCreateRequest(standardSync, fullRefreshCatalogWithSelectedFields);

final ConnectionRead actualConnectionRead = connectionsHandler.createConnection(connectionCreate);

final ConnectionRead expectedConnectionRead = ConnectionHelpers.generateExpectedConnectionRead(standardSync);

assertEquals(expectedConnectionRead, actualConnectionRead);

standardSync
.withFieldSelectionData(new FieldSelectionData().withAdditionalProperty("null/users-data0", true))
.getCatalog().getStreams().get(0).withSyncMode(io.airbyte.protocol.models.SyncMode.FULL_REFRESH).withCursorField(null);

verify(configRepository).writeStandardSync(standardSync);
}

@Test
void testFieldSelectionRemoveCursorFails() throws JsonValidationException, ConfigNotFoundException, IOException {
// Test that if we try to de-select a field that's being used for the cursor, the request will fail.
Expand All @@ -365,7 +365,8 @@ void testFieldSelectionRemoveCursorFails() throws JsonValidationException, Confi
catalogForUpdate.getStreams().get(0).getConfig()
.fieldSelectionEnabled(true)
.selectedFields(List.of(new SelectedFieldInfo().addFieldPathItem(FIELD_NAME)))
.cursorField(List.of(SECOND_FIELD_NAME));
.cursorField(List.of(SECOND_FIELD_NAME))
.syncMode(SyncMode.INCREMENTAL);

final ConnectionUpdate connectionUpdate = new ConnectionUpdate()
.connectionId(standardSync.getConnectionId())
Expand Down

0 comments on commit 5257f69

Please sign in to comment.