Skip to content

Commit

Permalink
Merge branch 'master' into ddavydov/#20703-source-salesforce-fix-prop…
Browse files Browse the repository at this point in the history
…s-chunk-length
  • Loading branch information
davydov-d committed Feb 14, 2023
2 parents 2bdcf63 + ab10c6d commit 31782a3
Show file tree
Hide file tree
Showing 207 changed files with 1,952 additions and 975 deletions.
2 changes: 2 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2789,6 +2789,8 @@ components:
format: uuid
disable_cache:
type: boolean
notifySchemaChange:
type: boolean
SourceUpdate:
type: object
required:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-bootloader/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ dependencies {
implementation project(':airbyte-config:config-persistence')
implementation project(':airbyte-db:db-lib')
implementation project(":airbyte-json-validation")
implementation project(':airbyte-protocol:protocol-models')
implementation libs.airbyte.protocol
implementation project(':airbyte-persistence:job-persistence')

testAnnotationProcessor platform(libs.micronaut.bom)
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.28.0
current_version = 0.28.1
commit = False

[bumpversion:file:setup.py]
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.28.1
Low-Code CDK: fix signature _parse_records_and_emit_request_and_responses

## 0.28.0
Low-Code: improve day_delta macro and MinMaxDatetime component

Expand Down
3 changes: 2 additions & 1 deletion airbyte-cdk/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ If the iteration you are working on includes changes to the models, you might wa
```commandline
SUB_BUILD=CONNECTORS_BASE ./gradlew format --scan --info --stacktrace
```
This will generate the files based on the schemas, add the license information and format the code. If you want to only do the former and rely on pre-commit to the others, you can run the appropriate generation command i.e. `./gradlew generateProtocolClassFiles` or `./gradlew generateComponentManifestClassFiles`.
This will generate the files based on the schemas, add the license information and format the code. If you want to only do the former and rely on
pre-commit to the others, you can run the appropriate generation command i.e. `./gradlew generateComponentManifestClassFiles`.

#### Testing

Expand Down
8 changes: 7 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# generated by generate-protocol-files
# The earlier versions of airbyte-cdk (0.28.0<=) had the airbyte_protocol python classes
# declared inline in the airbyte-cdk code. However, somewhere around Feb 2023 the
# Airbyte Protocol moved to its own repo/PyPi package, called airbyte-protocol-models.
# This directory including the airbyte_protocol.py and well_known_types.py files
# are just wrappers on top of that stand-alone package which do some namespacing magic
# to make the airbyte_protocol python classes available to the airbyte-cdk consumer as part
# of airbyte-cdk rather than a standalone package.
from .airbyte_protocol import *
from .well_known_types import *
396 changes: 1 addition & 395 deletions airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py

Large diffs are not rendered by default.

83 changes: 1 addition & 82 deletions airbyte-cdk/python/airbyte_cdk/models/well_known_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,85 +2,4 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

# generated by datamodel-codegen:
# filename: well_known_types.yaml

from __future__ import annotations

from enum import Enum
from typing import Any, Union

from pydantic import BaseModel, Field, constr


class Model(BaseModel):
__root__: Any


class String(BaseModel):
__root__: str = Field(..., description="Arbitrary text")


class BinaryData(BaseModel):
__root__: constr(regex=r"^(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?$") = Field(
...,
description="Arbitrary binary data. Represented as base64-encoded strings in the JSON transport. In the future, if we support other transports, may be encoded differently.\n",
)


class Date(BaseModel):
__root__: constr(regex=r"^\d{4}-\d{2}-\d{2}( BC)?$") = Field(
..., description="RFC 3339§5.6's full-date format, extended with BC era support"
)


class TimestampWithTimezone(BaseModel):
__root__: constr(regex=r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d+)?(Z|[+\-]\d{1,2}:\d{2})( BC)?$") = Field(
...,
description='An instant in time. Frequently simply referred to as just a timestamp, or timestamptz. Uses RFC 3339§5.6\'s date-time format, requiring a "T" separator, and extended with BC era support. Note that we do _not_ accept Unix epochs here.\n',
)


class TimestampWithoutTimezone(BaseModel):
__root__: constr(regex=r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d+)?( BC)?$") = Field(
...,
description='Also known as a localdatetime, or just datetime. Under RFC 3339§5.6, this would be represented as `full-date "T" partial-time`, extended with BC era support.\n',
)


class TimeWithTimezone(BaseModel):
__root__: constr(regex=r"^\d{2}:\d{2}:\d{2}(\.\d+)?(Z|[+\-]\d{1,2}:\d{2})$") = Field(..., description="An RFC 3339§5.6 full-time")


class TimeWithoutTimezone(BaseModel):
__root__: constr(regex=r"^\d{2}:\d{2}:\d{2}(\.\d+)?$") = Field(..., description="An RFC 3339§5.6 partial-time")


class NumberEnum(Enum):
Infinity = "Infinity"
_Infinity = "-Infinity"
NaN = "NaN"


class Number(BaseModel):
__root__: Union[Any, NumberEnum] = Field(
...,
description="Note the mix of regex validation for normal numbers, and enum validation for special values.",
)


class IntegerEnum(Enum):
Infinity = "Infinity"
_Infinity = "-Infinity"
NaN = "NaN"


class Integer(BaseModel):
__root__: Union[Any, IntegerEnum]


class Boolean(BaseModel):
__root__: bool = Field(
...,
description="Note the direct usage of a primitive boolean rather than string. Unlike Numbers and Integers, we don't expect unusual values here.",
)
from airbyte_protocol.models.well_known_types import *
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ def state(self, value: StreamState):
"""State setter, accept state serialized by state getter."""
self.stream_slicer.update_cursor(value)

def _parse_records_and_emit_request_and_responses(self, request, response, stream_slice, stream_state) -> Iterable[StreamData]:
def _parse_records_and_emit_request_and_responses(self, request, response, stream_state, stream_slice) -> Iterable[StreamData]:
# Only emit requests and responses when running in debug mode
if self.logger.isEnabledFor(logging.DEBUG):
yield _prepared_request_to_airbyte_message(request)
Expand Down
26 changes: 0 additions & 26 deletions airbyte-cdk/python/bin/generate-protocol-files.sh

This file was deleted.

11 changes: 0 additions & 11 deletions airbyte-cdk/python/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,6 @@ airbytePython {
moduleDirectory 'airbyte_cdk'
}

task generateProtocolClassFiles(type: Exec) {
environment 'ROOT_DIR', rootDir.absolutePath
commandLine 'bin/generate-protocol-files.sh'
dependsOn ':tools:code-generator:airbyteDocker'
}

task generateComponentManifestClassFiles(type: Exec) {
environment 'ROOT_DIR', rootDir.absolutePath
commandLine 'bin/generate-component-manifest-files.sh'
Expand All @@ -24,11 +18,6 @@ task validateSourceYamlManifest(type: Exec) {
commandLine 'bin/validate-yaml-schema.sh'
}

blackFormat.dependsOn generateProtocolClassFiles
isortFormat.dependsOn generateProtocolClassFiles
flakeCheck.dependsOn generateProtocolClassFiles
installReqs.dependsOn generateProtocolClassFiles

blackFormat.dependsOn generateComponentManifestClassFiles
isortFormat.dependsOn generateComponentManifestClassFiles
flakeCheck.dependsOn generateComponentManifestClassFiles
Expand Down
3 changes: 2 additions & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.28.0",
version="0.28.1",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down Expand Up @@ -44,6 +44,7 @@
packages=find_packages(exclude=("unit_tests",)),
package_data={"airbyte_cdk": ["py.typed", "sources/declarative/declarative_component_schema.yaml"]},
install_requires=[
"airbyte-protocol-models==1.0.0",
"backoff",
# pinned to the last working version for us temporarily while we fix
"dataclasses-jsonschema==2.15.1",
Expand Down
2 changes: 1 addition & 1 deletion airbyte-commons-protocol/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ dependencies {
implementation libs.bundles.micronaut.annotation
testImplementation libs.bundles.micronaut.test

implementation project(':airbyte-protocol:protocol-models')
implementation libs.airbyte.protocol
implementation project(':airbyte-json-validation')
}

Expand Down
2 changes: 1 addition & 1 deletion airbyte-commons-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ dependencies {
implementation project(':airbyte-db:db-lib')
implementation project(":airbyte-json-validation")
implementation project(':airbyte-oauth')
implementation project(':airbyte-protocol:protocol-models')
implementation libs.airbyte.protocol
implementation project(':airbyte-persistence:job-persistence')

implementation 'com.github.slugify:slugify:2.4'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import io.airbyte.config.persistence.SecretsRepositoryReader;
import io.airbyte.config.persistence.SecretsRepositoryWriter;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.WebUrlHelper;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
Expand All @@ -81,8 +82,10 @@
import java.util.Optional;
import java.util.UUID;
import javax.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j;

@Singleton
@Slf4j
public class SchedulerHandler {

private static final HashFunction HASH_FUNCTION = Hashing.md5();
Expand All @@ -100,6 +103,7 @@ public class SchedulerHandler {
private final JobConverter jobConverter;
private final EventRunner eventRunner;
private final FeatureFlags envVariableFeatureFlags;
private final WebUrlHelper webUrlHelper;

// TODO: Convert to be fully using micronaut
public SchedulerHandler(final ConfigRepository configRepository,
Expand All @@ -111,7 +115,8 @@ public SchedulerHandler(final ConfigRepository configRepository,
final LogConfigs logConfigs,
final EventRunner eventRunner,
final ConnectionsHandler connectionsHandler,
final FeatureFlags envVariableFeatureFlags) {
final FeatureFlags envVariableFeatureFlags,
final WebUrlHelper webUrlHelper) {
this(
configRepository,
secretsRepositoryWriter,
Expand All @@ -122,7 +127,8 @@ public SchedulerHandler(final ConfigRepository configRepository,
eventRunner,
new JobConverter(workerEnvironment, logConfigs),
connectionsHandler,
envVariableFeatureFlags);
envVariableFeatureFlags,
webUrlHelper);
}

@VisibleForTesting
Expand All @@ -135,7 +141,8 @@ public SchedulerHandler(final ConfigRepository configRepository,
final EventRunner eventRunner,
final JobConverter jobConverter,
final ConnectionsHandler connectionsHandler,
final FeatureFlags envVariableFeatureFlags) {
final FeatureFlags envVariableFeatureFlags,
final WebUrlHelper webUrlHelper) {
this.configRepository = configRepository;
this.secretsRepositoryWriter = secretsRepositoryWriter;
this.synchronousSchedulerClient = synchronousSchedulerClient;
Expand All @@ -146,6 +153,7 @@ public SchedulerHandler(final ConfigRepository configRepository,
this.jobConverter = jobConverter;
this.connectionsHandler = connectionsHandler;
this.envVariableFeatureFlags = envVariableFeatureFlags;
this.webUrlHelper = webUrlHelper;
}

public CheckConnectionRead checkSourceConnectionFromSourceId(final SourceIdRequestBody sourceIdRequestBody)
Expand Down Expand Up @@ -272,7 +280,7 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source

if (persistedCatalogId.isSuccess() && discoverSchemaRequestBody.getConnectionId() != null) {
// modify discoveredSchema object to add CatalogDiff, containsBreakingChange, and connectionStatus
generateCatalogDiffsAndDisableConnectionsIfNeeded(discoveredSchema, discoverSchemaRequestBody);
generateCatalogDiffsAndDisableConnectionsIfNeeded(discoveredSchema, discoverSchemaRequestBody, source.getWorkspaceId());
}

return discoveredSchema;
Expand Down Expand Up @@ -394,9 +402,10 @@ public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOE
// determine whether 1. the source schema change resulted in a broken connection or 2. the user
// wants the connection disabled when non-breaking changes are detected. If so, disable that
// connection. Modify the current discoveredSchema object to add a CatalogDiff,
// containsBreakingChange paramter, and connectionStatus parameter.
// containsBreakingChange parameter, and connectionStatus parameter.
private void generateCatalogDiffsAndDisableConnectionsIfNeeded(final SourceDiscoverSchemaRead discoveredSchema,
final SourceDiscoverSchemaRequestBody discoverSchemaRequestBody)
final SourceDiscoverSchemaRequestBody discoverSchemaRequestBody,
final UUID workspaceId)
throws JsonValidationException, ConfigNotFoundException, IOException {
final ConnectionReadList connectionsForSource = connectionsHandler.listConnectionsForSource(discoverSchemaRequestBody.getSourceId(), false);
for (final ConnectionRead connectionRead : connectionsForSource.getConnections()) {
Expand All @@ -418,12 +427,23 @@ private void generateCatalogDiffsAndDisableConnectionsIfNeeded(final SourceDisco
}
updateObject.status(connectionStatus);
connectionsHandler.updateConnection(updateObject);
if (shouldNotifySchemaChange(diff, connectionRead, discoverSchemaRequestBody)) {
final String url = webUrlHelper.getConnectionUrl(workspaceId, connectionRead.getConnectionId());
eventRunner.sendSchemaChangeNotification(connectionRead.getConnectionId(), url);
}
if (connectionRead.getConnectionId().equals(discoverSchemaRequestBody.getConnectionId())) {
discoveredSchema.catalogDiff(diff).breakingChange(containsBreakingChange).connectionStatus(connectionStatus);
}
}
}

private boolean shouldNotifySchemaChange(final CatalogDiff diff,
final ConnectionRead connectionRead,
final SourceDiscoverSchemaRequestBody requestBody) {
return !diff.getTransforms().isEmpty() && connectionRead.getNotifySchemaChanges() && requestBody.getNotifySchemaChange() != null
&& requestBody.getNotifySchemaChange();
}

private boolean shouldDisableConnection(final boolean containsBreakingChange,
final NonBreakingChangesPreference preference,
final CatalogDiff diff) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,8 @@ private Optional<SourceDiscoverSchemaRead> getRefreshedSchema(final UUID sourceI
final SourceDiscoverSchemaRequestBody discoverSchemaReadReq = new SourceDiscoverSchemaRequestBody()
.sourceId(sourceId)
.disableCache(true)
.connectionId(connectionId);
.connectionId(connectionId)
.notifySchemaChange(false);
final SourceDiscoverSchemaRead schemaRead = schedulerHandler.discoverSchemaForSourceFromSourceId(discoverSchemaReadReq);
return Optional.ofNullable(schemaRead);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ public interface EventRunner {

void update(final UUID connectionId);

void sendSchemaChangeNotification(final UUID connectionId, final String url);

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,9 @@ public void update(final UUID connectionId) {
temporalClient.update(connectionId);
}

@Override
public void sendSchemaChangeNotification(final UUID connectionId, final String url) {
temporalClient.sendSchemaChangeNotification(connectionId, url);
}

}
Loading

0 comments on commit 31782a3

Please sign in to comment.