-
Notifications
You must be signed in to change notification settings - Fork 4.4k
/
Copy pathPostgresCdcConnectorMetadataInjector.java
38 lines (29 loc) · 1.27 KB
/
PostgresCdcConnectorMetadataInjector.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.source.postgres.cdc;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_LSN;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.integrations.debezium.CdcMetadataInjector;
public class PostgresCdcConnectorMetadataInjector implements CdcMetadataInjector<Long> {
@Override
public void addMetaData(final ObjectNode event, final JsonNode source) {
final long lsn = source.get("lsn").asLong();
event.put(CDC_LSN, lsn);
}
@Override
public void addMetaDataToRowsFetchedOutsideDebezium(final ObjectNode record, final String transactionTimestamp, final Long lsn) {
record.put(CDC_UPDATED_AT, transactionTimestamp);
record.put(CDC_LSN, lsn);
record.put(CDC_DELETED_AT, (String) null);
}
@Override
public String namespace(final JsonNode source) {
return source.get("schema").asText();
}
@Override
public String name(JsonNode source) { return source.get("table").asText(); }
}