Skip to content

Commit

Permalink
Merge branch 'master' into augustin/sat/capture-config-messages
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere authored Dec 2, 2022
2 parents f879c01 + 764496e commit 0cd81af
Show file tree
Hide file tree
Showing 75 changed files with 1,743 additions and 1,130 deletions.
2 changes: 1 addition & 1 deletion airbyte-bootloader/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ dependencies {
implementation libs.flyway.core

testImplementation libs.platform.testcontainers.postgresql
testImplementation 'uk.org.webcompere:system-stubs-jupiter:1.2.0'
testImplementation 'uk.org.webcompere:system-stubs-jupiter:2.0.1'
}

application {
Expand Down
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.11.2
Low-code: Include the HTTP method used by the request in logging output of the `airbyte-cdk`

## 0.11.1
Low-code: Fix the component manifest schema to and validate check instead of checker

Expand Down
6 changes: 4 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def _read_incremental(
sync_mode=SyncMode.incremental,
stream_state=stream_state,
)
logger.debug(f"Processing stream slices for {stream_name}", extra={"stream_slices": slices})
logger.debug(f"Processing stream slices for {stream_name} (sync_mode: incremental)", extra={"stream_slices": slices})

total_records_counter = 0
has_slices = False
Expand Down Expand Up @@ -276,7 +276,9 @@ def _read_full_refresh(
internal_config: InternalConfig,
) -> Iterator[AirbyteMessage]:
slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field)
logger.debug(f"Processing stream slices for {configured_stream.stream.name}", extra={"stream_slices": slices})
logger.debug(
f"Processing stream slices for {configured_stream.stream.name} (sync_mode: full_refresh)", extra={"stream_slices": slices}
)
total_records_counter = 0
for _slice in slices:
logger.debug("Processing stream slice", extra={"slice": _slice})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ def parse_records_and_emit_request_and_responses(self, request, response, stream

def _create_trace_message_from_request(self, request: requests.PreparedRequest):
# FIXME: this should return some sort of trace message
request_dict = {"url": request.url, "headers": dict(request.headers), "body": request.body}
request_dict = {"url": request.url, "http_method": request.method, "headers": dict(request.headers), "body": request.body}
log_message = filter_secrets(f"request:{json.dumps(request_dict)}")
return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message))

Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.11.1",
version="0.11.2",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public boolean healthCheck() {

public StandardWorkspace getStandardWorkspaceNoSecrets(final UUID workspaceId, final boolean includeTombstone)
throws JsonValidationException, IOException, ConfigNotFoundException {
return listWorkspaceQuery(includeTombstone)
return listWorkspaceQuery(Optional.of(workspaceId), includeTombstone)
.findFirst()
.orElseThrow(() -> new ConfigNotFoundException(ConfigSchema.STANDARD_WORKSPACE, workspaceId));
}
Expand All @@ -158,13 +158,14 @@ public StandardWorkspace getWorkspaceBySlug(final String slug, final boolean inc
}

public List<StandardWorkspace> listStandardWorkspaces(final boolean includeTombstone) throws IOException {
return listWorkspaceQuery(includeTombstone).toList();
return listWorkspaceQuery(Optional.empty(), includeTombstone).toList();
}

private Stream<StandardWorkspace> listWorkspaceQuery(final boolean includeTombstone) throws IOException {
private Stream<StandardWorkspace> listWorkspaceQuery(final Optional<UUID> workspaceId, final boolean includeTombstone) throws IOException {
return database.query(ctx -> ctx.select(WORKSPACE.asterisk())
.from(WORKSPACE)
.where(includeTombstone ? noCondition() : WORKSPACE.TOMBSTONE.notEqual(true))
.and(workspaceId.map(WORKSPACE.ID::eq).orElse(noCondition()))
.fetch())
.stream()
.map(DbConverter::buildStandardWorkspace);
Expand Down Expand Up @@ -907,7 +908,7 @@ private static StandardSyncOperation buildStandardSyncOperation(final Record rec
}

public StandardSyncOperation getStandardSyncOperation(final UUID operationId) throws JsonValidationException, IOException, ConfigNotFoundException {
return listStandardSyncOperationQuery(Optional.empty())
return listStandardSyncOperationQuery(Optional.of(operationId))
.findFirst()
.orElseThrow(() -> new ConfigNotFoundException(ConfigSchema.STANDARD_SYNC_OPERATION, operationId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ private SourceConnection createSourceConnection(final UUID workspaceId, final St
}

private DestinationConnection createDestinationConnection(final UUID workspaceId, final StandardDestinationDefinition destDef)
throws JsonValidationException, IOException {
throws IOException {
final UUID destinationId = UUID.randomUUID();
final DestinationConnection dest = new DestinationConnection()
.withName("source-" + destinationId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@

class StatePersistenceTest extends BaseConfigDatabaseTest {

private ConfigRepository configRepository;
private StatePersistence statePersistence;
private UUID connectionId;
private static final String STATE_ONE = "\"state1\"";
Expand All @@ -58,7 +57,7 @@ void beforeEach() throws DatabaseInitializationException, IOException, JsonValid
}

private void setupTestData() throws JsonValidationException, IOException {
configRepository = new ConfigRepository(
final ConfigRepository configRepository = new ConfigRepository(
database,
new ActorDefinitionMigrator(new ExceptionWrappingDatabase(database)),
new StandardSyncPersistence(database));
Expand All @@ -68,15 +67,14 @@ private void setupTestData() throws JsonValidationException, IOException {
final SourceConnection sourceConnection = MockData.sourceConnections().get(0);
final StandardDestinationDefinition destinationDefinition = MockData.publicDestinationDefinition();
final DestinationConnection destinationConnection = MockData.destinationConnections().get(0);
final StandardSync sync = MockData.standardSyncs().get(0);
// we don't need sync operations in this test suite, zero them out.
final StandardSync sync = Jsons.clone(MockData.standardSyncs().get(0)).withOperationIds(Collections.emptyList());

configRepository.writeStandardWorkspaceNoSecrets(workspace);
configRepository.writeStandardSourceDefinition(sourceDefinition);
configRepository.writeSourceConnectionNoSecrets(sourceConnection);
configRepository.writeStandardDestinationDefinition(destinationDefinition);
configRepository.writeDestinationConnectionNoSecrets(destinationConnection);
configRepository.writeStandardSyncOperation(MockData.standardSyncOperations().get(0));
configRepository.writeStandardSyncOperation(MockData.standardSyncOperations().get(1));
configRepository.writeStandardSync(sync);

connectionId = sync.getConnectionId();
Expand Down Expand Up @@ -239,7 +237,7 @@ void testGlobalPartialReset() throws IOException {
.withType(AirbyteStateType.GLOBAL)
.withGlobal(new AirbyteGlobalState()
.withSharedState(Jsons.deserialize(GLOBAL_STATE))
.withStreamStates(Arrays.asList(
.withStreamStates(List.of(
new AirbyteStreamState()
.withStreamDescriptor(new StreamDescriptor().withName("s1"))
.withStreamState(Jsons.deserialize(STATE_TWO))))));
Expand Down Expand Up @@ -424,7 +422,7 @@ void testStreamPartialUpdates() throws IOException {
assertEquals(
new StateWrapper()
.withStateType(StateType.STREAM)
.withStateMessages(Arrays.asList(
.withStateMessages(List.of(
new AirbyteStateMessage()
.withType(AirbyteStateType.STREAM)
.withStream(new AirbyteStreamState()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.persistence;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import io.airbyte.config.Geography;
import io.airbyte.config.OperatorDbt;
import io.airbyte.config.OperatorNormalization;
import io.airbyte.config.OperatorNormalization.Option;
import io.airbyte.config.OperatorWebhook;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardSyncOperation.OperatorType;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class SyncOperationPersistenceTest extends BaseConfigDatabaseTest {

private static final UUID WORKSPACE_ID = UUID.randomUUID();
private static final UUID WEBHOOK_CONFIG_ID = UUID.randomUUID();
private static final String WEBHOOK_OPERATION_EXECUTION_URL = "test-webhook-url";
private static final String WEBHOOK_OPERATION_EXECUTION_BODY = "test-webhook-body";

private ConfigRepository configRepository;

private static final StandardSyncOperation DBT_OP = new StandardSyncOperation()
.withName("operation-1")
.withTombstone(false)
.withOperationId(UUID.randomUUID())
.withWorkspaceId(WORKSPACE_ID)
.withOperatorDbt(new OperatorDbt()
.withDbtArguments("dbt-arguments")
.withDockerImage("image-tag")
.withGitRepoBranch("git-repo-branch")
.withGitRepoUrl("git-repo-url"))
.withOperatorNormalization(null)
.withOperatorType(OperatorType.DBT);
private static final StandardSyncOperation NORMALIZATION_OP = new StandardSyncOperation()
.withName("operation-1")
.withTombstone(false)
.withOperationId(UUID.randomUUID())
.withWorkspaceId(WORKSPACE_ID)
.withOperatorDbt(null)
.withOperatorNormalization(new OperatorNormalization().withOption(Option.BASIC))
.withOperatorType(OperatorType.NORMALIZATION);
private static final StandardSyncOperation WEBHOOK_OP = new StandardSyncOperation()
.withName("webhook-operation")
.withTombstone(false)
.withOperationId(UUID.randomUUID())
.withWorkspaceId(WORKSPACE_ID)
.withOperatorType(OperatorType.WEBHOOK)
.withOperatorDbt(null)
.withOperatorNormalization(null)
.withOperatorWebhook(
new OperatorWebhook()
.withWebhookConfigId(WEBHOOK_CONFIG_ID)
.withExecutionUrl(WEBHOOK_OPERATION_EXECUTION_URL)
.withExecutionBody(WEBHOOK_OPERATION_EXECUTION_BODY));
private static final List<StandardSyncOperation> OPS = List.of(DBT_OP, NORMALIZATION_OP, WEBHOOK_OP);

@BeforeEach
void beforeEach() throws Exception {
truncateAllTables();

configRepository = new ConfigRepository(database);
createWorkspace();

for (final StandardSyncOperation op : OPS) {
configRepository.writeStandardSyncOperation(op);
}
}

@Test
void testReadWrite() throws IOException, ConfigNotFoundException, JsonValidationException {
for (final StandardSyncOperation op : OPS) {
assertEquals(op, configRepository.getStandardSyncOperation(op.getOperationId()));
}
}

@Test
void testReadNotExists() {
assertThrows(ConfigNotFoundException.class, () -> configRepository.getStandardSyncOperation(UUID.randomUUID()));
}

@Test
void testList() throws IOException, JsonValidationException {
assertEquals(OPS, configRepository.listStandardSyncOperations());
}

@Test
void testDelete() throws IOException, ConfigNotFoundException, JsonValidationException {
for (final StandardSyncOperation op : OPS) {
assertEquals(op, configRepository.getStandardSyncOperation(op.getOperationId()));
configRepository.deleteStandardSyncOperation(op.getOperationId());
assertThrows(ConfigNotFoundException.class, () -> configRepository.getStandardSyncOperation(UUID.randomUUID()));

}
}

private void createWorkspace() throws IOException, JsonValidationException {
final StandardWorkspace workspace = new StandardWorkspace()
.withWorkspaceId(WORKSPACE_ID)
.withName("Another Workspace")
.withSlug("another-workspace")
.withInitialSetupComplete(true)
.withTombstone(false)
.withDefaultGeography(Geography.AUTO);
configRepository.writeStandardWorkspaceNoSecrets(workspace);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ void setup() {
null));
}

@Test
void testGetWorkspace() throws ConfigNotFoundException, IOException, JsonValidationException {
configRepository.writeStandardWorkspaceNoSecrets(createBaseStandardWorkspace().withWorkspaceId(UUID.randomUUID()));
assertReturnsWorkspace(createBaseStandardWorkspace());
}

@Test
void testWorkspaceWithNullTombstone() throws ConfigNotFoundException, IOException, JsonValidationException {
assertReturnsWorkspace(createBaseStandardWorkspace());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,10 @@
icon: facebook.svg
sourceType: api
releaseStage: alpha
- name: "Sample Data (faker)"
- name: Sample Data (Faker)
sourceDefinitionId: dfd88b22-b603-4c3d-aad7-3701784586b1
dockerRepository: airbyte/source-faker
dockerImageTag: 0.2.1
dockerImageTag: 1.0.0
documentationUrl: https://docs.airbyte.com/integrations/sources/faker
sourceType: api
releaseStage: alpha
Expand Down Expand Up @@ -554,7 +554,7 @@
- name: Gitlab
sourceDefinitionId: 5e6175e5-68e1-4c17-bff9-56103bbb0d80
dockerRepository: airbyte/source-gitlab
dockerImageTag: 0.1.6
dockerImageTag: 0.1.7
documentationUrl: https://docs.airbyte.com/integrations/sources/gitlab
icon: gitlab.svg
sourceType: api
Expand Down Expand Up @@ -753,7 +753,7 @@
- name: Iterable
sourceDefinitionId: 2e875208-0c0b-4ee4-9e92-1cb3156ea799
dockerRepository: airbyte/source-iterable
dockerImageTag: 0.1.21
dockerImageTag: 0.1.22
documentationUrl: https://docs.airbyte.com/integrations/sources/iterable
icon: iterable.svg
sourceType: api
Expand Down Expand Up @@ -1484,7 +1484,7 @@
- name: Slack
sourceDefinitionId: c2281cee-86f9-4a86-bb48-d23286b4c7bd
dockerRepository: airbyte/source-slack
dockerImageTag: 0.1.18
dockerImageTag: 0.1.19
documentationUrl: https://docs.airbyte.com/integrations/sources/slack
icon: slack.svg
sourceType: api
Expand Down
Loading

0 comments on commit 0cd81af

Please sign in to comment.