diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorBreakingChanges.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorBreakingChanges.py new file mode 100644 index 000000000000..adaa55b5f4f4 --- /dev/null +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorBreakingChanges.py @@ -0,0 +1,49 @@ +# generated by datamodel-codegen: +# filename: ConnectorBreakingChanges.yaml + +from __future__ import annotations + +from datetime import date +from typing import Any, Dict, List, Optional + +from pydantic import AnyUrl, BaseModel, Extra, Field, constr + + +class StreamBreakingChangeScope(BaseModel): + class Config: + extra = Extra.forbid + + scopeType: Any = Field("stream", const=True) + impactedScopes: List[str] = Field(..., description="List of streams that are impacted by the breaking change.", min_items=1) + + +class BreakingChangeScope(BaseModel): + __root__: StreamBreakingChangeScope = Field(..., description="A scope that can be used to limit the impact of a breaking change.") + + +class VersionBreakingChange(BaseModel): + class Config: + extra = Extra.forbid + + upgradeDeadline: date = Field(..., description="The deadline by which to upgrade before the breaking change takes effect.") + message: str = Field(..., description="Descriptive message detailing the breaking change.") + migrationDocumentationUrl: Optional[AnyUrl] = Field( + None, + description="URL to documentation on how to migrate to the current version. Defaults to ${documentationUrl}-migrations#${version}", + ) + scopedImpact: Optional[List[BreakingChangeScope]] = Field( + None, + description="List of scopes that are impacted by the breaking change. If not specified, the breaking change cannot be scoped to reduce impact via the supported scope types.", + min_items=1, + ) + + +class ConnectorBreakingChanges(BaseModel): + class Config: + extra = Extra.forbid + + __root__: Dict[constr(regex=r"^\d+\.\d+\.\d+$"), VersionBreakingChange] = Field( + ..., + description="Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade.", + title="ConnectorBreakingChanges", + ) diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorMetadataDefinitionV0.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorMetadataDefinitionV0.py index 05c366aed62e..2e0af418fe6c 100644 --- a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorMetadataDefinitionV0.py +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorMetadataDefinitionV0.py @@ -267,7 +267,9 @@ class Config: extra = Extra.forbid __root__: Dict[constr(regex=r"^\d+\.\d+\.\d+$"), VersionBreakingChange] = Field( - ..., description="Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade." + ..., + description="Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade.", + title="ConnectorBreakingChanges", ) diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorRegistryDestinationDefinition.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorRegistryDestinationDefinition.py index c10a1e246d07..92e64e1c11c4 100644 --- a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorRegistryDestinationDefinition.py +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorRegistryDestinationDefinition.py @@ -86,6 +86,16 @@ class Config: impactedScopes: List[str] = Field(..., description="List of streams that are impacted by the breaking change.", min_items=1) +class SuggestedStreams(BaseModel): + class Config: + extra = Extra.allow + + streams: Optional[List[str]] = Field( + None, + description="An array of streams that this connector suggests the average user will want. SuggestedStreams not being present for the source means that all streams are suggested. An empty list here means that no streams are suggested.", + ) + + class AirbyteInternal(BaseModel): class Config: extra = Extra.allow @@ -182,20 +192,9 @@ class Config: extra = Extra.forbid __root__: Dict[constr(regex=r"^\d+\.\d+\.\d+$"), VersionBreakingChange] = Field( - ..., description="Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade." - ) - - -class ConnectorReleases(BaseModel): - class Config: - extra = Extra.forbid - - isReleaseCandidate: Optional[bool] = Field(False, description="Whether the release is eligible to be a release candidate.") - rolloutConfiguration: Optional[RolloutConfiguration] = None - breakingChanges: ConnectorBreakingChanges - migrationDocumentationUrl: Optional[AnyUrl] = Field( - None, - description="URL to documentation on how to migrate from the previous version to the current version. Defaults to ${documentationUrl}-migrations", + ..., + description="Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade.", + title="ConnectorBreakingChanges", ) @@ -230,11 +229,82 @@ class Config: description="an optional flag indicating whether DBT is used in the normalization. If the flag value is NULL - DBT is not used.", ) allowedHosts: Optional[AllowedHosts] = None - releases: Optional[ConnectorReleases] = None + releases: Optional[ConnectorRegistryReleases] = None ab_internal: Optional[AirbyteInternal] = None supportsRefreshes: Optional[bool] = False generated: Optional[GeneratedFields] = None packageInfo: Optional[ConnectorPackageInfo] = None - language: Optional[str] = Field( - None, description="The language the connector is written in" + language: Optional[str] = Field(None, description="The language the connector is written in") + + +class ConnectorRegistryReleases(BaseModel): + class Config: + extra = Extra.forbid + + releaseCandidates: Optional[ConnectorReleaseCandidates] = None + rolloutConfiguration: Optional[RolloutConfiguration] = None + breakingChanges: Optional[ConnectorBreakingChanges] = None + migrationDocumentationUrl: Optional[AnyUrl] = Field( + None, + description="URL to documentation on how to migrate from the previous version to the current version. Defaults to ${documentationUrl}-migrations", + ) + + +class ConnectorReleaseCandidates(BaseModel): + class Config: + extra = Extra.forbid + + __root__: Dict[constr(regex=r"^\d+\.\d+\.\d+$"), VersionReleaseCandidate] = Field( + ..., description="Each entry denotes a release candidate version of a connector." + ) + + +class VersionReleaseCandidate(BaseModel): + class Config: + extra = Extra.forbid + + __root__: Union[ConnectorRegistrySourceDefinition, ConnectorRegistryDestinationDefinition] = Field( + ..., description="Contains information about a release candidate version of a connector." + ) + + +class ConnectorRegistrySourceDefinition(BaseModel): + class Config: + extra = Extra.allow + + sourceDefinitionId: UUID + name: str + dockerRepository: str + dockerImageTag: str + documentationUrl: str + icon: Optional[str] = None + iconUrl: Optional[str] = None + sourceType: Optional[Literal["api", "file", "database", "custom"]] = None + spec: Dict[str, Any] + tombstone: Optional[bool] = Field( + False, description="if false, the configuration is active. if true, then this configuration is permanently off." ) + public: Optional[bool] = Field(False, description="true if this connector definition is available to all workspaces") + custom: Optional[bool] = Field(False, description="whether this is a custom connector definition") + releaseStage: Optional[ReleaseStage] = None + supportLevel: Optional[SupportLevel] = None + releaseDate: Optional[date] = Field(None, description="The date when this connector was first released, in yyyy-mm-dd format.") + resourceRequirements: Optional[ActorDefinitionResourceRequirements] = None + protocolVersion: Optional[str] = Field(None, description="the Airbyte Protocol version supported by the connector") + allowedHosts: Optional[AllowedHosts] = None + suggestedStreams: Optional[SuggestedStreams] = None + maxSecondsBetweenMessages: Optional[int] = Field( + None, description="Number of seconds allowed between 2 airbyte protocol messages. The source will timeout if this delay is reach" + ) + erdUrl: Optional[str] = Field(None, description="The URL where you can visualize the ERD") + releases: Optional[ConnectorRegistryReleases] = None + ab_internal: Optional[AirbyteInternal] = None + generated: Optional[GeneratedFields] = None + packageInfo: Optional[ConnectorPackageInfo] = None + language: Optional[str] = Field(None, description="The language the connector is written in") + + +ConnectorRegistryDestinationDefinition.update_forward_refs() +ConnectorRegistryReleases.update_forward_refs() +ConnectorReleaseCandidates.update_forward_refs() +VersionReleaseCandidate.update_forward_refs() diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorRegistryReleases.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorRegistryReleases.py new file mode 100644 index 000000000000..80c930e04109 --- /dev/null +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorRegistryReleases.py @@ -0,0 +1,309 @@ +# generated by datamodel-codegen: +# filename: ConnectorRegistryReleases.yaml + +from __future__ import annotations + +from datetime import date, datetime +from typing import Any, Dict, List, Optional, Union +from uuid import UUID + +from pydantic import AnyUrl, BaseModel, Extra, Field, conint, constr +from typing_extensions import Literal + + +class RolloutConfiguration(BaseModel): + class Config: + extra = Extra.forbid + + initialPercentage: Optional[conint(ge=0, le=100)] = Field( + 0, description="The percentage of users that should receive the new version initially." + ) + maxPercentage: Optional[conint(ge=0, le=100)] = Field( + 50, description="The percentage of users who should receive the release candidate during the test phase before full rollout." + ) + advanceDelayMinutes: Optional[conint(ge=10)] = Field( + 10, description="The number of minutes to wait before advancing the rollout percentage." + ) + + +class StreamBreakingChangeScope(BaseModel): + class Config: + extra = Extra.forbid + + scopeType: Any = Field("stream", const=True) + impactedScopes: List[str] = Field(..., description="List of streams that are impacted by the breaking change.", min_items=1) + + +class ReleaseStage(BaseModel): + __root__: Literal["alpha", "beta", "generally_available", "custom"] = Field( + ..., description="enum that describes a connector's release stage", title="ReleaseStage" + ) + + +class SupportLevel(BaseModel): + __root__: Literal["community", "certified", "archived"] = Field( + ..., description="enum that describes a connector's release stage", title="SupportLevel" + ) + + +class ResourceRequirements(BaseModel): + class Config: + extra = Extra.forbid + + cpu_request: Optional[str] = None + cpu_limit: Optional[str] = None + memory_request: Optional[str] = None + memory_limit: Optional[str] = None + + +class JobType(BaseModel): + __root__: Literal["get_spec", "check_connection", "discover_schema", "sync", "reset_connection", "connection_updater", "replicate"] = ( + Field(..., description="enum that describes the different types of jobs that the platform runs.", title="JobType") + ) + + +class AllowedHosts(BaseModel): + class Config: + extra = Extra.allow + + hosts: Optional[List[str]] = Field( + None, + description="An array of hosts that this connector can connect to. AllowedHosts not being present for the source or destination means that access to all hosts is allowed. An empty list here means that no network access is granted.", + ) + + +class SuggestedStreams(BaseModel): + class Config: + extra = Extra.allow + + streams: Optional[List[str]] = Field( + None, + description="An array of streams that this connector suggests the average user will want. SuggestedStreams not being present for the source means that all streams are suggested. An empty list here means that no streams are suggested.", + ) + + +class AirbyteInternal(BaseModel): + class Config: + extra = Extra.allow + + sl: Optional[Literal[100, 200, 300]] = None + ql: Optional[Literal[100, 200, 300, 400, 500, 600]] = None + + +class GitInfo(BaseModel): + class Config: + extra = Extra.forbid + + commit_sha: Optional[str] = Field(None, description="The git commit sha of the last commit that modified this file.") + commit_timestamp: Optional[datetime] = Field(None, description="The git commit timestamp of the last commit that modified this file.") + commit_author: Optional[str] = Field(None, description="The git commit author of the last commit that modified this file.") + commit_author_email: Optional[str] = Field(None, description="The git commit author email of the last commit that modified this file.") + + +class SourceFileInfo(BaseModel): + metadata_etag: Optional[str] = None + metadata_file_path: Optional[str] = None + metadata_bucket_name: Optional[str] = None + metadata_last_modified: Optional[str] = None + registry_entry_generated_at: Optional[str] = None + + +class ConnectorMetrics(BaseModel): + all: Optional[Any] = None + cloud: Optional[Any] = None + oss: Optional[Any] = None + + +class ConnectorMetric(BaseModel): + class Config: + extra = Extra.allow + + usage: Optional[Union[str, Literal["low", "medium", "high"]]] = None + sync_success_rate: Optional[Union[str, Literal["low", "medium", "high"]]] = None + connector_version: Optional[str] = None + + +class ConnectorPackageInfo(BaseModel): + cdk_version: Optional[str] = None + + +class NormalizationDestinationDefinitionConfig(BaseModel): + class Config: + extra = Extra.allow + + normalizationRepository: str = Field( + ..., + description="a field indicating the name of the repository to be used for normalization. If the value of the flag is NULL - normalization is not used.", + ) + normalizationTag: str = Field(..., description="a field indicating the tag of the docker repository to be used for normalization.") + normalizationIntegrationType: str = Field( + ..., description="a field indicating the type of integration dialect to use for normalization." + ) + + +class BreakingChangeScope(BaseModel): + __root__: StreamBreakingChangeScope = Field(..., description="A scope that can be used to limit the impact of a breaking change.") + + +class JobTypeResourceLimit(BaseModel): + class Config: + extra = Extra.forbid + + jobType: JobType + resourceRequirements: ResourceRequirements + + +class GeneratedFields(BaseModel): + git: Optional[GitInfo] = None + source_file_info: Optional[SourceFileInfo] = None + metrics: Optional[ConnectorMetrics] = None + sbomUrl: Optional[str] = Field(None, description="URL to the SBOM file") + + +class VersionBreakingChange(BaseModel): + class Config: + extra = Extra.forbid + + upgradeDeadline: date = Field(..., description="The deadline by which to upgrade before the breaking change takes effect.") + message: str = Field(..., description="Descriptive message detailing the breaking change.") + migrationDocumentationUrl: Optional[AnyUrl] = Field( + None, + description="URL to documentation on how to migrate to the current version. Defaults to ${documentationUrl}-migrations#${version}", + ) + scopedImpact: Optional[List[BreakingChangeScope]] = Field( + None, + description="List of scopes that are impacted by the breaking change. If not specified, the breaking change cannot be scoped to reduce impact via the supported scope types.", + min_items=1, + ) + + +class ActorDefinitionResourceRequirements(BaseModel): + class Config: + extra = Extra.forbid + + default: Optional[ResourceRequirements] = Field( + None, description="if set, these are the requirements that should be set for ALL jobs run for this actor definition." + ) + jobSpecific: Optional[List[JobTypeResourceLimit]] = None + + +class ConnectorBreakingChanges(BaseModel): + class Config: + extra = Extra.forbid + + __root__: Dict[constr(regex=r"^\d+\.\d+\.\d+$"), VersionBreakingChange] = Field( + ..., + description="Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade.", + title="ConnectorBreakingChanges", + ) + + +class ConnectorRegistryReleases(BaseModel): + class Config: + extra = Extra.forbid + + releaseCandidates: Optional[ConnectorReleaseCandidates] = None + rolloutConfiguration: Optional[RolloutConfiguration] = None + breakingChanges: Optional[ConnectorBreakingChanges] = None + migrationDocumentationUrl: Optional[AnyUrl] = Field( + None, + description="URL to documentation on how to migrate from the previous version to the current version. Defaults to ${documentationUrl}-migrations", + ) + + +class ConnectorReleaseCandidates(BaseModel): + class Config: + extra = Extra.forbid + + __root__: Dict[constr(regex=r"^\d+\.\d+\.\d+$"), VersionReleaseCandidate] = Field( + ..., description="Each entry denotes a release candidate version of a connector." + ) + + +class VersionReleaseCandidate(BaseModel): + class Config: + extra = Extra.forbid + + __root__: Union[ConnectorRegistrySourceDefinition, ConnectorRegistryDestinationDefinition] = Field( + ..., description="Contains information about a release candidate version of a connector." + ) + + +class ConnectorRegistrySourceDefinition(BaseModel): + class Config: + extra = Extra.allow + + sourceDefinitionId: UUID + name: str + dockerRepository: str + dockerImageTag: str + documentationUrl: str + icon: Optional[str] = None + iconUrl: Optional[str] = None + sourceType: Optional[Literal["api", "file", "database", "custom"]] = None + spec: Dict[str, Any] + tombstone: Optional[bool] = Field( + False, description="if false, the configuration is active. if true, then this configuration is permanently off." + ) + public: Optional[bool] = Field(False, description="true if this connector definition is available to all workspaces") + custom: Optional[bool] = Field(False, description="whether this is a custom connector definition") + releaseStage: Optional[ReleaseStage] = None + supportLevel: Optional[SupportLevel] = None + releaseDate: Optional[date] = Field(None, description="The date when this connector was first released, in yyyy-mm-dd format.") + resourceRequirements: Optional[ActorDefinitionResourceRequirements] = None + protocolVersion: Optional[str] = Field(None, description="the Airbyte Protocol version supported by the connector") + allowedHosts: Optional[AllowedHosts] = None + suggestedStreams: Optional[SuggestedStreams] = None + maxSecondsBetweenMessages: Optional[int] = Field( + None, description="Number of seconds allowed between 2 airbyte protocol messages. The source will timeout if this delay is reach" + ) + erdUrl: Optional[str] = Field(None, description="The URL where you can visualize the ERD") + releases: Optional[ConnectorRegistryReleases] = None + ab_internal: Optional[AirbyteInternal] = None + generated: Optional[GeneratedFields] = None + packageInfo: Optional[ConnectorPackageInfo] = None + language: Optional[str] = Field(None, description="The language the connector is written in") + + +class ConnectorRegistryDestinationDefinition(BaseModel): + class Config: + extra = Extra.allow + + destinationDefinitionId: UUID + name: str + dockerRepository: str + dockerImageTag: str + documentationUrl: str + icon: Optional[str] = None + iconUrl: Optional[str] = None + spec: Dict[str, Any] + tombstone: Optional[bool] = Field( + False, description="if false, the configuration is active. if true, then this configuration is permanently off." + ) + public: Optional[bool] = Field(False, description="true if this connector definition is available to all workspaces") + custom: Optional[bool] = Field(False, description="whether this is a custom connector definition") + releaseStage: Optional[ReleaseStage] = None + supportLevel: Optional[SupportLevel] = None + releaseDate: Optional[date] = Field(None, description="The date when this connector was first released, in yyyy-mm-dd format.") + tags: Optional[List[str]] = Field( + None, description="An array of tags that describe the connector. E.g: language:python, keyword:rds, etc." + ) + resourceRequirements: Optional[ActorDefinitionResourceRequirements] = None + protocolVersion: Optional[str] = Field(None, description="the Airbyte Protocol version supported by the connector") + normalizationConfig: Optional[NormalizationDestinationDefinitionConfig] = None + supportsDbt: Optional[bool] = Field( + None, + description="an optional flag indicating whether DBT is used in the normalization. If the flag value is NULL - DBT is not used.", + ) + allowedHosts: Optional[AllowedHosts] = None + releases: Optional[ConnectorRegistryReleases] = None + ab_internal: Optional[AirbyteInternal] = None + supportsRefreshes: Optional[bool] = False + generated: Optional[GeneratedFields] = None + packageInfo: Optional[ConnectorPackageInfo] = None + language: Optional[str] = Field(None, description="The language the connector is written in") + + +ConnectorRegistryReleases.update_forward_refs() +ConnectorReleaseCandidates.update_forward_refs() +VersionReleaseCandidate.update_forward_refs() diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorRegistrySourceDefinition.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorRegistrySourceDefinition.py index 36e8c1f1e856..be46a26c7213 100644 --- a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorRegistrySourceDefinition.py +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorRegistrySourceDefinition.py @@ -82,6 +82,20 @@ class Config: impactedScopes: List[str] = Field(..., description="List of streams that are impacted by the breaking change.", min_items=1) +class NormalizationDestinationDefinitionConfig(BaseModel): + class Config: + extra = Extra.allow + + normalizationRepository: str = Field( + ..., + description="a field indicating the name of the repository to be used for normalization. If the value of the flag is NULL - normalization is not used.", + ) + normalizationTag: str = Field(..., description="a field indicating the tag of the docker repository to be used for normalization.") + normalizationIntegrationType: str = Field( + ..., description="a field indicating the type of integration dialect to use for normalization." + ) + + class AirbyteInternal(BaseModel): class Config: extra = Extra.allow @@ -178,35 +192,90 @@ class Config: extra = Extra.forbid __root__: Dict[constr(regex=r"^\d+\.\d+\.\d+$"), VersionBreakingChange] = Field( - ..., description="Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade." + ..., + description="Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade.", + title="ConnectorBreakingChanges", ) -class ConnectorReleases(BaseModel): +class ConnectorRegistrySourceDefinition(BaseModel): + class Config: + extra = Extra.allow + + sourceDefinitionId: UUID + name: str + dockerRepository: str + dockerImageTag: str + documentationUrl: str + icon: Optional[str] = None + iconUrl: Optional[str] = None + sourceType: Optional[Literal["api", "file", "database", "custom"]] = None + spec: Dict[str, Any] + tombstone: Optional[bool] = Field( + False, description="if false, the configuration is active. if true, then this configuration is permanently off." + ) + public: Optional[bool] = Field(False, description="true if this connector definition is available to all workspaces") + custom: Optional[bool] = Field(False, description="whether this is a custom connector definition") + releaseStage: Optional[ReleaseStage] = None + supportLevel: Optional[SupportLevel] = None + releaseDate: Optional[date] = Field(None, description="The date when this connector was first released, in yyyy-mm-dd format.") + resourceRequirements: Optional[ActorDefinitionResourceRequirements] = None + protocolVersion: Optional[str] = Field(None, description="the Airbyte Protocol version supported by the connector") + allowedHosts: Optional[AllowedHosts] = None + suggestedStreams: Optional[SuggestedStreams] = None + maxSecondsBetweenMessages: Optional[int] = Field( + None, description="Number of seconds allowed between 2 airbyte protocol messages. The source will timeout if this delay is reach" + ) + erdUrl: Optional[str] = Field(None, description="The URL where you can visualize the ERD") + releases: Optional[ConnectorRegistryReleases] = None + ab_internal: Optional[AirbyteInternal] = None + generated: Optional[GeneratedFields] = None + packageInfo: Optional[ConnectorPackageInfo] = None + language: Optional[str] = Field(None, description="The language the connector is written in") + + +class ConnectorRegistryReleases(BaseModel): class Config: extra = Extra.forbid - isReleaseCandidate: Optional[bool] = Field(False, description="Whether the release is eligible to be a release candidate.") + releaseCandidates: Optional[ConnectorReleaseCandidates] = None rolloutConfiguration: Optional[RolloutConfiguration] = None - breakingChanges: ConnectorBreakingChanges + breakingChanges: Optional[ConnectorBreakingChanges] = None migrationDocumentationUrl: Optional[AnyUrl] = Field( None, description="URL to documentation on how to migrate from the previous version to the current version. Defaults to ${documentationUrl}-migrations", ) -class ConnectorRegistrySourceDefinition(BaseModel): +class ConnectorReleaseCandidates(BaseModel): + class Config: + extra = Extra.forbid + + __root__: Dict[constr(regex=r"^\d+\.\d+\.\d+$"), VersionReleaseCandidate] = Field( + ..., description="Each entry denotes a release candidate version of a connector." + ) + + +class VersionReleaseCandidate(BaseModel): + class Config: + extra = Extra.forbid + + __root__: Union[ConnectorRegistrySourceDefinition, ConnectorRegistryDestinationDefinition] = Field( + ..., description="Contains information about a release candidate version of a connector." + ) + + +class ConnectorRegistryDestinationDefinition(BaseModel): class Config: extra = Extra.allow - sourceDefinitionId: UUID + destinationDefinitionId: UUID name: str dockerRepository: str dockerImageTag: str documentationUrl: str icon: Optional[str] = None iconUrl: Optional[str] = None - sourceType: Optional[Literal["api", "file", "database", "custom"]] = None spec: Dict[str, Any] tombstone: Optional[bool] = Field( False, description="if false, the configuration is active. if true, then this configuration is permanently off." @@ -216,18 +285,26 @@ class Config: releaseStage: Optional[ReleaseStage] = None supportLevel: Optional[SupportLevel] = None releaseDate: Optional[date] = Field(None, description="The date when this connector was first released, in yyyy-mm-dd format.") + tags: Optional[List[str]] = Field( + None, description="An array of tags that describe the connector. E.g: language:python, keyword:rds, etc." + ) resourceRequirements: Optional[ActorDefinitionResourceRequirements] = None protocolVersion: Optional[str] = Field(None, description="the Airbyte Protocol version supported by the connector") - allowedHosts: Optional[AllowedHosts] = None - suggestedStreams: Optional[SuggestedStreams] = None - maxSecondsBetweenMessages: Optional[int] = Field( - None, description="Number of seconds allowed between 2 airbyte protocol messages. The source will timeout if this delay is reach" + normalizationConfig: Optional[NormalizationDestinationDefinitionConfig] = None + supportsDbt: Optional[bool] = Field( + None, + description="an optional flag indicating whether DBT is used in the normalization. If the flag value is NULL - DBT is not used.", ) - erdUrl: Optional[str] = Field(None, description="The URL where you can visualize the ERD") - releases: Optional[ConnectorReleases] = None + allowedHosts: Optional[AllowedHosts] = None + releases: Optional[ConnectorRegistryReleases] = None ab_internal: Optional[AirbyteInternal] = None + supportsRefreshes: Optional[bool] = False generated: Optional[GeneratedFields] = None packageInfo: Optional[ConnectorPackageInfo] = None - language: Optional[str] = Field( - None, description="The language the connector is written in" - ) + language: Optional[str] = Field(None, description="The language the connector is written in") + + +ConnectorRegistrySourceDefinition.update_forward_refs() +ConnectorRegistryReleases.update_forward_refs() +ConnectorReleaseCandidates.update_forward_refs() +VersionReleaseCandidate.update_forward_refs() diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorRegistryV0.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorRegistryV0.py index a14e04684aa6..739f395e33d2 100644 --- a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorRegistryV0.py +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorRegistryV0.py @@ -86,6 +86,16 @@ class Config: impactedScopes: List[str] = Field(..., description="List of streams that are impacted by the breaking change.", min_items=1) +class SuggestedStreams(BaseModel): + class Config: + extra = Extra.allow + + streams: Optional[List[str]] = Field( + None, + description="An array of streams that this connector suggests the average user will want. SuggestedStreams not being present for the source means that all streams are suggested. An empty list here means that no streams are suggested.", + ) + + class AirbyteInternal(BaseModel): class Config: extra = Extra.allow @@ -131,16 +141,6 @@ class ConnectorPackageInfo(BaseModel): cdk_version: Optional[str] = None -class SuggestedStreams(BaseModel): - class Config: - extra = Extra.allow - - streams: Optional[List[str]] = Field( - None, - description="An array of streams that this connector suggests the average user will want. SuggestedStreams not being present for the source means that all streams are suggested. An empty list here means that no streams are suggested.", - ) - - class JobTypeResourceLimit(BaseModel): class Config: extra = Extra.forbid @@ -192,35 +192,28 @@ class Config: extra = Extra.forbid __root__: Dict[constr(regex=r"^\d+\.\d+\.\d+$"), VersionBreakingChange] = Field( - ..., description="Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade." + ..., + description="Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade.", + title="ConnectorBreakingChanges", ) -class ConnectorReleases(BaseModel): - class Config: - extra = Extra.forbid - - isReleaseCandidate: Optional[bool] = Field(False, description="Whether the release is eligible to be a release candidate.") - rolloutConfiguration: Optional[RolloutConfiguration] = None - breakingChanges: ConnectorBreakingChanges - migrationDocumentationUrl: Optional[AnyUrl] = Field( - None, - description="URL to documentation on how to migrate from the previous version to the current version. Defaults to ${documentationUrl}-migrations", - ) +class ConnectorRegistryV0(BaseModel): + destinations: List[ConnectorRegistryDestinationDefinition] + sources: List[ConnectorRegistrySourceDefinition] -class ConnectorRegistrySourceDefinition(BaseModel): +class ConnectorRegistryDestinationDefinition(BaseModel): class Config: extra = Extra.allow - sourceDefinitionId: UUID + destinationDefinitionId: UUID name: str dockerRepository: str dockerImageTag: str documentationUrl: str icon: Optional[str] = None iconUrl: Optional[str] = None - sourceType: Optional[Literal["api", "file", "database", "custom"]] = None spec: Dict[str, Any] tombstone: Optional[bool] = Field( False, description="if false, the configuration is active. if true, then this configuration is permanently off." @@ -230,34 +223,68 @@ class Config: releaseStage: Optional[ReleaseStage] = None supportLevel: Optional[SupportLevel] = None releaseDate: Optional[date] = Field(None, description="The date when this connector was first released, in yyyy-mm-dd format.") + tags: Optional[List[str]] = Field( + None, description="An array of tags that describe the connector. E.g: language:python, keyword:rds, etc." + ) resourceRequirements: Optional[ActorDefinitionResourceRequirements] = None protocolVersion: Optional[str] = Field(None, description="the Airbyte Protocol version supported by the connector") - allowedHosts: Optional[AllowedHosts] = None - suggestedStreams: Optional[SuggestedStreams] = None - maxSecondsBetweenMessages: Optional[int] = Field( - None, description="Number of seconds allowed between 2 airbyte protocol messages. The source will timeout if this delay is reach" + normalizationConfig: Optional[NormalizationDestinationDefinitionConfig] = None + supportsDbt: Optional[bool] = Field( + None, + description="an optional flag indicating whether DBT is used in the normalization. If the flag value is NULL - DBT is not used.", ) - erdUrl: Optional[str] = Field(None, description="The URL where you can visualize the ERD") - releases: Optional[ConnectorReleases] = None + allowedHosts: Optional[AllowedHosts] = None + releases: Optional[ConnectorRegistryReleases] = None ab_internal: Optional[AirbyteInternal] = None + supportsRefreshes: Optional[bool] = False generated: Optional[GeneratedFields] = None packageInfo: Optional[ConnectorPackageInfo] = None - language: Optional[str] = Field( - None, description="The language the connector is written in" + language: Optional[str] = Field(None, description="The language the connector is written in") + + +class ConnectorRegistryReleases(BaseModel): + class Config: + extra = Extra.forbid + + releaseCandidates: Optional[ConnectorReleaseCandidates] = None + rolloutConfiguration: Optional[RolloutConfiguration] = None + breakingChanges: Optional[ConnectorBreakingChanges] = None + migrationDocumentationUrl: Optional[AnyUrl] = Field( + None, + description="URL to documentation on how to migrate from the previous version to the current version. Defaults to ${documentationUrl}-migrations", ) -class ConnectorRegistryDestinationDefinition(BaseModel): +class ConnectorReleaseCandidates(BaseModel): + class Config: + extra = Extra.forbid + + __root__: Dict[constr(regex=r"^\d+\.\d+\.\d+$"), VersionReleaseCandidate] = Field( + ..., description="Each entry denotes a release candidate version of a connector." + ) + + +class VersionReleaseCandidate(BaseModel): + class Config: + extra = Extra.forbid + + __root__: Union[ConnectorRegistrySourceDefinition, ConnectorRegistryDestinationDefinition] = Field( + ..., description="Contains information about a release candidate version of a connector." + ) + + +class ConnectorRegistrySourceDefinition(BaseModel): class Config: extra = Extra.allow - destinationDefinitionId: UUID + sourceDefinitionId: UUID name: str dockerRepository: str dockerImageTag: str documentationUrl: str icon: Optional[str] = None iconUrl: Optional[str] = None + sourceType: Optional[Literal["api", "file", "database", "custom"]] = None spec: Dict[str, Any] tombstone: Optional[bool] = Field( False, description="if false, the configuration is active. if true, then this configuration is permanently off." @@ -267,27 +294,23 @@ class Config: releaseStage: Optional[ReleaseStage] = None supportLevel: Optional[SupportLevel] = None releaseDate: Optional[date] = Field(None, description="The date when this connector was first released, in yyyy-mm-dd format.") - tags: Optional[List[str]] = Field( - None, description="An array of tags that describe the connector. E.g: language:python, keyword:rds, etc." - ) resourceRequirements: Optional[ActorDefinitionResourceRequirements] = None protocolVersion: Optional[str] = Field(None, description="the Airbyte Protocol version supported by the connector") - normalizationConfig: Optional[NormalizationDestinationDefinitionConfig] = None - supportsDbt: Optional[bool] = Field( - None, - description="an optional flag indicating whether DBT is used in the normalization. If the flag value is NULL - DBT is not used.", - ) allowedHosts: Optional[AllowedHosts] = None - releases: Optional[ConnectorReleases] = None + suggestedStreams: Optional[SuggestedStreams] = None + maxSecondsBetweenMessages: Optional[int] = Field( + None, description="Number of seconds allowed between 2 airbyte protocol messages. The source will timeout if this delay is reach" + ) + erdUrl: Optional[str] = Field(None, description="The URL where you can visualize the ERD") + releases: Optional[ConnectorRegistryReleases] = None ab_internal: Optional[AirbyteInternal] = None - supportsRefreshes: Optional[bool] = False generated: Optional[GeneratedFields] = None packageInfo: Optional[ConnectorPackageInfo] = None - language: Optional[str] = Field( - None, description="The language the connector is written in" - ) + language: Optional[str] = Field(None, description="The language the connector is written in") -class ConnectorRegistryV0(BaseModel): - destinations: List[ConnectorRegistryDestinationDefinition] - sources: List[ConnectorRegistrySourceDefinition] +ConnectorRegistryV0.update_forward_refs() +ConnectorRegistryDestinationDefinition.update_forward_refs() +ConnectorRegistryReleases.update_forward_refs() +ConnectorReleaseCandidates.update_forward_refs() +VersionReleaseCandidate.update_forward_refs() diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorReleases.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorReleases.py index 4e33fa7f6358..ab3399094a93 100644 --- a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorReleases.py +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/ConnectorReleases.py @@ -58,7 +58,9 @@ class Config: extra = Extra.forbid __root__: Dict[constr(regex=r"^\d+\.\d+\.\d+$"), VersionBreakingChange] = Field( - ..., description="Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade." + ..., + description="Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade.", + title="ConnectorBreakingChanges", ) diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/__init__.py b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/__init__.py index 00c75f10006c..fe744caace4b 100644 --- a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/__init__.py +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/__init__.py @@ -2,11 +2,13 @@ from .ActorDefinitionResourceRequirements import * from .AirbyteInternal import * from .AllowedHosts import * +from .ConnectorBreakingChanges import * from .ConnectorBuildOptions import * from .ConnectorMetadataDefinitionV0 import * from .ConnectorMetrics import * from .ConnectorPackageInfo import * from .ConnectorRegistryDestinationDefinition import * +from .ConnectorRegistryReleases import * from .ConnectorRegistrySourceDefinition import * from .ConnectorRegistryV0 import * from .ConnectorReleases import * diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorBreakingChanges.yaml b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorBreakingChanges.yaml new file mode 100644 index 000000000000..584d29ae8c8c --- /dev/null +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorBreakingChanges.yaml @@ -0,0 +1,59 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +"$id": https://github.com/airbytehq/airbyte/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorBreakingChanges.yaml +title: ConnectorBreakingChanges +description: Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade. +type: object +additionalProperties: false +minProperties: 1 +patternProperties: + "^\\d+\\.\\d+\\.\\d+$": + $ref: "#/definitions/VersionBreakingChange" +definitions: + VersionBreakingChange: + description: Contains information about a breaking change, including the deadline to upgrade and a message detailing the change. + type: object + additionalProperties: false + required: + - upgradeDeadline + - message + properties: + upgradeDeadline: + description: The deadline by which to upgrade before the breaking change takes effect. + type: string + format: date + message: + description: Descriptive message detailing the breaking change. + type: string + migrationDocumentationUrl: + description: URL to documentation on how to migrate to the current version. Defaults to ${documentationUrl}-migrations#${version} + type: string + format: uri + scopedImpact: + description: List of scopes that are impacted by the breaking change. If not specified, the breaking change cannot be scoped to reduce impact via the supported scope types. + type: array + minItems: 1 + items: + $ref: "#/definitions/BreakingChangeScope" + BreakingChangeScope: + description: A scope that can be used to limit the impact of a breaking change. + type: object + oneOf: + - $ref: "#/definitions/StreamBreakingChangeScope" + StreamBreakingChangeScope: + description: A scope that can be used to limit the impact of a breaking change to specific streams. + type: object + additionalProperties: false + required: + - scopeType + - impactedScopes + properties: + scopeType: + type: const + const: stream + impactedScopes: + description: List of streams that are impacted by the breaking change. + type: array + minItems: 1 + items: + type: string diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorRegistryDestinationDefinition.yaml b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorRegistryDestinationDefinition.yaml index 9ee74b57c8c9..32d4fc47ba2b 100644 --- a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorRegistryDestinationDefinition.yaml +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorRegistryDestinationDefinition.yaml @@ -69,7 +69,7 @@ properties: allowedHosts: "$ref": AllowedHosts.yaml releases: - "$ref": ConnectorReleases.yaml + "$ref": ConnectorRegistryReleases.yaml ab_internal: "$ref": AirbyteInternal.yaml supportsRefreshes: diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorRegistryReleases.yaml b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorRegistryReleases.yaml new file mode 100644 index 000000000000..c219572fb393 --- /dev/null +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorRegistryReleases.yaml @@ -0,0 +1,35 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +"$id": https://github.com/airbytehq/airbyte/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorRegistryReleases.yaml +title: ConnectorRegistryReleases +description: Contains information about different types of releases for a connector. +type: object +additionalProperties: false +properties: + releaseCandidates: + $ref: "#/definitions/ConnectorReleaseCandidates" + rolloutConfiguration: + $ref: RolloutConfiguration.yaml + breakingChanges: + $ref: ConnectorBreakingChanges.yaml + migrationDocumentationUrl: + description: URL to documentation on how to migrate from the previous version to the current version. Defaults to ${documentationUrl}-migrations + type: string + format: uri +definitions: + ConnectorReleaseCandidates: + description: Each entry denotes a release candidate version of a connector. + type: object + additionalProperties: false + minProperties: 1 + maxProperties: 1 + patternProperties: + "^\\d+\\.\\d+\\.\\d+$": + $ref: "#/definitions/VersionReleaseCandidate" + VersionReleaseCandidate: + description: Contains information about a release candidate version of a connector. + additionalProperties: false + type: object + oneOf: + - $ref: ConnectorRegistrySourceDefinition.yaml + - $ref: ConnectorRegistryDestinationDefinition.yaml diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorRegistrySourceDefinition.yaml b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorRegistrySourceDefinition.yaml index 9052fa841185..31b424c00ad6 100644 --- a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorRegistrySourceDefinition.yaml +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorRegistrySourceDefinition.yaml @@ -74,7 +74,7 @@ properties: type: string description: The URL where you can visualize the ERD releases: - "$ref": ConnectorReleases.yaml + "$ref": ConnectorRegistryReleases.yaml ab_internal: "$ref": AirbyteInternal.yaml generated: diff --git a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorReleases.yaml b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorReleases.yaml index b670415bf938..390974f6fbb8 100644 --- a/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorReleases.yaml +++ b/airbyte-ci/connectors/metadata_service/lib/metadata_service/models/src/ConnectorReleases.yaml @@ -15,64 +15,8 @@ properties: rolloutConfiguration: $ref: RolloutConfiguration.yaml breakingChanges: - $ref: "#/definitions/ConnectorBreakingChanges" + $ref: ConnectorBreakingChanges.yaml migrationDocumentationUrl: description: URL to documentation on how to migrate from the previous version to the current version. Defaults to ${documentationUrl}-migrations type: string format: uri -definitions: - ConnectorBreakingChanges: - description: Each entry denotes a breaking change in a specific version of a connector that requires user action to upgrade. - type: object - additionalProperties: false - minProperties: 1 - patternProperties: - "^\\d+\\.\\d+\\.\\d+$": - $ref: "#/definitions/VersionBreakingChange" - VersionBreakingChange: - description: Contains information about a breaking change, including the deadline to upgrade and a message detailing the change. - type: object - additionalProperties: false - required: - - upgradeDeadline - - message - properties: - upgradeDeadline: - description: The deadline by which to upgrade before the breaking change takes effect. - type: string - format: date - message: - description: Descriptive message detailing the breaking change. - type: string - migrationDocumentationUrl: - description: URL to documentation on how to migrate to the current version. Defaults to ${documentationUrl}-migrations#${version} - type: string - format: uri - scopedImpact: - description: List of scopes that are impacted by the breaking change. If not specified, the breaking change cannot be scoped to reduce impact via the supported scope types. - type: array - minItems: 1 - items: - $ref: "#/definitions/BreakingChangeScope" - BreakingChangeScope: - description: A scope that can be used to limit the impact of a breaking change. - type: object - oneOf: - - $ref: "#/definitions/StreamBreakingChangeScope" - StreamBreakingChangeScope: - description: A scope that can be used to limit the impact of a breaking change to specific streams. - type: object - additionalProperties: false - required: - - scopeType - - impactedScopes - properties: - scopeType: - type: const - const: stream - impactedScopes: - description: List of streams that are impacted by the breaking change. - type: array - minItems: 1 - items: - type: string diff --git a/airbyte-ci/connectors/metadata_service/lib/pyproject.toml b/airbyte-ci/connectors/metadata_service/lib/pyproject.toml index d97f8edd7a1e..bbdeadf96d2d 100644 --- a/airbyte-ci/connectors/metadata_service/lib/pyproject.toml +++ b/airbyte-ci/connectors/metadata_service/lib/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metadata-service" -version = "0.13.1" +version = "0.14.0" description = "" authors = ["Ben Church "] readme = "README.md" diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/__init__.py b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/__init__.py index d4866dd5f878..b919db828162 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/__init__.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/__init__.py @@ -94,7 +94,7 @@ ), "latest_metadata_file_blobs": gcs_directory_blobs.configured( {"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": METADATA_FOLDER, "match_regex": f".*latest/{METADATA_FILE_NAME}$"} - ), + ) } DATA_WAREHOUSE_RESOURCE_TREE = { @@ -125,6 +125,12 @@ "latest_oss_registry_entries_file_blobs": gcs_directory_blobs.configured( {"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": METADATA_FOLDER, "match_regex": f".*latest/oss.json$"} ), + "release_candidate_cloud_registry_entries_file_blobs": gcs_directory_blobs.configured( + {"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": METADATA_FOLDER, "match_regex": f".*release_candidate/cloud.json$"} + ), + "release_candidate_oss_registry_entries_file_blobs": gcs_directory_blobs.configured( + {"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": METADATA_FOLDER, "match_regex": f".*release_candidate/oss.json$"} + ), } CONNECTOR_TEST_REPORT_SENSOR_RESOURCE_TREE = { @@ -167,12 +173,26 @@ gcs_blobs_resource_key="latest_oss_registry_entries_file_blobs", interval=60, ), + new_gcs_blobs_sensor( + job=generate_oss_registry, + resources_def=REGISTRY_ENTRY_RESOURCE_TREE, + gcs_blobs_resource_key="release_candidate_oss_registry_entries_file_blobs", + interval=60, + allow_duplicate_runs=True, + ), new_gcs_blobs_sensor( job=generate_cloud_registry, resources_def=REGISTRY_ENTRY_RESOURCE_TREE, gcs_blobs_resource_key="latest_cloud_registry_entries_file_blobs", interval=60, ), + new_gcs_blobs_sensor( + job=generate_cloud_registry, + resources_def=REGISTRY_ENTRY_RESOURCE_TREE, + gcs_blobs_resource_key="release_candidate_cloud_registry_entries_file_blobs", + interval=60, + allow_duplicate_runs=True, + ), new_gcs_blobs_sensor( job=generate_nightly_reports, resources_def=CONNECTOR_TEST_REPORT_SENSOR_RESOURCE_TREE, @@ -184,7 +204,7 @@ SCHEDULES = [ ScheduleDefinition(job=add_new_metadata_partitions, cron_schedule="*/2 * * * *", tags={"dagster/priority": HIGH_QUEUE_PRIORITY}), ScheduleDefinition( - cron_schedule="0 1 * * *", # Daily at 1am US/Pacific + cron_schedule="*/2 * * * *", # Every 2 minutes execution_timezone="US/Pacific", job=remove_stale_metadata_partitions, ), diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/connector_metrics.py b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/connector_metrics.py index a8cda7468c20..20afa5a6b391 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/connector_metrics.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/connector_metrics.py @@ -59,7 +59,7 @@ def _convert_json_to_metrics_dict(jsonl_string: str) -> dict: @asset(required_resource_keys={"latest_metrics_gcs_blob"}, group_name=GROUP_NAME) @sentry.instrument_asset_op -def latest_connnector_metrics(context: OpExecutionContext) -> dict: +def latest_connector_metrics(context: OpExecutionContext) -> dict: latest_metrics_gcs_blob = context.resources.latest_metrics_gcs_blob latest_metrics_jsonl = _safe_read_gcs_file(latest_metrics_gcs_blob) diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/github.py b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/github.py index a6c25648ffbf..137a7d7a7222 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/github.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/github.py @@ -13,7 +13,7 @@ from dagster import OpExecutionContext, Output, asset from github import Repository from orchestrator.logging import sentry -from orchestrator.models.metadata import LatestMetadataEntry, MetadataDefinition, PartialMetadataDefinition +from orchestrator.models.metadata import LatestMetadataEntry, MetadataDefinition from orchestrator.ops.slack import send_slack_message from orchestrator.utils.dagster_helpers import OutputDataFrame, output_dataframe @@ -102,27 +102,56 @@ def github_metadata_definitions(context): return Output(metadata_definitions, metadata={"preview": [md.json() for md in metadata_definitions]}) +def entry_is_younger_than_grace_period(entry: LatestMetadataEntry) -> bool: + grace_period_marker = datetime.datetime.now(datetime.timezone.utc) - PUBLISH_GRACE_PERIOD + entry_last_modified = datetime.datetime.strptime(entry.last_modified, "%a, %d %b %Y %H:%M:%S %Z").replace(tzinfo=datetime.timezone.utc) + return entry_last_modified > grace_period_marker + + +def entry_should_be_on_gcs(metadata_entry: LatestMetadataEntry) -> bool: + """For stale metadata detection, we only want to scan latest metadata files from our master branch that are expected to be on GCS. + A metadata file should be on GCS, in the latest directory, if: + - it is not archived + - not a release candidate + - has been published for more than the grace period (just to reduce false positives when publish pipeline and stale detection run concurrently). + + Args: + metadata_entry (LatestMetadataEntry): The metadata entry to check + + Returns: + bool: True if the metadata entry should be on GCS, False otherwise + """ + if metadata_entry.metadata_definition.data.supportLevel == "archived": + return False + if getattr(metadata_entry.metadata_definition.releases, "isReleaseCandidate", False): + return False + if entry_is_younger_than_grace_period(metadata_entry): + return False + return True + + @asset(required_resource_keys={"slack"}, group_name=GROUP_NAME) -def stale_gcs_latest_metadata_file(context, github_metadata_definitions: list, metadata_definitions: list) -> OutputDataFrame: +def stale_gcs_latest_metadata_file(context, github_metadata_definitions: list, latest_metadata_entries: list) -> OutputDataFrame: """ Return a list of all metadata files in the github repo and denote whether they are stale or not. Stale means that the file in the github repo is not in the latest metadata file blobs. """ + + # TODO: + # The logic here is not bulletproof. It can't find release candidate metadata which did not made their way to GCS. + # We should improve this logic to be able to detect those cases as well. + latest_versions_on_gcs = { metadata_entry.metadata_definition.data.dockerRepository: metadata_entry.metadata_definition.data.dockerImageTag - for metadata_entry in metadata_definitions + for metadata_entry in latest_metadata_entries if metadata_entry.metadata_definition.data.supportLevel != "archived" } - now = datetime.datetime.now(datetime.timezone.utc) latest_versions_on_github = { metadata_entry.metadata_definition.data.dockerRepository: metadata_entry.metadata_definition.data.dockerImageTag for metadata_entry in github_metadata_definitions - if metadata_entry.metadata_definition.data.supportLevel - != "archived" # We give a 2 hour grace period for the metadata to be updated - and datetime.datetime.strptime(metadata_entry.last_modified, "%a, %d %b %Y %H:%M:%S %Z").replace(tzinfo=datetime.timezone.utc) - < now - PUBLISH_GRACE_PERIOD + if entry_should_be_on_gcs(metadata_entry) } stale_connectors = [] diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/metadata.py b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/metadata.py index 60159fe35e04..ea3d0fb54a7d 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/metadata.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/metadata.py @@ -103,16 +103,9 @@ def compute_registry_overrides(merged_df): return registries -# ASSETS - - -@asset(required_resource_keys={"latest_metadata_file_blobs"}, group_name=GROUP_NAME) -@sentry.instrument_asset_op -def metadata_definitions(context: OpExecutionContext) -> List[LatestMetadataEntry]: - latest_metadata_file_blobs = context.resources.latest_metadata_file_blobs - +def get_metadata_entries(blob_resource) -> Output: metadata_entries = [] - for blob in latest_metadata_file_blobs: + for blob in blob_resource: yaml_string = blob.download_as_string().decode("utf-8") metadata_dict = yaml.safe_load(yaml_string) metadata_def = MetadataDefinition.parse_obj(metadata_dict) @@ -137,4 +130,12 @@ def metadata_definitions(context: OpExecutionContext) -> List[LatestMetadataEntr ) metadata_entries.append(metadata_entry) - return metadata_entries + return Output(metadata_entries, metadata={"preview": [m.file_path for m in metadata_entries]}) + + +# ASSETS +@asset(required_resource_keys={"latest_metadata_file_blobs"}, group_name=GROUP_NAME) +@sentry.instrument_asset_op +def latest_metadata_entries(context: OpExecutionContext) -> Output[List[LatestMetadataEntry]]: + latest_metadata_file_blobs = context.resources.latest_metadata_file_blobs + return get_metadata_entries(latest_metadata_file_blobs) diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/registry.py b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/registry.py index b81b15be1d8f..67de2c6791b0 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/registry.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/registry.py @@ -2,21 +2,26 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import copy import json -from typing import List +from typing import List, Union import sentry_sdk -from dagster import MetadataValue, OpExecutionContext, Output, asset +from dagster import AutoMaterializePolicy, MetadataValue, OpExecutionContext, Output, asset from dagster_gcp.gcs.file_manager import GCSFileHandle, GCSFileManager -from google.cloud import storage +from metadata_service.models.generated.ConnectorRegistryDestinationDefinition import ConnectorRegistryDestinationDefinition +from metadata_service.models.generated.ConnectorRegistrySourceDefinition import ConnectorRegistrySourceDefinition from metadata_service.models.generated.ConnectorRegistryV0 import ConnectorRegistryV0 from metadata_service.models.transform import to_json_sanitized_dict from orchestrator.assets.registry_entry import ConnectorTypePrimaryKey, ConnectorTypes, read_registry_entry_blob from orchestrator.logging import sentry from orchestrator.logging.publish_connector_lifecycle import PublishConnectorLifecycle, PublishConnectorLifecycleStage, StageStatus +from orchestrator.models.metadata import LatestMetadataEntry, MetadataDefinition from orchestrator.utils.object_helpers import default_none_to_dict from pydash.objects import set_with +PolymorphicRegistryEntry = Union[ConnectorRegistrySourceDefinition, ConnectorRegistryDestinationDefinition] + GROUP_NAME = "registry" @@ -79,13 +84,54 @@ def persist_registry_to_json( return file_handle +@sentry_sdk.trace +def apply_release_candidates( + latest_registry_entry: dict, + release_candidate_registry_entry: PolymorphicRegistryEntry, +) -> dict: + updated_registry_entry = copy.deepcopy(latest_registry_entry) + updated_registry_entry.setdefault("releases", {}) + updated_registry_entry["releases"]["releaseCandidates"] = { + release_candidate_registry_entry.dockerImageTag: to_json_sanitized_dict(release_candidate_registry_entry) + } + return updated_registry_entry + + +def apply_release_candidate_entries(registry_entry_dict: dict, docker_repository_to_rc_registry_entry: dict) -> dict: + """Apply the optionally existing release candidate entries to the registry entry. + We need both the release candidate metadata entry and the release candidate registry entry because the metadata entry contains the rollout configuration, and the registry entry contains the actual RC registry entry. + + Args: + registry_entry_dict (dict): The registry entry. + docker_repository_to_rc_registry_entry (dict): Mapping of docker repository to release candidate registry entry. + + Returns: + dict: The registry entry with release candidates applied. + """ + registry_entry_dict = copy.deepcopy(registry_entry_dict) + if registry_entry_dict["dockerRepository"] in docker_repository_to_rc_registry_entry: + release_candidate_registry_entry = docker_repository_to_rc_registry_entry[registry_entry_dict["dockerRepository"]] + registry_entry_dict = apply_release_candidates(registry_entry_dict, release_candidate_registry_entry) + return registry_entry_dict + + +def get_connector_type_from_registry_entry(registry_entry: PolymorphicRegistryEntry) -> ConnectorTypes: + if hasattr(registry_entry, "sourceDefinitionId"): + return ConnectorTypes.SOURCE + elif hasattr(registry_entry, "destinationDefinitionId"): + return ConnectorTypes.DESTINATION + else: + raise ValueError("Registry entry is not a source or destination") + + @sentry_sdk.trace def generate_and_persist_registry( context: OpExecutionContext, - registry_entry_file_blobs: List[storage.Blob], + latest_registry_entries: List, + release_candidate_registry_entries: List, registry_directory_manager: GCSFileManager, registry_name: str, - latest_connnector_metrics: dict, + latest_connector_metrics: dict, ) -> Output[ConnectorRegistryV0]: """Generate the selected registry from the metadata files, and persist it to GCS. @@ -104,14 +150,21 @@ def generate_and_persist_registry( ) registry_dict = {"sources": [], "destinations": []} - for blob in registry_entry_file_blobs: - connector_type, registry_entry = read_registry_entry_blob(blob) - plural_connector_type = f"{connector_type}s" - # We santiize the registry entry to ensure its in a format + docker_repository_to_rc_registry_entry = { + release_candidate_registry_entries.dockerRepository: release_candidate_registry_entries + for release_candidate_registry_entries in release_candidate_registry_entries + } + + for latest_registry_entry in latest_registry_entries: + connector_type = get_connector_type_from_registry_entry(latest_registry_entry) + plural_connector_type = f"{connector_type.value}s" + + # We sanitize the registry entry to ensure its in a format # that can be parsed by pydantic. - registry_entry_dict = to_json_sanitized_dict(registry_entry) - enriched_registry_entry_dict = apply_metrics_to_registry_entry(registry_entry_dict, connector_type, latest_connnector_metrics) + registry_entry_dict = to_json_sanitized_dict(latest_registry_entry) + enriched_registry_entry_dict = apply_metrics_to_registry_entry(registry_entry_dict, connector_type, latest_connector_metrics) + enriched_registry_entry_dict = apply_release_candidate_entries(enriched_registry_entry_dict, docker_repository_to_rc_registry_entry) registry_dict[plural_connector_type].append(enriched_registry_entry_dict) @@ -137,46 +190,66 @@ def generate_and_persist_registry( @asset( - required_resource_keys={"slack", "registry_directory_manager", "latest_oss_registry_entries_file_blobs", "latest_metrics_gcs_blob"}, + required_resource_keys={ + "slack", + "registry_directory_manager", + "latest_oss_registry_entries_file_blobs", + "release_candidate_oss_registry_entries_file_blobs", + "latest_metrics_gcs_blob", + }, group_name=GROUP_NAME, ) @sentry.instrument_asset_op -def persisted_oss_registry(context: OpExecutionContext, latest_connnector_metrics: dict) -> Output[ConnectorRegistryV0]: +def persisted_oss_registry( + context: OpExecutionContext, + latest_connector_metrics: dict, + latest_oss_registry_entries: List, + release_candidate_oss_registry_entries: List, +) -> Output[ConnectorRegistryV0]: """ This asset is used to generate the oss registry from the registry entries. """ registry_name = "oss" registry_directory_manager = context.resources.registry_directory_manager - latest_oss_registry_entries_file_blobs = context.resources.latest_oss_registry_entries_file_blobs - return generate_and_persist_registry( context=context, - registry_entry_file_blobs=latest_oss_registry_entries_file_blobs, + latest_registry_entries=latest_oss_registry_entries, + release_candidate_registry_entries=release_candidate_oss_registry_entries, registry_directory_manager=registry_directory_manager, registry_name=registry_name, - latest_connnector_metrics=latest_connnector_metrics, + latest_connector_metrics=latest_connector_metrics, ) @asset( - required_resource_keys={"slack", "registry_directory_manager", "latest_cloud_registry_entries_file_blobs", "latest_metrics_gcs_blob"}, + required_resource_keys={ + "slack", + "registry_directory_manager", + "latest_cloud_registry_entries_file_blobs", + "release_candidate_cloud_registry_entries_file_blobs", + "latest_metrics_gcs_blob", + }, group_name=GROUP_NAME, ) @sentry.instrument_asset_op -def persisted_cloud_registry(context: OpExecutionContext, latest_connnector_metrics: dict) -> Output[ConnectorRegistryV0]: +def persisted_cloud_registry( + context: OpExecutionContext, + latest_connector_metrics: dict, + latest_cloud_registry_entries: List, + release_candidate_cloud_registry_entries: List, +) -> Output[ConnectorRegistryV0]: """ This asset is used to generate the cloud registry from the registry entries. """ registry_name = "cloud" registry_directory_manager = context.resources.registry_directory_manager - latest_cloud_registry_entries_file_blobs = context.resources.latest_cloud_registry_entries_file_blobs - return generate_and_persist_registry( context=context, - registry_entry_file_blobs=latest_cloud_registry_entries_file_blobs, + latest_registry_entries=latest_cloud_registry_entries, + release_candidate_registry_entries=release_candidate_cloud_registry_entries, registry_directory_manager=registry_directory_manager, registry_name=registry_name, - latest_connnector_metrics=latest_connnector_metrics, + latest_connector_metrics=latest_connector_metrics, ) diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/registry_entry.py b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/registry_entry.py index 3f143d22e574..77f821b6d3e0 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/registry_entry.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/registry_entry.py @@ -11,6 +11,7 @@ import orchestrator.hacks as HACKS import pandas as pd +import semver import sentry_sdk from dagster import AutoMaterializePolicy, DynamicPartitionsDefinition, MetadataValue, OpExecutionContext, Output, asset from dagster_gcp.gcs.file_manager import GCSFileHandle, GCSFileManager @@ -90,25 +91,29 @@ def calculate_migration_documentation_url(releases_or_breaking_change: dict, doc @deep_copy_params -def apply_connector_release_defaults(metadata: dict) -> Optional[pd.DataFrame]: - metadata_releases = metadata.get("releases") +def apply_connector_releases(metadata: dict) -> Optional[pd.DataFrame]: documentation_url = metadata.get("documentationUrl") - if metadata_releases is None: - return None + final_registry_releases = {} - # apply defaults for connector releases - metadata_releases["migrationDocumentationUrl"] = calculate_migration_documentation_url(metadata_releases, documentation_url) + if metadata.get("releases", {}).get("breakingChanges"): + # apply defaults for connector releases + final_registry_releases["migrationDocumentationUrl"] = calculate_migration_documentation_url( + metadata["releases"], documentation_url + ) - # releases has a dictionary field called breakingChanges, where the key is the version and the value is the data for the breaking change - # each breaking change has a migrationDocumentationUrl field that is optional, so we need to apply defaults to it - breaking_changes = metadata_releases["breakingChanges"] - if breaking_changes is not None: - for version, breaking_change in breaking_changes.items(): - breaking_change["migrationDocumentationUrl"] = calculate_migration_documentation_url( - breaking_change, documentation_url, version - ) + # releases has a dictionary field called breakingChanges, where the key is the version and the value is the data for the breaking change + # each breaking change has a migrationDocumentationUrl field that is optional, so we need to apply defaults to it + breaking_changes = metadata["releases"]["breakingChanges"] + if breaking_changes is not None: + for version, breaking_change in breaking_changes.items(): + breaking_change["migrationDocumentationUrl"] = calculate_migration_documentation_url( + breaking_change, documentation_url, version + ) + final_registry_releases["breakingChanges"] = breaking_changes - return metadata_releases + if metadata.get("releases", {}).get("rolloutConfiguration"): + final_registry_releases["rolloutConfiguration"] = metadata["releases"]["rolloutConfiguration"] + return final_registry_releases @deep_copy_params @@ -278,8 +283,7 @@ def metadata_to_registry_entry(metadata_entry: LatestMetadataEntry, override_reg # apply generated fields overridden_metadata_data["iconUrl"] = metadata_entry.icon_url - overridden_metadata_data["releases"] = apply_connector_release_defaults(overridden_metadata_data) - + overridden_metadata_data["releases"] = apply_connector_releases(overridden_metadata_data) return overridden_metadata_data @@ -303,7 +307,7 @@ def get_connector_type_from_registry_entry(registry_entry: dict) -> TaggedRegist raise Exception("Could not determine connector type from registry entry") -def _get_latest_entry_write_path(metadata_path: Optional[str], registry_name: str) -> str: +def _get_directory_write_path(metadata_path: Optional[str], registry_name: str) -> str: """Get the write path for the registry entry, assuming the metadata entry is the latest version.""" if metadata_path is None: raise Exception(f"Metadata entry {metadata_entry} does not have a file path") @@ -316,9 +320,9 @@ def get_registry_entry_write_path( registry_entry: Optional[PolymorphicRegistryEntry], metadata_entry: LatestMetadataEntry, registry_name: str ) -> str: """Get the write path for the registry entry.""" - if metadata_entry.is_latest_version_path: - # if the metadata entry is the latest version, write the registry entry to the same path as the metadata entry - return _get_latest_entry_write_path(metadata_entry.file_path, registry_name) + if metadata_entry.is_latest_version_path or metadata_entry.is_release_candidate_version_path: + # if the metadata entry is the latest or RC version, write the registry entry to the same path as the metadata entry + return _get_directory_write_path(metadata_entry.file_path, registry_name) else: if registry_entry is None: raise Exception(f"Could not determine write path for registry entry {registry_entry} because it is None") @@ -353,29 +357,48 @@ def persist_registry_entry_to_json( return file_handle -@sentry_sdk.trace -def generate_and_persist_registry_entry( +def generate_registry_entry( metadata_entry: LatestMetadataEntry, spec_cache: SpecCache, - metadata_directory_manager: GCSFileManager, registry_name: str, -) -> str: - """Generate the selected registry from the metadata files, and persist it to GCS. +) -> PolymorphicRegistryEntry: + """Generate a registry entry given a metadata entry. + Enriches the metadata entry with spec and release candidate information. Args: - context (OpExecutionContext): The execution context. - metadata_entry (List[LatestMetadataEntry]): The metadata definitions. - cached_specs (OutputDataFrame): The cached specs. + metadata_entry (LatestMetadataEntry): The metadata entry. + spec_cache (SpecCache): The spec cache. + registry_name (str): The name of the registry_entry. One of "cloud" or "oss". Returns: - Output[ConnectorRegistryV0]: The registry. + PolymorphicRegistryEntry: The registry entry (could be a source or destination entry). """ raw_entry_dict = metadata_to_registry_entry(metadata_entry, registry_name) registry_entry_with_spec = apply_spec_to_registry_entry(raw_entry_dict, spec_cache, registry_name) _, ConnectorModel = get_connector_type_from_registry_entry(registry_entry_with_spec) - registry_model = ConnectorModel.parse_obj(registry_entry_with_spec) + return ConnectorModel.parse_obj(registry_entry_with_spec) + + +@sentry_sdk.trace +def generate_and_persist_registry_entry( + metadata_entry: LatestMetadataEntry, + spec_cache: SpecCache, + metadata_directory_manager: GCSFileManager, + registry_name: str, +) -> str: + """Generate the selected registry from the metadata files, and persist it to GCS. + + Args: + metadata_entry (List[LatestMetadataEntry]): The metadata entry. + spec_cache (SpecCache): The spec cache. + metadata_directory_manager (GCSFileManager): The metadata directory manager. + registry_name (str): The name of the registry_entry. One of "cloud" or "oss". + Returns: + str: The public url of the registry entry. + """ + registry_model = generate_registry_entry(metadata_entry, spec_cache, registry_name) file_handle = persist_registry_entry_to_json(registry_model, registry_name, metadata_entry, metadata_directory_manager) @@ -585,7 +608,10 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=MAX_METADATA_PARTITION_RUN_REQUEST), ) @sentry.instrument_asset_op -def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestMetadataEntry]) -> Output[Optional[dict]]: +def registry_entry( + context: OpExecutionContext, + metadata_entry: Optional[LatestMetadataEntry], +) -> Output[Optional[dict]]: """ Generate the registry entry files from the given metadata file, and persist it to GCS. """ @@ -613,7 +639,12 @@ def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestM enabled_registries, disabled_registries = get_registry_status_lists(metadata_entry) persisted_registry_entries = { - registry_name: generate_and_persist_registry_entry(metadata_entry, spec_cache, root_metadata_directory_manager, registry_name) + registry_name: generate_and_persist_registry_entry( + metadata_entry, + spec_cache, + root_metadata_directory_manager, + registry_name, + ) for registry_name in enabled_registries } @@ -663,3 +694,36 @@ def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestM ) return Output(metadata=dagster_metadata, value=persisted_registry_entries) + + +def get_registry_entries(blob_resource) -> Output[List]: + registry_entries = [] + for blob in blob_resource: + _, registry_entry = read_registry_entry_blob(blob) + registry_entries.append(registry_entry) + + return Output(registry_entries) + + +@asset(required_resource_keys={"latest_cloud_registry_entries_file_blobs"}, group_name=GROUP_NAME) +@sentry.instrument_asset_op +def latest_cloud_registry_entries(context: OpExecutionContext) -> Output[List]: + return get_registry_entries(context.resources.latest_cloud_registry_entries_file_blobs) + + +@asset(required_resource_keys={"latest_oss_registry_entries_file_blobs"}, group_name=GROUP_NAME) +@sentry.instrument_asset_op +def latest_oss_registry_entries(context: OpExecutionContext) -> Output[List]: + return get_registry_entries(context.resources.latest_oss_registry_entries_file_blobs) + + +@asset(required_resource_keys={"release_candidate_cloud_registry_entries_file_blobs"}, group_name=GROUP_NAME) +@sentry.instrument_asset_op +def release_candidate_cloud_registry_entries(context: OpExecutionContext) -> Output[List]: + return get_registry_entries(context.resources.release_candidate_cloud_registry_entries_file_blobs) + + +@asset(required_resource_keys={"release_candidate_oss_registry_entries_file_blobs"}, group_name=GROUP_NAME) +@sentry.instrument_asset_op +def release_candidate_oss_registry_entries(context: OpExecutionContext) -> Output[List]: + return get_registry_entries(context.resources.release_candidate_oss_registry_entries_file_blobs) diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/jobs/registry.py b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/jobs/registry.py index 166ebf4f3fb5..322c2e5d3002 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/jobs/registry.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/jobs/registry.py @@ -3,13 +3,14 @@ # from dagster import AssetSelection, SkipReason, define_asset_job, job, op -from orchestrator.assets import registry_entry +from orchestrator.assets import metadata, registry, registry_entry, specs_secrets_mask from orchestrator.config import HIGH_QUEUE_PRIORITY, MAX_METADATA_PARTITION_RUN_REQUEST from orchestrator.logging.publish_connector_lifecycle import PublishConnectorLifecycle, PublishConnectorLifecycleStage, StageStatus oss_registry_inclusive = AssetSelection.keys("persisted_oss_registry", "specs_secrets_mask_yaml").upstream() generate_oss_registry = define_asset_job(name="generate_oss_registry", selection=oss_registry_inclusive) + cloud_registry_inclusive = AssetSelection.keys("persisted_cloud_registry", "specs_secrets_mask_yaml").upstream() generate_cloud_registry = define_asset_job(name="generate_cloud_registry", selection=cloud_registry_inclusive) diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/models/metadata.py b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/models/metadata.py index 4445aa1356aa..9a5f074c0954 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/models/metadata.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/models/metadata.py @@ -65,6 +65,14 @@ def is_latest_version_path(self) -> bool: ending_path = f"latest/{METADATA_FILE_NAME}" return self.file_path.endswith(ending_path) + @property + def is_release_candidate_version_path(self) -> bool: + """ + Path is considered a latest version path if the subfolder containing METADATA_FILE_NAME is "latest" + """ + ending_path = f"release_candidate/{METADATA_FILE_NAME}" + return self.file_path.endswith(ending_path) + @property def dependency_file_url(self) -> Optional[str]: if not self.bucket_name or not self.metadata_definition: diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/sensors/gcs.py b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/sensors/gcs.py index b6fda58d291c..d83a1ae4c183 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/sensors/gcs.py +++ b/airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/sensors/gcs.py @@ -2,6 +2,8 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import time + from dagster import ( DefaultSensorStatus, RunRequest, @@ -21,6 +23,7 @@ def new_gcs_blobs_sensor( job, interval, resources_def, + allow_duplicate_runs=False, ) -> SensorDefinition: """ This sensor is responsible for polling a list of gcs blobs and triggering a job when the list changes. @@ -42,7 +45,6 @@ def new_gcs_blobs_sensor_definition(context: SensorEvaluationContext): context.log.info(f"Old etag cursor: {context.cursor}") gcs_blobs_resource = getattr(resources, gcs_blobs_resource_key) - new_etags_cursor = string_array_to_hash([blob.etag for blob in gcs_blobs_resource]) context.log.info(f"New etag cursor: {new_etags_cursor}") @@ -54,6 +56,11 @@ def new_gcs_blobs_sensor_definition(context: SensorEvaluationContext): context.update_cursor(new_etags_cursor) context.log.info(f"New {gcs_blobs_resource_key} in GCS bucket") run_key = f"{sensor_name}:{new_etags_cursor}" + # Dagster skips runs with the same run_key + # It means that if the GCS blob list changed back to a state which was already processed, the run will be skipped + # This is not desirable in cases we want to reprocess the same data again after a blob deletion + if allow_duplicate_runs: + run_key += f":{int(time.time())}" return RunRequest(run_key=run_key) return new_gcs_blobs_sensor_definition diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/poetry.lock b/airbyte-ci/connectors/metadata_service/orchestrator/poetry.lock index 681fba946c20..65b4c30a2a86 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/poetry.lock +++ b/airbyte-ci/connectors/metadata_service/orchestrator/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. [[package]] name = "alembic" @@ -1747,7 +1747,7 @@ files = [ [[package]] name = "metadata-service" -version = "0.13.0" +version = "0.14.0" description = "" optional = false python-versions = "^3.9" @@ -2514,6 +2514,7 @@ files = [ {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:69b023b2b4daa7548bcfbd4aa3da05b3a74b772db9e23b982788168117739938"}, {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:81e0b275a9ecc9c0c0c07b4b90ba548307583c125f54d5b6946cfee6360c733d"}, {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ba336e390cd8e4d1739f42dfe9bb83a3cc2e80f567d8805e11b46f4a943f5515"}, + {file = "PyYAML-6.0.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:326c013efe8048858a6d312ddd31d56e468118ad4cdeda36c719bf5bb6192290"}, {file = "PyYAML-6.0.1-cp310-cp310-win32.whl", hash = "sha256:bd4af7373a854424dabd882decdc5579653d7868b8fb26dc7d0e99f823aa5924"}, {file = "PyYAML-6.0.1-cp310-cp310-win_amd64.whl", hash = "sha256:fd1592b3fdf65fff2ad0004b5e363300ef59ced41c2e6b3a99d4089fa8c5435d"}, {file = "PyYAML-6.0.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:6965a7bc3cf88e5a1c3bd2e0b5c22f8d677dc88a455344035f03399034eb3007"}, @@ -2521,8 +2522,15 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:42f8152b8dbc4fe7d96729ec2b99c7097d656dc1213a3229ca5383f973a5ed6d"}, {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:062582fca9fabdd2c8b54a3ef1c978d786e0f6b3a1510e0ac93ef59e0ddae2bc"}, {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d2b04aac4d386b172d5b9692e2d2da8de7bfb6c387fa4f801fbf6fb2e6ba4673"}, + {file = "PyYAML-6.0.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:e7d73685e87afe9f3b36c799222440d6cf362062f78be1013661b00c5c6f678b"}, {file = "PyYAML-6.0.1-cp311-cp311-win32.whl", hash = "sha256:1635fd110e8d85d55237ab316b5b011de701ea0f29d07611174a1b42f1444741"}, {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, + {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, + {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, + {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, + {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, + {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, + {file = "PyYAML-6.0.1-cp312-cp312-win_amd64.whl", hash = "sha256:0d3304d8c0adc42be59c5f8a4d9e3d7379e6955ad754aa9d6ab7a398b59dd1df"}, {file = "PyYAML-6.0.1-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:50550eb667afee136e9a77d6dc71ae76a44df8b3e51e41b77f6de2932bfe0f47"}, {file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1fe35611261b29bd1de0070f0b2f47cb6ff71fa6595c077e42bd0c419fa27b98"}, {file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:704219a11b772aea0d8ecd7058d0082713c3562b4e271b849ad7dc4a5c90c13c"}, @@ -2539,6 +2547,7 @@ files = [ {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a0cd17c15d3bb3fa06978b4e8958dcdc6e0174ccea823003a106c7d4d7899ac5"}, {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:28c119d996beec18c05208a8bd78cbe4007878c6dd15091efb73a30e90539696"}, {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7e07cbde391ba96ab58e532ff4803f79c4129397514e1413a7dc761ccd755735"}, + {file = "PyYAML-6.0.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:49a183be227561de579b4a36efbb21b3eab9651dd81b1858589f796549873dd6"}, {file = "PyYAML-6.0.1-cp38-cp38-win32.whl", hash = "sha256:184c5108a2aca3c5b3d3bf9395d50893a7ab82a38004c8f61c258d4428e80206"}, {file = "PyYAML-6.0.1-cp38-cp38-win_amd64.whl", hash = "sha256:1e2722cc9fbb45d9b87631ac70924c11d3a401b2d7f410cc0e3bbf249f2dca62"}, {file = "PyYAML-6.0.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:9eb6caa9a297fc2c2fb8862bc5370d0303ddba53ba97e71f08023b6cd73d16a8"}, @@ -2546,6 +2555,7 @@ files = [ {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5773183b6446b2c99bb77e77595dd486303b4faab2b086e7b17bc6bef28865f6"}, {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:b786eecbdf8499b9ca1d697215862083bd6d2a99965554781d0d8d1ad31e13a0"}, {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bc1bf2925a1ecd43da378f4db9e4f799775d6367bdb94671027b73b393a7c42c"}, + {file = "PyYAML-6.0.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:04ac92ad1925b2cff1db0cfebffb6ffc43457495c9b3c39d3fcae417d7125dc5"}, {file = "PyYAML-6.0.1-cp39-cp39-win32.whl", hash = "sha256:faca3bdcf85b2fc05d06ff3fbc1f83e1391b3e724afa3feba7d13eeab355484c"}, {file = "PyYAML-6.0.1-cp39-cp39-win_amd64.whl", hash = "sha256:510c9deebc5c0225e8c96813043e62b680ba2f9c50a08d3724c7f28a747d1486"}, {file = "PyYAML-6.0.1.tar.gz", hash = "sha256:bfdf460b1736c775f2ba9f6a92bca30bc2095067b8a9d77876d1fad6cc3b4a43"}, diff --git a/airbyte-ci/connectors/metadata_service/orchestrator/pyproject.toml b/airbyte-ci/connectors/metadata_service/orchestrator/pyproject.toml index a182237f2b22..b96544b70d21 100644 --- a/airbyte-ci/connectors/metadata_service/orchestrator/pyproject.toml +++ b/airbyte-ci/connectors/metadata_service/orchestrator/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "orchestrator" -version = "0.4.1" +version = "0.5.0" description = "" authors = ["Ben Church "] readme = "README.md"