Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make schema field in source-snowflake mean a subset of the specified o… #20465

Merged
merged 14 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ public void testDataTypes() throws Exception {
final List<AirbyteMessage> allMessages = runRead(catalog);
final UUID catalogId = runDiscover();
final Map<String, AirbyteStream> streams = getLastPersistedCatalog().getStreams().stream()
.collect(Collectors.toMap(AirbyteStream::getName, s -> s));
.collect(Collectors.toMap(
s ->
"%s.%s".formatted(s.getNamespace(), s.getName()),
Copy link
Contributor Author

@rodireich rodireich Dec 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change makes the test more robust in case previous runs left junk schemas or in case multiple instances of acceptance test are running at the same time.

s -> s));
final List<AirbyteMessage> recordMessages = allMessages.stream().filter(m -> m.getType() == Type.RECORD).toList();
final Map<String, List<String>> expectedValues = new HashMap<>();
testDataHolders.forEach(testDataHolder -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,8 +38,9 @@ public class SnowflakeDataSourceUtils {
public static final String AIRBYTE_OSS = "airbyte_oss";
public static final String AIRBYTE_CLOUD = "airbyte_cloud";
private static final String JDBC_CONNECTION_STRING =
"role=%s&warehouse=%s&database=%s&schema=%s&JDBC_QUERY_RESULT_FORMAT=%s&CLIENT_SESSION_KEEP_ALIVE=%s&application=%s";
"role=%s&warehouse=%s&database=%s&JDBC_QUERY_RESULT_FORMAT=%s&CLIENT_SESSION_KEEP_ALIVE=%s&application=%s";

private static final String JDBC_SCHEMA_PARAM = "&schema=%s&CLIENT_METADATA_REQUEST_USE_CONNECTION_CTX=true";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the problem with the current implementation that it does not set CLIENT_METADATA_REQUEST_USE_CONNECTION_CTX to true and therefore connector ends up pulling all tables from all schemas?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct.
So while UI is requiring a schema field, it doesn't affect the outcome catalog

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeDataSourceUtils.class);
private static final int PAUSE_BETWEEN_TOKEN_REFRESH_MIN = 7; // snowflake access token's TTL is 10min and can't be modified
private static final String REFRESH_TOKEN_URL = "https://%s/oauth/token-request";
Expand Down Expand Up @@ -141,13 +143,16 @@ public static String buildJDBCUrl(final JsonNode config, final String airbyteEnv
config.get("role").asText(),
config.get("warehouse").asText(),
config.get(JdbcUtils.DATABASE_KEY).asText(),
config.get("schema").asText(),
// Needed for JDK17 - see
// https://stackoverflow.com/questions/67409650/snowflake-jdbc-driver-internal-error-fail-to-retrieve-row-count-for-first-arrow
"JSON",
true,
airbyteEnvironment));

if (config.get("schema") != null && StringUtils.isNotBlank(config.get("schema").asText())) {
jdbcUrl.append(JDBC_SCHEMA_PARAM.formatted(config.get("schema").asText()));
}

// https://docs.snowflake.com/en/user-guide/jdbc-configure.html#jdbc-driver-connection-string
if (config.has(JdbcUtils.JDBC_URL_PARAMS_KEY)) {
jdbcUrl.append("&").append(config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Snowflake Source Spec",
"type": "object",
"required": ["host", "role", "warehouse", "database", "schema"],
"required": ["host", "role", "warehouse", "database"],
"properties": {
"credentials": {
"title": "Authorization Method",
Expand Down Expand Up @@ -110,7 +110,7 @@
"order": 4
},
"schema": {
"description": "The source Snowflake schema tables.",
"description": "The source Snowflake schema tables. Leave empty to access tables from multiple schemas.",
"examples": ["AIRBYTE_SCHEMA"],
"type": "string",
"title": "Schema",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import static io.airbyte.integrations.source.snowflake.SnowflakeDataSourceUtils.AIRBYTE_OSS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -37,6 +38,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -226,4 +228,46 @@ protected List<AirbyteMessage> getExpectedAirbyteMessagesSecondSync(final String
return expectedMessages;
}

@Test
void testDiscoverSchemaConfig() throws Exception {
// add table and data to a separate schema.
database.execute(connection -> {
connection.createStatement().execute(
String.format("CREATE TABLE %s(id VARCHAR(200) NOT NULL, name VARCHAR(200) NOT NULL)",
sourceOperations.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME)));
connection.createStatement()
.execute(String.format("INSERT INTO %s(id, name) VALUES ('1','picard')",
sourceOperations.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME)));
connection.createStatement()
.execute(String.format("INSERT INTO %s(id, name) VALUES ('2', 'crusher')",
sourceOperations.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME)));
connection.createStatement()
.execute(String.format("INSERT INTO %s(id, name) VALUES ('3', 'vash')",
sourceOperations.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME)));
connection.createStatement().execute(
String.format("CREATE TABLE %s(id VARCHAR(200) NOT NULL, name VARCHAR(200) NOT NULL)",
sourceOperations.getFullyQualifiedTableName(SCHEMA_NAME, Strings.addRandomSuffix(TABLE_NAME, "_", 4))));
});

JsonNode confWithSchema = ((ObjectNode) config).put("schema", SCHEMA_NAME);
AirbyteCatalog actual = source.discover(confWithSchema);

assertFalse(actual.getStreams().isEmpty());

var streams = actual.getStreams().stream()
.filter(s -> !s.getNamespace().equals(SCHEMA_NAME))
.collect(Collectors.toList());

assertTrue(streams.isEmpty());

confWithSchema = ((ObjectNode) config).put("schema", SCHEMA_NAME2);
actual = source.discover(confWithSchema);
assertFalse(actual.getStreams().isEmpty());

streams = actual.getStreams().stream()
.filter(s -> !s.getNamespace().equals(SCHEMA_NAME2))
.collect(Collectors.toList());

assertTrue(streams.isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withCursorField(Lists.newArrayList("ID"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s.%s", SCHEMA_NAME, STREAM_NAME1),
STREAM_NAME1, SCHEMA_NAME,
Field.of("ID", JsonSchemaType.NUMBER),
Field.of("NAME", JsonSchemaType.STRING))
.withSupportedSyncModes(
Expand All @@ -84,7 +84,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s.%s", SCHEMA_NAME, STREAM_NAME2),
STREAM_NAME2, SCHEMA_NAME,
Field.of("ID", JsonSchemaType.NUMBER),
Field.of("NAME", JsonSchemaType.STRING))
.withSupportedSyncModes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,10 @@ protected DataSource createDataSource() {
final StringBuilder jdbcUrl = new StringBuilder(
String.format("jdbc:snowflake://%s/?", config.get(JdbcUtils.HOST_KEY).asText()));
jdbcUrl.append(String.format(
"role=%s&warehouse=%s&database=%s&schema=%s&JDBC_QUERY_RESULT_FORMAT=%s&CLIENT_SESSION_KEEP_ALIVE=%s",
"role=%s&warehouse=%s&database=%s&JDBC_QUERY_RESULT_FORMAT=%s&CLIENT_SESSION_KEEP_ALIVE=%s",
config.get("role").asText(),
config.get("warehouse").asText(),
config.get(JdbcUtils.DATABASE_KEY).asText(),
config.get(JdbcUtils.SCHEMA_KEY).asText(),
// Needed for JDK17 - see
// https://stackoverflow.com/questions/67409650/snowflake-jdbc-driver-internal-error-fail-to-retrieve-row-count-for-first-arrow
"JSON",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Snowflake Source Spec",
"type": "object",
"required": ["host", "role", "warehouse", "database", "schema"],
"required": ["host", "role", "warehouse", "database"],
"properties": {
"credentials": {
"title": "Authorization Method",
Expand Down Expand Up @@ -110,7 +110,7 @@
"order": 4
},
"schema": {
"description": "The source Snowflake schema tables.",
"description": "The source Snowflake schema tables. Leave empty to access tables from multiple schemas.",
"examples": ["AIRBYTE_SCHEMA"],
"type": "string",
"title": "Schema",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ class SnowflakeDataSourceUtilsTest {
}
""";
private final String expectedJdbcUrl =
"jdbc:snowflake://host/?role=role&warehouse=WAREHOUSE&database=DATABASE&schema=SOURCE_SCHEMA&JDBC_QUERY_RESULT_FORMAT=JSON&CLIENT_SESSION_KEEP_ALIVE=true&application=airbyte_oss";
"jdbc:snowflake://host/?role=role&warehouse=WAREHOUSE&database=DATABASE"
+ "&JDBC_QUERY_RESULT_FORMAT=JSON&CLIENT_SESSION_KEEP_ALIVE=true&application=airbyte_oss"
+ "&schema=SOURCE_SCHEMA&CLIENT_METADATA_REQUEST_USE_CONNECTION_CTX=true";

@Test
void testBuildJDBCUrl() {
Expand Down