Skip to content

Commit

Permalink
build: fix ensure no file change check (#20328)
Browse files Browse the repository at this point in the history
  • Loading branch information
Conor authored Dec 9, 2022
1 parent 790dc42 commit 3fe3dea
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.slf4j.LoggerFactory;

public class DebeziumPropertiesManager {

private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumPropertiesManager.class);
private final JsonNode config;
private final AirbyteFileOffsetBackingStore offsetManager;
Expand Down Expand Up @@ -95,14 +96,14 @@ protected Properties getDebeziumProperties() {
}

public static String getTableIncludelist(final ConfiguredAirbyteCatalog catalog) {
// Turn "stream": {
// "namespace": "schema1"
// "name": "table1
// },
// "stream": {
// "namespace": "schema2"
// "name": "table2
// } -------> info "schema1.table1, schema2.table2"
// Turn "stream": {
// "namespace": "schema1"
// "name": "table1
// },
// "stream": {
// "namespace": "schema2"
// "name": "table2
// } -------> info "schema1.table1, schema2.table2"

return catalog.getStreams().stream()
.filter(s -> s.getSyncMode() == SyncMode.INCREMENTAL)
Expand All @@ -114,27 +115,27 @@ public static String getTableIncludelist(final ConfiguredAirbyteCatalog catalog)
}

public static String getColumnIncludeList(final ConfiguredAirbyteCatalog catalog) {
// Turn "stream": {
// "namespace": "schema1"
// "name": "table1"
// "jsonSchema": {
// "properties": {
// "column1": {
// },
// "column2": {
// }
// }
// }
// } -------> info "schema1.table1.(column1 | column2)"
// Turn "stream": {
// "namespace": "schema1"
// "name": "table1"
// "jsonSchema": {
// "properties": {
// "column1": {
// },
// "column2": {
// }
// }
// }
// } -------> info "schema1.table1.(column1 | column2)"

return catalog.getStreams().stream()
.filter(s -> s.getSyncMode() == SyncMode.INCREMENTAL)
.map(ConfiguredAirbyteStream::getStream)
.map(s -> {
final String fields = parseFields(s.getJsonSchema().get("properties").fieldNames());
// schema.table.(col1|col2)
return Pattern.quote(s.getNamespace() + "." + s.getName()) + (StringUtils.isNotBlank(fields) ? "\\." + fields : "");
})
final String fields = parseFields(s.getJsonSchema().get("properties").fieldNames());
// schema.table.(col1|col2)
return Pattern.quote(s.getNamespace() + "." + s.getName()) + (StringUtils.isNotBlank(fields) ? "\\." + fields : "");
})
.map(x -> StringUtils.escape(x, ",".toCharArray(), "\\,"))
.collect(Collectors.joining(","));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,17 @@ public void testColumnIncludelistFiltersFullRefresh() {

@Test
public void testColumnIncludeListEscaping() {
// final String a = "public\\.products\\*\\^\\$\\+-\\\\";
// final String b = "public.products*^$+-\\";
// final Pattern p = Pattern.compile(a, Pattern.UNIX_LINES);
// assertTrue(p.matcher(b).find());
// assertTrue(Pattern.compile(Pattern.quote(b)).matcher(b).find());
// final String a = "public\\.products\\*\\^\\$\\+-\\\\";
// final String b = "public.products*^$+-\\";
// final Pattern p = Pattern.compile(a, Pattern.UNIX_LINES);
// assertTrue(p.matcher(b).find());
// assertTrue(Pattern.compile(Pattern.quote(b)).matcher(b).find());

final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(ImmutableList.of(
CatalogHelpers.createConfiguredAirbyteStream(
"id_and_name",
"public",
Field.of("fld1", JsonSchemaType.NUMBER), Field.of("fld2", JsonSchemaType.STRING)).withSyncMode(SyncMode.INCREMENTAL)));
"id_and_name",
"public",
Field.of("fld1", JsonSchemaType.NUMBER), Field.of("fld2", JsonSchemaType.STRING)).withSyncMode(SyncMode.INCREMENTAL)));

final String anchored = "^" + DebeziumPropertiesManager.getColumnIncludeList(catalog) + "$";
final Pattern pattern = Pattern.compile(anchored);
Expand All @@ -85,4 +85,5 @@ public void testColumnIncludeListEscaping() {
assertFalse(pattern.matcher("ppppublic.id_and_name.fld2333").find());
assertFalse(pattern.matcher("public.id_and_name.fld_wrong_wrong").find());
}

}

0 comments on commit 3fe3dea

Please sign in to comment.