Skip to content

Commit ee21229

Browse files
authored
Merge branch 'master' into assaadhjb/source-pg-fix-getlsn-function
2 parents 643421f + c4f00ab commit ee21229

File tree

333 files changed

+33318
-2811
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

333 files changed

+33318
-2811
lines changed
+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
name: Finalize connector rollout
2+
3+
on:
4+
repository_dispatch:
5+
types: [finalize-connector-rollout]
6+
workflow_dispatch:
7+
inputs:
8+
connector_name:
9+
description: "Connector name"
10+
required: true
11+
action:
12+
description: "Action to perform"
13+
required: true
14+
options: ["promote", "rollback"]
15+
jobs:
16+
finalize_rollout:
17+
name: Finalize connector rollout
18+
runs-on: connector-publish-large
19+
env:
20+
ACTION: ${{ github.event_name == 'workflow_dispatch' && github.event.inputs.action || github.event.client_payload.action }}
21+
steps:
22+
- name: Check action value
23+
run: |
24+
if [[ "${ACTION}" != "promote" && "${ACTION}" != "rollback" ]]; then
25+
echo "Invalid action: ${ACTION}"
26+
exit 1
27+
fi
28+
shell: bash
29+
- name: Checkout Airbyte
30+
uses: actions/checkout@v4
31+
- name: Promote {{ github.event.client_payload.connector_name }} release candidate
32+
id: promote-release-candidate
33+
if: ${{ env.ACTION == 'promote' }}
34+
uses: ./.github/actions/run-airbyte-ci
35+
with:
36+
context: "manual"
37+
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
38+
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
39+
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
40+
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
41+
gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
42+
github_token: ${{ secrets.GITHUB_TOKEN }}
43+
metadata_service_gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
44+
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
45+
slack_webhook_url: ${{ secrets.PUBLISH_ON_MERGE_SLACK_WEBHOOK }}
46+
subcommand: "connectors --name=${{ github.event.client_payload.connector_name }} publish --promote-release-candidate"
47+
- name: Rollback {{ github.event.client_payload.connector_name }} release candidate
48+
id: rollback-release-candidate
49+
if: ${{ env.ACTION == 'rollback' }}
50+
uses: ./.github/actions/run-airbyte-ci
51+
with:
52+
context: "manual"
53+
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
54+
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
55+
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
56+
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
57+
gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
58+
github_token: ${{ secrets.GITHUB_TOKEN }}
59+
metadata_service_gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
60+
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
61+
slack_webhook_url: ${{ secrets.PUBLISH_ON_MERGE_SLACK_WEBHOOK }}
62+
spec_cache_gcs_credentials: ${{ secrets.SPEC_CACHE_SERVICE_ACCOUNT_KEY_PUBLISH }}
63+
subcommand: "connectors --name=${{ github.event.client_payload.connector_name }} publish --rollback-release-candidate"

airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/util/Jsons.kt

-31
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@ package io.airbyte.cdk.util
33

44
import com.fasterxml.jackson.annotation.JsonInclude
55
import com.fasterxml.jackson.core.JsonGenerator
6-
import com.fasterxml.jackson.core.JsonProcessingException
76
import com.fasterxml.jackson.databind.DeserializationFeature
8-
import com.fasterxml.jackson.databind.JsonNode
97
import com.fasterxml.jackson.databind.ObjectMapper
108
import com.fasterxml.jackson.databind.node.ArrayNode
119
import com.fasterxml.jackson.databind.node.BinaryNode
@@ -16,7 +14,6 @@ import com.fasterxml.jackson.databind.node.TextNode
1614
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
1715
import com.fasterxml.jackson.module.afterburner.AfterburnerModule
1816
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
19-
import java.io.IOException
2017
import java.math.BigDecimal
2118
import java.math.BigInteger
2219
import java.nio.ByteBuffer
@@ -58,32 +55,4 @@ object Jsons : ObjectMapper() {
5855
}
5956

6057
fun booleanNode(boolean: Boolean): BooleanNode = nodeFactory.booleanNode(boolean)
61-
62-
fun <T> `object`(jsonNode: JsonNode?, klass: Class<T>?): T? {
63-
return convertValue(jsonNode, klass)
64-
}
65-
66-
fun <T> serialize(`object`: T): String {
67-
try {
68-
return writeValueAsString(`object`)
69-
} catch (e: JsonProcessingException) {
70-
throw RuntimeException(e)
71-
}
72-
}
73-
74-
fun <T> deserialize(jsonString: String?, klass: Class<T>?): T {
75-
try {
76-
return readValue(jsonString, klass)
77-
} catch (e: IOException) {
78-
throw RuntimeException(e)
79-
}
80-
}
81-
82-
fun deserialize(jsonString: String?): JsonNode {
83-
try {
84-
return readTree(jsonString)
85-
} catch (e: IOException) {
86-
throw RuntimeException(e)
87-
}
88-
}
8958
}

airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/output/BufferingOutputConsumer.kt

+12
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class BufferingOutputConsumer(
3434
private val catalogs = mutableListOf<AirbyteCatalog>()
3535
private val traces = mutableListOf<AirbyteTraceMessage>()
3636
private val messages = mutableListOf<AirbyteMessage>()
37+
private var messagesIndex: Int = 0
3738

3839
var callback: (AirbyteMessage) -> Unit = {}
3940
set(value) {
@@ -79,4 +80,15 @@ class BufferingOutputConsumer(
7980
fun traces(): List<AirbyteTraceMessage> = synchronized(this) { listOf(*traces.toTypedArray()) }
8081

8182
fun messages(): List<AirbyteMessage> = synchronized(this) { listOf(*messages.toTypedArray()) }
83+
84+
fun newMessages(): List<AirbyteMessage> =
85+
synchronized(this) {
86+
val newMessages = messages.subList(messagesIndex, messages.size)
87+
messagesIndex = messages.size
88+
newMessages
89+
}
90+
91+
fun resetNewMessagesCursor() {
92+
synchronized(this) { messagesIndex = 0 }
93+
}
8294
}

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/discover/Field.kt

+7
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
package io.airbyte.cdk.discover
33

44
import io.airbyte.cdk.data.AirbyteType
5+
import io.airbyte.cdk.data.IntCodec
56
import io.airbyte.cdk.data.JsonDecoder
67
import io.airbyte.cdk.data.JsonEncoder
78
import io.airbyte.cdk.data.JsonStringCodec
@@ -77,6 +78,12 @@ data object CdcStringMetaFieldType : LosslessFieldType {
7778
override val jsonDecoder: JsonDecoder<String> = JsonStringCodec
7879
}
7980

81+
data object CdcIntegerMetaFieldType : LosslessFieldType {
82+
override val airbyteType: AirbyteType = LeafAirbyteType.INTEGER
83+
override val jsonEncoder: JsonEncoder<Int> = IntCodec
84+
override val jsonDecoder: JsonDecoder<Int> = IntCodec
85+
}
86+
8087
data object CdcOffsetDateTimeMetaFieldType : LosslessFieldType {
8188
override val airbyteType: AirbyteType = LeafAirbyteType.TIMESTAMP_WITH_TIMEZONE
8289
override val jsonEncoder: JsonEncoder<OffsetDateTime> = OffsetDateTimeCodec

airbyte-cdk/bulk/core/extract/src/main/kotlin/io/airbyte/cdk/read/StateManager.kt

+91-52
Original file line numberDiff line numberDiff line change
@@ -94,48 +94,85 @@ class StateManager(
9494
private sealed class BaseStateManager<K : Feed>(
9595
override val feed: K,
9696
initialState: OpaqueStateValue?,
97-
private val isCheckpointUnique: Boolean = true,
9897
) : StateManagerScopedToFeed {
99-
private var current: OpaqueStateValue?
100-
private var pending: OpaqueStateValue?
101-
private var isPending: Boolean
102-
private var pendingNumRecords: Long
103-
104-
init {
105-
synchronized(this) {
106-
current = initialState
107-
pending = initialState
108-
isPending = initialState != null
109-
pendingNumRecords = 0L
110-
}
111-
}
98+
private var currentStateValue: OpaqueStateValue? = initialState
99+
private var pendingStateValue: OpaqueStateValue? = initialState
100+
private var pendingNumRecords: Long = 0L
112101

113-
override fun current(): OpaqueStateValue? = synchronized(this) { current }
102+
@Synchronized override fun current(): OpaqueStateValue? = currentStateValue
114103

104+
@Synchronized
115105
override fun set(
116106
state: OpaqueStateValue,
117107
numRecords: Long,
118108
) {
119-
synchronized(this) {
120-
pending = state
121-
isPending = true
122-
pendingNumRecords += numRecords
123-
}
109+
pendingStateValue = state
110+
pendingNumRecords += numRecords
124111
}
125112

126-
fun swap(): Pair<OpaqueStateValue?, Long>? {
127-
synchronized(this) {
128-
if (isCheckpointUnique && !isPending) {
129-
return null
130-
}
131-
val returnValue: Pair<OpaqueStateValue?, Long> = pending to pendingNumRecords
132-
current = pending
133-
pendingNumRecords = 0L
134-
return returnValue
135-
}
113+
/**
114+
* Called by [StateManager.checkpoint] to generate the Airbyte STATE messages for the
115+
* checkpoint.
116+
*
117+
* The return value is either [Fresh] or [Stale] depending on whether [set] has been called
118+
* since the last call to [takeForCheckpoint], or not, respectively.
119+
*
120+
* [Stale] messages are simply ignored when dealing only with [Stream] feeds, however these
121+
* may be required when emitting Airbyte STATE messages of type GLOBAL.
122+
*/
123+
@Synchronized
124+
fun takeForCheckpoint(): StateForCheckpoint {
125+
// Check if there is a pending state value or not.
126+
// If not, then set() HASN'T been called since the last call to takeForCheckpoint(),
127+
// because set() can only accept non-null state values.
128+
//
129+
// This means that there is nothing worth checkpointing for this particular feed.
130+
// In that case, exit early with the current state value.
131+
val freshStateValue: OpaqueStateValue =
132+
pendingStateValue ?: return Stale(currentStateValue)
133+
// This point is reached in the case where there is a pending state value.
134+
// This means that set() HAS been called since the last call to takeForCheckpoint().
135+
//
136+
// Keep a copy of the total number of records registered in all calls to set() since the
137+
// last call to takeForCheckpoint(), this number will be returned.
138+
val freshNumRecords: Long = pendingNumRecords
139+
// Update current state value.
140+
currentStateValue = freshStateValue
141+
// Reset the pending state, which will be overwritten by the next call to set().
142+
pendingStateValue = null
143+
pendingNumRecords = 0L
144+
// Return the latest state value as well as the total number of records seen since the
145+
// last call to takeForCheckpoint().
146+
return Fresh(freshStateValue, freshNumRecords)
136147
}
137148
}
138149

150+
/** Return value type for [BaseStateManager.takeForCheckpoint]. */
151+
private sealed interface StateForCheckpoint {
152+
val opaqueStateValue: OpaqueStateValue?
153+
val numRecords: Long
154+
}
155+
156+
/**
157+
* [StateForCheckpoint] implementation for when [StateManagerScopedToFeed.set] has been called
158+
* since the last call to [BaseStateManager.takeForCheckpoint].
159+
*/
160+
private data class Fresh(
161+
override val opaqueStateValue: OpaqueStateValue,
162+
override val numRecords: Long,
163+
) : StateForCheckpoint
164+
165+
/**
166+
* [StateForCheckpoint] implementation for when [StateManagerScopedToFeed.set] has NOT been
167+
* called since the last call to [BaseStateManager.takeForCheckpoint].
168+
*/
169+
private data class Stale(
170+
override val opaqueStateValue: OpaqueStateValue?,
171+
) : StateForCheckpoint {
172+
override val numRecords: Long
173+
get() = 0L
174+
}
175+
139176
private class GlobalStateManager(
140177
global: Global,
141178
initialGlobalState: OpaqueStateValue?,
@@ -147,34 +184,30 @@ class StateManager(
147184
.mapKeys { it.key.id }
148185

149186
fun checkpoint(): AirbyteStateMessage? {
150-
var numSwapped = 0
151-
var totalNumRecords: Long = 0L
152-
var globalStateValue: OpaqueStateValue? = current()
153-
val globalSwapped: Pair<OpaqueStateValue?, Long>? = swap()
154-
if (globalSwapped != null) {
155-
numSwapped++
156-
globalStateValue = globalSwapped.first
157-
totalNumRecords += globalSwapped.second
158-
}
187+
var shouldCheckpoint = false
188+
var totalNumRecords = 0L
189+
val globalStateForCheckpoint: StateForCheckpoint = takeForCheckpoint()
190+
totalNumRecords += globalStateForCheckpoint.numRecords
191+
if (globalStateForCheckpoint is Fresh) shouldCheckpoint = true
159192
val streamStates = mutableListOf<AirbyteStreamState>()
160193
for ((_, streamStateManager) in streamStateManagers) {
161-
var streamStateValue: OpaqueStateValue? = streamStateManager.current()
162-
val globalStreamSwapped: Pair<OpaqueStateValue?, Long>? = streamStateManager.swap()
163-
if (globalStreamSwapped != null) {
164-
numSwapped++
165-
streamStateValue = globalStreamSwapped.first
166-
totalNumRecords += globalStreamSwapped.second
167-
}
194+
val streamStateForCheckpoint: StateForCheckpoint =
195+
streamStateManager.takeForCheckpoint()
196+
totalNumRecords += streamStateForCheckpoint.numRecords
197+
if (streamStateForCheckpoint is Fresh) shouldCheckpoint = true
168198
val streamID: StreamIdentifier = streamStateManager.feed.id
169199
streamStates.add(
170200
AirbyteStreamState()
171201
.withStreamDescriptor(streamID.asProtocolStreamDescriptor())
172-
.withStreamState(streamStateValue),
202+
.withStreamState(streamStateForCheckpoint.opaqueStateValue),
173203
)
174204
}
205+
if (!shouldCheckpoint) {
206+
return null
207+
}
175208
val airbyteGlobalState =
176209
AirbyteGlobalState()
177-
.withSharedState(globalStateValue)
210+
.withSharedState(globalStateForCheckpoint.opaqueStateValue)
178211
.withStreamStates(streamStates)
179212
return AirbyteStateMessage()
180213
.withType(AirbyteStateMessage.AirbyteStateType.GLOBAL)
@@ -186,22 +219,28 @@ class StateManager(
186219
private class GlobalStreamStateManager(
187220
stream: Stream,
188221
initialState: OpaqueStateValue?,
189-
) : BaseStateManager<Stream>(stream, initialState, isCheckpointUnique = false)
222+
) : BaseStateManager<Stream>(stream, initialState)
190223

191224
private class NonGlobalStreamStateManager(
192225
stream: Stream,
193226
initialState: OpaqueStateValue?,
194227
) : BaseStateManager<Stream>(stream, initialState) {
195228
fun checkpoint(): AirbyteStateMessage? {
196-
val (opaqueStateValue: OpaqueStateValue?, numRecords: Long) = swap() ?: return null
229+
val streamStateForCheckpoint: StateForCheckpoint = takeForCheckpoint()
230+
if (streamStateForCheckpoint is Stale) {
231+
return null
232+
}
197233
val airbyteStreamState =
198234
AirbyteStreamState()
199235
.withStreamDescriptor(feed.id.asProtocolStreamDescriptor())
200-
.withStreamState(opaqueStateValue)
236+
.withStreamState(streamStateForCheckpoint.opaqueStateValue)
201237
return AirbyteStateMessage()
202238
.withType(AirbyteStateMessage.AirbyteStateType.STREAM)
203239
.withStream(airbyteStreamState)
204-
.withSourceStats(AirbyteStateStats().withRecordCount(numRecords.toDouble()))
240+
.withSourceStats(
241+
AirbyteStateStats()
242+
.withRecordCount(streamStateForCheckpoint.numRecords.toDouble())
243+
)
205244
}
206245
}
207246
}

0 commit comments

Comments
 (0)