diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumPropertiesManager.java b/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumPropertiesManager.java index e97cc9d3cec3..ec243b362793 100644 --- a/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumPropertiesManager.java +++ b/airbyte-integrations/bases/debezium-v1-9-6/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumPropertiesManager.java @@ -20,6 +20,7 @@ import org.slf4j.LoggerFactory; public class DebeziumPropertiesManager { + private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumPropertiesManager.class); private final JsonNode config; private final AirbyteFileOffsetBackingStore offsetManager; @@ -95,14 +96,14 @@ protected Properties getDebeziumProperties() { } public static String getTableIncludelist(final ConfiguredAirbyteCatalog catalog) { - // Turn "stream": { - // "namespace": "schema1" - // "name": "table1 - // }, - // "stream": { - // "namespace": "schema2" - // "name": "table2 - // } -------> info "schema1.table1, schema2.table2" + // Turn "stream": { + // "namespace": "schema1" + // "name": "table1 + // }, + // "stream": { + // "namespace": "schema2" + // "name": "table2 + // } -------> info "schema1.table1, schema2.table2" return catalog.getStreams().stream() .filter(s -> s.getSyncMode() == SyncMode.INCREMENTAL) @@ -114,27 +115,27 @@ public static String getTableIncludelist(final ConfiguredAirbyteCatalog catalog) } public static String getColumnIncludeList(final ConfiguredAirbyteCatalog catalog) { - // Turn "stream": { - // "namespace": "schema1" - // "name": "table1" - // "jsonSchema": { - // "properties": { - // "column1": { - // }, - // "column2": { - // } - // } - // } - // } -------> info "schema1.table1.(column1 | column2)" + // Turn "stream": { + // "namespace": "schema1" + // "name": "table1" + // "jsonSchema": { + // "properties": { + // "column1": { + // }, + // "column2": { + // } + // } + // } + // } -------> info "schema1.table1.(column1 | column2)" return catalog.getStreams().stream() .filter(s -> s.getSyncMode() == SyncMode.INCREMENTAL) .map(ConfiguredAirbyteStream::getStream) .map(s -> { - final String fields = parseFields(s.getJsonSchema().get("properties").fieldNames()); - // schema.table.(col1|col2) - return Pattern.quote(s.getNamespace() + "." + s.getName()) + (StringUtils.isNotBlank(fields) ? "\\." + fields : ""); - }) + final String fields = parseFields(s.getJsonSchema().get("properties").fieldNames()); + // schema.table.(col1|col2) + return Pattern.quote(s.getNamespace() + "." + s.getName()) + (StringUtils.isNotBlank(fields) ? "\\." + fields : ""); + }) .map(x -> StringUtils.escape(x, ",".toCharArray(), "\\,")) .collect(Collectors.joining(",")); } diff --git a/airbyte-integrations/bases/debezium-v1-9-6/src/test/java/io/airbyte/integrations/debezium/DebeziumRecordPublisherTest.java b/airbyte-integrations/bases/debezium-v1-9-6/src/test/java/io/airbyte/integrations/debezium/DebeziumRecordPublisherTest.java index fda7b89601a9..253469fede89 100644 --- a/airbyte-integrations/bases/debezium-v1-9-6/src/test/java/io/airbyte/integrations/debezium/DebeziumRecordPublisherTest.java +++ b/airbyte-integrations/bases/debezium-v1-9-6/src/test/java/io/airbyte/integrations/debezium/DebeziumRecordPublisherTest.java @@ -64,17 +64,17 @@ public void testColumnIncludelistFiltersFullRefresh() { @Test public void testColumnIncludeListEscaping() { -// final String a = "public\\.products\\*\\^\\$\\+-\\\\"; -// final String b = "public.products*^$+-\\"; -// final Pattern p = Pattern.compile(a, Pattern.UNIX_LINES); -// assertTrue(p.matcher(b).find()); -// assertTrue(Pattern.compile(Pattern.quote(b)).matcher(b).find()); + // final String a = "public\\.products\\*\\^\\$\\+-\\\\"; + // final String b = "public.products*^$+-\\"; + // final Pattern p = Pattern.compile(a, Pattern.UNIX_LINES); + // assertTrue(p.matcher(b).find()); + // assertTrue(Pattern.compile(Pattern.quote(b)).matcher(b).find()); final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(ImmutableList.of( CatalogHelpers.createConfiguredAirbyteStream( - "id_and_name", - "public", - Field.of("fld1", JsonSchemaType.NUMBER), Field.of("fld2", JsonSchemaType.STRING)).withSyncMode(SyncMode.INCREMENTAL))); + "id_and_name", + "public", + Field.of("fld1", JsonSchemaType.NUMBER), Field.of("fld2", JsonSchemaType.STRING)).withSyncMode(SyncMode.INCREMENTAL))); final String anchored = "^" + DebeziumPropertiesManager.getColumnIncludeList(catalog) + "$"; final Pattern pattern = Pattern.compile(anchored); @@ -85,4 +85,5 @@ public void testColumnIncludeListEscaping() { assertFalse(pattern.matcher("ppppublic.id_and_name.fld2333").find()); assertFalse(pattern.matcher("public.id_and_name.fld_wrong_wrong").find()); } + }