Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into artem1205/source-am…
Browse files Browse the repository at this point in the history
…azon-ads-migrate-low-code

# Conflicts:
#	airbyte-integrations/connectors/source-amazon-ads/metadata.yaml
#	airbyte-integrations/connectors/source-amazon-ads/pyproject.toml
#	airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/attribution_report.py
#	airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/common.py
#	airbyte-integrations/connectors/source-amazon-ads/source_amazon_ads/streams/portfolios.py
#	docs/integrations/sources/amazon-ads.md
  • Loading branch information
artem1205 committed Nov 12, 2024
2 parents e4972fb + cba7c04 commit 7078ee4
Show file tree
Hide file tree
Showing 386 changed files with 95,340 additions and 3,294 deletions.
37 changes: 37 additions & 0 deletions .github/workflows/python-cdk-pdoc-generate.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
name: "Python CDK: Generate Docs"

on:
push:
branches:
- main
pull_request: {}

jobs:
preview_docs:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Poetry
uses: Gr1N/setup-poetry@v9
with:
poetry-version: "1.7.1"
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.10"
cache: "poetry"

- name: Install dependencies
run: cd airbyte-cdk/python && poetry install --all-extras

- name: Generate documentation
run: |
cd airbyte-cdk/python && poetry run poe docs-generate
- name: Upload artifact
uses: actions/upload-artifact@v4
with:
name: "docs-generated"
path: "airbyte-cdk/python/docs/generated"
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package io.airbyte.cdk.read
import io.airbyte.cdk.StreamIdentifier
import io.airbyte.cdk.asProtocolStreamDescriptor
import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.util.Jsons
import io.airbyte.protocol.models.v0.AirbyteGlobalState
import io.airbyte.protocol.models.v0.AirbyteStateMessage
import io.airbyte.protocol.models.v0.AirbyteStateStats
Expand Down Expand Up @@ -199,7 +200,9 @@ class StateManager(
streamStates.add(
AirbyteStreamState()
.withStreamDescriptor(streamID.asProtocolStreamDescriptor())
.withStreamState(streamStateForCheckpoint.opaqueStateValue),
.withStreamState(
streamStateForCheckpoint.opaqueStateValue ?: Jsons.objectNode()
),
)
}
if (!shouldCheckpoint) {
Expand Down Expand Up @@ -233,7 +236,9 @@ class StateManager(
val airbyteStreamState =
AirbyteStreamState()
.withStreamDescriptor(feed.id.asProtocolStreamDescriptor())
.withStreamState(streamStateForCheckpoint.opaqueStateValue)
.withStreamState(
streamStateForCheckpoint.opaqueStateValue ?: Jsons.objectNode()
)
return AirbyteStateMessage()
.withType(AirbyteStateMessage.AirbyteStateType.STREAM)
.withStream(airbyteStreamState)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,152 +9,191 @@ import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Change
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason

interface AirbyteValueMapper {
val collectedChanges: List<DestinationRecord.Change>
fun map(
value: AirbyteValue,
schema: AirbyteType,
path: List<String> = emptyList(),
nullable: Boolean = false
): AirbyteValue
changes: List<DestinationRecord.Change> = emptyList()
): Pair<AirbyteValue, List<DestinationRecord.Change>>
}

/** An optimized identity mapper that just passes through. */
class AirbyteValueNoopMapper : AirbyteValueMapper {
override val collectedChanges: List<DestinationRecord.Change> = emptyList()
override fun map(
value: AirbyteValue,
schema: AirbyteType,
path: List<String>,
nullable: Boolean
): AirbyteValue = value
changes: List<DestinationRecord.Change>
): Pair<AirbyteValue, List<DestinationRecord.Change>> = value to changes
}

open class AirbyteValueIdentityMapper : AirbyteValueMapper {
override val collectedChanges: List<DestinationRecord.Change>
get() = changes.toList().also { changes.clear() }

private val changes: MutableList<DestinationRecord.Change> = mutableListOf()

private fun collectFailure(
path: List<String>,
reason: Reason = Reason.DESTINATION_SERIALIZATION_ERROR
) {
val joined = path.joinToString(".")
if (changes.none { it.field == joined }) {
changes.add(DestinationRecord.Change(path.joinToString("."), Change.NULLED, reason))
}
}
data class Context(
val nullable: Boolean = false,
val path: List<String> = emptyList(),
val changes: MutableSet<DestinationRecord.Change> = mutableSetOf(),
)

override fun map(
value: AirbyteValue,
schema: AirbyteType,
path: List<String>,
nullable: Boolean,
): AirbyteValue =
changes: List<DestinationRecord.Change>
): Pair<AirbyteValue, List<DestinationRecord.Change>> =
mapInner(value, schema, Context(changes = changes.toMutableSet())).let {
it.first to it.second.changes.toList()
}

fun mapInner(
value: AirbyteValue,
schema: AirbyteType,
context: Context,
): Pair<AirbyteValue, Context> =
if (value is NullValue) {
if (!nullable) {
if (!context.nullable) {
throw IllegalStateException(
"null value for non-nullable field at path: ${path.joinToString(".")}"
"null value for non-nullable field at path: ${context.path.joinToString(".")}"
)
}
mapNull(path)
mapNull(context)
} else
try {
when (schema) {
is ObjectType -> mapObject(value as ObjectValue, schema, path)
is ObjectType -> mapObject(value as ObjectValue, schema, context)
is ObjectTypeWithoutSchema ->
mapObjectWithoutSchema(value as ObjectValue, schema, path)
mapObjectWithoutSchema(value as ObjectValue, schema, context)
is ObjectTypeWithEmptySchema ->
mapObjectWithEmptySchema(value as ObjectValue, schema, path)
is ArrayType -> mapArray(value as ArrayValue, schema, path)
mapObjectWithEmptySchema(value as ObjectValue, schema, context)
is ArrayType -> mapArray(value as ArrayValue, schema, context)
is ArrayTypeWithoutSchema ->
mapArrayWithoutSchema(value as ArrayValue, schema, path)
is UnionType -> mapUnion(value, schema, path)
is BooleanType -> mapBoolean(value as BooleanValue, path)
is NumberType -> mapNumber(value as NumberValue, path)
is StringType -> mapString(value as StringValue, path)
is IntegerType -> mapInteger(value as IntegerValue, path)
is DateType -> mapDate(value as DateValue, path)
mapArrayWithoutSchema(value as ArrayValue, schema, context)
is UnionType -> mapUnion(value, schema, context)
is BooleanType -> mapBoolean(value as BooleanValue, context)
is NumberType -> mapNumber(value as NumberValue, context)
is StringType -> mapString(value as StringValue, context)
is IntegerType -> mapInteger(value as IntegerValue, context)
is DateType -> mapDate(value as DateValue, context)
is TimeTypeWithTimezone ->
mapTimeWithTimezone(
value as TimeValue,
path,
context,
)
is TimeTypeWithoutTimezone ->
mapTimeWithoutTimezone(
value as TimeValue,
path,
context,
)
is TimestampTypeWithTimezone ->
mapTimestampWithTimezone(value as TimestampValue, path)
mapTimestampWithTimezone(value as TimestampValue, context)
is TimestampTypeWithoutTimezone ->
mapTimestampWithoutTimezone(value as TimestampValue, path)
is UnknownType -> mapUnknown(value as UnknownValue, path)
mapTimestampWithoutTimezone(value as TimestampValue, context)
is UnknownType -> mapUnknown(value as UnknownValue, context)
}
} catch (e: Exception) {
collectFailure(path)
map(NullValue, schema, path, nullable)
context.changes.add(
DestinationRecord.Change(
context.path.joinToString("."),
Change.NULLED,
Reason.DESTINATION_SERIALIZATION_ERROR
)
)
mapInner(NullValue, schema, context)
}

open fun mapObject(value: ObjectValue, schema: ObjectType, path: List<String>): AirbyteValue {
open fun mapObject(
value: ObjectValue,
schema: ObjectType,
context: Context
): Pair<AirbyteValue, Context> {
val values = LinkedHashMap<String, AirbyteValue>()
schema.properties.forEach { (name, field) ->
values[name] =
map(value.values[name] ?: NullValue, field.type, path + name, field.nullable)
mapInner(
value.values[name] ?: NullValue,
field.type,
context.copy(path = context.path + name, nullable = field.nullable)
)
.first
}
return ObjectValue(values)
return ObjectValue(values) to context
}

open fun mapObjectWithoutSchema(
value: ObjectValue,
schema: ObjectTypeWithoutSchema,
path: List<String>
): AirbyteValue = value
context: Context
): Pair<AirbyteValue, Context> = value to context

open fun mapObjectWithEmptySchema(
value: ObjectValue,
schema: ObjectTypeWithEmptySchema,
path: List<String>
): AirbyteValue = value
context: Context
): Pair<AirbyteValue, Context> = value to context

open fun mapArray(value: ArrayValue, schema: ArrayType, path: List<String>): AirbyteValue {
return ArrayValue(
open fun mapArray(
value: ArrayValue,
schema: ArrayType,
context: Context
): Pair<AirbyteValue, Context> {
val mapped =
value.values.mapIndexed { index, element ->
map(element, schema.items.type, path + "[$index]", schema.items.nullable)
mapInner(
element,
schema.items.type,
context.copy(
path = context.path + "[$index]",
nullable = schema.items.nullable
)
)
.first
}
)
return ArrayValue(mapped) to context
}

open fun mapArrayWithoutSchema(
value: ArrayValue,
schema: ArrayTypeWithoutSchema,
path: List<String>
): AirbyteValue = value
context: Context
): Pair<AirbyteValue, Context> = value to context

open fun mapUnion(value: AirbyteValue, schema: UnionType, path: List<String>): AirbyteValue =
value
open fun mapUnion(
value: AirbyteValue,
schema: UnionType,
context: Context
): Pair<AirbyteValue, Context> = value to context

open fun mapBoolean(value: BooleanValue, path: List<String>): AirbyteValue = value
open fun mapBoolean(value: BooleanValue, context: Context): Pair<AirbyteValue, Context> =
value to context

open fun mapNumber(value: NumberValue, path: List<String>): AirbyteValue = value
open fun mapNumber(value: NumberValue, context: Context): Pair<AirbyteValue, Context> =
value to context

open fun mapString(value: StringValue, path: List<String>): AirbyteValue = value
open fun mapString(value: StringValue, context: Context): Pair<AirbyteValue, Context> =
value to context

open fun mapInteger(value: IntegerValue, path: List<String>): AirbyteValue = value
open fun mapInteger(value: IntegerValue, context: Context): Pair<AirbyteValue, Context> =
value to context

open fun mapDate(value: DateValue, path: List<String>): AirbyteValue = value
open fun mapDate(value: DateValue, context: Context): Pair<AirbyteValue, Context> =
value to context

open fun mapTimeWithTimezone(value: TimeValue, path: List<String>): AirbyteValue = value
open fun mapTimeWithTimezone(value: TimeValue, context: Context): Pair<AirbyteValue, Context> =
value to context

open fun mapTimeWithoutTimezone(value: TimeValue, path: List<String>): AirbyteValue = value
open fun mapTimeWithoutTimezone(
value: TimeValue,
context: Context
): Pair<AirbyteValue, Context> = value to context

open fun mapTimestampWithTimezone(value: TimestampValue, path: List<String>): AirbyteValue =
value
open fun mapTimestampWithTimezone(
value: TimestampValue,
context: Context
): Pair<AirbyteValue, Context> = value to context

open fun mapTimestampWithoutTimezone(value: TimestampValue, path: List<String>): AirbyteValue =
value
open fun mapTimestampWithoutTimezone(
value: TimestampValue,
context: Context
): Pair<AirbyteValue, Context> = value to context

open fun mapNull(path: List<String>): AirbyteValue = NullValue
open fun mapNull(context: Context): Pair<AirbyteValue, Context> = NullValue to context

open fun mapUnknown(value: UnknownValue, path: List<String>): AirbyteValue = value
open fun mapUnknown(value: UnknownValue, context: Context): Pair<AirbyteValue, Context> =
value to context
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,12 @@ class MapperPipeline(
finalSchema = schemas.last()
}

fun map(data: AirbyteValue, changes: List<Change>? = null): Pair<AirbyteValue, List<Change>> {
val results =
schemasWithMappers.runningFold(data) { value, (schema, mapper) ->
mapper.map(value, schema)
}
val changesFlattened =
schemasWithMappers
.flatMap { it.second.collectedChanges + (changes ?: emptyList()) }
.toSet()
.toList()
return results.last() to changesFlattened
}
fun map(data: AirbyteValue, changes: List<Change>? = null): Pair<AirbyteValue, List<Change>> =
schemasWithMappers.fold(data to (changes ?: emptyList())) {
(value, changes),
(schema, mapper) ->
mapper.map(value, schema, changes)
}
}

interface MapperPipelineFactory {
Expand Down

This file was deleted.

Loading

0 comments on commit 7078ee4

Please sign in to comment.